You've already forked FrameTour-BE
Merge branch 'refs/heads/facebody_async'
Some checks are pending
ZhenTu-BE/pipeline/head Build queued...
Some checks are pending
ZhenTu-BE/pipeline/head Build queued...
This commit is contained in:
@@ -0,0 +1,50 @@
|
|||||||
|
package com.ycwl.basic.controller.monitor;
|
||||||
|
|
||||||
|
import com.ycwl.basic.integration.kafka.scheduler.AccountFaceSchedulerManager;
|
||||||
|
import com.ycwl.basic.utils.ApiResponse;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import org.springframework.web.bind.annotation.GetMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 人脸识别监控接口
|
||||||
|
* 提供调度器状态查询功能
|
||||||
|
*/
|
||||||
|
@RestController
|
||||||
|
@RequestMapping("/api/monitor/face-recognition")
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class FaceRecognitionMonitorController {
|
||||||
|
|
||||||
|
private final AccountFaceSchedulerManager schedulerManager;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取所有账号的调度器统计信息
|
||||||
|
* <p>
|
||||||
|
* 示例返回:
|
||||||
|
* {
|
||||||
|
* "LTAI5xxx": {
|
||||||
|
* "accountKey": "LTAI5xxx",
|
||||||
|
* "cloudType": "ALI",
|
||||||
|
* "activeThreads": 3,
|
||||||
|
* "executorQueueSize": 12,
|
||||||
|
* "schedulerQueueSize": 45
|
||||||
|
* },
|
||||||
|
* "245xxx": {
|
||||||
|
* "accountKey": "245xxx",
|
||||||
|
* "cloudType": "BAIDU",
|
||||||
|
* "activeThreads": 8,
|
||||||
|
* "executorQueueSize": 5,
|
||||||
|
* "schedulerQueueSize": 20
|
||||||
|
* }
|
||||||
|
* }
|
||||||
|
*
|
||||||
|
* @return 所有账号的调度器状态
|
||||||
|
*/
|
||||||
|
@GetMapping("/schedulers")
|
||||||
|
public ApiResponse<Map<String, AccountFaceSchedulerManager.AccountSchedulerStats>> getAllSchedulerStats() {
|
||||||
|
return ApiResponse.success(schedulerManager.getAllStats());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -45,6 +45,7 @@ public class AliFaceBodyAdapter implements IFaceBodyAdapter {
|
|||||||
private static final Map<String, IRateLimiter> deleteDbLimiters = new ConcurrentHashMap<>();
|
private static final Map<String, IRateLimiter> deleteDbLimiters = new ConcurrentHashMap<>();
|
||||||
private static final Map<String, IRateLimiter> deleteEntityLimiters = new ConcurrentHashMap<>();
|
private static final Map<String, IRateLimiter> deleteEntityLimiters = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
@Getter // 添加getter,支持获取accessKeyId
|
||||||
private AliFaceBodyConfig config;
|
private AliFaceBodyConfig config;
|
||||||
|
|
||||||
public boolean setConfig(AliFaceBodyConfig config) {
|
public boolean setConfig(AliFaceBodyConfig config) {
|
||||||
@@ -184,10 +185,8 @@ public class AliFaceBodyAdapter implements IFaceBodyAdapter {
|
|||||||
addFaceRequest.setImageUrl(faceUrl);
|
addFaceRequest.setImageUrl(faceUrl);
|
||||||
addFaceRequest.setExtraData(extData);
|
addFaceRequest.setExtraData(extData);
|
||||||
AddFaceResp respVo = new AddFaceResp();
|
AddFaceResp respVo = new AddFaceResp();
|
||||||
try {
|
// QPS控制已由外层调度器管理,这里不再需要限流
|
||||||
addFaceLimiter.acquire();
|
// 移除阻塞等待: addFaceLimiter.acquire()
|
||||||
} catch (InterruptedException ignored) {
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
AddFaceResponse acsResponse = client.getAcsResponse(addFaceRequest);
|
AddFaceResponse acsResponse = client.getAcsResponse(addFaceRequest);
|
||||||
respVo.setScore(acsResponse.getData().getQualitieScore());
|
respVo.setScore(acsResponse.getData().getQualitieScore());
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import com.ycwl.basic.facebody.entity.SearchFaceResp;
|
|||||||
import com.ycwl.basic.facebody.entity.SearchFaceResultItem;
|
import com.ycwl.basic.facebody.entity.SearchFaceResultItem;
|
||||||
import com.ycwl.basic.utils.ratelimiter.FixedRateLimiter;
|
import com.ycwl.basic.utils.ratelimiter.FixedRateLimiter;
|
||||||
import com.ycwl.basic.utils.ratelimiter.IRateLimiter;
|
import com.ycwl.basic.utils.ratelimiter.IRateLimiter;
|
||||||
|
import lombok.Getter;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.json.JSONArray;
|
import org.json.JSONArray;
|
||||||
import org.json.JSONObject;
|
import org.json.JSONObject;
|
||||||
@@ -40,6 +41,8 @@ public class BceFaceBodyAdapter implements IFaceBodyAdapter {
|
|||||||
private static final Map<String, IRateLimiter> deleteDbLimiters = new ConcurrentHashMap<>();
|
private static final Map<String, IRateLimiter> deleteDbLimiters = new ConcurrentHashMap<>();
|
||||||
private static final Map<String, IRateLimiter> deleteEntityLimiters = new ConcurrentHashMap<>();
|
private static final Map<String, IRateLimiter> deleteEntityLimiters = new ConcurrentHashMap<>();
|
||||||
private static final Map<String, IRateLimiter> deleteFaceLimiters = new ConcurrentHashMap<>();
|
private static final Map<String, IRateLimiter> deleteFaceLimiters = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
@Getter // 添加getter,支持获取appId和addQps
|
||||||
private BceFaceBodyConfig config;
|
private BceFaceBodyConfig config;
|
||||||
|
|
||||||
public boolean setConfig(BceFaceBodyConfig config) {
|
public boolean setConfig(BceFaceBodyConfig config) {
|
||||||
@@ -149,10 +152,8 @@ public class BceFaceBodyAdapter implements IFaceBodyAdapter {
|
|||||||
options.put("user_info", extData);
|
options.put("user_info", extData);
|
||||||
// options.put("quality_control", "LOW");
|
// options.put("quality_control", "LOW");
|
||||||
options.put("action_type", "REPLACE");
|
options.put("action_type", "REPLACE");
|
||||||
try {
|
// QPS控制已由外层调度器管理,这里不再需要限流
|
||||||
addEntityLimiter.acquire();
|
// 移除阻塞等待: addEntityLimiter.acquire()
|
||||||
} catch (InterruptedException ignored) {
|
|
||||||
}
|
|
||||||
JSONObject response = client.addUser(faceUrl, "URL", dbName, entityId, options);
|
JSONObject response = client.addUser(faceUrl, "URL", dbName, entityId, options);
|
||||||
int errorCode = response.getInt("error_code");
|
int errorCode = response.getInt("error_code");
|
||||||
if (errorCode == 0) {
|
if (errorCode == 0) {
|
||||||
@@ -164,10 +165,7 @@ public class BceFaceBodyAdapter implements IFaceBodyAdapter {
|
|||||||
log.warn("无法正常访问URL图片,错误码: 222204,尝试下载图片转base64后重试,URL: {}", faceUrl);
|
log.warn("无法正常访问URL图片,错误码: 222204,尝试下载图片转base64后重试,URL: {}", faceUrl);
|
||||||
String base64Image = downloadImageAsBase64(faceUrl);
|
String base64Image = downloadImageAsBase64(faceUrl);
|
||||||
if (base64Image != null) {
|
if (base64Image != null) {
|
||||||
try {
|
// 重试时也不需要限流,由外层调度器控制
|
||||||
addEntityLimiter.acquire();
|
|
||||||
} catch (InterruptedException ignored) {
|
|
||||||
}
|
|
||||||
JSONObject retryResponse = client.addUser(base64Image, "BASE64", dbName, entityId, options);
|
JSONObject retryResponse = client.addUser(base64Image, "BASE64", dbName, entityId, options);
|
||||||
if (retryResponse.getInt("error_code") == 0) {
|
if (retryResponse.getInt("error_code") == 0) {
|
||||||
log.info("使用base64重试添加人脸成功,entityId: {}", entityId);
|
log.info("使用base64重试添加人脸成功,entityId: {}", entityId);
|
||||||
|
|||||||
@@ -1,49 +0,0 @@
|
|||||||
package com.ycwl.basic.integration.kafka.config;
|
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
|
||||||
import org.springframework.context.annotation.Bean;
|
|
||||||
import org.springframework.context.annotation.Configuration;
|
|
||||||
|
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 人脸识别异步处理线程池配置
|
|
||||||
*/
|
|
||||||
@Slf4j
|
|
||||||
@Configuration
|
|
||||||
@ConditionalOnProperty(name = "kafka.enabled", havingValue = "true")
|
|
||||||
public class FaceRecognitionThreadPoolConfig {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 创建人脸识别专用线程池
|
|
||||||
* - 核心线程数:128
|
|
||||||
* - 最大线程数:256
|
|
||||||
* - 队列容量:1000(避免无限制增长)
|
|
||||||
* - 拒绝策略:CallerRunsPolicy(调用者线程执行)
|
|
||||||
*/
|
|
||||||
@Bean(name = "faceRecognitionExecutor", destroyMethod = "shutdown")
|
|
||||||
public ThreadPoolExecutor faceRecognitionExecutor() {
|
|
||||||
ThreadPoolExecutor executor = new ThreadPoolExecutor(
|
|
||||||
128, // 核心线程数
|
|
||||||
256, // 最大线程数
|
|
||||||
10L, // 空闲线程存活时间
|
|
||||||
TimeUnit.SECONDS, // 时间单位
|
|
||||||
new LinkedBlockingQueue<>(1024), // 任务队列
|
|
||||||
r -> {
|
|
||||||
Thread thread = new Thread(r);
|
|
||||||
thread.setName("face-recognition-" + thread.getId());
|
|
||||||
thread.setDaemon(false);
|
|
||||||
return thread;
|
|
||||||
},
|
|
||||||
new ThreadPoolExecutor.CallerRunsPolicy() // 超过容量时由调用者线程执行
|
|
||||||
);
|
|
||||||
|
|
||||||
log.info("人脸识别线程池初始化完成 - 核心线程数: {}, 最大线程数: {}, 队列容量: 1000",
|
|
||||||
executor.getCorePoolSize(), executor.getMaximumPoolSize());
|
|
||||||
|
|
||||||
return executor;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -0,0 +1,180 @@
|
|||||||
|
package com.ycwl.basic.integration.kafka.scheduler;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 账号级别的人脸识别调度器管理
|
||||||
|
* 每个账号(accessKeyId/appId)拥有独立的:
|
||||||
|
* 1. 线程池 - 资源隔离
|
||||||
|
* 2. QPS调度器 - 精确控制每个账号的QPS
|
||||||
|
* 3. 任务队列 - 独立排队
|
||||||
|
* <p>
|
||||||
|
* 核心优势:
|
||||||
|
* - 多个阿里云账号互不影响,充分利用多账号QPS优势
|
||||||
|
* - 百度云和阿里云任务完全隔离
|
||||||
|
* - 每个账号严格按自己的QPS限制调度
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
public class AccountFaceSchedulerManager {
|
||||||
|
|
||||||
|
// 账号 -> 调度器上下文的映射
|
||||||
|
private final ConcurrentHashMap<String, AccountSchedulerContext> schedulers = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取或创建账号的调度器上下文
|
||||||
|
*
|
||||||
|
* @param accountKey 账号唯一标识 (accessKeyId 或 appId)
|
||||||
|
* @param cloudType 云类型 ("ALI" 或 "BAIDU")
|
||||||
|
* @param qps 该账号的QPS限制
|
||||||
|
* @return 调度器上下文
|
||||||
|
*/
|
||||||
|
public AccountSchedulerContext getOrCreateScheduler(
|
||||||
|
String accountKey,
|
||||||
|
String cloudType,
|
||||||
|
float qps
|
||||||
|
) {
|
||||||
|
return schedulers.computeIfAbsent(accountKey, key -> {
|
||||||
|
log.info("创建账号调度器: accountKey={}, cloudType={}, qps={}",
|
||||||
|
accountKey, cloudType, qps);
|
||||||
|
return createSchedulerContext(accountKey, cloudType, qps);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 创建调度器上下文
|
||||||
|
*/
|
||||||
|
private AccountSchedulerContext createSchedulerContext(
|
||||||
|
String accountKey,
|
||||||
|
String cloudType,
|
||||||
|
float qps
|
||||||
|
) {
|
||||||
|
// 根据云类型和QPS计算线程池参数
|
||||||
|
ThreadPoolConfig poolConfig = calculateThreadPoolConfig(cloudType, qps);
|
||||||
|
|
||||||
|
// 创建独立线程池
|
||||||
|
ThreadPoolExecutor executor = new ThreadPoolExecutor(
|
||||||
|
poolConfig.coreSize,
|
||||||
|
poolConfig.maxSize,
|
||||||
|
60L, TimeUnit.SECONDS,
|
||||||
|
new LinkedBlockingQueue<>(poolConfig.queueCapacity),
|
||||||
|
new ThreadFactoryBuilder()
|
||||||
|
.setNameFormat(cloudType.toLowerCase() + "-" + accountKey.substring(0, Math.min(8, accountKey.length())) + "-%d")
|
||||||
|
.build(),
|
||||||
|
new ThreadPoolExecutor.AbortPolicy() // 快速失败,避免阻塞
|
||||||
|
);
|
||||||
|
|
||||||
|
// 创建QPS调度器
|
||||||
|
QpsScheduler scheduler = new QpsScheduler(
|
||||||
|
Math.round(qps), // 每秒调度的任务数
|
||||||
|
poolConfig.maxConcurrent, // 最大并发数
|
||||||
|
executor
|
||||||
|
);
|
||||||
|
|
||||||
|
log.info("账号调度器创建成功: accountKey={}, threadPool=[core={}, max={}, queue={}], qps={}, maxConcurrent={}",
|
||||||
|
accountKey,
|
||||||
|
poolConfig.coreSize,
|
||||||
|
poolConfig.maxSize,
|
||||||
|
poolConfig.queueCapacity,
|
||||||
|
Math.round(qps),
|
||||||
|
poolConfig.maxConcurrent);
|
||||||
|
|
||||||
|
return new AccountSchedulerContext(accountKey, cloudType, executor, scheduler);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 根据云类型和QPS计算线程池参数
|
||||||
|
*/
|
||||||
|
private ThreadPoolConfig calculateThreadPoolConfig(String cloudType, float qps) {
|
||||||
|
// 假设每个任务平均执行时间 500ms
|
||||||
|
int avgExecutionTimeMs = 500;
|
||||||
|
|
||||||
|
// 所需线程数 = QPS × 平均执行时间(秒)
|
||||||
|
int requiredThreads = Math.max(1, (int) Math.ceil(qps * avgExecutionTimeMs / 1000.0));
|
||||||
|
|
||||||
|
// 核心线程数 = 所需线程数 × 2 (留有余量)
|
||||||
|
int coreSize = requiredThreads * 2;
|
||||||
|
|
||||||
|
// 最大线程数 = 核心线程数 × 2
|
||||||
|
int maxSize = coreSize * 2;
|
||||||
|
|
||||||
|
// 队列容量 = QPS × 60 (可容纳1分钟的任务)
|
||||||
|
int queueCapacity = Math.max(100, (int) (qps * 60));
|
||||||
|
|
||||||
|
// 最大并发数 = 所需线程数 × 1.5 (防止瞬时抖动)
|
||||||
|
int maxConcurrent = Math.max(2, (int) (requiredThreads * 1.5));
|
||||||
|
|
||||||
|
log.debug("计算线程池参数 - cloudType={}, qps={}, requiredThreads={}, coreSize={}, maxSize={}, queue={}, maxConcurrent={}",
|
||||||
|
cloudType, qps, requiredThreads, coreSize, maxSize, queueCapacity, maxConcurrent);
|
||||||
|
|
||||||
|
return new ThreadPoolConfig(coreSize, maxSize, queueCapacity, maxConcurrent);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取所有调度器的监控信息
|
||||||
|
*/
|
||||||
|
public Map<String, AccountSchedulerStats> getAllStats() {
|
||||||
|
Map<String, AccountSchedulerStats> stats = new HashMap<>();
|
||||||
|
schedulers.forEach((key, ctx) -> {
|
||||||
|
stats.put(key, new AccountSchedulerStats(
|
||||||
|
ctx.getAccountKey(),
|
||||||
|
ctx.getCloudType(),
|
||||||
|
ctx.getExecutor().getActiveCount(),
|
||||||
|
ctx.getExecutor().getQueue().size(),
|
||||||
|
ctx.getScheduler().getQueueSize()
|
||||||
|
));
|
||||||
|
});
|
||||||
|
return stats;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 关闭所有调度器 (应用关闭时调用)
|
||||||
|
*/
|
||||||
|
public void shutdownAll() {
|
||||||
|
log.info("关闭所有账号调度器, total={}", schedulers.size());
|
||||||
|
schedulers.forEach((key, ctx) -> {
|
||||||
|
try {
|
||||||
|
ctx.getScheduler().shutdown();
|
||||||
|
ctx.getExecutor().shutdown();
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("关闭调度器失败, accountKey={}", key, e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 线程池配置
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@AllArgsConstructor
|
||||||
|
static class ThreadPoolConfig {
|
||||||
|
int coreSize;
|
||||||
|
int maxSize;
|
||||||
|
int queueCapacity;
|
||||||
|
int maxConcurrent;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 账号调度器统计信息
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@AllArgsConstructor
|
||||||
|
public static class AccountSchedulerStats {
|
||||||
|
String accountKey;
|
||||||
|
String cloudType;
|
||||||
|
int activeThreads;
|
||||||
|
int executorQueueSize;
|
||||||
|
int schedulerQueueSize;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,34 @@
|
|||||||
|
package com.ycwl.basic.integration.kafka.scheduler;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 账号调度器上下文
|
||||||
|
* 封装每个账号的线程池和QPS调度器
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class AccountSchedulerContext {
|
||||||
|
/**
|
||||||
|
* 账号唯一标识 (accessKeyId 或 appId)
|
||||||
|
*/
|
||||||
|
private String accountKey;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 云类型 ("ALI" 或 "BAIDU")
|
||||||
|
*/
|
||||||
|
private String cloudType;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 该账号专属的线程池
|
||||||
|
*/
|
||||||
|
private ThreadPoolExecutor executor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 该账号专属的QPS调度器
|
||||||
|
*/
|
||||||
|
private QpsScheduler scheduler;
|
||||||
|
}
|
||||||
@@ -0,0 +1,114 @@
|
|||||||
|
package com.ycwl.basic.integration.kafka.scheduler;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import java.util.concurrent.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* QPS 调度器
|
||||||
|
* 定期从队列取任务,严格控制 QPS
|
||||||
|
* 每秒调度固定数量的任务,确保不超过云端 API 的 QPS 限制
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public class QpsScheduler {
|
||||||
|
private final BlockingQueue<Runnable> taskQueue;
|
||||||
|
private final ThreadPoolExecutor workerPool;
|
||||||
|
private final ScheduledExecutorService scheduler;
|
||||||
|
private final int qps;
|
||||||
|
private final Semaphore concurrentLimiter; // 并发数限制,防止瞬时抖动
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 创建 QPS 调度器
|
||||||
|
*
|
||||||
|
* @param qps 每秒允许的最大请求数
|
||||||
|
* @param maxConcurrent 最大并发数
|
||||||
|
* @param workerPool 工作线程池
|
||||||
|
*/
|
||||||
|
public QpsScheduler(int qps, int maxConcurrent, ThreadPoolExecutor workerPool) {
|
||||||
|
this.qps = qps;
|
||||||
|
this.taskQueue = new LinkedBlockingQueue<>();
|
||||||
|
this.workerPool = workerPool;
|
||||||
|
this.scheduler = new ScheduledThreadPoolExecutor(1, r -> {
|
||||||
|
Thread thread = new Thread(r);
|
||||||
|
thread.setName("qps-scheduler-" + workerPool.getThreadFactory().newThread(() -> {}).getName());
|
||||||
|
thread.setDaemon(true);
|
||||||
|
return thread;
|
||||||
|
});
|
||||||
|
this.concurrentLimiter = new Semaphore(maxConcurrent);
|
||||||
|
|
||||||
|
// 每秒调度一次,取 qps 个任务
|
||||||
|
scheduler.scheduleAtFixedRate(this::dispatch, 0, 1, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
log.info("QPS调度器已启动: qps={}, maxConcurrent={}", qps, maxConcurrent);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 调度任务
|
||||||
|
* 每秒执行一次,从队列中取出 qps 个任务提交到工作线程池
|
||||||
|
*/
|
||||||
|
private void dispatch() {
|
||||||
|
int dispatched = 0;
|
||||||
|
for (int i = 0; i < qps; i++) {
|
||||||
|
Runnable task = taskQueue.poll();
|
||||||
|
if (task == null) {
|
||||||
|
break; // 队列为空,结束本次调度
|
||||||
|
}
|
||||||
|
|
||||||
|
// 检查并发数限制
|
||||||
|
if (concurrentLimiter.tryAcquire()) {
|
||||||
|
try {
|
||||||
|
workerPool.execute(() -> {
|
||||||
|
try {
|
||||||
|
task.run();
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("任务执行失败", e);
|
||||||
|
} finally {
|
||||||
|
concurrentLimiter.release();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
dispatched++;
|
||||||
|
} catch (RejectedExecutionException e) {
|
||||||
|
// 线程池拒绝,释放并发许可,任务丢弃
|
||||||
|
concurrentLimiter.release();
|
||||||
|
log.warn("任务被线程池拒绝", e);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// 并发数已满,任务放回队列,等待下次调度
|
||||||
|
taskQueue.offer(task);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dispatched > 0 || taskQueue.size() > 0) {
|
||||||
|
log.debug("QPS调度完成: dispatched={}, remainQueue={}, availableConcurrent={}",
|
||||||
|
dispatched, taskQueue.size(), concurrentLimiter.availablePermits());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 提交任务到调度队列
|
||||||
|
*
|
||||||
|
* @param task 待执行的任务
|
||||||
|
* @return 是否成功提交
|
||||||
|
*/
|
||||||
|
public boolean submit(Runnable task) {
|
||||||
|
return taskQueue.offer(task);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取队列中等待调度的任务数量
|
||||||
|
*
|
||||||
|
* @return 队列大小
|
||||||
|
*/
|
||||||
|
public int getQueueSize() {
|
||||||
|
return taskQueue.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 关闭调度器
|
||||||
|
*/
|
||||||
|
public void shutdown() {
|
||||||
|
scheduler.shutdown();
|
||||||
|
log.info("QPS调度器已关闭, qps={}", qps);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,17 +1,21 @@
|
|||||||
package com.ycwl.basic.integration.kafka.service;
|
package com.ycwl.basic.integration.kafka.service;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.ycwl.basic.facebody.adapter.AliFaceBodyAdapter;
|
||||||
|
import com.ycwl.basic.facebody.adapter.BceFaceBodyAdapter;
|
||||||
import com.ycwl.basic.facebody.adapter.IFaceBodyAdapter;
|
import com.ycwl.basic.facebody.adapter.IFaceBodyAdapter;
|
||||||
import com.ycwl.basic.facebody.entity.AddFaceResp;
|
import com.ycwl.basic.facebody.entity.AddFaceResp;
|
||||||
|
import com.ycwl.basic.facebody.entity.AliFaceBodyConfig;
|
||||||
|
import com.ycwl.basic.facebody.entity.BceFaceBodyConfig;
|
||||||
|
import com.ycwl.basic.integration.common.manager.DeviceConfigManager;
|
||||||
import com.ycwl.basic.integration.kafka.dto.FaceProcessingMessage;
|
import com.ycwl.basic.integration.kafka.dto.FaceProcessingMessage;
|
||||||
|
import com.ycwl.basic.integration.kafka.scheduler.AccountFaceSchedulerManager;
|
||||||
|
import com.ycwl.basic.integration.kafka.scheduler.AccountSchedulerContext;
|
||||||
import com.ycwl.basic.mapper.FaceSampleMapper;
|
import com.ycwl.basic.mapper.FaceSampleMapper;
|
||||||
import com.ycwl.basic.model.pc.faceSample.entity.FaceSampleEntity;
|
import com.ycwl.basic.model.pc.faceSample.entity.FaceSampleEntity;
|
||||||
import com.ycwl.basic.repository.DeviceRepository;
|
import com.ycwl.basic.repository.DeviceRepository;
|
||||||
import com.ycwl.basic.service.pc.ScenicService;
|
import com.ycwl.basic.service.pc.ScenicService;
|
||||||
import com.ycwl.basic.service.task.TaskFaceService;
|
import com.ycwl.basic.service.task.TaskFaceService;
|
||||||
import com.ycwl.basic.task.DynamicTaskGenerator;
|
import com.ycwl.basic.task.DynamicTaskGenerator;
|
||||||
import com.ycwl.basic.integration.common.manager.DeviceConfigManager;
|
|
||||||
// 不再需要SnowFlakeUtil,使用外部传入的ID
|
|
||||||
import com.ycwl.basic.utils.JacksonUtil;
|
import com.ycwl.basic.utils.JacksonUtil;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
@@ -20,13 +24,16 @@ import org.springframework.kafka.annotation.KafkaListener;
|
|||||||
import org.springframework.kafka.support.Acknowledgment;
|
import org.springframework.kafka.support.Acknowledgment;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import java.time.ZoneId;
|
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 人脸处理Kafka消费服务
|
* 人脸处理Kafka消费服务
|
||||||
* 消费外部系统发送到zt-face topic的消息
|
* 消费外部系统发送到zt-face topic的消息
|
||||||
|
* <p>
|
||||||
|
* 核心改进:
|
||||||
|
* 1. 按账号(accessKeyId/appId)隔离线程池和QPS调度器
|
||||||
|
* 2. 确保数据库优先写入,状态流转清晰
|
||||||
|
* 3. 严格QPS控制,线程资源高效利用
|
||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Service
|
@Service
|
||||||
@@ -40,51 +47,144 @@ public class FaceProcessingKafkaService {
|
|||||||
private final TaskFaceService taskFaceService;
|
private final TaskFaceService taskFaceService;
|
||||||
private final ScenicService scenicService;
|
private final ScenicService scenicService;
|
||||||
private final DeviceRepository deviceRepository;
|
private final DeviceRepository deviceRepository;
|
||||||
private final ThreadPoolExecutor faceRecognitionExecutor;
|
private final AccountFaceSchedulerManager schedulerManager;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 消费外部系统发送的人脸处理消息
|
* 消费外部系统发送的人脸处理消息
|
||||||
* 先保存人脸样本数据,再进行异步人脸识别处理
|
* 核心流程:
|
||||||
|
* 1. 同步写入数据库 (最高优先级)
|
||||||
|
* 2. 获取账号调度器上下文
|
||||||
|
* 3. 提交到账号专属调度器队列
|
||||||
*/
|
*/
|
||||||
@KafkaListener(topics = ZT_FACE_TOPIC, containerFactory = "manualCommitKafkaListenerContainerFactory")
|
@KafkaListener(topics = ZT_FACE_TOPIC, containerFactory = "manualCommitKafkaListenerContainerFactory")
|
||||||
public void processFaceMessage(String message, Acknowledgment ack) {
|
public void processFaceMessage(String message, Acknowledgment ack) {
|
||||||
|
Long faceSampleId = null;
|
||||||
try {
|
try {
|
||||||
FaceProcessingMessage faceMessage = JacksonUtil.parseObject(message, FaceProcessingMessage.class);
|
FaceProcessingMessage faceMessage = JacksonUtil.parseObject(message, FaceProcessingMessage.class);
|
||||||
log.debug("接收到外部人脸处理消息, scenicId: {}, deviceId: {}, faceUrl: {}",
|
faceSampleId = faceMessage.getFaceSampleId();
|
||||||
faceMessage.getScenicId(), faceMessage.getDeviceId(), faceMessage.getFaceUrl());
|
|
||||||
|
|
||||||
// 使用外部传入的faceSampleId
|
log.debug("接收人脸消息: scenicId={}, deviceId={}, faceSampleId={}",
|
||||||
Long externalFaceId = faceMessage.getFaceSampleId();
|
faceMessage.getScenicId(), faceMessage.getDeviceId(), faceSampleId);
|
||||||
if (externalFaceId == null) {
|
|
||||||
log.error("外部消息中未包含faceSampleId");
|
// ========== 第一步: 同步写入数据库 (最高优先级) ==========
|
||||||
// 即使消息格式错误,也消费消息避免重复处理
|
boolean saved = saveFaceSample(faceMessage, faceSampleId);
|
||||||
|
if (!saved) {
|
||||||
|
log.error("❌ 数据库写入失败, 不提交识别任务, faceSampleId={}", faceSampleId);
|
||||||
|
// 数据库写入失败,消费消息避免重复
|
||||||
ack.acknowledge();
|
ack.acknowledge();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 先保存人脸样本数据
|
|
||||||
boolean saved = saveFaceSample(faceMessage, externalFaceId);
|
|
||||||
|
|
||||||
// 然后异步进行人脸识别处理(使用专用线程池)
|
log.debug("✅ 数据库写入成功, faceSampleId={}, status=0", faceSampleId);
|
||||||
if (saved) {
|
|
||||||
faceRecognitionExecutor.execute(() -> processFaceRecognitionAsync(faceMessage));
|
// ========== 第二步: 获取账号调度器上下文 ==========
|
||||||
log.debug("人脸识别任务已提交至线程池, faceSampleId: {}, 活跃线程: {}, 队列大小: {}",
|
AccountSchedulerContext schedulerCtx = getSchedulerContextForScenic(faceMessage.getScenicId());
|
||||||
externalFaceId, faceRecognitionExecutor.getActiveCount(),
|
if (schedulerCtx == null) {
|
||||||
faceRecognitionExecutor.getQueue().size());
|
log.error("❌ 无法获取调度器上下文, faceSampleId={}", faceSampleId);
|
||||||
} else {
|
updateFaceSampleStatusSafely(faceSampleId, -1);
|
||||||
log.warn("人脸样本保存失败,但消息仍将被消费, faceSampleId: {}", externalFaceId);
|
ack.acknowledge();
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 无论处理是否成功,都消费消息
|
// ========== 第三步: 提交到账号专属调度器 ==========
|
||||||
|
boolean submitted = schedulerCtx.getScheduler().submit(() -> {
|
||||||
|
processFaceRecognitionAsync(faceMessage);
|
||||||
|
});
|
||||||
|
|
||||||
|
if (submitted) {
|
||||||
|
log.debug("✅ 任务已提交到调度器, account={}, cloudType={}, faceSampleId={}, schedulerQueue={}",
|
||||||
|
schedulerCtx.getAccountKey(),
|
||||||
|
schedulerCtx.getCloudType(),
|
||||||
|
faceSampleId,
|
||||||
|
schedulerCtx.getScheduler().getQueueSize());
|
||||||
|
} else {
|
||||||
|
log.error("❌ 调度器队列已满, account={}, faceSampleId={}",
|
||||||
|
schedulerCtx.getAccountKey(), faceSampleId);
|
||||||
|
updateFaceSampleStatusSafely(faceSampleId, -1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 无论成功失败,都消费消息
|
||||||
ack.acknowledge();
|
ack.acknowledge();
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("处理外部人脸消息失败: {}", e.getMessage(), e);
|
log.error("❌ 处理人脸消息异常, faceSampleId={}", faceSampleId, e);
|
||||||
// 即使发生异常也消费消息,避免消息堆积
|
if (faceSampleId != null) {
|
||||||
|
updateFaceSampleStatusSafely(faceSampleId, -1);
|
||||||
|
}
|
||||||
ack.acknowledge();
|
ack.acknowledge();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 根据景区获取对应的账号调度器上下文
|
||||||
|
* 关键: 按 accessKeyId/appId 隔离,而非按云类型
|
||||||
|
*/
|
||||||
|
private AccountSchedulerContext getSchedulerContextForScenic(Long scenicId) {
|
||||||
|
try {
|
||||||
|
// 获取景区的 adapter
|
||||||
|
IFaceBodyAdapter adapter = scenicService.getScenicFaceBodyAdapter(scenicId);
|
||||||
|
if (adapter == null) {
|
||||||
|
log.error("景区 adapter 不存在, scenicId={}", scenicId);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 提取账号信息和QPS配置
|
||||||
|
if (adapter instanceof AliFaceBodyAdapter aliAdapter) {
|
||||||
|
AliFaceBodyConfig config = aliAdapter.getConfig();
|
||||||
|
|
||||||
|
if (config == null || config.getAccessKeyId() == null) {
|
||||||
|
log.error("阿里云配置为空, scenicId={}", scenicId);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 使用 accessKeyId 作为唯一标识
|
||||||
|
String accountKey = config.getAccessKeyId();
|
||||||
|
float qps = 2.0f; // 阿里云固定 2 QPS (AddFace操作)
|
||||||
|
|
||||||
|
return schedulerManager.getOrCreateScheduler(accountKey, "ALI", qps);
|
||||||
|
|
||||||
|
} else if (adapter instanceof BceFaceBodyAdapter baiduAdapter) {
|
||||||
|
BceFaceBodyConfig config = baiduAdapter.getConfig();
|
||||||
|
|
||||||
|
if (config == null || config.getAppId() == null) {
|
||||||
|
log.error("百度云配置为空, scenicId={}", scenicId);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 使用 appId 作为唯一标识
|
||||||
|
String accountKey = config.getAppId();
|
||||||
|
float qps = config.getAddQps(); // 百度云可配置 QPS
|
||||||
|
|
||||||
|
return schedulerManager.getOrCreateScheduler(accountKey, "BAIDU", qps);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
log.error("未知的 adapter 类型: {}", adapter.getClass().getName());
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("获取调度器上下文失败, scenicId={}", scenicId, e);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 安全地更新人脸样本状态
|
||||||
|
* 捕获所有异常,确保状态更新失败不影响主流程
|
||||||
|
*/
|
||||||
|
private void updateFaceSampleStatusSafely(Long faceSampleId, Integer status) {
|
||||||
|
if (faceSampleId == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
faceSampleMapper.updateStatus(faceSampleId, status);
|
||||||
|
log.debug("状态更新成功: faceSampleId={}, status={}", faceSampleId, status);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("⚠️ 状态更新失败(非致命): faceSampleId={}, status={}", faceSampleId, status, e);
|
||||||
|
// 不抛出异常,避免影响消息消费
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 保存人脸样本数据到数据库
|
* 保存人脸样本数据到数据库
|
||||||
* @param faceMessage 人脸处理消息
|
* @param faceMessage 人脸处理消息
|
||||||
@@ -126,69 +226,59 @@ public class FaceProcessingKafkaService {
|
|||||||
private void processFaceRecognitionAsync(FaceProcessingMessage message) {
|
private void processFaceRecognitionAsync(FaceProcessingMessage message) {
|
||||||
Long faceSampleId = message.getFaceSampleId();
|
Long faceSampleId = message.getFaceSampleId();
|
||||||
Long scenicId = message.getScenicId();
|
Long scenicId = message.getScenicId();
|
||||||
String faceUrl = message.getFaceUrl();
|
|
||||||
|
|
||||||
// 直接使用faceSampleId作为唯一标识
|
|
||||||
String faceUniqueId = faceSampleId.toString();
|
|
||||||
|
|
||||||
// 获取人脸识别适配器
|
|
||||||
IFaceBodyAdapter faceBodyAdapter = scenicService.getScenicFaceBodyAdapter(scenicId);
|
|
||||||
if (faceBodyAdapter == null) {
|
|
||||||
log.error("人脸识别适配器不存在, scenicId: {}", scenicId);
|
|
||||||
updateFaceSampleStatus(faceSampleId, -1);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// 更新状态为处理中
|
// ========== 第一步: 更新状态为"处理中" ==========
|
||||||
updateFaceSampleStatus(faceSampleId, 1);
|
updateFaceSampleStatusSafely(faceSampleId, 1);
|
||||||
|
log.debug("开始人脸识别, faceSampleId={}, status=1", faceSampleId);
|
||||||
|
|
||||||
// 确保人脸数据库存在
|
// ========== 第二步: 获取 adapter ==========
|
||||||
taskFaceService.assureFaceDb(faceBodyAdapter, scenicId.toString());
|
IFaceBodyAdapter adapter = scenicService.getScenicFaceBodyAdapter(scenicId);
|
||||||
|
if (adapter == null) {
|
||||||
|
log.error("adapter 不存在, scenicId={}, faceSampleId={}", scenicId, faceSampleId);
|
||||||
|
updateFaceSampleStatusSafely(faceSampleId, -1);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// 添加人脸到识别服务(使用faceSampleId作为唯一标识)
|
// ========== 第三步: 确保人脸数据库存在 ==========
|
||||||
AddFaceResp addFaceResp = faceBodyAdapter.addFace(
|
taskFaceService.assureFaceDb(adapter, scenicId.toString());
|
||||||
scenicId.toString(),
|
|
||||||
faceSampleId.toString(),
|
// ========== 第四步: 调用 addFace (QPS已由调度器控制) ==========
|
||||||
faceUrl,
|
String faceUniqueId = faceSampleId.toString();
|
||||||
faceUniqueId // 即faceSampleId.toString()
|
AddFaceResp addFaceResp = adapter.addFace(
|
||||||
|
scenicId.toString(),
|
||||||
|
faceSampleId.toString(),
|
||||||
|
message.getFaceUrl(),
|
||||||
|
faceUniqueId
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// ========== 第五步: 更新识别结果 ==========
|
||||||
if (addFaceResp != null) {
|
if (addFaceResp != null) {
|
||||||
// 更新人脸样本得分和状态
|
// 成功: 更新 score 和状态
|
||||||
faceSampleMapper.updateScore(faceSampleId, addFaceResp.getScore());
|
faceSampleMapper.updateScore(faceSampleId, addFaceResp.getScore());
|
||||||
updateFaceSampleStatus(faceSampleId, 2);
|
updateFaceSampleStatusSafely(faceSampleId, 2);
|
||||||
log.debug("人脸识别处理成功, faceSampleId: {}", faceSampleId);
|
|
||||||
|
|
||||||
// 查询设备配置,判断是否启用预订功能
|
log.info("✅ 人脸识别成功, faceSampleId={}, score={}, status=2",
|
||||||
|
faceSampleId, addFaceResp.getScore());
|
||||||
|
|
||||||
|
// 可选: 触发预订任务
|
||||||
Long deviceId = message.getDeviceId();
|
Long deviceId = message.getDeviceId();
|
||||||
DeviceConfigManager deviceConfig = deviceRepository.getDeviceConfigManager(deviceId);
|
DeviceConfigManager deviceConfig = deviceRepository.getDeviceConfigManager(deviceId);
|
||||||
if (deviceConfig != null &&
|
if (deviceConfig != null &&
|
||||||
Integer.valueOf(1).equals(deviceConfig.getInteger("enable_pre_book"))) {
|
Integer.valueOf(1).equals(deviceConfig.getInteger("enable_pre_book"))) {
|
||||||
DynamicTaskGenerator.addTask(faceSampleId);
|
DynamicTaskGenerator.addTask(faceSampleId);
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
log.warn("人脸添加返回空结果, faceSampleId: {}", faceSampleId);
|
// addFace 返回 null,识别失败
|
||||||
updateFaceSampleStatus(faceSampleId, -1);
|
log.warn("⚠️ addFace 返回 null, faceSampleId={}", faceSampleId);
|
||||||
|
updateFaceSampleStatusSafely(faceSampleId, -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("人脸识别处理失败, faceSampleId: {}, error: {}",
|
// ========== 异常处理: 更新状态为失败 ==========
|
||||||
faceSampleId, e.getMessage(), e);
|
log.error("❌ 人脸识别异常, faceSampleId={}", faceSampleId, e);
|
||||||
|
updateFaceSampleStatusSafely(faceSampleId, -1);
|
||||||
// 标记人脸样本为处理失败状态
|
|
||||||
updateFaceSampleStatus(faceSampleId, -1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 更新人脸样本状态
|
|
||||||
*/
|
|
||||||
private void updateFaceSampleStatus(Long faceSampleId, Integer status) {
|
|
||||||
try {
|
|
||||||
faceSampleMapper.updateStatus(faceSampleId, status);
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("更新人脸样本状态失败, faceSampleId: {}", faceSampleId, e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -226,14 +316,27 @@ public class FaceProcessingKafkaService {
|
|||||||
.source("retry-manual")
|
.source("retry-manual")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
// 提交到线程池进行异步处理
|
// 获取账号调度器上下文
|
||||||
faceRecognitionExecutor.execute(() -> processFaceRecognitionAsync(message));
|
AccountSchedulerContext schedulerCtx = getSchedulerContextForScenic(faceSample.getScenicId());
|
||||||
|
if (schedulerCtx == null) {
|
||||||
|
log.error("无法获取调度器上下文, faceSampleId={}", faceSampleId);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
log.info("人脸识别重试任务已提交, faceSampleId: {}, 活跃线程: {}, 队列大小: {}",
|
// 提交到调度器进行异步处理
|
||||||
faceSampleId, faceRecognitionExecutor.getActiveCount(),
|
boolean submitted = schedulerCtx.getScheduler().submit(() -> processFaceRecognitionAsync(message));
|
||||||
faceRecognitionExecutor.getQueue().size());
|
|
||||||
|
|
||||||
return true;
|
if (submitted) {
|
||||||
|
log.info("人脸识别重试任务已提交, faceSampleId={}, account={}, cloudType={}, schedulerQueue={}",
|
||||||
|
faceSampleId,
|
||||||
|
schedulerCtx.getAccountKey(),
|
||||||
|
schedulerCtx.getCloudType(),
|
||||||
|
schedulerCtx.getScheduler().getQueueSize());
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
log.error("调度器队列已满,重试任务被拒绝, faceSampleId={}", faceSampleId);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("提交人脸识别重试任务失败, faceSampleId: {}", faceSampleId, e);
|
log.error("提交人脸识别重试任务失败, faceSampleId: {}", faceSampleId, e);
|
||||||
|
|||||||
Reference in New Issue
Block a user