From 132a539bb677bcbad90343bc229c9578ef95731f Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Sun, 28 Sep 2025 11:26:01 +0800 Subject: [PATCH] =?UTF-8?q?fix(kafka):=20=E8=B0=83=E6=95=B4=E4=BA=BA?= =?UTF-8?q?=E8=84=B8=E8=AF=86=E5=88=AB=E6=B6=88=E6=81=AF=E5=A4=84=E7=90=86?= =?UTF-8?q?=E9=80=BB=E8=BE=91=EF=BC=8C=E7=A1=AE=E4=BF=9D=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E5=A7=8B=E7=BB=88=E8=A2=AB=E6=B6=88=E8=B4=B9-=20=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=E6=B6=88=E6=81=AF=E5=A4=84=E7=90=86=E5=A4=B1=E8=B4=A5?= =?UTF-8?q?=E6=97=B6=E7=9A=84=E7=A1=AE=E8=AE=A4=E6=9C=BA=E5=88=B6=EF=BC=8C?= =?UTF-8?q?=E9=81=BF=E5=85=8D=E6=B6=88=E6=81=AF=E5=A0=86=E7=A7=AF=20-=20?= =?UTF-8?q?=E5=8D=B3=E4=BD=BF=E4=BA=BA=E8=84=B8=E6=A0=B7=E6=9C=AC=E4=BF=9D?= =?UTF-8?q?=E5=AD=98=E6=88=96=E8=AF=86=E5=88=AB=E5=A4=84=E7=90=86=E5=A4=B1?= =?UTF-8?q?=E8=B4=A5=EF=BC=8C=E4=B9=9F=E6=B6=88=E8=B4=B9=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E9=98=B2=E6=AD=A2=E9=87=8D=E5=A4=8D=E5=A4=84=E7=90=86=20-=20?= =?UTF-8?q?=E5=BC=82=E5=B8=B8=E6=83=85=E5=86=B5=E4=B8=8B=E5=90=8C=E6=A0=B7?= =?UTF-8?q?=E7=A1=AE=E8=AE=A4=E6=B6=88=E6=81=AF=E6=B6=88=E8=B4=B9=EF=BC=8C?= =?UTF-8?q?=E8=AE=B0=E5=BD=95=E9=94=99=E8=AF=AF=E6=97=A5=E5=BF=97=E8=80=8C?= =?UTF-8?q?=E9=9D=9E=E9=98=BB=E5=A1=9E=E6=B5=81=E7=A8=8B-=20=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E6=97=A5=E5=BF=97=E8=AE=B0=E5=BD=95=EF=BC=8C=E6=98=8E?= =?UTF-8?q?=E7=A1=AE=E5=8C=BA=E5=88=86=E5=A4=84=E7=90=86=E7=BB=93=E6=9E=9C?= =?UTF-8?q?=E4=B8=8E=E6=B6=88=E6=81=AF=E7=A1=AE=E8=AE=A4=E7=8A=B6=E6=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/FaceProcessingKafkaService.java | 30 ++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java b/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java index 4adfefaf..2d578740 100644 --- a/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java +++ b/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java @@ -55,6 +55,8 @@ public class FaceProcessingKafkaService { Long externalFaceId = faceMessage.getFaceSampleId(); if (externalFaceId == null) { log.error("外部消息中未包含faceSampleId"); + // 即使消息格式错误,也消费消息避免重复处理 + ack.acknowledge(); return; } @@ -63,21 +65,29 @@ public class FaceProcessingKafkaService { // 然后进行人脸识别处理 if (saved) { - boolean processed = processFaceRecognition(faceMessage); - if (processed) { - // 只有在所有处理都成功后才手动提交 - ack.acknowledge(); - log.info("消息处理成功并已提交, faceSampleId: {}", externalFaceId); - } else { - log.warn("人脸识别处理失败,消息不会被提交, faceSampleId: {}", externalFaceId); + try { + boolean processed = processFaceRecognition(faceMessage); + if (processed) { + log.info("人脸识别处理成功, faceSampleId: {}", externalFaceId); + } else { + log.warn("人脸识别处理失败,但消息仍将被消费, faceSampleId: {}", externalFaceId); + } + } catch (Exception e) { + log.error("人脸识别处理异常,但消息仍将被消费, faceSampleId: {}, error: {}", externalFaceId, e.getMessage(), e); } } else { - log.warn("人脸样本保存失败,消息不会被提交, faceSampleId: {}", externalFaceId); + log.warn("人脸样本保存失败,但消息仍将被消费, faceSampleId: {}", externalFaceId); } + // 无论处理是否成功,都消费消息 + ack.acknowledge(); + log.info("消息已消费, faceSampleId: {}", externalFaceId); + } catch (Exception e) { - log.error("处理外部人脸消息失败,消息不会被提交: {}", e.getMessage(), e); - // 不调用ack.acknowledge(),消息保持未提交状态,可以重新消费 + log.error("处理外部人脸消息失败: {}", e.getMessage(), e); + // 即使发生异常也消费消息,避免消息堆积 + ack.acknowledge(); + log.info("异常情况下消息已消费,避免重复处理"); } }