refactor(task):优化视频片段获取任务的设备计数逻辑

- 将 currentUnFinPlaceholder从 List 类型改为 Map<String, AtomicInteger>- 使用 AtomicInteger 跟踪每个设备的未完成任务数量
- 在设备任务完成时正确减少计数并清理已完成的设备
- 更新进度日志以反映去重后的设备总数
This commit is contained in:
2025-09-27 01:07:52 +08:00
parent 2836326518
commit da89067c48

View File

@@ -53,6 +53,7 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -145,7 +146,7 @@ public class VideoPieceGetter {
new ArrayBlockingQueue<>(128),
threadFactory
);
List<String> currentUnFinPlaceholder = new ArrayList<>();
Map<String, AtomicInteger> currentUnFinPlaceholder = new ConcurrentHashMap<>();
List<FaceSampleEntity> list = faceSampleMapper.listByIds(task.getFaceSampleIds());
Map<Long, Long> pairDeviceMap = new ConcurrentHashMap<>();
if (!list.isEmpty()) {
@@ -169,12 +170,12 @@ public class VideoPieceGetter {
})
.collect(Collectors.groupingBy(FaceSampleEntity::getDeviceId));
if (templatePlaceholder != null) {
IntStream.range(0, templatePlaceholder.size()).forEach(i -> {
currentUnFinPlaceholder.add(templatePlaceholder.get(i));
templatePlaceholder.forEach(deviceId -> {
currentUnFinPlaceholder.computeIfAbsent(deviceId, k -> new AtomicInteger(0)).incrementAndGet();
});
} else {
collection.keySet().forEach(i -> {
currentUnFinPlaceholder.add(i.toString());
collection.keySet().forEach(deviceId -> {
currentUnFinPlaceholder.put(deviceId.toString(), new AtomicInteger(1));
});
}
collection.values().forEach(faceSampleList -> {
@@ -197,14 +198,21 @@ public class VideoPieceGetter {
log.info("找到同景区关联设备:{} -> {}", pairDeviceId, faceSample.getDeviceId());
if (pairDeviceId != null) {
doCut(pairDeviceId, faceSample.getId(), faceSample.getCreateAt(), task);
currentUnFinPlaceholder.remove(faceSample.getDeviceId().toString());
AtomicInteger count = currentUnFinPlaceholder.get(faceSample.getDeviceId().toString());
if (count != null && count.decrementAndGet() <= 0) {
currentUnFinPlaceholder.remove(faceSample.getDeviceId().toString());
}
}
});
}
doCut(faceSample.getDeviceId(), faceSample.getId(), faceSample.getCreateAt(), task);
currentUnFinPlaceholder.remove(faceSample.getDeviceId().toString());
AtomicInteger count = currentUnFinPlaceholder.get(faceSample.getDeviceId().toString());
if (count != null && count.decrementAndGet() <= 0) {
currentUnFinPlaceholder.remove(faceSample.getDeviceId().toString());
}
if (templatePlaceholder != null) {
log.info("当前进度:!{}/{}", currentUnFinPlaceholder.size(), templatePlaceholder.size());
long distinctDeviceCount = templatePlaceholder.stream().distinct().count();
log.info("当前进度:!{}/{}", currentUnFinPlaceholder.size(), distinctDeviceCount);
if (currentUnFinPlaceholder.isEmpty()) {
if (!invoke.get()) {
invoke.set(true);