From 1727619b2925c418603400e22c6bc8bdbf7d8414 Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Fri, 3 Oct 2025 14:30:08 +0800 Subject: [PATCH] =?UTF-8?q?refactor(kafka):=20=E5=B0=86=E4=BA=BA=E8=84=B8?= =?UTF-8?q?=E8=AF=86=E5=88=AB=E5=A4=84=E7=90=86=E6=94=B9=E4=B8=BA=E5=BC=82?= =?UTF-8?q?=E6=AD=A5=E6=89=A7=E8=A1=8C-=20=E5=BC=95=E5=85=A5CompletableFut?= =?UTF-8?q?ure=E5=AE=9E=E7=8E=B0=E5=BC=82=E6=AD=A5=E5=A4=84=E7=90=86=20-?= =?UTF-8?q?=20=E4=BF=AE=E6=94=B9processFaceRecognition=E6=96=B9=E6=B3=95?= =?UTF-8?q?=E4=B8=BA=E5=BC=82=E6=AD=A5=E7=89=88=E6=9C=AC=20-=20=E7=A7=BB?= =?UTF-8?q?=E9=99=A4=E5=8E=9F=E5=90=8C=E6=AD=A5=E5=A4=84=E7=90=86=E4=B8=AD?= =?UTF-8?q?=E7=9A=84try-catch=E5=9D=97=20-=20=E6=9B=B4=E6=96=B0=E6=96=B9?= =?UTF-8?q?=E6=B3=95=E8=BF=94=E5=9B=9E=E7=B1=BB=E5=9E=8B=E4=BB=8Eboolean?= =?UTF-8?q?=E6=94=B9=E4=B8=BAvoid-=E4=BF=9D=E7=95=99=E5=A4=84=E7=90=86?= =?UTF-8?q?=E6=88=90=E5=8A=9F=E5=92=8C=E5=A4=B1=E8=B4=A5=E7=9A=84=E7=8A=B6?= =?UTF-8?q?=E6=80=81=E6=9B=B4=E6=96=B0=E9=80=BB=E8=BE=91-=20=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=E5=BC=82=E6=AD=A5=E5=A4=84=E7=90=86=E6=88=90=E5=8A=9F?= =?UTF-8?q?=E5=90=8E=E7=9A=84=E6=97=A5=E5=BF=97=E8=AE=B0=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../FaceRecognitionThreadPoolConfig.java | 49 +++++++++++++++++++ .../service/FaceProcessingKafkaService.java | 44 +++++++---------- 2 files changed, 68 insertions(+), 25 deletions(-) create mode 100644 src/main/java/com/ycwl/basic/integration/kafka/config/FaceRecognitionThreadPoolConfig.java diff --git a/src/main/java/com/ycwl/basic/integration/kafka/config/FaceRecognitionThreadPoolConfig.java b/src/main/java/com/ycwl/basic/integration/kafka/config/FaceRecognitionThreadPoolConfig.java new file mode 100644 index 00000000..7f2d8b41 --- /dev/null +++ b/src/main/java/com/ycwl/basic/integration/kafka/config/FaceRecognitionThreadPoolConfig.java @@ -0,0 +1,49 @@ +package com.ycwl.basic.integration.kafka.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * 人脸识别异步处理线程池配置 + */ +@Slf4j +@Configuration +@ConditionalOnProperty(name = "kafka.enabled", havingValue = "true") +public class FaceRecognitionThreadPoolConfig { + + /** + * 创建人脸识别专用线程池 + * - 核心线程数:32 + * - 最大线程数:128 + * - 队列容量:1000(避免无限制增长) + * - 拒绝策略:CallerRunsPolicy(调用者线程执行) + */ + @Bean(name = "faceRecognitionExecutor", destroyMethod = "shutdown") + public ThreadPoolExecutor faceRecognitionExecutor() { + ThreadPoolExecutor executor = new ThreadPoolExecutor( + 32, // 核心线程数 + 128, // 最大线程数 + 60L, // 空闲线程存活时间 + TimeUnit.SECONDS, // 时间单位 + new LinkedBlockingQueue<>(1000), // 任务队列 + r -> { + Thread thread = new Thread(r); + thread.setName("face-recognition-" + thread.getId()); + thread.setDaemon(false); + return thread; + }, + new ThreadPoolExecutor.CallerRunsPolicy() // 超过容量时由调用者线程执行 + ); + + log.info("人脸识别线程池初始化完成 - 核心线程数: {}, 最大线程数: {}, 队列容量: 1000", + executor.getCorePoolSize(), executor.getMaximumPoolSize()); + + return executor; + } +} diff --git a/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java b/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java index 3a727adb..3d3b0a5a 100644 --- a/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java +++ b/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java @@ -22,6 +22,7 @@ import org.springframework.stereotype.Service; import java.time.ZoneId; import java.util.Date; +import java.util.concurrent.ThreadPoolExecutor; /** * 人脸处理Kafka消费服务 @@ -39,6 +40,7 @@ public class FaceProcessingKafkaService { private final TaskFaceService taskFaceService; private final ScenicService scenicService; private final DeviceRepository deviceRepository; + private final ThreadPoolExecutor faceRecognitionExecutor; /** * 消费外部系统发送的人脸处理消息 @@ -63,18 +65,12 @@ public class FaceProcessingKafkaService { // 先保存人脸样本数据 boolean saved = saveFaceSample(faceMessage, externalFaceId); - // 然后进行人脸识别处理 + // 然后异步进行人脸识别处理(使用专用线程池) if (saved) { - try { - boolean processed = processFaceRecognition(faceMessage); - if (processed) { - log.info("人脸识别处理成功, faceSampleId: {}", externalFaceId); - } else { - log.warn("人脸识别处理失败,但消息仍将被消费, faceSampleId: {}", externalFaceId); - } - } catch (Exception e) { - log.error("人脸识别处理异常,但消息仍将被消费, faceSampleId: {}, error: {}", externalFaceId, e.getMessage(), e); - } + faceRecognitionExecutor.execute(() -> processFaceRecognitionAsync(faceMessage)); + log.debug("人脸识别任务已提交至线程池, faceSampleId: {}, 活跃线程: {}, 队列大小: {}", + externalFaceId, faceRecognitionExecutor.getActiveCount(), + faceRecognitionExecutor.getQueue().size()); } else { log.warn("人脸样本保存失败,但消息仍将被消费, faceSampleId: {}", externalFaceId); } @@ -124,14 +120,14 @@ public class FaceProcessingKafkaService { } /** - * 执行人脸识别处理逻辑 + * 异步执行人脸识别处理逻辑 * 对已保存的人脸样本进行识别处理 */ - private boolean processFaceRecognition(FaceProcessingMessage message) { + private void processFaceRecognitionAsync(FaceProcessingMessage message) { Long faceSampleId = message.getFaceSampleId(); Long scenicId = message.getScenicId(); String faceUrl = message.getFaceUrl(); - + // 直接使用faceSampleId作为唯一标识 String faceUniqueId = faceSampleId.toString(); @@ -140,21 +136,21 @@ public class FaceProcessingKafkaService { if (faceBodyAdapter == null) { log.error("人脸识别适配器不存在, scenicId: {}", scenicId); updateFaceSampleStatus(faceSampleId, -1); - return false; + return; } try { // 更新状态为处理中 updateFaceSampleStatus(faceSampleId, 1); - + // 确保人脸数据库存在 taskFaceService.assureFaceDb(faceBodyAdapter, scenicId.toString()); - + // 添加人脸到识别服务(使用faceSampleId作为唯一标识) AddFaceResp addFaceResp = faceBodyAdapter.addFace( - scenicId.toString(), - faceSampleId.toString(), - faceUrl, + scenicId.toString(), + faceSampleId.toString(), + faceUrl, faceUniqueId // 即faceSampleId.toString() ); @@ -162,6 +158,7 @@ public class FaceProcessingKafkaService { // 更新人脸样本得分和状态 faceSampleMapper.updateScore(faceSampleId, addFaceResp.getScore()); updateFaceSampleStatus(faceSampleId, 2); + log.info("人脸识别处理成功, faceSampleId: {}", faceSampleId); // 查询设备配置,判断是否启用预订功能 Long deviceId = message.getDeviceId(); @@ -170,20 +167,17 @@ public class FaceProcessingKafkaService { Integer.valueOf(1).equals(deviceConfig.getInteger("enable_pre_book"))) { DynamicTaskGenerator.addTask(faceSampleId); } - return true; } else { log.warn("人脸添加返回空结果, faceSampleId: {}", faceSampleId); updateFaceSampleStatus(faceSampleId, -1); - return false; } } catch (Exception e) { - log.error("人脸识别处理失败, faceSampleId: {}, error: {}", + log.error("人脸识别处理失败, faceSampleId: {}, error: {}", faceSampleId, e.getMessage(), e); - + // 标记人脸样本为处理失败状态 updateFaceSampleStatus(faceSampleId, -1); - return false; } }