You've already forked FrameTour-BE
Compare commits
3 Commits
f33ce8e7a7
...
1727619b29
Author | SHA1 | Date | |
---|---|---|---|
1727619b29 | |||
3099e68a97 | |||
db86c82bc8 |
@@ -0,0 +1,49 @@
|
|||||||
|
package com.ycwl.basic.integration.kafka.config;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 人脸识别异步处理线程池配置
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Configuration
|
||||||
|
@ConditionalOnProperty(name = "kafka.enabled", havingValue = "true")
|
||||||
|
public class FaceRecognitionThreadPoolConfig {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 创建人脸识别专用线程池
|
||||||
|
* - 核心线程数:32
|
||||||
|
* - 最大线程数:128
|
||||||
|
* - 队列容量:1000(避免无限制增长)
|
||||||
|
* - 拒绝策略:CallerRunsPolicy(调用者线程执行)
|
||||||
|
*/
|
||||||
|
@Bean(name = "faceRecognitionExecutor", destroyMethod = "shutdown")
|
||||||
|
public ThreadPoolExecutor faceRecognitionExecutor() {
|
||||||
|
ThreadPoolExecutor executor = new ThreadPoolExecutor(
|
||||||
|
32, // 核心线程数
|
||||||
|
128, // 最大线程数
|
||||||
|
60L, // 空闲线程存活时间
|
||||||
|
TimeUnit.SECONDS, // 时间单位
|
||||||
|
new LinkedBlockingQueue<>(1000), // 任务队列
|
||||||
|
r -> {
|
||||||
|
Thread thread = new Thread(r);
|
||||||
|
thread.setName("face-recognition-" + thread.getId());
|
||||||
|
thread.setDaemon(false);
|
||||||
|
return thread;
|
||||||
|
},
|
||||||
|
new ThreadPoolExecutor.CallerRunsPolicy() // 超过容量时由调用者线程执行
|
||||||
|
);
|
||||||
|
|
||||||
|
log.info("人脸识别线程池初始化完成 - 核心线程数: {}, 最大线程数: {}, 队列容量: 1000",
|
||||||
|
executor.getCorePoolSize(), executor.getMaximumPoolSize());
|
||||||
|
|
||||||
|
return executor;
|
||||||
|
}
|
||||||
|
}
|
@@ -22,6 +22,7 @@ import org.springframework.stereotype.Service;
|
|||||||
|
|
||||||
import java.time.ZoneId;
|
import java.time.ZoneId;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 人脸处理Kafka消费服务
|
* 人脸处理Kafka消费服务
|
||||||
@@ -39,6 +40,7 @@ public class FaceProcessingKafkaService {
|
|||||||
private final TaskFaceService taskFaceService;
|
private final TaskFaceService taskFaceService;
|
||||||
private final ScenicService scenicService;
|
private final ScenicService scenicService;
|
||||||
private final DeviceRepository deviceRepository;
|
private final DeviceRepository deviceRepository;
|
||||||
|
private final ThreadPoolExecutor faceRecognitionExecutor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 消费外部系统发送的人脸处理消息
|
* 消费外部系统发送的人脸处理消息
|
||||||
@@ -48,7 +50,7 @@ public class FaceProcessingKafkaService {
|
|||||||
public void processFaceMessage(String message, Acknowledgment ack) {
|
public void processFaceMessage(String message, Acknowledgment ack) {
|
||||||
try {
|
try {
|
||||||
FaceProcessingMessage faceMessage = JacksonUtil.parseObject(message, FaceProcessingMessage.class);
|
FaceProcessingMessage faceMessage = JacksonUtil.parseObject(message, FaceProcessingMessage.class);
|
||||||
log.info("接收到外部人脸处理消息, scenicId: {}, deviceId: {}, faceUrl: {}",
|
log.debug("接收到外部人脸处理消息, scenicId: {}, deviceId: {}, faceUrl: {}",
|
||||||
faceMessage.getScenicId(), faceMessage.getDeviceId(), faceMessage.getFaceUrl());
|
faceMessage.getScenicId(), faceMessage.getDeviceId(), faceMessage.getFaceUrl());
|
||||||
|
|
||||||
// 使用外部传入的faceSampleId
|
// 使用外部传入的faceSampleId
|
||||||
@@ -63,31 +65,23 @@ public class FaceProcessingKafkaService {
|
|||||||
// 先保存人脸样本数据
|
// 先保存人脸样本数据
|
||||||
boolean saved = saveFaceSample(faceMessage, externalFaceId);
|
boolean saved = saveFaceSample(faceMessage, externalFaceId);
|
||||||
|
|
||||||
// 然后进行人脸识别处理
|
// 然后异步进行人脸识别处理(使用专用线程池)
|
||||||
if (saved) {
|
if (saved) {
|
||||||
try {
|
faceRecognitionExecutor.execute(() -> processFaceRecognitionAsync(faceMessage));
|
||||||
boolean processed = processFaceRecognition(faceMessage);
|
log.debug("人脸识别任务已提交至线程池, faceSampleId: {}, 活跃线程: {}, 队列大小: {}",
|
||||||
if (processed) {
|
externalFaceId, faceRecognitionExecutor.getActiveCount(),
|
||||||
log.info("人脸识别处理成功, faceSampleId: {}", externalFaceId);
|
faceRecognitionExecutor.getQueue().size());
|
||||||
} else {
|
|
||||||
log.warn("人脸识别处理失败,但消息仍将被消费, faceSampleId: {}", externalFaceId);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("人脸识别处理异常,但消息仍将被消费, faceSampleId: {}, error: {}", externalFaceId, e.getMessage(), e);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
log.warn("人脸样本保存失败,但消息仍将被消费, faceSampleId: {}", externalFaceId);
|
log.warn("人脸样本保存失败,但消息仍将被消费, faceSampleId: {}", externalFaceId);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 无论处理是否成功,都消费消息
|
// 无论处理是否成功,都消费消息
|
||||||
ack.acknowledge();
|
ack.acknowledge();
|
||||||
log.info("消息已消费, faceSampleId: {}", externalFaceId);
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("处理外部人脸消息失败: {}", e.getMessage(), e);
|
log.error("处理外部人脸消息失败: {}", e.getMessage(), e);
|
||||||
// 即使发生异常也消费消息,避免消息堆积
|
// 即使发生异常也消费消息,避免消息堆积
|
||||||
ack.acknowledge();
|
ack.acknowledge();
|
||||||
log.info("异常情况下消息已消费,避免重复处理");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -113,12 +107,10 @@ public class FaceProcessingKafkaService {
|
|||||||
} else {
|
} else {
|
||||||
faceSample.setCreateAt(new Date());
|
faceSample.setCreateAt(new Date());
|
||||||
}
|
}
|
||||||
|
|
||||||
// 保存到数据库
|
// 保存到数据库
|
||||||
faceSampleMapper.add(faceSample);
|
faceSampleMapper.add(faceSample);
|
||||||
log.info("人脸样本数据已保存, 使用外部faceId: {}, scenicId: {}, deviceId: {}, faceUrl: {}",
|
|
||||||
externalFaceId, faceMessage.getScenicId(), faceMessage.getDeviceId(), faceMessage.getFaceUrl());
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("保存人脸样本数据失败, 外部faceId: {}, scenicId: {}, deviceId: {}",
|
log.error("保存人脸样本数据失败, 外部faceId: {}, scenicId: {}, deviceId: {}",
|
||||||
@@ -128,14 +120,14 @@ public class FaceProcessingKafkaService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 执行人脸识别处理逻辑
|
* 异步执行人脸识别处理逻辑
|
||||||
* 对已保存的人脸样本进行识别处理
|
* 对已保存的人脸样本进行识别处理
|
||||||
*/
|
*/
|
||||||
private boolean processFaceRecognition(FaceProcessingMessage message) {
|
private void processFaceRecognitionAsync(FaceProcessingMessage message) {
|
||||||
Long faceSampleId = message.getFaceSampleId();
|
Long faceSampleId = message.getFaceSampleId();
|
||||||
Long scenicId = message.getScenicId();
|
Long scenicId = message.getScenicId();
|
||||||
String faceUrl = message.getFaceUrl();
|
String faceUrl = message.getFaceUrl();
|
||||||
|
|
||||||
// 直接使用faceSampleId作为唯一标识
|
// 直接使用faceSampleId作为唯一标识
|
||||||
String faceUniqueId = faceSampleId.toString();
|
String faceUniqueId = faceSampleId.toString();
|
||||||
|
|
||||||
@@ -144,21 +136,21 @@ public class FaceProcessingKafkaService {
|
|||||||
if (faceBodyAdapter == null) {
|
if (faceBodyAdapter == null) {
|
||||||
log.error("人脸识别适配器不存在, scenicId: {}", scenicId);
|
log.error("人脸识别适配器不存在, scenicId: {}", scenicId);
|
||||||
updateFaceSampleStatus(faceSampleId, -1);
|
updateFaceSampleStatus(faceSampleId, -1);
|
||||||
return false;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// 更新状态为处理中
|
// 更新状态为处理中
|
||||||
updateFaceSampleStatus(faceSampleId, 1);
|
updateFaceSampleStatus(faceSampleId, 1);
|
||||||
|
|
||||||
// 确保人脸数据库存在
|
// 确保人脸数据库存在
|
||||||
taskFaceService.assureFaceDb(faceBodyAdapter, scenicId.toString());
|
taskFaceService.assureFaceDb(faceBodyAdapter, scenicId.toString());
|
||||||
|
|
||||||
// 添加人脸到识别服务(使用faceSampleId作为唯一标识)
|
// 添加人脸到识别服务(使用faceSampleId作为唯一标识)
|
||||||
AddFaceResp addFaceResp = faceBodyAdapter.addFace(
|
AddFaceResp addFaceResp = faceBodyAdapter.addFace(
|
||||||
scenicId.toString(),
|
scenicId.toString(),
|
||||||
faceSampleId.toString(),
|
faceSampleId.toString(),
|
||||||
faceUrl,
|
faceUrl,
|
||||||
faceUniqueId // 即faceSampleId.toString()
|
faceUniqueId // 即faceSampleId.toString()
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -166,31 +158,26 @@ public class FaceProcessingKafkaService {
|
|||||||
// 更新人脸样本得分和状态
|
// 更新人脸样本得分和状态
|
||||||
faceSampleMapper.updateScore(faceSampleId, addFaceResp.getScore());
|
faceSampleMapper.updateScore(faceSampleId, addFaceResp.getScore());
|
||||||
updateFaceSampleStatus(faceSampleId, 2);
|
updateFaceSampleStatus(faceSampleId, 2);
|
||||||
log.info("人脸识别处理完成, faceSampleId: {}, score: {}",
|
log.info("人脸识别处理成功, faceSampleId: {}", faceSampleId);
|
||||||
faceSampleId, addFaceResp.getScore());
|
|
||||||
|
|
||||||
// 查询设备配置,判断是否启用预订功能
|
// 查询设备配置,判断是否启用预订功能
|
||||||
Long deviceId = message.getDeviceId();
|
Long deviceId = message.getDeviceId();
|
||||||
DeviceConfigManager deviceConfig = deviceRepository.getDeviceConfigManager(deviceId);
|
DeviceConfigManager deviceConfig = deviceRepository.getDeviceConfigManager(deviceId);
|
||||||
if (deviceConfig != null &&
|
if (deviceConfig != null &&
|
||||||
Integer.valueOf(1).equals(deviceConfig.getInteger("enable_pre_book"))) {
|
Integer.valueOf(1).equals(deviceConfig.getInteger("enable_pre_book"))) {
|
||||||
DynamicTaskGenerator.addTask(faceSampleId);
|
DynamicTaskGenerator.addTask(faceSampleId);
|
||||||
log.info("已添加到预订任务队列, faceSampleId: {}", faceSampleId);
|
|
||||||
}
|
}
|
||||||
return true;
|
|
||||||
} else {
|
} else {
|
||||||
log.warn("人脸添加返回空结果, faceSampleId: {}", faceSampleId);
|
log.warn("人脸添加返回空结果, faceSampleId: {}", faceSampleId);
|
||||||
updateFaceSampleStatus(faceSampleId, -1);
|
updateFaceSampleStatus(faceSampleId, -1);
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("人脸识别处理失败, faceSampleId: {}, error: {}",
|
log.error("人脸识别处理失败, faceSampleId: {}, error: {}",
|
||||||
faceSampleId, e.getMessage(), e);
|
faceSampleId, e.getMessage(), e);
|
||||||
|
|
||||||
// 标记人脸样本为处理失败状态
|
// 标记人脸样本为处理失败状态
|
||||||
updateFaceSampleStatus(faceSampleId, -1);
|
updateFaceSampleStatus(faceSampleId, -1);
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -200,7 +187,6 @@ public class FaceProcessingKafkaService {
|
|||||||
private void updateFaceSampleStatus(Long faceSampleId, Integer status) {
|
private void updateFaceSampleStatus(Long faceSampleId, Integer status) {
|
||||||
try {
|
try {
|
||||||
faceSampleMapper.updateStatus(faceSampleId, status);
|
faceSampleMapper.updateStatus(faceSampleId, status);
|
||||||
log.info("人脸样本状态已更新, faceSampleId: {}, status: {} (0:初始,1:处理中,2:成功,-1:失败)", faceSampleId, status);
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("更新人脸样本状态失败, faceSampleId: {}", faceSampleId, e);
|
log.error("更新人脸样本状态失败, faceSampleId: {}", faceSampleId, e);
|
||||||
}
|
}
|
||||||
|
@@ -42,7 +42,7 @@ public class ZTSourceConsumerService {
|
|||||||
sourceMessage = JacksonUtil.parseObject(message, ZTSourceMessage.class);
|
sourceMessage = JacksonUtil.parseObject(message, ZTSourceMessage.class);
|
||||||
|
|
||||||
// 输出业务相关的日志信息
|
// 输出业务相关的日志信息
|
||||||
log.info("接收到ZT-Source消息, sourceId: {}, deviceId: {}, faceSampleId: {}",
|
log.debug("接收到ZT-Source消息, sourceId: {}, deviceId: {}, faceSampleId: {}",
|
||||||
sourceMessage.getSourceId(), sourceMessage.getDeviceId(), sourceMessage.getFaceSampleId());
|
sourceMessage.getSourceId(), sourceMessage.getDeviceId(), sourceMessage.getFaceSampleId());
|
||||||
|
|
||||||
// 处理消息
|
// 处理消息
|
||||||
|
@@ -457,11 +457,7 @@ public class TaskTaskServiceImpl implements TaskService {
|
|||||||
taskStatusBiz.setFaceCutStatus(faceId, 2);
|
taskStatusBiz.setFaceCutStatus(faceId, 2);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
if (!sourceList.isEmpty()) {
|
VideoPieceGetter.addTask(task);
|
||||||
task.callback.onInvoke();
|
|
||||||
} else {
|
|
||||||
VideoPieceGetter.addTask(task);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@@ -173,10 +173,18 @@ public class VideoPieceGetter {
|
|||||||
templatePlaceholder.forEach(deviceId -> {
|
templatePlaceholder.forEach(deviceId -> {
|
||||||
currentUnFinPlaceholder.computeIfAbsent(deviceId, k -> new AtomicInteger(0)).incrementAndGet();
|
currentUnFinPlaceholder.computeIfAbsent(deviceId, k -> new AtomicInteger(0)).incrementAndGet();
|
||||||
});
|
});
|
||||||
|
log.info("[Placeholder初始化] 有templateId,初始化完成:placeholder总数={}, 不同设备数={}, 详细计数={}",
|
||||||
|
templatePlaceholder.size(),
|
||||||
|
currentUnFinPlaceholder.size(),
|
||||||
|
currentUnFinPlaceholder.entrySet().stream()
|
||||||
|
.map(e -> e.getKey() + "=" + e.getValue().get())
|
||||||
|
.collect(Collectors.joining(", ")));
|
||||||
} else {
|
} else {
|
||||||
collection.keySet().forEach(deviceId -> {
|
collection.keySet().forEach(deviceId -> {
|
||||||
currentUnFinPlaceholder.put(deviceId.toString(), new AtomicInteger(1));
|
currentUnFinPlaceholder.put(deviceId.toString(), new AtomicInteger(1));
|
||||||
});
|
});
|
||||||
|
log.info("[Placeholder初始化] 无templateId,初始化完成:设备数={}",
|
||||||
|
currentUnFinPlaceholder.size());
|
||||||
}
|
}
|
||||||
collection.values().forEach(faceSampleList -> {
|
collection.values().forEach(faceSampleList -> {
|
||||||
executor.execute(() -> {
|
executor.execute(() -> {
|
||||||
@@ -199,9 +207,15 @@ public class VideoPieceGetter {
|
|||||||
doCut(pairDeviceId, faceSample.getId(), faceSample.getCreateAt(), task);
|
doCut(pairDeviceId, faceSample.getId(), faceSample.getCreateAt(), task);
|
||||||
// 让主设备的计数器 -1
|
// 让主设备的计数器 -1
|
||||||
AtomicInteger pairCount = currentUnFinPlaceholder.get(pairDeviceId.toString());
|
AtomicInteger pairCount = currentUnFinPlaceholder.get(pairDeviceId.toString());
|
||||||
if (pairCount != null && pairCount.decrementAndGet() <= 0) {
|
if (pairCount != null) {
|
||||||
currentUnFinPlaceholder.remove(pairDeviceId.toString());
|
int remaining = pairCount.decrementAndGet();
|
||||||
log.info("设备 {} 的placeholder已满足", pairDeviceId);
|
log.info("[计数器更新] 关联设备 {} 计数器递减,剩余={}, currentUnFinPlaceholder总数={}",
|
||||||
|
pairDeviceId, remaining, currentUnFinPlaceholder.size());
|
||||||
|
if (remaining <= 0) {
|
||||||
|
currentUnFinPlaceholder.remove(pairDeviceId.toString());
|
||||||
|
log.info("[Placeholder完成] 设备 {} 的placeholder已满足并移除,剩余设备数={}",
|
||||||
|
pairDeviceId, currentUnFinPlaceholder.size());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@@ -210,9 +224,15 @@ public class VideoPieceGetter {
|
|||||||
// 处理当前设备
|
// 处理当前设备
|
||||||
doCut(faceSample.getDeviceId(), faceSample.getId(), faceSample.getCreateAt(), task);
|
doCut(faceSample.getDeviceId(), faceSample.getId(), faceSample.getCreateAt(), task);
|
||||||
AtomicInteger count = currentUnFinPlaceholder.get(faceSample.getDeviceId().toString());
|
AtomicInteger count = currentUnFinPlaceholder.get(faceSample.getDeviceId().toString());
|
||||||
if (count != null && count.decrementAndGet() <= 0) {
|
if (count != null) {
|
||||||
currentUnFinPlaceholder.remove(faceSample.getDeviceId().toString());
|
int remaining = count.decrementAndGet();
|
||||||
log.info("设备 {} 的placeholder已满足", faceSample.getDeviceId());
|
log.info("[计数器更新] 设备 {} 计数器递减,剩余={}, currentUnFinPlaceholder总数={}",
|
||||||
|
faceSample.getDeviceId(), remaining, currentUnFinPlaceholder.size());
|
||||||
|
if (remaining <= 0) {
|
||||||
|
currentUnFinPlaceholder.remove(faceSample.getDeviceId().toString());
|
||||||
|
log.info("[Placeholder完成] 设备 {} 的placeholder已满足并移除,剩余设备数={}",
|
||||||
|
faceSample.getDeviceId(), currentUnFinPlaceholder.size());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 如果有templateId,检查是否所有placeholder都已满足
|
// 如果有templateId,检查是否所有placeholder都已满足
|
||||||
@@ -221,14 +241,17 @@ public class VideoPieceGetter {
|
|||||||
int remainingCount = currentUnFinPlaceholder.values().stream()
|
int remainingCount = currentUnFinPlaceholder.values().stream()
|
||||||
.mapToInt(AtomicInteger::get)
|
.mapToInt(AtomicInteger::get)
|
||||||
.sum();
|
.sum();
|
||||||
log.info("当前进度:已完成 {}/{},剩余 {} 个placeholder未满足",
|
log.info("[进度检查] 当前进度:已完成 {}/{},剩余 {} 个placeholder未满足,剩余设备数={}",
|
||||||
totalPlaceholderCount - remainingCount, totalPlaceholderCount, remainingCount);
|
totalPlaceholderCount - remainingCount, totalPlaceholderCount, remainingCount,
|
||||||
|
currentUnFinPlaceholder.size());
|
||||||
|
|
||||||
if (currentUnFinPlaceholder.isEmpty()) {
|
if (currentUnFinPlaceholder.isEmpty()) {
|
||||||
if (!invoke.get()) {
|
if (!invoke.get()) {
|
||||||
invoke.set(true);
|
invoke.set(true);
|
||||||
log.info("所有placeholder已满足,提前调用callback");
|
log.info("[Callback调用] 所有placeholder已满足,currentUnFinPlaceholder为空,提前调用callback");
|
||||||
task.getCallback().onInvoke();
|
task.getCallback().onInvoke();
|
||||||
|
} else {
|
||||||
|
log.warn("[Callback跳过] 所有placeholder已满足,但callback已被调用过");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -252,7 +275,11 @@ public class VideoPieceGetter {
|
|||||||
if (null != task.getCallback()) {
|
if (null != task.getCallback()) {
|
||||||
if (!invoke.get()) {
|
if (!invoke.get()) {
|
||||||
invoke.set(true);
|
invoke.set(true);
|
||||||
|
log.info("[Callback调用] 兜底调用callback,currentUnFinPlaceholder剩余设备数={}",
|
||||||
|
currentUnFinPlaceholder.size());
|
||||||
task.getCallback().onInvoke();
|
task.getCallback().onInvoke();
|
||||||
|
} else {
|
||||||
|
log.info("[Callback跳过] 兜底检查,callback已被调用过");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (task.getFaceId() != null) {
|
if (task.getFaceId() != null) {
|
||||||
|
Reference in New Issue
Block a user