From ab3208c9df9112ce8e2358caa7b36e7bcd818563 Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Thu, 25 Sep 2025 18:46:15 +0800 Subject: [PATCH] =?UTF-8?q?feat(kafka):=20=E6=B7=BB=E5=8A=A0=E6=89=8B?= =?UTF-8?q?=E5=8A=A8=E6=8F=90=E4=BA=A4=E6=A8=A1=E5=BC=8F=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E4=BB=A5=E5=A2=9E=E5=BC=BA=E6=B6=88=E6=81=AF=E5=A4=84=E7=90=86?= =?UTF-8?q?=E5=8F=AF=E9=9D=A0=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在 KafkaConfig 中新增 manualCommitKafkaListenerContainerFactory 配置 - 启用手动提交模式并设置 AckMode 为 MANUAL_IMMEDIATE - 修改 FaceProcessingKafkaService 使用新的容器工厂- 添加 Acknowledgment 参数以控制消息提交时机 -仅在人脸样本保存和识别全部成功后才手动确认消息 - 处理失败时不再调用 ack.acknowledge()使消息可重新消费 - 更新 processFaceRecognition 方法返回处理结果状态 - 增强异常处理逻辑,确保失败情况下不提交消息 --- .../com/ycwl/basic/config/KafkaConfig.java | 18 ++++++++++++ .../service/FaceProcessingKafkaService.java | 29 ++++++++++++++----- 2 files changed, 39 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/ycwl/basic/config/KafkaConfig.java b/src/main/java/com/ycwl/basic/config/KafkaConfig.java index 7dec01eb..ba4e609b 100644 --- a/src/main/java/com/ycwl/basic/config/KafkaConfig.java +++ b/src/main/java/com/ycwl/basic/config/KafkaConfig.java @@ -10,6 +10,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.*; +import org.springframework.kafka.listener.ContainerProperties; import java.util.HashMap; import java.util.Map; @@ -80,4 +81,21 @@ public class KafkaConfig { factory.setConsumerFactory(consumerFactory()); return factory; } + + @Bean + public ConcurrentKafkaListenerContainerFactory manualCommitKafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + + factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props)); + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); + return factory; + } } \ No newline at end of file diff --git a/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java b/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java index bab12bd5..4adfefaf 100644 --- a/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java +++ b/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java @@ -17,6 +17,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Service; import java.time.ZoneId; @@ -43,8 +44,8 @@ public class FaceProcessingKafkaService { * 消费外部系统发送的人脸处理消息 * 先保存人脸样本数据,再进行异步人脸识别处理 */ - @KafkaListener(topics = ZT_FACE_TOPIC) - public void processFaceMessage(String message) { + @KafkaListener(topics = ZT_FACE_TOPIC, containerFactory = "manualCommitKafkaListenerContainerFactory") + public void processFaceMessage(String message, Acknowledgment ack) { try { FaceProcessingMessage faceMessage = JacksonUtil.parseObject(message, FaceProcessingMessage.class); log.info("接收到外部人脸处理消息, scenicId: {}, deviceId: {}, faceUrl: {}", @@ -59,15 +60,24 @@ public class FaceProcessingKafkaService { // 先保存人脸样本数据 boolean saved = saveFaceSample(faceMessage, externalFaceId); - + // 然后进行人脸识别处理 if (saved) { - processFaceRecognition(faceMessage); + boolean processed = processFaceRecognition(faceMessage); + if (processed) { + // 只有在所有处理都成功后才手动提交 + ack.acknowledge(); + log.info("消息处理成功并已提交, faceSampleId: {}", externalFaceId); + } else { + log.warn("人脸识别处理失败,消息不会被提交, faceSampleId: {}", externalFaceId); + } + } else { + log.warn("人脸样本保存失败,消息不会被提交, faceSampleId: {}", externalFaceId); } } catch (Exception e) { - log.error("处理外部人脸消息失败: {}", e.getMessage(), e); - // TODO: 考虑错误重试机制或死信队列 + log.error("处理外部人脸消息失败,消息不会被提交: {}", e.getMessage(), e); + // 不调用ack.acknowledge(),消息保持未提交状态,可以重新消费 } } @@ -111,7 +121,7 @@ public class FaceProcessingKafkaService { * 执行人脸识别处理逻辑 * 对已保存的人脸样本进行识别处理 */ - private void processFaceRecognition(FaceProcessingMessage message) { + private boolean processFaceRecognition(FaceProcessingMessage message) { Long faceSampleId = message.getFaceSampleId(); Long scenicId = message.getScenicId(); String faceUrl = message.getFaceUrl(); @@ -124,7 +134,7 @@ public class FaceProcessingKafkaService { if (faceBodyAdapter == null) { log.error("人脸识别适配器不存在, scenicId: {}", scenicId); updateFaceSampleStatus(faceSampleId, -1); - return; + return false; } try { @@ -157,9 +167,11 @@ public class FaceProcessingKafkaService { DynamicTaskGenerator.addTask(faceSampleId); log.info("已添加到预订任务队列, faceSampleId: {}", faceSampleId); } + return true; } else { log.warn("人脸添加返回空结果, faceSampleId: {}", faceSampleId); updateFaceSampleStatus(faceSampleId, -1); + return false; } } catch (Exception e) { @@ -168,6 +180,7 @@ public class FaceProcessingKafkaService { // 标记人脸样本为处理失败状态 updateFaceSampleStatus(faceSampleId, -1); + return false; } }