Compare commits

...

3 Commits

Author SHA1 Message Date
1727619b29 refactor(kafka): 将人脸识别处理改为异步执行- 引入CompletableFuture实现异步处理
All checks were successful
ZhenTu-BE/pipeline/head This commit looks good
- 修改processFaceRecognition方法为异步版本
- 移除原同步处理中的try-catch块
- 更新方法返回类型从boolean改为void-保留处理成功和失败的状态更新逻辑- 添加异步处理成功后的日志记录
2025-10-04 10:12:37 +08:00
3099e68a97 refactor(logging): 调整人脸处理服务中的日志级别
- 将接收到人脸消息的日志级别从 info 调整为 debug
- 移除了部分冗余的 info 级别日志输出
- 统一异常处理中的日志记录方式
-优化日志内容,减少不必要的信息输出
- 确保关键操作仍然保留适当日志记录- 提升系统在高并发下的日志可读性与性能
2025-10-03 13:46:22 +08:00
db86c82bc8 refactor(task):优化视频片段获取逻辑并增强日志记录
- 移除任务执行前的空列表检查,统一通过VideoPieceGetter.addTask处理
- 增强Placeholder初始化阶段的日志输出,区分有无templateId情况- 细化计数器递减过程中的日志信息,记录设备关联及剩余数量
- 完善进度检查时的日志内容,增加已完成与未完成的统计显示- 补充Callback调用条件判断,避免重复触发并记录调用状态
- 添加兜底逻辑中对Callback是否已触发的判断和相应日志提示
2025-10-01 22:01:34 +08:00
5 changed files with 112 additions and 54 deletions

View File

@@ -0,0 +1,49 @@
package com.ycwl.basic.integration.kafka.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 人脸识别异步处理线程池配置
*/
@Slf4j
@Configuration
@ConditionalOnProperty(name = "kafka.enabled", havingValue = "true")
public class FaceRecognitionThreadPoolConfig {
/**
* 创建人脸识别专用线程池
* - 核心线程数:32
* - 最大线程数:128
* - 队列容量:1000(避免无限制增长)
* - 拒绝策略:CallerRunsPolicy(调用者线程执行)
*/
@Bean(name = "faceRecognitionExecutor", destroyMethod = "shutdown")
public ThreadPoolExecutor faceRecognitionExecutor() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
32, // 核心线程数
128, // 最大线程数
60L, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(1000), // 任务队列
r -> {
Thread thread = new Thread(r);
thread.setName("face-recognition-" + thread.getId());
thread.setDaemon(false);
return thread;
},
new ThreadPoolExecutor.CallerRunsPolicy() // 超过容量时由调用者线程执行
);
log.info("人脸识别线程池初始化完成 - 核心线程数: {}, 最大线程数: {}, 队列容量: 1000",
executor.getCorePoolSize(), executor.getMaximumPoolSize());
return executor;
}
}

View File

@@ -22,6 +22,7 @@ import org.springframework.stereotype.Service;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.Date; import java.util.Date;
import java.util.concurrent.ThreadPoolExecutor;
/** /**
* 人脸处理Kafka消费服务 * 人脸处理Kafka消费服务
@@ -39,6 +40,7 @@ public class FaceProcessingKafkaService {
private final TaskFaceService taskFaceService; private final TaskFaceService taskFaceService;
private final ScenicService scenicService; private final ScenicService scenicService;
private final DeviceRepository deviceRepository; private final DeviceRepository deviceRepository;
private final ThreadPoolExecutor faceRecognitionExecutor;
/** /**
* 消费外部系统发送的人脸处理消息 * 消费外部系统发送的人脸处理消息
@@ -48,7 +50,7 @@ public class FaceProcessingKafkaService {
public void processFaceMessage(String message, Acknowledgment ack) { public void processFaceMessage(String message, Acknowledgment ack) {
try { try {
FaceProcessingMessage faceMessage = JacksonUtil.parseObject(message, FaceProcessingMessage.class); FaceProcessingMessage faceMessage = JacksonUtil.parseObject(message, FaceProcessingMessage.class);
log.info("接收到外部人脸处理消息, scenicId: {}, deviceId: {}, faceUrl: {}", log.debug("接收到外部人脸处理消息, scenicId: {}, deviceId: {}, faceUrl: {}",
faceMessage.getScenicId(), faceMessage.getDeviceId(), faceMessage.getFaceUrl()); faceMessage.getScenicId(), faceMessage.getDeviceId(), faceMessage.getFaceUrl());
// 使用外部传入的faceSampleId // 使用外部传入的faceSampleId
@@ -63,31 +65,23 @@ public class FaceProcessingKafkaService {
// 先保存人脸样本数据 // 先保存人脸样本数据
boolean saved = saveFaceSample(faceMessage, externalFaceId); boolean saved = saveFaceSample(faceMessage, externalFaceId);
// 然后进行人脸识别处理 // 然后异步进行人脸识别处理(使用专用线程池)
if (saved) { if (saved) {
try { faceRecognitionExecutor.execute(() -> processFaceRecognitionAsync(faceMessage));
boolean processed = processFaceRecognition(faceMessage); log.debug("人脸识别任务已提交至线程池, faceSampleId: {}, 活跃线程: {}, 队列大小: {}",
if (processed) { externalFaceId, faceRecognitionExecutor.getActiveCount(),
log.info("人脸识别处理成功, faceSampleId: {}", externalFaceId); faceRecognitionExecutor.getQueue().size());
} else {
log.warn("人脸识别处理失败,但消息仍将被消费, faceSampleId: {}", externalFaceId);
}
} catch (Exception e) {
log.error("人脸识别处理异常,但消息仍将被消费, faceSampleId: {}, error: {}", externalFaceId, e.getMessage(), e);
}
} else { } else {
log.warn("人脸样本保存失败,但消息仍将被消费, faceSampleId: {}", externalFaceId); log.warn("人脸样本保存失败,但消息仍将被消费, faceSampleId: {}", externalFaceId);
} }
// 无论处理是否成功,都消费消息 // 无论处理是否成功,都消费消息
ack.acknowledge(); ack.acknowledge();
log.info("消息已消费, faceSampleId: {}", externalFaceId);
} catch (Exception e) { } catch (Exception e) {
log.error("处理外部人脸消息失败: {}", e.getMessage(), e); log.error("处理外部人脸消息失败: {}", e.getMessage(), e);
// 即使发生异常也消费消息,避免消息堆积 // 即使发生异常也消费消息,避免消息堆积
ack.acknowledge(); ack.acknowledge();
log.info("异常情况下消息已消费,避免重复处理");
} }
} }
@@ -116,8 +110,6 @@ public class FaceProcessingKafkaService {
// 保存到数据库 // 保存到数据库
faceSampleMapper.add(faceSample); faceSampleMapper.add(faceSample);
log.info("人脸样本数据已保存, 使用外部faceId: {}, scenicId: {}, deviceId: {}, faceUrl: {}",
externalFaceId, faceMessage.getScenicId(), faceMessage.getDeviceId(), faceMessage.getFaceUrl());
return true; return true;
} catch (Exception e) { } catch (Exception e) {
@@ -128,10 +120,10 @@ public class FaceProcessingKafkaService {
} }
/** /**
* 执行人脸识别处理逻辑 * 异步执行人脸识别处理逻辑
* 对已保存的人脸样本进行识别处理 * 对已保存的人脸样本进行识别处理
*/ */
private boolean processFaceRecognition(FaceProcessingMessage message) { private void processFaceRecognitionAsync(FaceProcessingMessage message) {
Long faceSampleId = message.getFaceSampleId(); Long faceSampleId = message.getFaceSampleId();
Long scenicId = message.getScenicId(); Long scenicId = message.getScenicId();
String faceUrl = message.getFaceUrl(); String faceUrl = message.getFaceUrl();
@@ -144,7 +136,7 @@ public class FaceProcessingKafkaService {
if (faceBodyAdapter == null) { if (faceBodyAdapter == null) {
log.error("人脸识别适配器不存在, scenicId: {}", scenicId); log.error("人脸识别适配器不存在, scenicId: {}", scenicId);
updateFaceSampleStatus(faceSampleId, -1); updateFaceSampleStatus(faceSampleId, -1);
return false; return;
} }
try { try {
@@ -166,8 +158,7 @@ public class FaceProcessingKafkaService {
// 更新人脸样本得分和状态 // 更新人脸样本得分和状态
faceSampleMapper.updateScore(faceSampleId, addFaceResp.getScore()); faceSampleMapper.updateScore(faceSampleId, addFaceResp.getScore());
updateFaceSampleStatus(faceSampleId, 2); updateFaceSampleStatus(faceSampleId, 2);
log.info("人脸识别处理成, faceSampleId: {}, score: {}", log.info("人脸识别处理成, faceSampleId: {}", faceSampleId);
faceSampleId, addFaceResp.getScore());
// 查询设备配置,判断是否启用预订功能 // 查询设备配置,判断是否启用预订功能
Long deviceId = message.getDeviceId(); Long deviceId = message.getDeviceId();
@@ -175,13 +166,10 @@ public class FaceProcessingKafkaService {
if (deviceConfig != null && if (deviceConfig != null &&
Integer.valueOf(1).equals(deviceConfig.getInteger("enable_pre_book"))) { Integer.valueOf(1).equals(deviceConfig.getInteger("enable_pre_book"))) {
DynamicTaskGenerator.addTask(faceSampleId); DynamicTaskGenerator.addTask(faceSampleId);
log.info("已添加到预订任务队列, faceSampleId: {}", faceSampleId);
} }
return true;
} else { } else {
log.warn("人脸添加返回空结果, faceSampleId: {}", faceSampleId); log.warn("人脸添加返回空结果, faceSampleId: {}", faceSampleId);
updateFaceSampleStatus(faceSampleId, -1); updateFaceSampleStatus(faceSampleId, -1);
return false;
} }
} catch (Exception e) { } catch (Exception e) {
@@ -190,7 +178,6 @@ public class FaceProcessingKafkaService {
// 标记人脸样本为处理失败状态 // 标记人脸样本为处理失败状态
updateFaceSampleStatus(faceSampleId, -1); updateFaceSampleStatus(faceSampleId, -1);
return false;
} }
} }
@@ -200,7 +187,6 @@ public class FaceProcessingKafkaService {
private void updateFaceSampleStatus(Long faceSampleId, Integer status) { private void updateFaceSampleStatus(Long faceSampleId, Integer status) {
try { try {
faceSampleMapper.updateStatus(faceSampleId, status); faceSampleMapper.updateStatus(faceSampleId, status);
log.info("人脸样本状态已更新, faceSampleId: {}, status: {} (0:初始,1:处理中,2:成功,-1:失败)", faceSampleId, status);
} catch (Exception e) { } catch (Exception e) {
log.error("更新人脸样本状态失败, faceSampleId: {}", faceSampleId, e); log.error("更新人脸样本状态失败, faceSampleId: {}", faceSampleId, e);
} }

View File

@@ -42,7 +42,7 @@ public class ZTSourceConsumerService {
sourceMessage = JacksonUtil.parseObject(message, ZTSourceMessage.class); sourceMessage = JacksonUtil.parseObject(message, ZTSourceMessage.class);
// 输出业务相关的日志信息 // 输出业务相关的日志信息
log.info("接收到ZT-Source消息, sourceId: {}, deviceId: {}, faceSampleId: {}", log.debug("接收到ZT-Source消息, sourceId: {}, deviceId: {}, faceSampleId: {}",
sourceMessage.getSourceId(), sourceMessage.getDeviceId(), sourceMessage.getFaceSampleId()); sourceMessage.getSourceId(), sourceMessage.getDeviceId(), sourceMessage.getFaceSampleId());
// 处理消息 // 处理消息

View File

@@ -457,11 +457,7 @@ public class TaskTaskServiceImpl implements TaskService {
taskStatusBiz.setFaceCutStatus(faceId, 2); taskStatusBiz.setFaceCutStatus(faceId, 2);
} }
}; };
if (!sourceList.isEmpty()) { VideoPieceGetter.addTask(task);
task.callback.onInvoke();
} else {
VideoPieceGetter.addTask(task);
}
} }
@Override @Override

View File

@@ -173,10 +173,18 @@ public class VideoPieceGetter {
templatePlaceholder.forEach(deviceId -> { templatePlaceholder.forEach(deviceId -> {
currentUnFinPlaceholder.computeIfAbsent(deviceId, k -> new AtomicInteger(0)).incrementAndGet(); currentUnFinPlaceholder.computeIfAbsent(deviceId, k -> new AtomicInteger(0)).incrementAndGet();
}); });
log.info("[Placeholder初始化] 有templateId,初始化完成:placeholder总数={}, 不同设备数={}, 详细计数={}",
templatePlaceholder.size(),
currentUnFinPlaceholder.size(),
currentUnFinPlaceholder.entrySet().stream()
.map(e -> e.getKey() + "=" + e.getValue().get())
.collect(Collectors.joining(", ")));
} else { } else {
collection.keySet().forEach(deviceId -> { collection.keySet().forEach(deviceId -> {
currentUnFinPlaceholder.put(deviceId.toString(), new AtomicInteger(1)); currentUnFinPlaceholder.put(deviceId.toString(), new AtomicInteger(1));
}); });
log.info("[Placeholder初始化] 无templateId,初始化完成:设备数={}",
currentUnFinPlaceholder.size());
} }
collection.values().forEach(faceSampleList -> { collection.values().forEach(faceSampleList -> {
executor.execute(() -> { executor.execute(() -> {
@@ -199,9 +207,15 @@ public class VideoPieceGetter {
doCut(pairDeviceId, faceSample.getId(), faceSample.getCreateAt(), task); doCut(pairDeviceId, faceSample.getId(), faceSample.getCreateAt(), task);
// 让主设备的计数器 -1 // 让主设备的计数器 -1
AtomicInteger pairCount = currentUnFinPlaceholder.get(pairDeviceId.toString()); AtomicInteger pairCount = currentUnFinPlaceholder.get(pairDeviceId.toString());
if (pairCount != null && pairCount.decrementAndGet() <= 0) { if (pairCount != null) {
currentUnFinPlaceholder.remove(pairDeviceId.toString()); int remaining = pairCount.decrementAndGet();
log.info("设备 {} 的placeholder已满足", pairDeviceId); log.info("[计数器更新] 关联设备 {} 计数器递减,剩余={}, currentUnFinPlaceholder总数={}",
pairDeviceId, remaining, currentUnFinPlaceholder.size());
if (remaining <= 0) {
currentUnFinPlaceholder.remove(pairDeviceId.toString());
log.info("[Placeholder完成] 设备 {} 的placeholder已满足并移除,剩余设备数={}",
pairDeviceId, currentUnFinPlaceholder.size());
}
} }
} }
}); });
@@ -210,9 +224,15 @@ public class VideoPieceGetter {
// 处理当前设备 // 处理当前设备
doCut(faceSample.getDeviceId(), faceSample.getId(), faceSample.getCreateAt(), task); doCut(faceSample.getDeviceId(), faceSample.getId(), faceSample.getCreateAt(), task);
AtomicInteger count = currentUnFinPlaceholder.get(faceSample.getDeviceId().toString()); AtomicInteger count = currentUnFinPlaceholder.get(faceSample.getDeviceId().toString());
if (count != null && count.decrementAndGet() <= 0) { if (count != null) {
currentUnFinPlaceholder.remove(faceSample.getDeviceId().toString()); int remaining = count.decrementAndGet();
log.info("设备 {} 的placeholder已满足", faceSample.getDeviceId()); log.info("[计数器更新] 设备 {} 计数器递减,剩余={}, currentUnFinPlaceholder总数={}",
faceSample.getDeviceId(), remaining, currentUnFinPlaceholder.size());
if (remaining <= 0) {
currentUnFinPlaceholder.remove(faceSample.getDeviceId().toString());
log.info("[Placeholder完成] 设备 {} 的placeholder已满足并移除,剩余设备数={}",
faceSample.getDeviceId(), currentUnFinPlaceholder.size());
}
} }
// 如果有templateId,检查是否所有placeholder都已满足 // 如果有templateId,检查是否所有placeholder都已满足
@@ -221,14 +241,17 @@ public class VideoPieceGetter {
int remainingCount = currentUnFinPlaceholder.values().stream() int remainingCount = currentUnFinPlaceholder.values().stream()
.mapToInt(AtomicInteger::get) .mapToInt(AtomicInteger::get)
.sum(); .sum();
log.info("当前进度:已完成 {}/{},剩余 {} 个placeholder未满足", log.info("[进度检查] 当前进度:已完成 {}/{},剩余 {} 个placeholder未满足,剩余设备数={}",
totalPlaceholderCount - remainingCount, totalPlaceholderCount, remainingCount); totalPlaceholderCount - remainingCount, totalPlaceholderCount, remainingCount,
currentUnFinPlaceholder.size());
if (currentUnFinPlaceholder.isEmpty()) { if (currentUnFinPlaceholder.isEmpty()) {
if (!invoke.get()) { if (!invoke.get()) {
invoke.set(true); invoke.set(true);
log.info("所有placeholder已满足,提前调用callback"); log.info("[Callback调用] 所有placeholder已满足,currentUnFinPlaceholder为空,提前调用callback");
task.getCallback().onInvoke(); task.getCallback().onInvoke();
} else {
log.warn("[Callback跳过] 所有placeholder已满足,但callback已被调用过");
} }
} }
} }
@@ -252,7 +275,11 @@ public class VideoPieceGetter {
if (null != task.getCallback()) { if (null != task.getCallback()) {
if (!invoke.get()) { if (!invoke.get()) {
invoke.set(true); invoke.set(true);
log.info("[Callback调用] 兜底调用callback,currentUnFinPlaceholder剩余设备数={}",
currentUnFinPlaceholder.size());
task.getCallback().onInvoke(); task.getCallback().onInvoke();
} else {
log.info("[Callback跳过] 兜底检查,callback已被调用过");
} }
} }
if (task.getFaceId() != null) { if (task.getFaceId() != null) {