You've already forked FrameTour-BE
Compare commits
3 Commits
a361b59d74
...
becbe5f6ab
Author | SHA1 | Date | |
---|---|---|---|
becbe5f6ab | |||
dc3a46362b | |||
dc2154c020 |
6
pom.xml
6
pom.xml
@@ -266,6 +266,12 @@
|
||||
<artifactId>mts20140618</artifactId>
|
||||
<version>5.0.0</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Spring Kafka -->
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
@@ -132,6 +132,7 @@ public class TemplateBiz {
|
||||
log.info("filterTaskParams: templateId:{} has no placeholders", templateId);
|
||||
return Map.of();
|
||||
}
|
||||
TemplateConfigEntity templateConfig = templateRepository.getTemplateConfig(templateId);
|
||||
|
||||
// 统计每个 placeholder 在模板中出现的次数
|
||||
Map<String, Long> placeholderCounts = templatePlaceholders.stream()
|
||||
@@ -142,6 +143,9 @@ public class TemplateBiz {
|
||||
|
||||
Map<String, List<SourceEntity>> filteredParams = new HashMap<>();
|
||||
|
||||
// 判断是否允许片段重复
|
||||
boolean allowDuplicate = templateConfig != null && Integer.valueOf(1).equals(templateConfig.getDuplicateEnable());
|
||||
|
||||
for (Map.Entry<String, Long> entry : placeholderCounts.entrySet()) {
|
||||
String placeholder = entry.getKey();
|
||||
Long requiredCount = entry.getValue();
|
||||
@@ -151,28 +155,53 @@ public class TemplateBiz {
|
||||
String imageKey = placeholder;
|
||||
if (allTaskParams.containsKey(imageKey)) {
|
||||
List<SourceEntity> allSources = allTaskParams.get(imageKey);
|
||||
int actualCount = Math.min(requiredCount.intValue(), allSources.size());
|
||||
List<SourceEntity> selectedSources = allSources.subList(0, actualCount);
|
||||
filteredParams.put(imageKey, new ArrayList<>(selectedSources));
|
||||
List<SourceEntity> selectedSources = selectSources(allSources, requiredCount.intValue(), allowDuplicate);
|
||||
if (!selectedSources.isEmpty()) {
|
||||
filteredParams.put(imageKey, selectedSources);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// 视频源:占位符直接对应设备ID
|
||||
String videoKey = placeholder;
|
||||
if (allTaskParams.containsKey(videoKey)) {
|
||||
List<SourceEntity> allSources = allTaskParams.get(videoKey);
|
||||
int actualCount = Math.min(requiredCount.intValue(), allSources.size());
|
||||
List<SourceEntity> selectedSources = allSources.subList(0, actualCount);
|
||||
filteredParams.put(videoKey, new ArrayList<>(selectedSources));
|
||||
List<SourceEntity> selectedSources = selectSources(allSources, requiredCount.intValue(), allowDuplicate);
|
||||
if (!selectedSources.isEmpty()) {
|
||||
filteredParams.put(videoKey, selectedSources);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.debug("filterTaskParams: templateId:{}, original keys:{}, filtered keys:{}, placeholder counts:{}",
|
||||
templateId, allTaskParams.keySet().size(), filteredParams.keySet().size(), placeholderCounts);
|
||||
log.debug("filterTaskParams: templateId:{}, original keys:{}, filtered keys:{}, placeholder counts:{}, allowDuplicate:{}",
|
||||
templateId, allTaskParams.keySet().size(), filteredParams.keySet().size(), placeholderCounts, allowDuplicate);
|
||||
|
||||
return filteredParams;
|
||||
}
|
||||
|
||||
private List<SourceEntity> selectSources(List<SourceEntity> allSources, int requiredCount, boolean allowDuplicate) {
|
||||
if (allSources == null || allSources.isEmpty()) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
if (!allowDuplicate) {
|
||||
// 不允许重复,使用原有逻辑
|
||||
int actualCount = Math.min(requiredCount, allSources.size());
|
||||
return new ArrayList<>(allSources.subList(0, actualCount));
|
||||
}
|
||||
|
||||
// 允许重复,循环填充到所需数量
|
||||
List<SourceEntity> selectedSources = new ArrayList<>();
|
||||
int sourceIndex = 0;
|
||||
|
||||
for (int i = 0; i < requiredCount; i++) {
|
||||
selectedSources.add(allSources.get(sourceIndex));
|
||||
sourceIndex = (sourceIndex + 1) % allSources.size();
|
||||
}
|
||||
|
||||
return selectedSources;
|
||||
}
|
||||
|
||||
public Long findFirstAvailableTemplate(List<Long> templateIds, Long faceId, boolean scanSource) {
|
||||
if (templateIds == null || templateIds.isEmpty() || faceId == null) {
|
||||
return null;
|
||||
|
83
src/main/java/com/ycwl/basic/config/KafkaConfig.java
Normal file
83
src/main/java/com/ycwl/basic/config/KafkaConfig.java
Normal file
@@ -0,0 +1,83 @@
|
||||
package com.ycwl.basic.config;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
||||
import org.springframework.kafka.core.*;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@Configuration
|
||||
@ConditionalOnProperty(name = "kafka.enabled", havingValue = "true", matchIfMissing = false)
|
||||
public class KafkaConfig {
|
||||
|
||||
@Value("${kafka.bootstrap-servers:100.64.0.12:39092}")
|
||||
private String bootstrapServers;
|
||||
|
||||
@Value("${kafka.consumer.group-id:liuying-microservice}")
|
||||
private String consumerGroupId;
|
||||
|
||||
@Value("${kafka.consumer.auto-offset-reset:earliest}")
|
||||
private String autoOffsetReset;
|
||||
|
||||
@Value("${kafka.producer.acks:all}")
|
||||
private String acks;
|
||||
|
||||
@Value("${kafka.producer.retries:3}")
|
||||
private Integer retries;
|
||||
|
||||
@Value("${kafka.producer.batch-size:16384}")
|
||||
private Integer batchSize;
|
||||
|
||||
@Value("${kafka.producer.linger-ms:1}")
|
||||
private Integer lingerMs;
|
||||
|
||||
@Value("${kafka.producer.buffer-memory:33554432}")
|
||||
private Integer bufferMemory;
|
||||
|
||||
@Bean
|
||||
public ProducerFactory<String, String> producerFactory() {
|
||||
Map<String, Object> configProps = new HashMap<>();
|
||||
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
||||
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
configProps.put(ProducerConfig.ACKS_CONFIG, acks);
|
||||
configProps.put(ProducerConfig.RETRIES_CONFIG, retries);
|
||||
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
|
||||
configProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
|
||||
configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
|
||||
|
||||
return new DefaultKafkaProducerFactory<>(configProps);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public KafkaTemplate<String, String> kafkaTemplate() {
|
||||
return new KafkaTemplate<>(producerFactory());
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ConsumerFactory<String, String> consumerFactory() {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
||||
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
|
||||
|
||||
return new DefaultKafkaConsumerFactory<>(props);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
|
||||
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
|
||||
factory.setConsumerFactory(consumerFactory());
|
||||
return factory;
|
||||
}
|
||||
}
|
@@ -0,0 +1,35 @@
|
||||
package com.ycwl.basic.integration.kafka.config;
|
||||
|
||||
import lombok.Data;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Data
|
||||
@Component
|
||||
@ConfigurationProperties(prefix = "kafka")
|
||||
public class KafkaIntegrationProperties {
|
||||
|
||||
private boolean enabled = false;
|
||||
private String bootstrapServers = "100.64.0.12:39092";
|
||||
private Consumer consumer = new Consumer();
|
||||
private Producer producer = new Producer();
|
||||
|
||||
@Data
|
||||
public static class Consumer {
|
||||
private String groupId = "liuying-microservice";
|
||||
private String autoOffsetReset = "earliest";
|
||||
private String keyDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
|
||||
private String valueDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class Producer {
|
||||
private String acks = "all";
|
||||
private Integer retries = 3;
|
||||
private Integer batchSize = 16384;
|
||||
private Integer lingerMs = 1;
|
||||
private Integer bufferMemory = 33554432;
|
||||
private String keySerializer = "org.apache.kafka.common.serialization.StringSerializer";
|
||||
private String valueSerializer = "org.apache.kafka.common.serialization.StringSerializer";
|
||||
}
|
||||
}
|
@@ -0,0 +1,83 @@
|
||||
package com.ycwl.basic.integration.kafka.dto;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonFormat;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
/**
|
||||
* zt-face topic消息结构
|
||||
* 用于人脸处理任务的异步消息传递
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class FaceProcessingMessage {
|
||||
|
||||
/**
|
||||
* 人脸样本ID(外部系统传入)
|
||||
*/
|
||||
private Long faceSampleId;
|
||||
|
||||
/**
|
||||
* 景区ID
|
||||
*/
|
||||
private Long scenicId;
|
||||
|
||||
/**
|
||||
* 设备ID
|
||||
*/
|
||||
private Long deviceId;
|
||||
|
||||
/**
|
||||
* 人脸图片URL
|
||||
*/
|
||||
private String faceUrl;
|
||||
|
||||
// 不再需要faceUniqueId,直接使用faceSampleId作为唯一标识
|
||||
|
||||
/**
|
||||
* 拍摄时间
|
||||
*/
|
||||
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
|
||||
private LocalDateTime shotTime;
|
||||
|
||||
// status字段已移除,由系统内部管理状态
|
||||
|
||||
// deviceConfig字段已移除,由系统内部通过deviceRepository查询
|
||||
|
||||
/**
|
||||
* 消息创建时间
|
||||
*/
|
||||
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
|
||||
private LocalDateTime createTime;
|
||||
|
||||
/**
|
||||
* 消息来源
|
||||
*/
|
||||
private String source;
|
||||
|
||||
// retryCount字段已移除,重试机制由系统内部管理
|
||||
|
||||
// DeviceConfig内部类已移除,不再需要在消息中传递设备配置
|
||||
|
||||
/**
|
||||
* 创建人脸处理消息的工厂方法(使用外部传入的faceId)
|
||||
*/
|
||||
public static FaceProcessingMessage create(Long externalFaceId, Long scenicId, Long deviceId,
|
||||
String faceUrl, LocalDateTime shotTime) {
|
||||
return FaceProcessingMessage.builder()
|
||||
.faceSampleId(externalFaceId) // 使用外部传入的ID作为唯一标识
|
||||
.scenicId(scenicId)
|
||||
.deviceId(deviceId)
|
||||
.faceUrl(faceUrl)
|
||||
.shotTime(shotTime)
|
||||
.createTime(LocalDateTime.now())
|
||||
.source("external-system")
|
||||
.build();
|
||||
}
|
||||
}
|
@@ -0,0 +1,38 @@
|
||||
package com.ycwl.basic.integration.kafka.dto;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonFormat;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class KafkaMessage<T> {
|
||||
|
||||
private String messageId;
|
||||
private String topic;
|
||||
private String eventType;
|
||||
private T payload;
|
||||
|
||||
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
|
||||
private LocalDateTime timestamp;
|
||||
|
||||
private String source;
|
||||
private String version;
|
||||
|
||||
public static <T> KafkaMessage<T> of(String topic, String eventType, T payload) {
|
||||
return KafkaMessage.<T>builder()
|
||||
.topic(topic)
|
||||
.eventType(eventType)
|
||||
.payload(payload)
|
||||
.timestamp(LocalDateTime.now())
|
||||
.source("liuying-microservice")
|
||||
.version("1.0")
|
||||
.build();
|
||||
}
|
||||
}
|
@@ -0,0 +1,57 @@
|
||||
package com.ycwl.basic.integration.kafka.example;
|
||||
|
||||
import com.ycwl.basic.integration.kafka.dto.KafkaMessage;
|
||||
import com.ycwl.basic.integration.kafka.service.KafkaIntegrationService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* Kafka集成使用示例(暂时注释,后续开发时启用)
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
@ConditionalOnProperty(name = "kafka.example.enabled", havingValue = "true", matchIfMissing = false)
|
||||
public class KafkaIntegrationExample {
|
||||
|
||||
private final KafkaIntegrationService kafkaService;
|
||||
|
||||
/**
|
||||
* 演示Kafka连接测试
|
||||
*/
|
||||
public void demonstrateConnectionTest() {
|
||||
log.info("=== Kafka Integration Example ===");
|
||||
|
||||
// 测试连接
|
||||
boolean connected = kafkaService.testConnection();
|
||||
log.info("Kafka connection status: {}", connected ? "SUCCESS" : "FAILED");
|
||||
|
||||
// 显示配置信息
|
||||
var properties = kafkaService.getKafkaProperties();
|
||||
log.info("Kafka Bootstrap Servers: {}", properties.getBootstrapServers());
|
||||
log.info("Consumer Group ID: {}", properties.getConsumer().getGroupId());
|
||||
}
|
||||
|
||||
/**
|
||||
* 演示消息发送(预留示例)
|
||||
*/
|
||||
public void demonstrateMessageSending() {
|
||||
log.info("=== Message Sending Example (Not Implemented) ===");
|
||||
|
||||
// 创建示例消息
|
||||
KafkaMessage<String> message = KafkaMessage.of(
|
||||
"test-topic",
|
||||
"TEST_EVENT",
|
||||
"Hello Kafka from liuying-microservice!"
|
||||
);
|
||||
|
||||
// 发送消息(暂不实现)
|
||||
kafkaService.sendMessage("test-topic", "test-key", message);
|
||||
log.info("Message sending demonstration completed");
|
||||
}
|
||||
|
||||
// TODO: 后续添加消费者示例
|
||||
// public void demonstrateMessageConsuming() { ... }
|
||||
}
|
@@ -0,0 +1,186 @@
|
||||
package com.ycwl.basic.integration.kafka.service;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.ycwl.basic.facebody.adapter.IFaceBodyAdapter;
|
||||
import com.ycwl.basic.facebody.entity.AddFaceResp;
|
||||
import com.ycwl.basic.integration.kafka.dto.FaceProcessingMessage;
|
||||
import com.ycwl.basic.mapper.FaceSampleMapper;
|
||||
import com.ycwl.basic.model.pc.faceSample.entity.FaceSampleEntity;
|
||||
import com.ycwl.basic.repository.DeviceRepository;
|
||||
import com.ycwl.basic.service.pc.ScenicService;
|
||||
import com.ycwl.basic.service.task.TaskFaceService;
|
||||
import com.ycwl.basic.task.DynamicTaskGenerator;
|
||||
import com.ycwl.basic.integration.common.manager.DeviceConfigManager;
|
||||
// 不再需要SnowFlakeUtil,使用外部传入的ID
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.time.ZoneId;
|
||||
import java.util.Date;
|
||||
|
||||
/**
|
||||
* 人脸处理Kafka消费服务
|
||||
* 消费外部系统发送到zt-face topic的消息
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
@ConditionalOnProperty(name = "kafka.enabled", havingValue = "true")
|
||||
public class FaceProcessingKafkaService {
|
||||
|
||||
private static final String ZT_FACE_TOPIC = "zt-face";
|
||||
|
||||
private final ObjectMapper objectMapper;
|
||||
private final FaceSampleMapper faceSampleMapper;
|
||||
private final TaskFaceService taskFaceService;
|
||||
private final ScenicService scenicService;
|
||||
private final DeviceRepository deviceRepository;
|
||||
|
||||
/**
|
||||
* 消费外部系统发送的人脸处理消息
|
||||
* 先保存人脸样本数据,再进行异步人脸识别处理
|
||||
*/
|
||||
@KafkaListener(topics = ZT_FACE_TOPIC, groupId = "face-processing-group")
|
||||
public void processFaceMessage(String message) {
|
||||
try {
|
||||
FaceProcessingMessage faceMessage = objectMapper.readValue(message, FaceProcessingMessage.class);
|
||||
log.info("接收到外部人脸处理消息, scenicId: {}, deviceId: {}, faceUrl: {}",
|
||||
faceMessage.getScenicId(), faceMessage.getDeviceId(), faceMessage.getFaceUrl());
|
||||
|
||||
// 使用外部传入的faceSampleId
|
||||
Long externalFaceId = faceMessage.getFaceSampleId();
|
||||
if (externalFaceId == null) {
|
||||
log.error("外部消息中未包含faceSampleId");
|
||||
return;
|
||||
}
|
||||
|
||||
// 先保存人脸样本数据
|
||||
boolean saved = saveFaceSample(faceMessage, externalFaceId);
|
||||
|
||||
// 然后进行人脸识别处理
|
||||
if (saved) {
|
||||
processFaceRecognition(faceMessage);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("处理外部人脸消息失败: {}", e.getMessage(), e);
|
||||
// TODO: 考虑错误重试机制或死信队列
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存人脸样本数据到数据库
|
||||
* @param faceMessage 人脸处理消息
|
||||
* @param externalFaceId 外部传入的人脸ID
|
||||
* @return 是否保存成功
|
||||
*/
|
||||
private boolean saveFaceSample(FaceProcessingMessage faceMessage, Long externalFaceId) {
|
||||
try {
|
||||
FaceSampleEntity faceSample = new FaceSampleEntity();
|
||||
faceSample.setId(externalFaceId); // 使用外部传入的ID
|
||||
faceSample.setScenicId(faceMessage.getScenicId());
|
||||
faceSample.setDeviceId(faceMessage.getDeviceId());
|
||||
faceSample.setStatus(0); // 初始状态
|
||||
faceSample.setFaceUrl(faceMessage.getFaceUrl());
|
||||
|
||||
// 转换时间格式
|
||||
if (faceMessage.getShotTime() != null) {
|
||||
Date shotTime = Date.from(faceMessage.getShotTime().atZone(ZoneId.systemDefault()).toInstant());
|
||||
faceSample.setCreateAt(shotTime);
|
||||
} else {
|
||||
faceSample.setCreateAt(new Date());
|
||||
}
|
||||
|
||||
// 保存到数据库
|
||||
faceSampleMapper.add(faceSample);
|
||||
log.info("人脸样本数据已保存, 使用外部faceId: {}, scenicId: {}, deviceId: {}, faceUrl: {}",
|
||||
externalFaceId, faceMessage.getScenicId(), faceMessage.getDeviceId(), faceMessage.getFaceUrl());
|
||||
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
log.error("保存人脸样本数据失败, 外部faceId: {}, scenicId: {}, deviceId: {}",
|
||||
externalFaceId, faceMessage.getScenicId(), faceMessage.getDeviceId(), e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行人脸识别处理逻辑
|
||||
* 对已保存的人脸样本进行识别处理
|
||||
*/
|
||||
private void processFaceRecognition(FaceProcessingMessage message) {
|
||||
Long faceSampleId = message.getFaceSampleId();
|
||||
Long scenicId = message.getScenicId();
|
||||
String faceUrl = message.getFaceUrl();
|
||||
|
||||
// 直接使用faceSampleId作为唯一标识
|
||||
String faceUniqueId = faceSampleId.toString();
|
||||
|
||||
// 获取人脸识别适配器
|
||||
IFaceBodyAdapter faceBodyAdapter = scenicService.getScenicFaceBodyAdapter(scenicId);
|
||||
if (faceBodyAdapter == null) {
|
||||
log.error("人脸识别适配器不存在, scenicId: {}", scenicId);
|
||||
updateFaceSampleStatus(faceSampleId, -1);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
// 更新状态为处理中
|
||||
updateFaceSampleStatus(faceSampleId, 1);
|
||||
|
||||
// 确保人脸数据库存在
|
||||
taskFaceService.assureFaceDb(faceBodyAdapter, scenicId.toString());
|
||||
|
||||
// 添加人脸到识别服务(使用faceSampleId作为唯一标识)
|
||||
AddFaceResp addFaceResp = faceBodyAdapter.addFace(
|
||||
scenicId.toString(),
|
||||
faceSampleId.toString(),
|
||||
faceUrl,
|
||||
faceUniqueId // 即faceSampleId.toString()
|
||||
);
|
||||
|
||||
if (addFaceResp != null) {
|
||||
// 更新人脸样本得分和状态
|
||||
faceSampleMapper.updateScore(faceSampleId, addFaceResp.getScore());
|
||||
updateFaceSampleStatus(faceSampleId, 2); // 成功状态
|
||||
log.info("人脸识别处理完成, faceSampleId: {}, score: {}",
|
||||
faceSampleId, addFaceResp.getScore());
|
||||
|
||||
// 查询设备配置,判断是否启用预订功能
|
||||
Long deviceId = message.getDeviceId();
|
||||
DeviceConfigManager deviceConfig = deviceRepository.getDeviceConfigManager(deviceId);
|
||||
if (deviceConfig != null &&
|
||||
Integer.valueOf(1).equals(deviceConfig.getInteger("enable_pre_book"))) {
|
||||
DynamicTaskGenerator.addTask(faceSampleId);
|
||||
log.info("已添加到预订任务队列, faceSampleId: {}", faceSampleId);
|
||||
}
|
||||
} else {
|
||||
log.warn("人脸添加返回空结果, faceSampleId: {}", faceSampleId);
|
||||
updateFaceSampleStatus(faceSampleId, -1);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("人脸识别处理失败, faceSampleId: {}, error: {}",
|
||||
faceSampleId, e.getMessage(), e);
|
||||
|
||||
// 标记人脸样本为处理失败状态
|
||||
updateFaceSampleStatus(faceSampleId, -1);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新人脸样本状态
|
||||
*/
|
||||
private void updateFaceSampleStatus(Long faceSampleId, Integer status) {
|
||||
try {
|
||||
// TODO: 需要在FaceSampleMapper中添加updateStatus方法
|
||||
// faceSampleMapper.updateStatus(faceSampleId, status);
|
||||
log.info("人脸样本状态已更新, faceSampleId: {}, status: {} (0:初始,1:处理中,2:成功,-1:失败)", faceSampleId, status);
|
||||
} catch (Exception e) {
|
||||
log.error("更新人脸样本状态失败, faceSampleId: {}", faceSampleId, e);
|
||||
}
|
||||
}
|
||||
}
|
@@ -0,0 +1,51 @@
|
||||
package com.ycwl.basic.integration.kafka.service;
|
||||
|
||||
import com.ycwl.basic.integration.kafka.config.KafkaIntegrationProperties;
|
||||
import com.ycwl.basic.integration.kafka.dto.KafkaMessage;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
@ConditionalOnProperty(name = "kafka.enabled", havingValue = "true")
|
||||
public class KafkaIntegrationService {
|
||||
|
||||
private final KafkaTemplate<String, String> kafkaTemplate;
|
||||
private final KafkaIntegrationProperties kafkaProperties;
|
||||
|
||||
/**
|
||||
* 测试Kafka连接
|
||||
*/
|
||||
public boolean testConnection() {
|
||||
try {
|
||||
log.info("Testing Kafka connection to: {}", kafkaProperties.getBootstrapServers());
|
||||
|
||||
// 尝试获取元数据以测试连接
|
||||
var metadata = kafkaTemplate.getProducerFactory().createProducer().partitionsFor("test-topic");
|
||||
log.info("Kafka connection test successful");
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
log.error("Kafka connection test failed", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送消息(预留接口,暂不实现具体逻辑)
|
||||
*/
|
||||
public void sendMessage(String topic, String key, KafkaMessage<?> message) {
|
||||
log.info("Kafka message sending is not implemented yet. Topic: {}, Key: {}", topic, key);
|
||||
// TODO: 后续实现具体的消息发送逻辑
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取Kafka配置信息
|
||||
*/
|
||||
public KafkaIntegrationProperties getKafkaProperties() {
|
||||
return kafkaProperties;
|
||||
}
|
||||
}
|
@@ -17,6 +17,24 @@ feign:
|
||||
okhttp:
|
||||
enabled: true
|
||||
|
||||
# Kafka配置
|
||||
kafka:
|
||||
enabled: false # 默认关闭,需要时手动开启
|
||||
bootstrap-servers: 100.64.0.12:39092
|
||||
consumer:
|
||||
group-id: liuying-microservice-dev
|
||||
auto-offset-reset: earliest
|
||||
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
||||
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
||||
producer:
|
||||
acks: all
|
||||
retries: 3
|
||||
batch-size: 16384
|
||||
linger-ms: 1
|
||||
buffer-memory: 33554432
|
||||
key-serializer: org.apache.kafka.common.serialization.StringSerializer
|
||||
value-serializer: org.apache.kafka.common.serialization.StringSerializer
|
||||
|
||||
# 开发环境日志配置
|
||||
logging:
|
||||
level:
|
||||
|
Reference in New Issue
Block a user