diff --git a/src/main/java/com/ycwl/basic/controller/monitor/FaceRecognitionMonitorController.java b/src/main/java/com/ycwl/basic/controller/monitor/FaceRecognitionMonitorController.java new file mode 100644 index 00000000..d5b80603 --- /dev/null +++ b/src/main/java/com/ycwl/basic/controller/monitor/FaceRecognitionMonitorController.java @@ -0,0 +1,50 @@ +package com.ycwl.basic.controller.monitor; + +import com.ycwl.basic.integration.kafka.scheduler.AccountFaceSchedulerManager; +import com.ycwl.basic.utils.ApiResponse; +import lombok.RequiredArgsConstructor; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.Map; + +/** + * 人脸识别监控接口 + * 提供调度器状态查询功能 + */ +@RestController +@RequestMapping("/api/monitor/face-recognition") +@RequiredArgsConstructor +public class FaceRecognitionMonitorController { + + private final AccountFaceSchedulerManager schedulerManager; + + /** + * 获取所有账号的调度器统计信息 + *

+ * 示例返回: + * { + * "LTAI5xxx": { + * "accountKey": "LTAI5xxx", + * "cloudType": "ALI", + * "activeThreads": 3, + * "executorQueueSize": 12, + * "schedulerQueueSize": 45 + * }, + * "245xxx": { + * "accountKey": "245xxx", + * "cloudType": "BAIDU", + * "activeThreads": 8, + * "executorQueueSize": 5, + * "schedulerQueueSize": 20 + * } + * } + * + * @return 所有账号的调度器状态 + */ + @GetMapping("/schedulers") + public ApiResponse> getAllSchedulerStats() { + return ApiResponse.success(schedulerManager.getAllStats()); + } +} diff --git a/src/main/java/com/ycwl/basic/facebody/adapter/AliFaceBodyAdapter.java b/src/main/java/com/ycwl/basic/facebody/adapter/AliFaceBodyAdapter.java index 6a3403a6..bc908744 100644 --- a/src/main/java/com/ycwl/basic/facebody/adapter/AliFaceBodyAdapter.java +++ b/src/main/java/com/ycwl/basic/facebody/adapter/AliFaceBodyAdapter.java @@ -45,6 +45,7 @@ public class AliFaceBodyAdapter implements IFaceBodyAdapter { private static final Map deleteDbLimiters = new ConcurrentHashMap<>(); private static final Map deleteEntityLimiters = new ConcurrentHashMap<>(); + @Getter // 添加getter,支持获取accessKeyId private AliFaceBodyConfig config; public boolean setConfig(AliFaceBodyConfig config) { @@ -184,10 +185,8 @@ public class AliFaceBodyAdapter implements IFaceBodyAdapter { addFaceRequest.setImageUrl(faceUrl); addFaceRequest.setExtraData(extData); AddFaceResp respVo = new AddFaceResp(); - try { - addFaceLimiter.acquire(); - } catch (InterruptedException ignored) { - } + // QPS控制已由外层调度器管理,这里不再需要限流 + // 移除阻塞等待: addFaceLimiter.acquire() try { AddFaceResponse acsResponse = client.getAcsResponse(addFaceRequest); respVo.setScore(acsResponse.getData().getQualitieScore()); diff --git a/src/main/java/com/ycwl/basic/facebody/adapter/BceFaceBodyAdapter.java b/src/main/java/com/ycwl/basic/facebody/adapter/BceFaceBodyAdapter.java index 52231da3..9527b7b2 100644 --- a/src/main/java/com/ycwl/basic/facebody/adapter/BceFaceBodyAdapter.java +++ b/src/main/java/com/ycwl/basic/facebody/adapter/BceFaceBodyAdapter.java @@ -8,6 +8,7 @@ import com.ycwl.basic.facebody.entity.SearchFaceResp; import com.ycwl.basic.facebody.entity.SearchFaceResultItem; import com.ycwl.basic.utils.ratelimiter.FixedRateLimiter; import com.ycwl.basic.utils.ratelimiter.IRateLimiter; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.json.JSONArray; import org.json.JSONObject; @@ -40,6 +41,8 @@ public class BceFaceBodyAdapter implements IFaceBodyAdapter { private static final Map deleteDbLimiters = new ConcurrentHashMap<>(); private static final Map deleteEntityLimiters = new ConcurrentHashMap<>(); private static final Map deleteFaceLimiters = new ConcurrentHashMap<>(); + + @Getter // 添加getter,支持获取appId和addQps private BceFaceBodyConfig config; public boolean setConfig(BceFaceBodyConfig config) { @@ -149,10 +152,8 @@ public class BceFaceBodyAdapter implements IFaceBodyAdapter { options.put("user_info", extData); // options.put("quality_control", "LOW"); options.put("action_type", "REPLACE"); - try { - addEntityLimiter.acquire(); - } catch (InterruptedException ignored) { - } + // QPS控制已由外层调度器管理,这里不再需要限流 + // 移除阻塞等待: addEntityLimiter.acquire() JSONObject response = client.addUser(faceUrl, "URL", dbName, entityId, options); int errorCode = response.getInt("error_code"); if (errorCode == 0) { @@ -164,10 +165,7 @@ public class BceFaceBodyAdapter implements IFaceBodyAdapter { log.warn("无法正常访问URL图片,错误码: 222204,尝试下载图片转base64后重试,URL: {}", faceUrl); String base64Image = downloadImageAsBase64(faceUrl); if (base64Image != null) { - try { - addEntityLimiter.acquire(); - } catch (InterruptedException ignored) { - } + // 重试时也不需要限流,由外层调度器控制 JSONObject retryResponse = client.addUser(base64Image, "BASE64", dbName, entityId, options); if (retryResponse.getInt("error_code") == 0) { log.info("使用base64重试添加人脸成功,entityId: {}", entityId); diff --git a/src/main/java/com/ycwl/basic/integration/kafka/config/FaceRecognitionThreadPoolConfig.java b/src/main/java/com/ycwl/basic/integration/kafka/config/FaceRecognitionThreadPoolConfig.java deleted file mode 100644 index 06d20672..00000000 --- a/src/main/java/com/ycwl/basic/integration/kafka/config/FaceRecognitionThreadPoolConfig.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.ycwl.basic.integration.kafka.config; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -/** - * 人脸识别异步处理线程池配置 - */ -@Slf4j -@Configuration -@ConditionalOnProperty(name = "kafka.enabled", havingValue = "true") -public class FaceRecognitionThreadPoolConfig { - - /** - * 创建人脸识别专用线程池 - * - 核心线程数:128 - * - 最大线程数:256 - * - 队列容量:1000(避免无限制增长) - * - 拒绝策略:CallerRunsPolicy(调用者线程执行) - */ - @Bean(name = "faceRecognitionExecutor", destroyMethod = "shutdown") - public ThreadPoolExecutor faceRecognitionExecutor() { - ThreadPoolExecutor executor = new ThreadPoolExecutor( - 128, // 核心线程数 - 256, // 最大线程数 - 10L, // 空闲线程存活时间 - TimeUnit.SECONDS, // 时间单位 - new LinkedBlockingQueue<>(1024), // 任务队列 - r -> { - Thread thread = new Thread(r); - thread.setName("face-recognition-" + thread.getId()); - thread.setDaemon(false); - return thread; - }, - new ThreadPoolExecutor.CallerRunsPolicy() // 超过容量时由调用者线程执行 - ); - - log.info("人脸识别线程池初始化完成 - 核心线程数: {}, 最大线程数: {}, 队列容量: 1000", - executor.getCorePoolSize(), executor.getMaximumPoolSize()); - - return executor; - } -} diff --git a/src/main/java/com/ycwl/basic/integration/kafka/scheduler/AccountFaceSchedulerManager.java b/src/main/java/com/ycwl/basic/integration/kafka/scheduler/AccountFaceSchedulerManager.java new file mode 100644 index 00000000..b838fc6f --- /dev/null +++ b/src/main/java/com/ycwl/basic/integration/kafka/scheduler/AccountFaceSchedulerManager.java @@ -0,0 +1,180 @@ +package com.ycwl.basic.integration.kafka.scheduler; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * 账号级别的人脸识别调度器管理 + * 每个账号(accessKeyId/appId)拥有独立的: + * 1. 线程池 - 资源隔离 + * 2. QPS调度器 - 精确控制每个账号的QPS + * 3. 任务队列 - 独立排队 + *

+ * 核心优势: + * - 多个阿里云账号互不影响,充分利用多账号QPS优势 + * - 百度云和阿里云任务完全隔离 + * - 每个账号严格按自己的QPS限制调度 + */ +@Slf4j +@Component +public class AccountFaceSchedulerManager { + + // 账号 -> 调度器上下文的映射 + private final ConcurrentHashMap schedulers = new ConcurrentHashMap<>(); + + /** + * 获取或创建账号的调度器上下文 + * + * @param accountKey 账号唯一标识 (accessKeyId 或 appId) + * @param cloudType 云类型 ("ALI" 或 "BAIDU") + * @param qps 该账号的QPS限制 + * @return 调度器上下文 + */ + public AccountSchedulerContext getOrCreateScheduler( + String accountKey, + String cloudType, + float qps + ) { + return schedulers.computeIfAbsent(accountKey, key -> { + log.info("创建账号调度器: accountKey={}, cloudType={}, qps={}", + accountKey, cloudType, qps); + return createSchedulerContext(accountKey, cloudType, qps); + }); + } + + /** + * 创建调度器上下文 + */ + private AccountSchedulerContext createSchedulerContext( + String accountKey, + String cloudType, + float qps + ) { + // 根据云类型和QPS计算线程池参数 + ThreadPoolConfig poolConfig = calculateThreadPoolConfig(cloudType, qps); + + // 创建独立线程池 + ThreadPoolExecutor executor = new ThreadPoolExecutor( + poolConfig.coreSize, + poolConfig.maxSize, + 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(poolConfig.queueCapacity), + new ThreadFactoryBuilder() + .setNameFormat(cloudType.toLowerCase() + "-" + accountKey.substring(0, Math.min(8, accountKey.length())) + "-%d") + .build(), + new ThreadPoolExecutor.AbortPolicy() // 快速失败,避免阻塞 + ); + + // 创建QPS调度器 + QpsScheduler scheduler = new QpsScheduler( + Math.round(qps), // 每秒调度的任务数 + poolConfig.maxConcurrent, // 最大并发数 + executor + ); + + log.info("账号调度器创建成功: accountKey={}, threadPool=[core={}, max={}, queue={}], qps={}, maxConcurrent={}", + accountKey, + poolConfig.coreSize, + poolConfig.maxSize, + poolConfig.queueCapacity, + Math.round(qps), + poolConfig.maxConcurrent); + + return new AccountSchedulerContext(accountKey, cloudType, executor, scheduler); + } + + /** + * 根据云类型和QPS计算线程池参数 + */ + private ThreadPoolConfig calculateThreadPoolConfig(String cloudType, float qps) { + // 假设每个任务平均执行时间 500ms + int avgExecutionTimeMs = 500; + + // 所需线程数 = QPS × 平均执行时间(秒) + int requiredThreads = Math.max(1, (int) Math.ceil(qps * avgExecutionTimeMs / 1000.0)); + + // 核心线程数 = 所需线程数 × 2 (留有余量) + int coreSize = requiredThreads * 2; + + // 最大线程数 = 核心线程数 × 2 + int maxSize = coreSize * 2; + + // 队列容量 = QPS × 60 (可容纳1分钟的任务) + int queueCapacity = Math.max(100, (int) (qps * 60)); + + // 最大并发数 = 所需线程数 × 1.5 (防止瞬时抖动) + int maxConcurrent = Math.max(2, (int) (requiredThreads * 1.5)); + + log.debug("计算线程池参数 - cloudType={}, qps={}, requiredThreads={}, coreSize={}, maxSize={}, queue={}, maxConcurrent={}", + cloudType, qps, requiredThreads, coreSize, maxSize, queueCapacity, maxConcurrent); + + return new ThreadPoolConfig(coreSize, maxSize, queueCapacity, maxConcurrent); + } + + /** + * 获取所有调度器的监控信息 + */ + public Map getAllStats() { + Map stats = new HashMap<>(); + schedulers.forEach((key, ctx) -> { + stats.put(key, new AccountSchedulerStats( + ctx.getAccountKey(), + ctx.getCloudType(), + ctx.getExecutor().getActiveCount(), + ctx.getExecutor().getQueue().size(), + ctx.getScheduler().getQueueSize() + )); + }); + return stats; + } + + /** + * 关闭所有调度器 (应用关闭时调用) + */ + public void shutdownAll() { + log.info("关闭所有账号调度器, total={}", schedulers.size()); + schedulers.forEach((key, ctx) -> { + try { + ctx.getScheduler().shutdown(); + ctx.getExecutor().shutdown(); + } catch (Exception e) { + log.error("关闭调度器失败, accountKey={}", key, e); + } + }); + } + + /** + * 线程池配置 + */ + @Data + @AllArgsConstructor + static class ThreadPoolConfig { + int coreSize; + int maxSize; + int queueCapacity; + int maxConcurrent; + } + + /** + * 账号调度器统计信息 + */ + @Data + @AllArgsConstructor + public static class AccountSchedulerStats { + String accountKey; + String cloudType; + int activeThreads; + int executorQueueSize; + int schedulerQueueSize; + } +} diff --git a/src/main/java/com/ycwl/basic/integration/kafka/scheduler/AccountSchedulerContext.java b/src/main/java/com/ycwl/basic/integration/kafka/scheduler/AccountSchedulerContext.java new file mode 100644 index 00000000..79c87691 --- /dev/null +++ b/src/main/java/com/ycwl/basic/integration/kafka/scheduler/AccountSchedulerContext.java @@ -0,0 +1,34 @@ +package com.ycwl.basic.integration.kafka.scheduler; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.util.concurrent.ThreadPoolExecutor; + +/** + * 账号调度器上下文 + * 封装每个账号的线程池和QPS调度器 + */ +@Data +@AllArgsConstructor +public class AccountSchedulerContext { + /** + * 账号唯一标识 (accessKeyId 或 appId) + */ + private String accountKey; + + /** + * 云类型 ("ALI" 或 "BAIDU") + */ + private String cloudType; + + /** + * 该账号专属的线程池 + */ + private ThreadPoolExecutor executor; + + /** + * 该账号专属的QPS调度器 + */ + private QpsScheduler scheduler; +} diff --git a/src/main/java/com/ycwl/basic/integration/kafka/scheduler/QpsScheduler.java b/src/main/java/com/ycwl/basic/integration/kafka/scheduler/QpsScheduler.java new file mode 100644 index 00000000..a42a92b5 --- /dev/null +++ b/src/main/java/com/ycwl/basic/integration/kafka/scheduler/QpsScheduler.java @@ -0,0 +1,114 @@ +package com.ycwl.basic.integration.kafka.scheduler; + +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.*; + +/** + * QPS 调度器 + * 定期从队列取任务,严格控制 QPS + * 每秒调度固定数量的任务,确保不超过云端 API 的 QPS 限制 + */ +@Slf4j +public class QpsScheduler { + private final BlockingQueue taskQueue; + private final ThreadPoolExecutor workerPool; + private final ScheduledExecutorService scheduler; + private final int qps; + private final Semaphore concurrentLimiter; // 并发数限制,防止瞬时抖动 + + /** + * 创建 QPS 调度器 + * + * @param qps 每秒允许的最大请求数 + * @param maxConcurrent 最大并发数 + * @param workerPool 工作线程池 + */ + public QpsScheduler(int qps, int maxConcurrent, ThreadPoolExecutor workerPool) { + this.qps = qps; + this.taskQueue = new LinkedBlockingQueue<>(); + this.workerPool = workerPool; + this.scheduler = new ScheduledThreadPoolExecutor(1, r -> { + Thread thread = new Thread(r); + thread.setName("qps-scheduler-" + workerPool.getThreadFactory().newThread(() -> {}).getName()); + thread.setDaemon(true); + return thread; + }); + this.concurrentLimiter = new Semaphore(maxConcurrent); + + // 每秒调度一次,取 qps 个任务 + scheduler.scheduleAtFixedRate(this::dispatch, 0, 1, TimeUnit.SECONDS); + + log.info("QPS调度器已启动: qps={}, maxConcurrent={}", qps, maxConcurrent); + } + + /** + * 调度任务 + * 每秒执行一次,从队列中取出 qps 个任务提交到工作线程池 + */ + private void dispatch() { + int dispatched = 0; + for (int i = 0; i < qps; i++) { + Runnable task = taskQueue.poll(); + if (task == null) { + break; // 队列为空,结束本次调度 + } + + // 检查并发数限制 + if (concurrentLimiter.tryAcquire()) { + try { + workerPool.execute(() -> { + try { + task.run(); + } catch (Exception e) { + log.error("任务执行失败", e); + } finally { + concurrentLimiter.release(); + } + }); + dispatched++; + } catch (RejectedExecutionException e) { + // 线程池拒绝,释放并发许可,任务丢弃 + concurrentLimiter.release(); + log.warn("任务被线程池拒绝", e); + } + } else { + // 并发数已满,任务放回队列,等待下次调度 + taskQueue.offer(task); + break; + } + } + + if (dispatched > 0 || taskQueue.size() > 0) { + log.debug("QPS调度完成: dispatched={}, remainQueue={}, availableConcurrent={}", + dispatched, taskQueue.size(), concurrentLimiter.availablePermits()); + } + } + + /** + * 提交任务到调度队列 + * + * @param task 待执行的任务 + * @return 是否成功提交 + */ + public boolean submit(Runnable task) { + return taskQueue.offer(task); + } + + /** + * 获取队列中等待调度的任务数量 + * + * @return 队列大小 + */ + public int getQueueSize() { + return taskQueue.size(); + } + + /** + * 关闭调度器 + */ + public void shutdown() { + scheduler.shutdown(); + log.info("QPS调度器已关闭, qps={}", qps); + } +} diff --git a/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java b/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java index 9bf01a39..c5e27cdf 100644 --- a/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java +++ b/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java @@ -1,17 +1,21 @@ package com.ycwl.basic.integration.kafka.service; -import com.fasterxml.jackson.databind.ObjectMapper; +import com.ycwl.basic.facebody.adapter.AliFaceBodyAdapter; +import com.ycwl.basic.facebody.adapter.BceFaceBodyAdapter; import com.ycwl.basic.facebody.adapter.IFaceBodyAdapter; import com.ycwl.basic.facebody.entity.AddFaceResp; +import com.ycwl.basic.facebody.entity.AliFaceBodyConfig; +import com.ycwl.basic.facebody.entity.BceFaceBodyConfig; +import com.ycwl.basic.integration.common.manager.DeviceConfigManager; import com.ycwl.basic.integration.kafka.dto.FaceProcessingMessage; +import com.ycwl.basic.integration.kafka.scheduler.AccountFaceSchedulerManager; +import com.ycwl.basic.integration.kafka.scheduler.AccountSchedulerContext; import com.ycwl.basic.mapper.FaceSampleMapper; import com.ycwl.basic.model.pc.faceSample.entity.FaceSampleEntity; import com.ycwl.basic.repository.DeviceRepository; import com.ycwl.basic.service.pc.ScenicService; import com.ycwl.basic.service.task.TaskFaceService; import com.ycwl.basic.task.DynamicTaskGenerator; -import com.ycwl.basic.integration.common.manager.DeviceConfigManager; -// 不再需要SnowFlakeUtil,使用外部传入的ID import com.ycwl.basic.utils.JacksonUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -20,13 +24,16 @@ import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Service; -import java.time.ZoneId; import java.util.Date; -import java.util.concurrent.ThreadPoolExecutor; /** * 人脸处理Kafka消费服务 * 消费外部系统发送到zt-face topic的消息 + *

+ * 核心改进: + * 1. 按账号(accessKeyId/appId)隔离线程池和QPS调度器 + * 2. 确保数据库优先写入,状态流转清晰 + * 3. 严格QPS控制,线程资源高效利用 */ @Slf4j @Service @@ -40,51 +47,144 @@ public class FaceProcessingKafkaService { private final TaskFaceService taskFaceService; private final ScenicService scenicService; private final DeviceRepository deviceRepository; - private final ThreadPoolExecutor faceRecognitionExecutor; + private final AccountFaceSchedulerManager schedulerManager; /** * 消费外部系统发送的人脸处理消息 - * 先保存人脸样本数据,再进行异步人脸识别处理 + * 核心流程: + * 1. 同步写入数据库 (最高优先级) + * 2. 获取账号调度器上下文 + * 3. 提交到账号专属调度器队列 */ @KafkaListener(topics = ZT_FACE_TOPIC, containerFactory = "manualCommitKafkaListenerContainerFactory") public void processFaceMessage(String message, Acknowledgment ack) { + Long faceSampleId = null; try { FaceProcessingMessage faceMessage = JacksonUtil.parseObject(message, FaceProcessingMessage.class); - log.debug("接收到外部人脸处理消息, scenicId: {}, deviceId: {}, faceUrl: {}", - faceMessage.getScenicId(), faceMessage.getDeviceId(), faceMessage.getFaceUrl()); + faceSampleId = faceMessage.getFaceSampleId(); - // 使用外部传入的faceSampleId - Long externalFaceId = faceMessage.getFaceSampleId(); - if (externalFaceId == null) { - log.error("外部消息中未包含faceSampleId"); - // 即使消息格式错误,也消费消息避免重复处理 + log.debug("接收人脸消息: scenicId={}, deviceId={}, faceSampleId={}", + faceMessage.getScenicId(), faceMessage.getDeviceId(), faceSampleId); + + // ========== 第一步: 同步写入数据库 (最高优先级) ========== + boolean saved = saveFaceSample(faceMessage, faceSampleId); + if (!saved) { + log.error("❌ 数据库写入失败, 不提交识别任务, faceSampleId={}", faceSampleId); + // 数据库写入失败,消费消息避免重复 ack.acknowledge(); return; } - - // 先保存人脸样本数据 - boolean saved = saveFaceSample(faceMessage, externalFaceId); - // 然后异步进行人脸识别处理(使用专用线程池) - if (saved) { - faceRecognitionExecutor.execute(() -> processFaceRecognitionAsync(faceMessage)); - log.debug("人脸识别任务已提交至线程池, faceSampleId: {}, 活跃线程: {}, 队列大小: {}", - externalFaceId, faceRecognitionExecutor.getActiveCount(), - faceRecognitionExecutor.getQueue().size()); - } else { - log.warn("人脸样本保存失败,但消息仍将被消费, faceSampleId: {}", externalFaceId); + log.debug("✅ 数据库写入成功, faceSampleId={}, status=0", faceSampleId); + + // ========== 第二步: 获取账号调度器上下文 ========== + AccountSchedulerContext schedulerCtx = getSchedulerContextForScenic(faceMessage.getScenicId()); + if (schedulerCtx == null) { + log.error("❌ 无法获取调度器上下文, faceSampleId={}", faceSampleId); + updateFaceSampleStatusSafely(faceSampleId, -1); + ack.acknowledge(); + return; } - // 无论处理是否成功,都消费消息 + // ========== 第三步: 提交到账号专属调度器 ========== + boolean submitted = schedulerCtx.getScheduler().submit(() -> { + processFaceRecognitionAsync(faceMessage); + }); + + if (submitted) { + log.debug("✅ 任务已提交到调度器, account={}, cloudType={}, faceSampleId={}, schedulerQueue={}", + schedulerCtx.getAccountKey(), + schedulerCtx.getCloudType(), + faceSampleId, + schedulerCtx.getScheduler().getQueueSize()); + } else { + log.error("❌ 调度器队列已满, account={}, faceSampleId={}", + schedulerCtx.getAccountKey(), faceSampleId); + updateFaceSampleStatusSafely(faceSampleId, -1); + } + + // 无论成功失败,都消费消息 ack.acknowledge(); } catch (Exception e) { - log.error("处理外部人脸消息失败: {}", e.getMessage(), e); - // 即使发生异常也消费消息,避免消息堆积 + log.error("❌ 处理人脸消息异常, faceSampleId={}", faceSampleId, e); + if (faceSampleId != null) { + updateFaceSampleStatusSafely(faceSampleId, -1); + } ack.acknowledge(); } } + /** + * 根据景区获取对应的账号调度器上下文 + * 关键: 按 accessKeyId/appId 隔离,而非按云类型 + */ + private AccountSchedulerContext getSchedulerContextForScenic(Long scenicId) { + try { + // 获取景区的 adapter + IFaceBodyAdapter adapter = scenicService.getScenicFaceBodyAdapter(scenicId); + if (adapter == null) { + log.error("景区 adapter 不存在, scenicId={}", scenicId); + return null; + } + + // 提取账号信息和QPS配置 + if (adapter instanceof AliFaceBodyAdapter aliAdapter) { + AliFaceBodyConfig config = aliAdapter.getConfig(); + + if (config == null || config.getAccessKeyId() == null) { + log.error("阿里云配置为空, scenicId={}", scenicId); + return null; + } + + // 使用 accessKeyId 作为唯一标识 + String accountKey = config.getAccessKeyId(); + float qps = 2.0f; // 阿里云固定 2 QPS (AddFace操作) + + return schedulerManager.getOrCreateScheduler(accountKey, "ALI", qps); + + } else if (adapter instanceof BceFaceBodyAdapter baiduAdapter) { + BceFaceBodyConfig config = baiduAdapter.getConfig(); + + if (config == null || config.getAppId() == null) { + log.error("百度云配置为空, scenicId={}", scenicId); + return null; + } + + // 使用 appId 作为唯一标识 + String accountKey = config.getAppId(); + float qps = config.getAddQps(); // 百度云可配置 QPS + + return schedulerManager.getOrCreateScheduler(accountKey, "BAIDU", qps); + + } else { + log.error("未知的 adapter 类型: {}", adapter.getClass().getName()); + return null; + } + + } catch (Exception e) { + log.error("获取调度器上下文失败, scenicId={}", scenicId, e); + return null; + } + } + + /** + * 安全地更新人脸样本状态 + * 捕获所有异常,确保状态更新失败不影响主流程 + */ + private void updateFaceSampleStatusSafely(Long faceSampleId, Integer status) { + if (faceSampleId == null) { + return; + } + try { + faceSampleMapper.updateStatus(faceSampleId, status); + log.debug("状态更新成功: faceSampleId={}, status={}", faceSampleId, status); + } catch (Exception e) { + log.error("⚠️ 状态更新失败(非致命): faceSampleId={}, status={}", faceSampleId, status, e); + // 不抛出异常,避免影响消息消费 + } + } + /** * 保存人脸样本数据到数据库 * @param faceMessage 人脸处理消息 @@ -126,69 +226,59 @@ public class FaceProcessingKafkaService { private void processFaceRecognitionAsync(FaceProcessingMessage message) { Long faceSampleId = message.getFaceSampleId(); Long scenicId = message.getScenicId(); - String faceUrl = message.getFaceUrl(); - - // 直接使用faceSampleId作为唯一标识 - String faceUniqueId = faceSampleId.toString(); - - // 获取人脸识别适配器 - IFaceBodyAdapter faceBodyAdapter = scenicService.getScenicFaceBodyAdapter(scenicId); - if (faceBodyAdapter == null) { - log.error("人脸识别适配器不存在, scenicId: {}", scenicId); - updateFaceSampleStatus(faceSampleId, -1); - return; - } try { - // 更新状态为处理中 - updateFaceSampleStatus(faceSampleId, 1); + // ========== 第一步: 更新状态为"处理中" ========== + updateFaceSampleStatusSafely(faceSampleId, 1); + log.debug("开始人脸识别, faceSampleId={}, status=1", faceSampleId); - // 确保人脸数据库存在 - taskFaceService.assureFaceDb(faceBodyAdapter, scenicId.toString()); + // ========== 第二步: 获取 adapter ========== + IFaceBodyAdapter adapter = scenicService.getScenicFaceBodyAdapter(scenicId); + if (adapter == null) { + log.error("adapter 不存在, scenicId={}, faceSampleId={}", scenicId, faceSampleId); + updateFaceSampleStatusSafely(faceSampleId, -1); + return; + } - // 添加人脸到识别服务(使用faceSampleId作为唯一标识) - AddFaceResp addFaceResp = faceBodyAdapter.addFace( - scenicId.toString(), - faceSampleId.toString(), - faceUrl, - faceUniqueId // 即faceSampleId.toString() + // ========== 第三步: 确保人脸数据库存在 ========== + taskFaceService.assureFaceDb(adapter, scenicId.toString()); + + // ========== 第四步: 调用 addFace (QPS已由调度器控制) ========== + String faceUniqueId = faceSampleId.toString(); + AddFaceResp addFaceResp = adapter.addFace( + scenicId.toString(), + faceSampleId.toString(), + message.getFaceUrl(), + faceUniqueId ); + // ========== 第五步: 更新识别结果 ========== if (addFaceResp != null) { - // 更新人脸样本得分和状态 + // 成功: 更新 score 和状态 faceSampleMapper.updateScore(faceSampleId, addFaceResp.getScore()); - updateFaceSampleStatus(faceSampleId, 2); - log.debug("人脸识别处理成功, faceSampleId: {}", faceSampleId); + updateFaceSampleStatusSafely(faceSampleId, 2); - // 查询设备配置,判断是否启用预订功能 + log.info("✅ 人脸识别成功, faceSampleId={}, score={}, status=2", + faceSampleId, addFaceResp.getScore()); + + // 可选: 触发预订任务 Long deviceId = message.getDeviceId(); DeviceConfigManager deviceConfig = deviceRepository.getDeviceConfigManager(deviceId); if (deviceConfig != null && - Integer.valueOf(1).equals(deviceConfig.getInteger("enable_pre_book"))) { + Integer.valueOf(1).equals(deviceConfig.getInteger("enable_pre_book"))) { DynamicTaskGenerator.addTask(faceSampleId); } + } else { - log.warn("人脸添加返回空结果, faceSampleId: {}", faceSampleId); - updateFaceSampleStatus(faceSampleId, -1); + // addFace 返回 null,识别失败 + log.warn("⚠️ addFace 返回 null, faceSampleId={}", faceSampleId); + updateFaceSampleStatusSafely(faceSampleId, -1); } } catch (Exception e) { - log.error("人脸识别处理失败, faceSampleId: {}, error: {}", - faceSampleId, e.getMessage(), e); - - // 标记人脸样本为处理失败状态 - updateFaceSampleStatus(faceSampleId, -1); - } - } - - /** - * 更新人脸样本状态 - */ - private void updateFaceSampleStatus(Long faceSampleId, Integer status) { - try { - faceSampleMapper.updateStatus(faceSampleId, status); - } catch (Exception e) { - log.error("更新人脸样本状态失败, faceSampleId: {}", faceSampleId, e); + // ========== 异常处理: 更新状态为失败 ========== + log.error("❌ 人脸识别异常, faceSampleId={}", faceSampleId, e); + updateFaceSampleStatusSafely(faceSampleId, -1); } } @@ -226,14 +316,27 @@ public class FaceProcessingKafkaService { .source("retry-manual") .build(); - // 提交到线程池进行异步处理 - faceRecognitionExecutor.execute(() -> processFaceRecognitionAsync(message)); + // 获取账号调度器上下文 + AccountSchedulerContext schedulerCtx = getSchedulerContextForScenic(faceSample.getScenicId()); + if (schedulerCtx == null) { + log.error("无法获取调度器上下文, faceSampleId={}", faceSampleId); + return false; + } - log.info("人脸识别重试任务已提交, faceSampleId: {}, 活跃线程: {}, 队列大小: {}", - faceSampleId, faceRecognitionExecutor.getActiveCount(), - faceRecognitionExecutor.getQueue().size()); + // 提交到调度器进行异步处理 + boolean submitted = schedulerCtx.getScheduler().submit(() -> processFaceRecognitionAsync(message)); - return true; + if (submitted) { + log.info("人脸识别重试任务已提交, faceSampleId={}, account={}, cloudType={}, schedulerQueue={}", + faceSampleId, + schedulerCtx.getAccountKey(), + schedulerCtx.getCloudType(), + schedulerCtx.getScheduler().getQueueSize()); + return true; + } else { + log.error("调度器队列已满,重试任务被拒绝, faceSampleId={}", faceSampleId); + return false; + } } catch (Exception e) { log.error("提交人脸识别重试任务失败, faceSampleId: {}", faceSampleId, e);