feat(face): 实现账号级人脸识别调度器

- 新增账号级别调度器管理器,支持多账号QPS隔离控制
- 为阿里云和百度云适配器添加配置getter方法
- 移除原有阻塞式限流逻辑,交由外层调度器统一管控
- 创建QPS调度器实现精确的任务频率控制
- 新增监控接口用于查询各账号调度器运行状态
- 重构人脸识别Kafka消费服务,集成账号调度机制
- 优化线程池资源配置,提升多账号并发处理效率
- 增强错误处理与状态更新的安全性
- 删除旧版全局线程池配置类
- 完善任务提交与状态流转的日志记录
This commit is contained in:
2025-11-29 23:50:24 +08:00
parent 1de760fc87
commit b92568b842
8 changed files with 569 additions and 140 deletions

View File

@@ -0,0 +1,180 @@
package com.ycwl.basic.integration.kafka.scheduler;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 账号级别的人脸识别调度器管理
* 每个账号(accessKeyId/appId)拥有独立的:
* 1. 线程池 - 资源隔离
* 2. QPS调度器 - 精确控制每个账号的QPS
* 3. 任务队列 - 独立排队
* <p>
* 核心优势:
* - 多个阿里云账号互不影响,充分利用多账号QPS优势
* - 百度云和阿里云任务完全隔离
* - 每个账号严格按自己的QPS限制调度
*/
@Slf4j
@Component
public class AccountFaceSchedulerManager {
// 账号 -> 调度器上下文的映射
private final ConcurrentHashMap<String, AccountSchedulerContext> schedulers = new ConcurrentHashMap<>();
/**
* 获取或创建账号的调度器上下文
*
* @param accountKey 账号唯一标识 (accessKeyId 或 appId)
* @param cloudType 云类型 ("ALI" 或 "BAIDU")
* @param qps 该账号的QPS限制
* @return 调度器上下文
*/
public AccountSchedulerContext getOrCreateScheduler(
String accountKey,
String cloudType,
float qps
) {
return schedulers.computeIfAbsent(accountKey, key -> {
log.info("创建账号调度器: accountKey={}, cloudType={}, qps={}",
accountKey, cloudType, qps);
return createSchedulerContext(accountKey, cloudType, qps);
});
}
/**
* 创建调度器上下文
*/
private AccountSchedulerContext createSchedulerContext(
String accountKey,
String cloudType,
float qps
) {
// 根据云类型和QPS计算线程池参数
ThreadPoolConfig poolConfig = calculateThreadPoolConfig(cloudType, qps);
// 创建独立线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
poolConfig.coreSize,
poolConfig.maxSize,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(poolConfig.queueCapacity),
new ThreadFactoryBuilder()
.setNameFormat(cloudType.toLowerCase() + "-" + accountKey.substring(0, Math.min(8, accountKey.length())) + "-%d")
.build(),
new ThreadPoolExecutor.AbortPolicy() // 快速失败,避免阻塞
);
// 创建QPS调度器
QpsScheduler scheduler = new QpsScheduler(
Math.round(qps), // 每秒调度的任务数
poolConfig.maxConcurrent, // 最大并发数
executor
);
log.info("账号调度器创建成功: accountKey={}, threadPool=[core={}, max={}, queue={}], qps={}, maxConcurrent={}",
accountKey,
poolConfig.coreSize,
poolConfig.maxSize,
poolConfig.queueCapacity,
Math.round(qps),
poolConfig.maxConcurrent);
return new AccountSchedulerContext(accountKey, cloudType, executor, scheduler);
}
/**
* 根据云类型和QPS计算线程池参数
*/
private ThreadPoolConfig calculateThreadPoolConfig(String cloudType, float qps) {
// 假设每个任务平均执行时间 500ms
int avgExecutionTimeMs = 500;
// 所需线程数 = QPS × 平均执行时间(秒)
int requiredThreads = Math.max(1, (int) Math.ceil(qps * avgExecutionTimeMs / 1000.0));
// 核心线程数 = 所需线程数 × 2 (留有余量)
int coreSize = requiredThreads * 2;
// 最大线程数 = 核心线程数 × 2
int maxSize = coreSize * 2;
// 队列容量 = QPS × 60 (可容纳1分钟的任务)
int queueCapacity = Math.max(100, (int) (qps * 60));
// 最大并发数 = 所需线程数 × 1.5 (防止瞬时抖动)
int maxConcurrent = Math.max(2, (int) (requiredThreads * 1.5));
log.debug("计算线程池参数 - cloudType={}, qps={}, requiredThreads={}, coreSize={}, maxSize={}, queue={}, maxConcurrent={}",
cloudType, qps, requiredThreads, coreSize, maxSize, queueCapacity, maxConcurrent);
return new ThreadPoolConfig(coreSize, maxSize, queueCapacity, maxConcurrent);
}
/**
* 获取所有调度器的监控信息
*/
public Map<String, AccountSchedulerStats> getAllStats() {
Map<String, AccountSchedulerStats> stats = new HashMap<>();
schedulers.forEach((key, ctx) -> {
stats.put(key, new AccountSchedulerStats(
ctx.getAccountKey(),
ctx.getCloudType(),
ctx.getExecutor().getActiveCount(),
ctx.getExecutor().getQueue().size(),
ctx.getScheduler().getQueueSize()
));
});
return stats;
}
/**
* 关闭所有调度器 (应用关闭时调用)
*/
public void shutdownAll() {
log.info("关闭所有账号调度器, total={}", schedulers.size());
schedulers.forEach((key, ctx) -> {
try {
ctx.getScheduler().shutdown();
ctx.getExecutor().shutdown();
} catch (Exception e) {
log.error("关闭调度器失败, accountKey={}", key, e);
}
});
}
/**
* 线程池配置
*/
@Data
@AllArgsConstructor
static class ThreadPoolConfig {
int coreSize;
int maxSize;
int queueCapacity;
int maxConcurrent;
}
/**
* 账号调度器统计信息
*/
@Data
@AllArgsConstructor
public static class AccountSchedulerStats {
String accountKey;
String cloudType;
int activeThreads;
int executorQueueSize;
int schedulerQueueSize;
}
}

View File

@@ -0,0 +1,34 @@
package com.ycwl.basic.integration.kafka.scheduler;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 账号调度器上下文
* 封装每个账号的线程池和QPS调度器
*/
@Data
@AllArgsConstructor
public class AccountSchedulerContext {
/**
* 账号唯一标识 (accessKeyId 或 appId)
*/
private String accountKey;
/**
* 云类型 ("ALI" 或 "BAIDU")
*/
private String cloudType;
/**
* 该账号专属的线程池
*/
private ThreadPoolExecutor executor;
/**
* 该账号专属的QPS调度器
*/
private QpsScheduler scheduler;
}

View File

@@ -0,0 +1,114 @@
package com.ycwl.basic.integration.kafka.scheduler;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
/**
* QPS 调度器
* 定期从队列取任务,严格控制 QPS
* 每秒调度固定数量的任务,确保不超过云端 API 的 QPS 限制
*/
@Slf4j
public class QpsScheduler {
private final BlockingQueue<Runnable> taskQueue;
private final ThreadPoolExecutor workerPool;
private final ScheduledExecutorService scheduler;
private final int qps;
private final Semaphore concurrentLimiter; // 并发数限制,防止瞬时抖动
/**
* 创建 QPS 调度器
*
* @param qps 每秒允许的最大请求数
* @param maxConcurrent 最大并发数
* @param workerPool 工作线程池
*/
public QpsScheduler(int qps, int maxConcurrent, ThreadPoolExecutor workerPool) {
this.qps = qps;
this.taskQueue = new LinkedBlockingQueue<>();
this.workerPool = workerPool;
this.scheduler = new ScheduledThreadPoolExecutor(1, r -> {
Thread thread = new Thread(r);
thread.setName("qps-scheduler-" + workerPool.getThreadFactory().newThread(() -> {}).getName());
thread.setDaemon(true);
return thread;
});
this.concurrentLimiter = new Semaphore(maxConcurrent);
// 每秒调度一次,取 qps 个任务
scheduler.scheduleAtFixedRate(this::dispatch, 0, 1, TimeUnit.SECONDS);
log.info("QPS调度器已启动: qps={}, maxConcurrent={}", qps, maxConcurrent);
}
/**
* 调度任务
* 每秒执行一次,从队列中取出 qps 个任务提交到工作线程池
*/
private void dispatch() {
int dispatched = 0;
for (int i = 0; i < qps; i++) {
Runnable task = taskQueue.poll();
if (task == null) {
break; // 队列为空,结束本次调度
}
// 检查并发数限制
if (concurrentLimiter.tryAcquire()) {
try {
workerPool.execute(() -> {
try {
task.run();
} catch (Exception e) {
log.error("任务执行失败", e);
} finally {
concurrentLimiter.release();
}
});
dispatched++;
} catch (RejectedExecutionException e) {
// 线程池拒绝,释放并发许可,任务丢弃
concurrentLimiter.release();
log.warn("任务被线程池拒绝", e);
}
} else {
// 并发数已满,任务放回队列,等待下次调度
taskQueue.offer(task);
break;
}
}
if (dispatched > 0 || taskQueue.size() > 0) {
log.debug("QPS调度完成: dispatched={}, remainQueue={}, availableConcurrent={}",
dispatched, taskQueue.size(), concurrentLimiter.availablePermits());
}
}
/**
* 提交任务到调度队列
*
* @param task 待执行的任务
* @return 是否成功提交
*/
public boolean submit(Runnable task) {
return taskQueue.offer(task);
}
/**
* 获取队列中等待调度的任务数量
*
* @return 队列大小
*/
public int getQueueSize() {
return taskQueue.size();
}
/**
* 关闭调度器
*/
public void shutdown() {
scheduler.shutdown();
log.info("QPS调度器已关闭, qps={}", qps);
}
}