You've already forked FrameTour-BE
fix(kafka): 调整人脸识别消息处理逻辑,确保消息始终被消费- 修改消息处理失败时的确认机制,避免消息堆积
- 即使人脸样本保存或识别处理失败,也消费消息防止重复处理 - 异常情况下同样确认消息消费,记录错误日志而非阻塞流程- 优化日志记录,明确区分处理结果与消息确认状态
This commit is contained in:
@@ -55,6 +55,8 @@ public class FaceProcessingKafkaService {
|
||||
Long externalFaceId = faceMessage.getFaceSampleId();
|
||||
if (externalFaceId == null) {
|
||||
log.error("外部消息中未包含faceSampleId");
|
||||
// 即使消息格式错误,也消费消息避免重复处理
|
||||
ack.acknowledge();
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -63,21 +65,29 @@ public class FaceProcessingKafkaService {
|
||||
|
||||
// 然后进行人脸识别处理
|
||||
if (saved) {
|
||||
boolean processed = processFaceRecognition(faceMessage);
|
||||
if (processed) {
|
||||
// 只有在所有处理都成功后才手动提交
|
||||
ack.acknowledge();
|
||||
log.info("消息处理成功并已提交, faceSampleId: {}", externalFaceId);
|
||||
} else {
|
||||
log.warn("人脸识别处理失败,消息不会被提交, faceSampleId: {}", externalFaceId);
|
||||
try {
|
||||
boolean processed = processFaceRecognition(faceMessage);
|
||||
if (processed) {
|
||||
log.info("人脸识别处理成功, faceSampleId: {}", externalFaceId);
|
||||
} else {
|
||||
log.warn("人脸识别处理失败,但消息仍将被消费, faceSampleId: {}", externalFaceId);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("人脸识别处理异常,但消息仍将被消费, faceSampleId: {}, error: {}", externalFaceId, e.getMessage(), e);
|
||||
}
|
||||
} else {
|
||||
log.warn("人脸样本保存失败,消息不会被提交, faceSampleId: {}", externalFaceId);
|
||||
log.warn("人脸样本保存失败,但消息仍将被消费, faceSampleId: {}", externalFaceId);
|
||||
}
|
||||
|
||||
// 无论处理是否成功,都消费消息
|
||||
ack.acknowledge();
|
||||
log.info("消息已消费, faceSampleId: {}", externalFaceId);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("处理外部人脸消息失败,消息不会被提交: {}", e.getMessage(), e);
|
||||
// 不调用ack.acknowledge(),消息保持未提交状态,可以重新消费
|
||||
log.error("处理外部人脸消息失败: {}", e.getMessage(), e);
|
||||
// 即使发生异常也消费消息,避免消息堆积
|
||||
ack.acknowledge();
|
||||
log.info("异常情况下消息已消费,避免重复处理");
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user