diff --git a/src/main/java/com/ycwl/basic/integration/kafka/config/FaceRecognitionThreadPoolConfig.java b/src/main/java/com/ycwl/basic/integration/kafka/config/FaceRecognitionThreadPoolConfig.java new file mode 100644 index 00000000..7f2d8b41 --- /dev/null +++ b/src/main/java/com/ycwl/basic/integration/kafka/config/FaceRecognitionThreadPoolConfig.java @@ -0,0 +1,49 @@ +package com.ycwl.basic.integration.kafka.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * 人脸识别异步处理线程池配置 + */ +@Slf4j +@Configuration +@ConditionalOnProperty(name = "kafka.enabled", havingValue = "true") +public class FaceRecognitionThreadPoolConfig { + + /** + * 创建人脸识别专用线程池 + * - 核心线程数:32 + * - 最大线程数:128 + * - 队列容量:1000(避免无限制增长) + * - 拒绝策略:CallerRunsPolicy(调用者线程执行) + */ + @Bean(name = "faceRecognitionExecutor", destroyMethod = "shutdown") + public ThreadPoolExecutor faceRecognitionExecutor() { + ThreadPoolExecutor executor = new ThreadPoolExecutor( + 32, // 核心线程数 + 128, // 最大线程数 + 60L, // 空闲线程存活时间 + TimeUnit.SECONDS, // 时间单位 + new LinkedBlockingQueue<>(1000), // 任务队列 + r -> { + Thread thread = new Thread(r); + thread.setName("face-recognition-" + thread.getId()); + thread.setDaemon(false); + return thread; + }, + new ThreadPoolExecutor.CallerRunsPolicy() // 超过容量时由调用者线程执行 + ); + + log.info("人脸识别线程池初始化完成 - 核心线程数: {}, 最大线程数: {}, 队列容量: 1000", + executor.getCorePoolSize(), executor.getMaximumPoolSize()); + + return executor; + } +} diff --git a/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java b/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java index 3a727adb..3d3b0a5a 100644 --- a/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java +++ b/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java @@ -22,6 +22,7 @@ import org.springframework.stereotype.Service; import java.time.ZoneId; import java.util.Date; +import java.util.concurrent.ThreadPoolExecutor; /** * 人脸处理Kafka消费服务 @@ -39,6 +40,7 @@ public class FaceProcessingKafkaService { private final TaskFaceService taskFaceService; private final ScenicService scenicService; private final DeviceRepository deviceRepository; + private final ThreadPoolExecutor faceRecognitionExecutor; /** * 消费外部系统发送的人脸处理消息 @@ -63,18 +65,12 @@ public class FaceProcessingKafkaService { // 先保存人脸样本数据 boolean saved = saveFaceSample(faceMessage, externalFaceId); - // 然后进行人脸识别处理 + // 然后异步进行人脸识别处理(使用专用线程池) if (saved) { - try { - boolean processed = processFaceRecognition(faceMessage); - if (processed) { - log.info("人脸识别处理成功, faceSampleId: {}", externalFaceId); - } else { - log.warn("人脸识别处理失败,但消息仍将被消费, faceSampleId: {}", externalFaceId); - } - } catch (Exception e) { - log.error("人脸识别处理异常,但消息仍将被消费, faceSampleId: {}, error: {}", externalFaceId, e.getMessage(), e); - } + faceRecognitionExecutor.execute(() -> processFaceRecognitionAsync(faceMessage)); + log.debug("人脸识别任务已提交至线程池, faceSampleId: {}, 活跃线程: {}, 队列大小: {}", + externalFaceId, faceRecognitionExecutor.getActiveCount(), + faceRecognitionExecutor.getQueue().size()); } else { log.warn("人脸样本保存失败,但消息仍将被消费, faceSampleId: {}", externalFaceId); } @@ -124,14 +120,14 @@ public class FaceProcessingKafkaService { } /** - * 执行人脸识别处理逻辑 + * 异步执行人脸识别处理逻辑 * 对已保存的人脸样本进行识别处理 */ - private boolean processFaceRecognition(FaceProcessingMessage message) { + private void processFaceRecognitionAsync(FaceProcessingMessage message) { Long faceSampleId = message.getFaceSampleId(); Long scenicId = message.getScenicId(); String faceUrl = message.getFaceUrl(); - + // 直接使用faceSampleId作为唯一标识 String faceUniqueId = faceSampleId.toString(); @@ -140,21 +136,21 @@ public class FaceProcessingKafkaService { if (faceBodyAdapter == null) { log.error("人脸识别适配器不存在, scenicId: {}", scenicId); updateFaceSampleStatus(faceSampleId, -1); - return false; + return; } try { // 更新状态为处理中 updateFaceSampleStatus(faceSampleId, 1); - + // 确保人脸数据库存在 taskFaceService.assureFaceDb(faceBodyAdapter, scenicId.toString()); - + // 添加人脸到识别服务(使用faceSampleId作为唯一标识) AddFaceResp addFaceResp = faceBodyAdapter.addFace( - scenicId.toString(), - faceSampleId.toString(), - faceUrl, + scenicId.toString(), + faceSampleId.toString(), + faceUrl, faceUniqueId // 即faceSampleId.toString() ); @@ -162,6 +158,7 @@ public class FaceProcessingKafkaService { // 更新人脸样本得分和状态 faceSampleMapper.updateScore(faceSampleId, addFaceResp.getScore()); updateFaceSampleStatus(faceSampleId, 2); + log.info("人脸识别处理成功, faceSampleId: {}", faceSampleId); // 查询设备配置,判断是否启用预订功能 Long deviceId = message.getDeviceId(); @@ -170,20 +167,17 @@ public class FaceProcessingKafkaService { Integer.valueOf(1).equals(deviceConfig.getInteger("enable_pre_book"))) { DynamicTaskGenerator.addTask(faceSampleId); } - return true; } else { log.warn("人脸添加返回空结果, faceSampleId: {}", faceSampleId); updateFaceSampleStatus(faceSampleId, -1); - return false; } } catch (Exception e) { - log.error("人脸识别处理失败, faceSampleId: {}, error: {}", + log.error("人脸识别处理失败, faceSampleId: {}, error: {}", faceSampleId, e.getMessage(), e); - + // 标记人脸样本为处理失败状态 updateFaceSampleStatus(faceSampleId, -1); - return false; } }