diff --git a/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java b/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java index 4adfefaf..2d578740 100644 --- a/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java +++ b/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java @@ -55,6 +55,8 @@ public class FaceProcessingKafkaService { Long externalFaceId = faceMessage.getFaceSampleId(); if (externalFaceId == null) { log.error("外部消息中未包含faceSampleId"); + // 即使消息格式错误,也消费消息避免重复处理 + ack.acknowledge(); return; } @@ -63,21 +65,29 @@ public class FaceProcessingKafkaService { // 然后进行人脸识别处理 if (saved) { - boolean processed = processFaceRecognition(faceMessage); - if (processed) { - // 只有在所有处理都成功后才手动提交 - ack.acknowledge(); - log.info("消息处理成功并已提交, faceSampleId: {}", externalFaceId); - } else { - log.warn("人脸识别处理失败,消息不会被提交, faceSampleId: {}", externalFaceId); + try { + boolean processed = processFaceRecognition(faceMessage); + if (processed) { + log.info("人脸识别处理成功, faceSampleId: {}", externalFaceId); + } else { + log.warn("人脸识别处理失败,但消息仍将被消费, faceSampleId: {}", externalFaceId); + } + } catch (Exception e) { + log.error("人脸识别处理异常,但消息仍将被消费, faceSampleId: {}, error: {}", externalFaceId, e.getMessage(), e); } } else { - log.warn("人脸样本保存失败,消息不会被提交, faceSampleId: {}", externalFaceId); + log.warn("人脸样本保存失败,但消息仍将被消费, faceSampleId: {}", externalFaceId); } + // 无论处理是否成功,都消费消息 + ack.acknowledge(); + log.info("消息已消费, faceSampleId: {}", externalFaceId); + } catch (Exception e) { - log.error("处理外部人脸消息失败,消息不会被提交: {}", e.getMessage(), e); - // 不调用ack.acknowledge(),消息保持未提交状态,可以重新消费 + log.error("处理外部人脸消息失败: {}", e.getMessage(), e); + // 即使发生异常也消费消息,避免消息堆积 + ack.acknowledge(); + log.info("异常情况下消息已消费,避免重复处理"); } }