You've already forked FrameTour-BE
refactor(video): 优化视频片段获取任务的设备配对处理
- 添加 Caffeine 缓存优化景区设备配对关系查询性能 - 实现设备配对关系缓存机制,避免重复数据库查询 - 重构线程安全的回调调用逻辑,使用 compareAndSet 保证原子性 - 添加调试日志的条件判断,减少不必要的日志输出 - 优化任务执行流程,调整线程池关闭和资源清理逻辑 - 实现设备配对关系加载方法,返回不可变Map提高安全性
This commit is contained in:
@@ -36,6 +36,9 @@ import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Cache;
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
@@ -43,7 +46,9 @@ import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.math.BigDecimal;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
@@ -84,6 +89,16 @@ public class VideoPieceGetter {
|
||||
@Autowired
|
||||
private FaceStatusManager faceStatusManager;
|
||||
|
||||
/**
|
||||
* 景区设备配对关系缓存
|
||||
* key: scenicId
|
||||
* value: Map<deviceId, pairDeviceId>,空Map表示该景区无配对关系
|
||||
*/
|
||||
private final Cache<Long, Map<Long, Long>> pairDeviceCache = Caffeine.newBuilder()
|
||||
.expireAfterWrite(5, TimeUnit.MINUTES)
|
||||
.maximumSize(500)
|
||||
.build();
|
||||
|
||||
@Data
|
||||
public static class Task {
|
||||
public List<Long> faceSampleIds = new ArrayList<>();
|
||||
@@ -153,17 +168,8 @@ public class VideoPieceGetter {
|
||||
task.callback.onInvoke();
|
||||
return;
|
||||
}
|
||||
Map<Long, Long> pairDeviceMap = new ConcurrentHashMap<>();
|
||||
Long scenicId = list.getFirst().getScenicId();
|
||||
List<DeviceV2DTO> allDeviceByScenicId = deviceRepository.getAllDeviceByScenicId(scenicId);
|
||||
allDeviceByScenicId.forEach(device -> {
|
||||
Long deviceId = device.getId();
|
||||
DeviceConfigManager deviceConfig = deviceRepository.getDeviceConfigManager(deviceId);
|
||||
Long pairDevice = deviceConfig.getLong("pair_device");
|
||||
if (pairDevice != null) {
|
||||
pairDeviceMap.putIfAbsent(deviceId, pairDevice);
|
||||
}
|
||||
});
|
||||
Map<Long, Long> pairDeviceMap = pairDeviceCache.get(scenicId, this::loadPairDeviceMap);
|
||||
Map<Long, List<FaceSampleEntity>> collection = list.stream()
|
||||
.filter(faceSample -> {
|
||||
if (templatePlaceholder != null) {
|
||||
@@ -176,12 +182,14 @@ public class VideoPieceGetter {
|
||||
templatePlaceholder.forEach(deviceId -> {
|
||||
currentUnFinPlaceholder.computeIfAbsent(deviceId, k -> new AtomicInteger(0)).incrementAndGet();
|
||||
});
|
||||
log.debug("[Placeholder初始化] 有templateId,初始化完成:placeholder总数={}, 不同设备数={}, 详细计数={}",
|
||||
templatePlaceholder.size(),
|
||||
currentUnFinPlaceholder.size(),
|
||||
currentUnFinPlaceholder.entrySet().stream()
|
||||
.map(e -> e.getKey() + "=" + e.getValue().get())
|
||||
.collect(Collectors.joining(", ")));
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("[Placeholder初始化] 有templateId,初始化完成:placeholder总数={}, 不同设备数={}, 详细计数={}",
|
||||
templatePlaceholder.size(),
|
||||
currentUnFinPlaceholder.size(),
|
||||
currentUnFinPlaceholder.entrySet().stream()
|
||||
.map(e -> e.getKey() + "=" + e.getValue().get())
|
||||
.collect(Collectors.joining(", ")));
|
||||
}
|
||||
} else {
|
||||
collection.keySet().forEach(deviceId -> {
|
||||
currentUnFinPlaceholder.put(deviceId.toString(), new AtomicInteger(1));
|
||||
@@ -190,16 +198,8 @@ public class VideoPieceGetter {
|
||||
currentUnFinPlaceholder.size());
|
||||
}
|
||||
collection.values().forEach(faceSampleList -> {
|
||||
executor.execute(() -> {
|
||||
AtomicBoolean isFirst = new AtomicBoolean(true);
|
||||
faceSampleList.forEach(faceSample -> {
|
||||
if (!isFirst.get()) {
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException ignore) {
|
||||
}
|
||||
}
|
||||
isFirst.set(false);
|
||||
faceSampleList.forEach(faceSample -> {
|
||||
executor.execute(() -> {
|
||||
// 处理关联设备:如果当前设备是某个主设备的配对设备,也处理主设备
|
||||
if (pairDeviceMap.containsValue(faceSample.getDeviceId())) {
|
||||
pairDeviceMap.entrySet().stream()
|
||||
@@ -249,35 +249,33 @@ public class VideoPieceGetter {
|
||||
currentUnFinPlaceholder.size());
|
||||
|
||||
if (currentUnFinPlaceholder.isEmpty()) {
|
||||
if (!invoke.get()) {
|
||||
invoke.set(true);
|
||||
// 使用 compareAndSet 保证原子性,避免多线程重复调用 callback
|
||||
if (invoke.compareAndSet(false, true)) {
|
||||
log.info("[Callback调用] 所有placeholder已满足,currentUnFinPlaceholder为空,提前调用callback");
|
||||
task.getCallback().onInvoke();
|
||||
}
|
||||
}
|
||||
}
|
||||
if (task.faceId != null) {
|
||||
// 经过切片后,可能有新的人脸切片生成,需要更新人脸状态
|
||||
templateRepository.getTemplateListByScenicId(scenicId).forEach(template -> {
|
||||
faceStatusManager.markHasNewPieces(task.faceId, template.getId());
|
||||
});
|
||||
}
|
||||
});
|
||||
if (task.faceId != null) {
|
||||
// 经过切片后,可能有新的人脸切片生成,需要更新人脸状态
|
||||
templateRepository.getTemplateListByScenicId(scenicId).forEach(template -> {
|
||||
faceStatusManager.markHasNewPieces(task.faceId, template.getId());
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
try {
|
||||
Thread.sleep(1000L);
|
||||
log.info("executor等待被结束![A:{}/T:{}/F:{}]", executor.getActiveCount(), executor.getTaskCount(), executor.getCompletedTaskCount());
|
||||
executor.shutdown();
|
||||
executor.awaitTermination(3, TimeUnit.MINUTES);
|
||||
log.info("executor已结束![A:{}/T:{}/F:{}]", executor.getActiveCount(), executor.getTaskCount(), executor.getCompletedTaskCount());
|
||||
executor.close();
|
||||
} catch (InterruptedException e) {
|
||||
log.info("executor已中断![A:{}/T:{}/F:{}]", executor.getActiveCount(), executor.getTaskCount(), executor.getCompletedTaskCount());
|
||||
} finally {
|
||||
executor.close();
|
||||
if (null != task.getCallback()) {
|
||||
if (!invoke.get()) {
|
||||
invoke.set(true);
|
||||
// 使用 compareAndSet 保证原子性,避免多线程重复调用 callback
|
||||
if (invoke.compareAndSet(false, true)) {
|
||||
log.info("[Callback调用] 兜底调用callback,currentUnFinPlaceholder剩余设备数={}",
|
||||
currentUnFinPlaceholder.size());
|
||||
task.getCallback().onInvoke();
|
||||
@@ -735,4 +733,27 @@ public class VideoPieceGetter {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 加载景区的设备配对关系
|
||||
* @param scenicId 景区ID
|
||||
* @return 设备配对关系Map,无配对关系时返回空Map(而非null,避免重复查询)
|
||||
*/
|
||||
private Map<Long, Long> loadPairDeviceMap(Long scenicId) {
|
||||
List<DeviceV2DTO> allDeviceByScenicId = deviceRepository.getAllDeviceByScenicId(scenicId);
|
||||
if (allDeviceByScenicId == null || allDeviceByScenicId.isEmpty()) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
Map<Long, Long> pairDeviceMap = new HashMap<>();
|
||||
allDeviceByScenicId.forEach(device -> {
|
||||
Long deviceId = device.getId();
|
||||
DeviceConfigManager deviceConfig = deviceRepository.getDeviceConfigManager(deviceId);
|
||||
Long pairDevice = deviceConfig.getLong("pair_device");
|
||||
if (pairDevice != null) {
|
||||
pairDeviceMap.putIfAbsent(deviceId, pairDevice);
|
||||
}
|
||||
});
|
||||
log.debug("加载景区 {} 设备配对关系,共 {} 对", scenicId, pairDeviceMap.size());
|
||||
return pairDeviceMap.isEmpty() ? Collections.emptyMap() : Collections.unmodifiableMap(pairDeviceMap);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user