diff --git a/pom.xml b/pom.xml index de66305a..070e7e3a 100644 --- a/pom.xml +++ b/pom.xml @@ -266,6 +266,12 @@ mts20140618 5.0.0 + + + + org.springframework.kafka + spring-kafka + diff --git a/src/main/java/com/ycwl/basic/config/KafkaConfig.java b/src/main/java/com/ycwl/basic/config/KafkaConfig.java new file mode 100644 index 00000000..7dec01eb --- /dev/null +++ b/src/main/java/com/ycwl/basic/config/KafkaConfig.java @@ -0,0 +1,83 @@ +package com.ycwl.basic.config; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.*; + +import java.util.HashMap; +import java.util.Map; + +@Configuration +@ConditionalOnProperty(name = "kafka.enabled", havingValue = "true", matchIfMissing = false) +public class KafkaConfig { + + @Value("${kafka.bootstrap-servers:100.64.0.12:39092}") + private String bootstrapServers; + + @Value("${kafka.consumer.group-id:liuying-microservice}") + private String consumerGroupId; + + @Value("${kafka.consumer.auto-offset-reset:earliest}") + private String autoOffsetReset; + + @Value("${kafka.producer.acks:all}") + private String acks; + + @Value("${kafka.producer.retries:3}") + private Integer retries; + + @Value("${kafka.producer.batch-size:16384}") + private Integer batchSize; + + @Value("${kafka.producer.linger-ms:1}") + private Integer lingerMs; + + @Value("${kafka.producer.buffer-memory:33554432}") + private Integer bufferMemory; + + @Bean + public ProducerFactory producerFactory() { + Map configProps = new HashMap<>(); + configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProps.put(ProducerConfig.ACKS_CONFIG, acks); + configProps.put(ProducerConfig.RETRIES_CONFIG, retries); + configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); + configProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs); + configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); + + return new DefaultKafkaProducerFactory<>(configProps); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + + @Bean + public ConsumerFactory consumerFactory() { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); + + return new DefaultKafkaConsumerFactory<>(props); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + return factory; + } +} \ No newline at end of file diff --git a/src/main/java/com/ycwl/basic/integration/kafka/config/KafkaIntegrationProperties.java b/src/main/java/com/ycwl/basic/integration/kafka/config/KafkaIntegrationProperties.java new file mode 100644 index 00000000..cf4842aa --- /dev/null +++ b/src/main/java/com/ycwl/basic/integration/kafka/config/KafkaIntegrationProperties.java @@ -0,0 +1,35 @@ +package com.ycwl.basic.integration.kafka.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +@Data +@Component +@ConfigurationProperties(prefix = "kafka") +public class KafkaIntegrationProperties { + + private boolean enabled = false; + private String bootstrapServers = "100.64.0.12:39092"; + private Consumer consumer = new Consumer(); + private Producer producer = new Producer(); + + @Data + public static class Consumer { + private String groupId = "liuying-microservice"; + private String autoOffsetReset = "earliest"; + private String keyDeserializer = "org.apache.kafka.common.serialization.StringDeserializer"; + private String valueDeserializer = "org.apache.kafka.common.serialization.StringDeserializer"; + } + + @Data + public static class Producer { + private String acks = "all"; + private Integer retries = 3; + private Integer batchSize = 16384; + private Integer lingerMs = 1; + private Integer bufferMemory = 33554432; + private String keySerializer = "org.apache.kafka.common.serialization.StringSerializer"; + private String valueSerializer = "org.apache.kafka.common.serialization.StringSerializer"; + } +} \ No newline at end of file diff --git a/src/main/java/com/ycwl/basic/integration/kafka/dto/FaceProcessingMessage.java b/src/main/java/com/ycwl/basic/integration/kafka/dto/FaceProcessingMessage.java new file mode 100644 index 00000000..78f542ec --- /dev/null +++ b/src/main/java/com/ycwl/basic/integration/kafka/dto/FaceProcessingMessage.java @@ -0,0 +1,83 @@ +package com.ycwl.basic.integration.kafka.dto; + +import com.fasterxml.jackson.annotation.JsonFormat; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; + +/** + * zt-face topic消息结构 + * 用于人脸处理任务的异步消息传递 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class FaceProcessingMessage { + + /** + * 人脸样本ID(外部系统传入) + */ + private Long faceSampleId; + + /** + * 景区ID + */ + private Long scenicId; + + /** + * 设备ID + */ + private Long deviceId; + + /** + * 人脸图片URL + */ + private String faceUrl; + + // 不再需要faceUniqueId,直接使用faceSampleId作为唯一标识 + + /** + * 拍摄时间 + */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private LocalDateTime shotTime; + + // status字段已移除,由系统内部管理状态 + + // deviceConfig字段已移除,由系统内部通过deviceRepository查询 + + /** + * 消息创建时间 + */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private LocalDateTime createTime; + + /** + * 消息来源 + */ + private String source; + + // retryCount字段已移除,重试机制由系统内部管理 + + // DeviceConfig内部类已移除,不再需要在消息中传递设备配置 + + /** + * 创建人脸处理消息的工厂方法(使用外部传入的faceId) + */ + public static FaceProcessingMessage create(Long externalFaceId, Long scenicId, Long deviceId, + String faceUrl, LocalDateTime shotTime) { + return FaceProcessingMessage.builder() + .faceSampleId(externalFaceId) // 使用外部传入的ID作为唯一标识 + .scenicId(scenicId) + .deviceId(deviceId) + .faceUrl(faceUrl) + .shotTime(shotTime) + .createTime(LocalDateTime.now()) + .source("external-system") + .build(); + } +} \ No newline at end of file diff --git a/src/main/java/com/ycwl/basic/integration/kafka/dto/KafkaMessage.java b/src/main/java/com/ycwl/basic/integration/kafka/dto/KafkaMessage.java new file mode 100644 index 00000000..3e07716a --- /dev/null +++ b/src/main/java/com/ycwl/basic/integration/kafka/dto/KafkaMessage.java @@ -0,0 +1,38 @@ +package com.ycwl.basic.integration.kafka.dto; + +import com.fasterxml.jackson.annotation.JsonFormat; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class KafkaMessage { + + private String messageId; + private String topic; + private String eventType; + private T payload; + + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private LocalDateTime timestamp; + + private String source; + private String version; + + public static KafkaMessage of(String topic, String eventType, T payload) { + return KafkaMessage.builder() + .topic(topic) + .eventType(eventType) + .payload(payload) + .timestamp(LocalDateTime.now()) + .source("liuying-microservice") + .version("1.0") + .build(); + } +} \ No newline at end of file diff --git a/src/main/java/com/ycwl/basic/integration/kafka/example/KafkaIntegrationExample.java b/src/main/java/com/ycwl/basic/integration/kafka/example/KafkaIntegrationExample.java new file mode 100644 index 00000000..72441c9b --- /dev/null +++ b/src/main/java/com/ycwl/basic/integration/kafka/example/KafkaIntegrationExample.java @@ -0,0 +1,57 @@ +package com.ycwl.basic.integration.kafka.example; + +import com.ycwl.basic.integration.kafka.dto.KafkaMessage; +import com.ycwl.basic.integration.kafka.service.KafkaIntegrationService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * Kafka集成使用示例(暂时注释,后续开发时启用) + */ +@Slf4j +@Component +@RequiredArgsConstructor +@ConditionalOnProperty(name = "kafka.example.enabled", havingValue = "true", matchIfMissing = false) +public class KafkaIntegrationExample { + + private final KafkaIntegrationService kafkaService; + + /** + * 演示Kafka连接测试 + */ + public void demonstrateConnectionTest() { + log.info("=== Kafka Integration Example ==="); + + // 测试连接 + boolean connected = kafkaService.testConnection(); + log.info("Kafka connection status: {}", connected ? "SUCCESS" : "FAILED"); + + // 显示配置信息 + var properties = kafkaService.getKafkaProperties(); + log.info("Kafka Bootstrap Servers: {}", properties.getBootstrapServers()); + log.info("Consumer Group ID: {}", properties.getConsumer().getGroupId()); + } + + /** + * 演示消息发送(预留示例) + */ + public void demonstrateMessageSending() { + log.info("=== Message Sending Example (Not Implemented) ==="); + + // 创建示例消息 + KafkaMessage message = KafkaMessage.of( + "test-topic", + "TEST_EVENT", + "Hello Kafka from liuying-microservice!" + ); + + // 发送消息(暂不实现) + kafkaService.sendMessage("test-topic", "test-key", message); + log.info("Message sending demonstration completed"); + } + + // TODO: 后续添加消费者示例 + // public void demonstrateMessageConsuming() { ... } +} \ No newline at end of file diff --git a/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java b/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java new file mode 100644 index 00000000..42d98542 --- /dev/null +++ b/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java @@ -0,0 +1,186 @@ +package com.ycwl.basic.integration.kafka.service; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.ycwl.basic.facebody.adapter.IFaceBodyAdapter; +import com.ycwl.basic.facebody.entity.AddFaceResp; +import com.ycwl.basic.integration.kafka.dto.FaceProcessingMessage; +import com.ycwl.basic.mapper.FaceSampleMapper; +import com.ycwl.basic.model.pc.faceSample.entity.FaceSampleEntity; +import com.ycwl.basic.repository.DeviceRepository; +import com.ycwl.basic.service.pc.ScenicService; +import com.ycwl.basic.service.task.TaskFaceService; +import com.ycwl.basic.task.DynamicTaskGenerator; +import com.ycwl.basic.integration.common.manager.DeviceConfigManager; +// 不再需要SnowFlakeUtil,使用外部传入的ID +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Service; + +import java.time.ZoneId; +import java.util.Date; + +/** + * 人脸处理Kafka消费服务 + * 消费外部系统发送到zt-face topic的消息 + */ +@Slf4j +@Service +@RequiredArgsConstructor +@ConditionalOnProperty(name = "kafka.enabled", havingValue = "true") +public class FaceProcessingKafkaService { + + private static final String ZT_FACE_TOPIC = "zt-face"; + + private final ObjectMapper objectMapper; + private final FaceSampleMapper faceSampleMapper; + private final TaskFaceService taskFaceService; + private final ScenicService scenicService; + private final DeviceRepository deviceRepository; + + /** + * 消费外部系统发送的人脸处理消息 + * 先保存人脸样本数据,再进行异步人脸识别处理 + */ + @KafkaListener(topics = ZT_FACE_TOPIC, groupId = "face-processing-group") + public void processFaceMessage(String message) { + try { + FaceProcessingMessage faceMessage = objectMapper.readValue(message, FaceProcessingMessage.class); + log.info("接收到外部人脸处理消息, scenicId: {}, deviceId: {}, faceUrl: {}", + faceMessage.getScenicId(), faceMessage.getDeviceId(), faceMessage.getFaceUrl()); + + // 使用外部传入的faceSampleId + Long externalFaceId = faceMessage.getFaceSampleId(); + if (externalFaceId == null) { + log.error("外部消息中未包含faceSampleId"); + return; + } + + // 先保存人脸样本数据 + boolean saved = saveFaceSample(faceMessage, externalFaceId); + + // 然后进行人脸识别处理 + if (saved) { + processFaceRecognition(faceMessage); + } + + } catch (Exception e) { + log.error("处理外部人脸消息失败: {}", e.getMessage(), e); + // TODO: 考虑错误重试机制或死信队列 + } + } + + /** + * 保存人脸样本数据到数据库 + * @param faceMessage 人脸处理消息 + * @param externalFaceId 外部传入的人脸ID + * @return 是否保存成功 + */ + private boolean saveFaceSample(FaceProcessingMessage faceMessage, Long externalFaceId) { + try { + FaceSampleEntity faceSample = new FaceSampleEntity(); + faceSample.setId(externalFaceId); // 使用外部传入的ID + faceSample.setScenicId(faceMessage.getScenicId()); + faceSample.setDeviceId(faceMessage.getDeviceId()); + faceSample.setStatus(0); // 初始状态 + faceSample.setFaceUrl(faceMessage.getFaceUrl()); + + // 转换时间格式 + if (faceMessage.getShotTime() != null) { + Date shotTime = Date.from(faceMessage.getShotTime().atZone(ZoneId.systemDefault()).toInstant()); + faceSample.setCreateAt(shotTime); + } else { + faceSample.setCreateAt(new Date()); + } + + // 保存到数据库 + faceSampleMapper.add(faceSample); + log.info("人脸样本数据已保存, 使用外部faceId: {}, scenicId: {}, deviceId: {}, faceUrl: {}", + externalFaceId, faceMessage.getScenicId(), faceMessage.getDeviceId(), faceMessage.getFaceUrl()); + + return true; + } catch (Exception e) { + log.error("保存人脸样本数据失败, 外部faceId: {}, scenicId: {}, deviceId: {}", + externalFaceId, faceMessage.getScenicId(), faceMessage.getDeviceId(), e); + return false; + } + } + + /** + * 执行人脸识别处理逻辑 + * 对已保存的人脸样本进行识别处理 + */ + private void processFaceRecognition(FaceProcessingMessage message) { + Long faceSampleId = message.getFaceSampleId(); + Long scenicId = message.getScenicId(); + String faceUrl = message.getFaceUrl(); + + // 直接使用faceSampleId作为唯一标识 + String faceUniqueId = faceSampleId.toString(); + + // 获取人脸识别适配器 + IFaceBodyAdapter faceBodyAdapter = scenicService.getScenicFaceBodyAdapter(scenicId); + if (faceBodyAdapter == null) { + log.error("人脸识别适配器不存在, scenicId: {}", scenicId); + updateFaceSampleStatus(faceSampleId, -1); + return; + } + + try { + // 更新状态为处理中 + updateFaceSampleStatus(faceSampleId, 1); + + // 确保人脸数据库存在 + taskFaceService.assureFaceDb(faceBodyAdapter, scenicId.toString()); + + // 添加人脸到识别服务(使用faceSampleId作为唯一标识) + AddFaceResp addFaceResp = faceBodyAdapter.addFace( + scenicId.toString(), + faceSampleId.toString(), + faceUrl, + faceUniqueId // 即faceSampleId.toString() + ); + + if (addFaceResp != null) { + // 更新人脸样本得分和状态 + faceSampleMapper.updateScore(faceSampleId, addFaceResp.getScore()); + updateFaceSampleStatus(faceSampleId, 2); // 成功状态 + log.info("人脸识别处理完成, faceSampleId: {}, score: {}", + faceSampleId, addFaceResp.getScore()); + + // 查询设备配置,判断是否启用预订功能 + Long deviceId = message.getDeviceId(); + DeviceConfigManager deviceConfig = deviceRepository.getDeviceConfigManager(deviceId); + if (deviceConfig != null && + Integer.valueOf(1).equals(deviceConfig.getInteger("enable_pre_book"))) { + DynamicTaskGenerator.addTask(faceSampleId); + log.info("已添加到预订任务队列, faceSampleId: {}", faceSampleId); + } + } else { + log.warn("人脸添加返回空结果, faceSampleId: {}", faceSampleId); + updateFaceSampleStatus(faceSampleId, -1); + } + + } catch (Exception e) { + log.error("人脸识别处理失败, faceSampleId: {}, error: {}", + faceSampleId, e.getMessage(), e); + + // 标记人脸样本为处理失败状态 + updateFaceSampleStatus(faceSampleId, -1); + } + } + + /** + * 更新人脸样本状态 + */ + private void updateFaceSampleStatus(Long faceSampleId, Integer status) { + try { + // TODO: 需要在FaceSampleMapper中添加updateStatus方法 + // faceSampleMapper.updateStatus(faceSampleId, status); + log.info("人脸样本状态已更新, faceSampleId: {}, status: {} (0:初始,1:处理中,2:成功,-1:失败)", faceSampleId, status); + } catch (Exception e) { + log.error("更新人脸样本状态失败, faceSampleId: {}", faceSampleId, e); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/ycwl/basic/integration/kafka/service/KafkaIntegrationService.java b/src/main/java/com/ycwl/basic/integration/kafka/service/KafkaIntegrationService.java new file mode 100644 index 00000000..9cb81b09 --- /dev/null +++ b/src/main/java/com/ycwl/basic/integration/kafka/service/KafkaIntegrationService.java @@ -0,0 +1,51 @@ +package com.ycwl.basic.integration.kafka.service; + +import com.ycwl.basic.integration.kafka.config.KafkaIntegrationProperties; +import com.ycwl.basic.integration.kafka.dto.KafkaMessage; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +@RequiredArgsConstructor +@ConditionalOnProperty(name = "kafka.enabled", havingValue = "true") +public class KafkaIntegrationService { + + private final KafkaTemplate kafkaTemplate; + private final KafkaIntegrationProperties kafkaProperties; + + /** + * 测试Kafka连接 + */ + public boolean testConnection() { + try { + log.info("Testing Kafka connection to: {}", kafkaProperties.getBootstrapServers()); + + // 尝试获取元数据以测试连接 + var metadata = kafkaTemplate.getProducerFactory().createProducer().partitionsFor("test-topic"); + log.info("Kafka connection test successful"); + return true; + } catch (Exception e) { + log.error("Kafka connection test failed", e); + return false; + } + } + + /** + * 发送消息(预留接口,暂不实现具体逻辑) + */ + public void sendMessage(String topic, String key, KafkaMessage message) { + log.info("Kafka message sending is not implemented yet. Topic: {}, Key: {}", topic, key); + // TODO: 后续实现具体的消息发送逻辑 + } + + /** + * 获取Kafka配置信息 + */ + public KafkaIntegrationProperties getKafkaProperties() { + return kafkaProperties; + } +} \ No newline at end of file diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 1ba11371..c0143d4c 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -17,6 +17,24 @@ feign: okhttp: enabled: true +# Kafka配置 +kafka: + enabled: false # 默认关闭,需要时手动开启 + bootstrap-servers: 100.64.0.12:39092 + consumer: + group-id: liuying-microservice-dev + auto-offset-reset: earliest + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + producer: + acks: all + retries: 3 + batch-size: 16384 + linger-ms: 1 + buffer-memory: 33554432 + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + # 开发环境日志配置 logging: level: