feat(kafka): 添加手动提交模式支持以增强消息处理可靠性

- 在 KafkaConfig 中新增 manualCommitKafkaListenerContainerFactory 配置
- 启用手动提交模式并设置 AckMode 为 MANUAL_IMMEDIATE
- 修改 FaceProcessingKafkaService 使用新的容器工厂- 添加 Acknowledgment 参数以控制消息提交时机
-仅在人脸样本保存和识别全部成功后才手动确认消息
- 处理失败时不再调用 ack.acknowledge()使消息可重新消费
- 更新 processFaceRecognition 方法返回处理结果状态
- 增强异常处理逻辑,确保失败情况下不提交消息
This commit is contained in:
2025-09-25 18:46:15 +08:00
parent 09e376e089
commit ab3208c9df
2 changed files with 39 additions and 8 deletions

View File

@@ -10,6 +10,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*; import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@@ -80,4 +81,21 @@ public class KafkaConfig {
factory.setConsumerFactory(consumerFactory()); factory.setConsumerFactory(consumerFactory());
return factory; return factory;
} }
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> manualCommitKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
} }

View File

@@ -17,6 +17,7 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.time.ZoneId; import java.time.ZoneId;
@@ -43,8 +44,8 @@ public class FaceProcessingKafkaService {
* 消费外部系统发送的人脸处理消息 * 消费外部系统发送的人脸处理消息
* 先保存人脸样本数据,再进行异步人脸识别处理 * 先保存人脸样本数据,再进行异步人脸识别处理
*/ */
@KafkaListener(topics = ZT_FACE_TOPIC) @KafkaListener(topics = ZT_FACE_TOPIC, containerFactory = "manualCommitKafkaListenerContainerFactory")
public void processFaceMessage(String message) { public void processFaceMessage(String message, Acknowledgment ack) {
try { try {
FaceProcessingMessage faceMessage = JacksonUtil.parseObject(message, FaceProcessingMessage.class); FaceProcessingMessage faceMessage = JacksonUtil.parseObject(message, FaceProcessingMessage.class);
log.info("接收到外部人脸处理消息, scenicId: {}, deviceId: {}, faceUrl: {}", log.info("接收到外部人脸处理消息, scenicId: {}, deviceId: {}, faceUrl: {}",
@@ -62,12 +63,21 @@ public class FaceProcessingKafkaService {
// 然后进行人脸识别处理 // 然后进行人脸识别处理
if (saved) { if (saved) {
processFaceRecognition(faceMessage); boolean processed = processFaceRecognition(faceMessage);
if (processed) {
// 只有在所有处理都成功后才手动提交
ack.acknowledge();
log.info("消息处理成功并已提交, faceSampleId: {}", externalFaceId);
} else {
log.warn("人脸识别处理失败,消息不会被提交, faceSampleId: {}", externalFaceId);
}
} else {
log.warn("人脸样本保存失败,消息不会被提交, faceSampleId: {}", externalFaceId);
} }
} catch (Exception e) { } catch (Exception e) {
log.error("处理外部人脸消息失败: {}", e.getMessage(), e); log.error("处理外部人脸消息失败,消息不会被提交: {}", e.getMessage(), e);
// TODO: 考虑错误重试机制或死信队列 // 不调用ack.acknowledge(),消息保持未提交状态,可以重新消费
} }
} }
@@ -111,7 +121,7 @@ public class FaceProcessingKafkaService {
* 执行人脸识别处理逻辑 * 执行人脸识别处理逻辑
* 对已保存的人脸样本进行识别处理 * 对已保存的人脸样本进行识别处理
*/ */
private void processFaceRecognition(FaceProcessingMessage message) { private boolean processFaceRecognition(FaceProcessingMessage message) {
Long faceSampleId = message.getFaceSampleId(); Long faceSampleId = message.getFaceSampleId();
Long scenicId = message.getScenicId(); Long scenicId = message.getScenicId();
String faceUrl = message.getFaceUrl(); String faceUrl = message.getFaceUrl();
@@ -124,7 +134,7 @@ public class FaceProcessingKafkaService {
if (faceBodyAdapter == null) { if (faceBodyAdapter == null) {
log.error("人脸识别适配器不存在, scenicId: {}", scenicId); log.error("人脸识别适配器不存在, scenicId: {}", scenicId);
updateFaceSampleStatus(faceSampleId, -1); updateFaceSampleStatus(faceSampleId, -1);
return; return false;
} }
try { try {
@@ -157,9 +167,11 @@ public class FaceProcessingKafkaService {
DynamicTaskGenerator.addTask(faceSampleId); DynamicTaskGenerator.addTask(faceSampleId);
log.info("已添加到预订任务队列, faceSampleId: {}", faceSampleId); log.info("已添加到预订任务队列, faceSampleId: {}", faceSampleId);
} }
return true;
} else { } else {
log.warn("人脸添加返回空结果, faceSampleId: {}", faceSampleId); log.warn("人脸添加返回空结果, faceSampleId: {}", faceSampleId);
updateFaceSampleStatus(faceSampleId, -1); updateFaceSampleStatus(faceSampleId, -1);
return false;
} }
} catch (Exception e) { } catch (Exception e) {
@@ -168,6 +180,7 @@ public class FaceProcessingKafkaService {
// 标记人脸样本为处理失败状态 // 标记人脸样本为处理失败状态
updateFaceSampleStatus(faceSampleId, -1); updateFaceSampleStatus(faceSampleId, -1);
return false;
} }
} }