Merge branch 'kafka_face_sample'

This commit is contained in:
2025-09-24 05:03:04 +08:00
9 changed files with 557 additions and 0 deletions

View File

@@ -266,6 +266,12 @@
<artifactId>mts20140618</artifactId>
<version>5.0.0</version>
</dependency>
<!-- Spring Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
<build>

View File

@@ -0,0 +1,83 @@
package com.ycwl.basic.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import java.util.HashMap;
import java.util.Map;
@Configuration
@ConditionalOnProperty(name = "kafka.enabled", havingValue = "true", matchIfMissing = false)
public class KafkaConfig {
@Value("${kafka.bootstrap-servers:100.64.0.12:39092}")
private String bootstrapServers;
@Value("${kafka.consumer.group-id:liuying-microservice}")
private String consumerGroupId;
@Value("${kafka.consumer.auto-offset-reset:earliest}")
private String autoOffsetReset;
@Value("${kafka.producer.acks:all}")
private String acks;
@Value("${kafka.producer.retries:3}")
private Integer retries;
@Value("${kafka.producer.batch-size:16384}")
private Integer batchSize;
@Value("${kafka.producer.linger-ms:1}")
private Integer lingerMs;
@Value("${kafka.producer.buffer-memory:33554432}")
private Integer bufferMemory;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.ACKS_CONFIG, acks);
configProps.put(ProducerConfig.RETRIES_CONFIG, retries);
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
configProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}

View File

@@ -0,0 +1,35 @@
package com.ycwl.basic.integration.kafka.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "kafka")
public class KafkaIntegrationProperties {
private boolean enabled = false;
private String bootstrapServers = "100.64.0.12:39092";
private Consumer consumer = new Consumer();
private Producer producer = new Producer();
@Data
public static class Consumer {
private String groupId = "liuying-microservice";
private String autoOffsetReset = "earliest";
private String keyDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
private String valueDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
}
@Data
public static class Producer {
private String acks = "all";
private Integer retries = 3;
private Integer batchSize = 16384;
private Integer lingerMs = 1;
private Integer bufferMemory = 33554432;
private String keySerializer = "org.apache.kafka.common.serialization.StringSerializer";
private String valueSerializer = "org.apache.kafka.common.serialization.StringSerializer";
}
}

View File

@@ -0,0 +1,83 @@
package com.ycwl.basic.integration.kafka.dto;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
/**
* zt-face topic消息结构
* 用于人脸处理任务的异步消息传递
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class FaceProcessingMessage {
/**
* 人脸样本ID(外部系统传入)
*/
private Long faceSampleId;
/**
* 景区ID
*/
private Long scenicId;
/**
* 设备ID
*/
private Long deviceId;
/**
* 人脸图片URL
*/
private String faceUrl;
// 不再需要faceUniqueId,直接使用faceSampleId作为唯一标识
/**
* 拍摄时间
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime shotTime;
// status字段已移除,由系统内部管理状态
// deviceConfig字段已移除,由系统内部通过deviceRepository查询
/**
* 消息创建时间
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime createTime;
/**
* 消息来源
*/
private String source;
// retryCount字段已移除,重试机制由系统内部管理
// DeviceConfig内部类已移除,不再需要在消息中传递设备配置
/**
* 创建人脸处理消息的工厂方法(使用外部传入的faceId)
*/
public static FaceProcessingMessage create(Long externalFaceId, Long scenicId, Long deviceId,
String faceUrl, LocalDateTime shotTime) {
return FaceProcessingMessage.builder()
.faceSampleId(externalFaceId) // 使用外部传入的ID作为唯一标识
.scenicId(scenicId)
.deviceId(deviceId)
.faceUrl(faceUrl)
.shotTime(shotTime)
.createTime(LocalDateTime.now())
.source("external-system")
.build();
}
}

View File

@@ -0,0 +1,38 @@
package com.ycwl.basic.integration.kafka.dto;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class KafkaMessage<T> {
private String messageId;
private String topic;
private String eventType;
private T payload;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime timestamp;
private String source;
private String version;
public static <T> KafkaMessage<T> of(String topic, String eventType, T payload) {
return KafkaMessage.<T>builder()
.topic(topic)
.eventType(eventType)
.payload(payload)
.timestamp(LocalDateTime.now())
.source("liuying-microservice")
.version("1.0")
.build();
}
}

View File

@@ -0,0 +1,57 @@
package com.ycwl.basic.integration.kafka.example;
import com.ycwl.basic.integration.kafka.dto.KafkaMessage;
import com.ycwl.basic.integration.kafka.service.KafkaIntegrationService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* Kafka集成使用示例(暂时注释,后续开发时启用)
*/
@Slf4j
@Component
@RequiredArgsConstructor
@ConditionalOnProperty(name = "kafka.example.enabled", havingValue = "true", matchIfMissing = false)
public class KafkaIntegrationExample {
private final KafkaIntegrationService kafkaService;
/**
* 演示Kafka连接测试
*/
public void demonstrateConnectionTest() {
log.info("=== Kafka Integration Example ===");
// 测试连接
boolean connected = kafkaService.testConnection();
log.info("Kafka connection status: {}", connected ? "SUCCESS" : "FAILED");
// 显示配置信息
var properties = kafkaService.getKafkaProperties();
log.info("Kafka Bootstrap Servers: {}", properties.getBootstrapServers());
log.info("Consumer Group ID: {}", properties.getConsumer().getGroupId());
}
/**
* 演示消息发送(预留示例)
*/
public void demonstrateMessageSending() {
log.info("=== Message Sending Example (Not Implemented) ===");
// 创建示例消息
KafkaMessage<String> message = KafkaMessage.of(
"test-topic",
"TEST_EVENT",
"Hello Kafka from liuying-microservice!"
);
// 发送消息(暂不实现)
kafkaService.sendMessage("test-topic", "test-key", message);
log.info("Message sending demonstration completed");
}
// TODO: 后续添加消费者示例
// public void demonstrateMessageConsuming() { ... }
}

View File

@@ -0,0 +1,186 @@
package com.ycwl.basic.integration.kafka.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ycwl.basic.facebody.adapter.IFaceBodyAdapter;
import com.ycwl.basic.facebody.entity.AddFaceResp;
import com.ycwl.basic.integration.kafka.dto.FaceProcessingMessage;
import com.ycwl.basic.mapper.FaceSampleMapper;
import com.ycwl.basic.model.pc.faceSample.entity.FaceSampleEntity;
import com.ycwl.basic.repository.DeviceRepository;
import com.ycwl.basic.service.pc.ScenicService;
import com.ycwl.basic.service.task.TaskFaceService;
import com.ycwl.basic.task.DynamicTaskGenerator;
import com.ycwl.basic.integration.common.manager.DeviceConfigManager;
// 不再需要SnowFlakeUtil,使用外部传入的ID
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.time.ZoneId;
import java.util.Date;
/**
* 人脸处理Kafka消费服务
* 消费外部系统发送到zt-face topic的消息
*/
@Slf4j
@Service
@RequiredArgsConstructor
@ConditionalOnProperty(name = "kafka.enabled", havingValue = "true")
public class FaceProcessingKafkaService {
private static final String ZT_FACE_TOPIC = "zt-face";
private final ObjectMapper objectMapper;
private final FaceSampleMapper faceSampleMapper;
private final TaskFaceService taskFaceService;
private final ScenicService scenicService;
private final DeviceRepository deviceRepository;
/**
* 消费外部系统发送的人脸处理消息
* 先保存人脸样本数据,再进行异步人脸识别处理
*/
@KafkaListener(topics = ZT_FACE_TOPIC, groupId = "face-processing-group")
public void processFaceMessage(String message) {
try {
FaceProcessingMessage faceMessage = objectMapper.readValue(message, FaceProcessingMessage.class);
log.info("接收到外部人脸处理消息, scenicId: {}, deviceId: {}, faceUrl: {}",
faceMessage.getScenicId(), faceMessage.getDeviceId(), faceMessage.getFaceUrl());
// 使用外部传入的faceSampleId
Long externalFaceId = faceMessage.getFaceSampleId();
if (externalFaceId == null) {
log.error("外部消息中未包含faceSampleId");
return;
}
// 先保存人脸样本数据
boolean saved = saveFaceSample(faceMessage, externalFaceId);
// 然后进行人脸识别处理
if (saved) {
processFaceRecognition(faceMessage);
}
} catch (Exception e) {
log.error("处理外部人脸消息失败: {}", e.getMessage(), e);
// TODO: 考虑错误重试机制或死信队列
}
}
/**
* 保存人脸样本数据到数据库
* @param faceMessage 人脸处理消息
* @param externalFaceId 外部传入的人脸ID
* @return 是否保存成功
*/
private boolean saveFaceSample(FaceProcessingMessage faceMessage, Long externalFaceId) {
try {
FaceSampleEntity faceSample = new FaceSampleEntity();
faceSample.setId(externalFaceId); // 使用外部传入的ID
faceSample.setScenicId(faceMessage.getScenicId());
faceSample.setDeviceId(faceMessage.getDeviceId());
faceSample.setStatus(0); // 初始状态
faceSample.setFaceUrl(faceMessage.getFaceUrl());
// 转换时间格式
if (faceMessage.getShotTime() != null) {
Date shotTime = Date.from(faceMessage.getShotTime().atZone(ZoneId.systemDefault()).toInstant());
faceSample.setCreateAt(shotTime);
} else {
faceSample.setCreateAt(new Date());
}
// 保存到数据库
faceSampleMapper.add(faceSample);
log.info("人脸样本数据已保存, 使用外部faceId: {}, scenicId: {}, deviceId: {}, faceUrl: {}",
externalFaceId, faceMessage.getScenicId(), faceMessage.getDeviceId(), faceMessage.getFaceUrl());
return true;
} catch (Exception e) {
log.error("保存人脸样本数据失败, 外部faceId: {}, scenicId: {}, deviceId: {}",
externalFaceId, faceMessage.getScenicId(), faceMessage.getDeviceId(), e);
return false;
}
}
/**
* 执行人脸识别处理逻辑
* 对已保存的人脸样本进行识别处理
*/
private void processFaceRecognition(FaceProcessingMessage message) {
Long faceSampleId = message.getFaceSampleId();
Long scenicId = message.getScenicId();
String faceUrl = message.getFaceUrl();
// 直接使用faceSampleId作为唯一标识
String faceUniqueId = faceSampleId.toString();
// 获取人脸识别适配器
IFaceBodyAdapter faceBodyAdapter = scenicService.getScenicFaceBodyAdapter(scenicId);
if (faceBodyAdapter == null) {
log.error("人脸识别适配器不存在, scenicId: {}", scenicId);
updateFaceSampleStatus(faceSampleId, -1);
return;
}
try {
// 更新状态为处理中
updateFaceSampleStatus(faceSampleId, 1);
// 确保人脸数据库存在
taskFaceService.assureFaceDb(faceBodyAdapter, scenicId.toString());
// 添加人脸到识别服务(使用faceSampleId作为唯一标识)
AddFaceResp addFaceResp = faceBodyAdapter.addFace(
scenicId.toString(),
faceSampleId.toString(),
faceUrl,
faceUniqueId // 即faceSampleId.toString()
);
if (addFaceResp != null) {
// 更新人脸样本得分和状态
faceSampleMapper.updateScore(faceSampleId, addFaceResp.getScore());
updateFaceSampleStatus(faceSampleId, 2); // 成功状态
log.info("人脸识别处理完成, faceSampleId: {}, score: {}",
faceSampleId, addFaceResp.getScore());
// 查询设备配置,判断是否启用预订功能
Long deviceId = message.getDeviceId();
DeviceConfigManager deviceConfig = deviceRepository.getDeviceConfigManager(deviceId);
if (deviceConfig != null &&
Integer.valueOf(1).equals(deviceConfig.getInteger("enable_pre_book"))) {
DynamicTaskGenerator.addTask(faceSampleId);
log.info("已添加到预订任务队列, faceSampleId: {}", faceSampleId);
}
} else {
log.warn("人脸添加返回空结果, faceSampleId: {}", faceSampleId);
updateFaceSampleStatus(faceSampleId, -1);
}
} catch (Exception e) {
log.error("人脸识别处理失败, faceSampleId: {}, error: {}",
faceSampleId, e.getMessage(), e);
// 标记人脸样本为处理失败状态
updateFaceSampleStatus(faceSampleId, -1);
}
}
/**
* 更新人脸样本状态
*/
private void updateFaceSampleStatus(Long faceSampleId, Integer status) {
try {
// TODO: 需要在FaceSampleMapper中添加updateStatus方法
// faceSampleMapper.updateStatus(faceSampleId, status);
log.info("人脸样本状态已更新, faceSampleId: {}, status: {} (0:初始,1:处理中,2:成功,-1:失败)", faceSampleId, status);
} catch (Exception e) {
log.error("更新人脸样本状态失败, faceSampleId: {}", faceSampleId, e);
}
}
}

View File

@@ -0,0 +1,51 @@
package com.ycwl.basic.integration.kafka.service;
import com.ycwl.basic.integration.kafka.config.KafkaIntegrationProperties;
import com.ycwl.basic.integration.kafka.dto.KafkaMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
@ConditionalOnProperty(name = "kafka.enabled", havingValue = "true")
public class KafkaIntegrationService {
private final KafkaTemplate<String, String> kafkaTemplate;
private final KafkaIntegrationProperties kafkaProperties;
/**
* 测试Kafka连接
*/
public boolean testConnection() {
try {
log.info("Testing Kafka connection to: {}", kafkaProperties.getBootstrapServers());
// 尝试获取元数据以测试连接
var metadata = kafkaTemplate.getProducerFactory().createProducer().partitionsFor("test-topic");
log.info("Kafka connection test successful");
return true;
} catch (Exception e) {
log.error("Kafka connection test failed", e);
return false;
}
}
/**
* 发送消息(预留接口,暂不实现具体逻辑)
*/
public void sendMessage(String topic, String key, KafkaMessage<?> message) {
log.info("Kafka message sending is not implemented yet. Topic: {}, Key: {}", topic, key);
// TODO: 后续实现具体的消息发送逻辑
}
/**
* 获取Kafka配置信息
*/
public KafkaIntegrationProperties getKafkaProperties() {
return kafkaProperties;
}
}

View File

@@ -17,6 +17,24 @@ feign:
okhttp:
enabled: true
# Kafka配置
kafka:
enabled: false # 默认关闭,需要时手动开启
bootstrap-servers: 100.64.0.12:39092
consumer:
group-id: liuying-microservice-dev
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
acks: all
retries: 3
batch-size: 16384
linger-ms: 1
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 开发环境日志配置
logging:
level: