diff --git a/src/main/java/com/ycwl/basic/task/VideoPieceGetter.java b/src/main/java/com/ycwl/basic/task/VideoPieceGetter.java index 5f9eadd7..ff3c876a 100644 --- a/src/main/java/com/ycwl/basic/task/VideoPieceGetter.java +++ b/src/main/java/com/ycwl/basic/task/VideoPieceGetter.java @@ -36,6 +36,9 @@ import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; + import java.io.BufferedReader; import java.io.File; import java.io.IOException; @@ -43,7 +46,9 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.math.BigDecimal; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; @@ -84,6 +89,16 @@ public class VideoPieceGetter { @Autowired private FaceStatusManager faceStatusManager; + /** + * 景区设备配对关系缓存 + * key: scenicId + * value: Map,空Map表示该景区无配对关系 + */ + private final Cache> pairDeviceCache = Caffeine.newBuilder() + .expireAfterWrite(5, TimeUnit.MINUTES) + .maximumSize(500) + .build(); + @Data public static class Task { public List faceSampleIds = new ArrayList<>(); @@ -153,17 +168,8 @@ public class VideoPieceGetter { task.callback.onInvoke(); return; } - Map pairDeviceMap = new ConcurrentHashMap<>(); Long scenicId = list.getFirst().getScenicId(); - List allDeviceByScenicId = deviceRepository.getAllDeviceByScenicId(scenicId); - allDeviceByScenicId.forEach(device -> { - Long deviceId = device.getId(); - DeviceConfigManager deviceConfig = deviceRepository.getDeviceConfigManager(deviceId); - Long pairDevice = deviceConfig.getLong("pair_device"); - if (pairDevice != null) { - pairDeviceMap.putIfAbsent(deviceId, pairDevice); - } - }); + Map pairDeviceMap = pairDeviceCache.get(scenicId, this::loadPairDeviceMap); Map> collection = list.stream() .filter(faceSample -> { if (templatePlaceholder != null) { @@ -176,12 +182,14 @@ public class VideoPieceGetter { templatePlaceholder.forEach(deviceId -> { currentUnFinPlaceholder.computeIfAbsent(deviceId, k -> new AtomicInteger(0)).incrementAndGet(); }); - log.debug("[Placeholder初始化] 有templateId,初始化完成:placeholder总数={}, 不同设备数={}, 详细计数={}", - templatePlaceholder.size(), - currentUnFinPlaceholder.size(), - currentUnFinPlaceholder.entrySet().stream() - .map(e -> e.getKey() + "=" + e.getValue().get()) - .collect(Collectors.joining(", "))); + if (log.isDebugEnabled()) { + log.debug("[Placeholder初始化] 有templateId,初始化完成:placeholder总数={}, 不同设备数={}, 详细计数={}", + templatePlaceholder.size(), + currentUnFinPlaceholder.size(), + currentUnFinPlaceholder.entrySet().stream() + .map(e -> e.getKey() + "=" + e.getValue().get()) + .collect(Collectors.joining(", "))); + } } else { collection.keySet().forEach(deviceId -> { currentUnFinPlaceholder.put(deviceId.toString(), new AtomicInteger(1)); @@ -190,16 +198,8 @@ public class VideoPieceGetter { currentUnFinPlaceholder.size()); } collection.values().forEach(faceSampleList -> { - executor.execute(() -> { - AtomicBoolean isFirst = new AtomicBoolean(true); - faceSampleList.forEach(faceSample -> { - if (!isFirst.get()) { - try { - Thread.sleep(1000); - } catch (InterruptedException ignore) { - } - } - isFirst.set(false); + faceSampleList.forEach(faceSample -> { + executor.execute(() -> { // 处理关联设备:如果当前设备是某个主设备的配对设备,也处理主设备 if (pairDeviceMap.containsValue(faceSample.getDeviceId())) { pairDeviceMap.entrySet().stream() @@ -249,35 +249,33 @@ public class VideoPieceGetter { currentUnFinPlaceholder.size()); if (currentUnFinPlaceholder.isEmpty()) { - if (!invoke.get()) { - invoke.set(true); + // 使用 compareAndSet 保证原子性,避免多线程重复调用 callback + if (invoke.compareAndSet(false, true)) { log.info("[Callback调用] 所有placeholder已满足,currentUnFinPlaceholder为空,提前调用callback"); task.getCallback().onInvoke(); } } } + if (task.faceId != null) { + // 经过切片后,可能有新的人脸切片生成,需要更新人脸状态 + templateRepository.getTemplateListByScenicId(scenicId).forEach(template -> { + faceStatusManager.markHasNewPieces(task.faceId, template.getId()); + }); + } }); - if (task.faceId != null) { - // 经过切片后,可能有新的人脸切片生成,需要更新人脸状态 - templateRepository.getTemplateListByScenicId(scenicId).forEach(template -> { - faceStatusManager.markHasNewPieces(task.faceId, template.getId()); - }); - } }); }); try { - Thread.sleep(1000L); - log.info("executor等待被结束![A:{}/T:{}/F:{}]", executor.getActiveCount(), executor.getTaskCount(), executor.getCompletedTaskCount()); executor.shutdown(); executor.awaitTermination(3, TimeUnit.MINUTES); log.info("executor已结束![A:{}/T:{}/F:{}]", executor.getActiveCount(), executor.getTaskCount(), executor.getCompletedTaskCount()); - executor.close(); } catch (InterruptedException e) { log.info("executor已中断![A:{}/T:{}/F:{}]", executor.getActiveCount(), executor.getTaskCount(), executor.getCompletedTaskCount()); } finally { + executor.close(); if (null != task.getCallback()) { - if (!invoke.get()) { - invoke.set(true); + // 使用 compareAndSet 保证原子性,避免多线程重复调用 callback + if (invoke.compareAndSet(false, true)) { log.info("[Callback调用] 兜底调用callback,currentUnFinPlaceholder剩余设备数={}", currentUnFinPlaceholder.size()); task.getCallback().onInvoke(); @@ -735,4 +733,27 @@ public class VideoPieceGetter { } } + /** + * 加载景区的设备配对关系 + * @param scenicId 景区ID + * @return 设备配对关系Map,无配对关系时返回空Map(而非null,避免重复查询) + */ + private Map loadPairDeviceMap(Long scenicId) { + List allDeviceByScenicId = deviceRepository.getAllDeviceByScenicId(scenicId); + if (allDeviceByScenicId == null || allDeviceByScenicId.isEmpty()) { + return Collections.emptyMap(); + } + Map pairDeviceMap = new HashMap<>(); + allDeviceByScenicId.forEach(device -> { + Long deviceId = device.getId(); + DeviceConfigManager deviceConfig = deviceRepository.getDeviceConfigManager(deviceId); + Long pairDevice = deviceConfig.getLong("pair_device"); + if (pairDevice != null) { + pairDeviceMap.putIfAbsent(deviceId, pairDevice); + } + }); + log.debug("加载景区 {} 设备配对关系,共 {} 对", scenicId, pairDeviceMap.size()); + return pairDeviceMap.isEmpty() ? Collections.emptyMap() : Collections.unmodifiableMap(pairDeviceMap); + } + }