feat(task): 优化任务分发逻辑

- 新增自托管景点缓存机制,减少重复查询
- 修改任务分配逻辑,排除自托管景点的任务
- 优化景点唯一性配置的读取方式
This commit is contained in:
2025-09-08 10:47:07 +08:00
parent 4ee79b5db8
commit c3101ceb6b

View File

@@ -2,6 +2,7 @@ package com.ycwl.basic.service.task.impl;
import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.DateUtil;
import com.ycwl.basic.integration.common.manager.RenderWorkerConfigManager; import com.ycwl.basic.integration.common.manager.RenderWorkerConfigManager;
import com.ycwl.basic.integration.common.manager.ScenicConfigManager;
import com.ycwl.basic.utils.JacksonUtil; import com.ycwl.basic.utils.JacksonUtil;
import com.ycwl.basic.biz.OrderBiz; import com.ycwl.basic.biz.OrderBiz;
import com.ycwl.basic.biz.TaskStatusBiz; import com.ycwl.basic.biz.TaskStatusBiz;
@@ -81,6 +82,8 @@ import java.util.stream.Collectors;
@Slf4j @Slf4j
@Service @Service
public class TaskTaskServiceImpl implements TaskService { public class TaskTaskServiceImpl implements TaskService {
private static final String WORKER_SELF_HOSTED_CACHE_KEY = "worker_self_hosted_scenic:%s";
private static final int CACHE_EXPIRE_MINUTES = 3;
@Autowired @Autowired
private TaskMapper taskMapper; private TaskMapper taskMapper;
@Autowired @Autowired
@@ -119,6 +122,8 @@ public class TaskTaskServiceImpl implements TaskService {
private DeviceRepository deviceRepository; private DeviceRepository deviceRepository;
@Autowired @Autowired
private VideoReUploader videoReUploader; private VideoReUploader videoReUploader;
@Autowired
private RedisTemplate<String, String> redisTemplate;
private RenderWorkerEntity getWorker(@NonNull WorkerAuthReqVo req) { private RenderWorkerEntity getWorker(@NonNull WorkerAuthReqVo req) {
String accessKey = req.getAccessKey(); String accessKey = req.getAccessKey();
@@ -128,6 +133,24 @@ public class TaskTaskServiceImpl implements TaskService {
return repository.getWorkerByAccessKey(accessKey); return repository.getWorkerByAccessKey(accessKey);
} }
private boolean isWorkerSelfHostedScenic(Long scenicId) {
String cacheKey = String.format(WORKER_SELF_HOSTED_CACHE_KEY, scenicId);
String cachedValue = redisTemplate.opsForValue().get(cacheKey);
if (cachedValue != null) {
return Boolean.parseBoolean(cachedValue);
}
// 缓存中没有,查询配置
ScenicConfigManager config = scenicRepository.getScenicConfigManager(scenicId);
boolean workerSelfHostedScenic = Boolean.TRUE.equals(config.getBoolean("worker_self_hosted"));
// 缓存结果,设置30分钟过期
redisTemplate.opsForValue().set(cacheKey, String.valueOf(workerSelfHostedScenic), CACHE_EXPIRE_MINUTES, TimeUnit.MINUTES);
return workerSelfHostedScenic;
}
@Override @Override
public TaskSyncRespVo handleSyncTask(@NonNull TaskReqVo req) { public TaskSyncRespVo handleSyncTask(@NonNull TaskReqVo req) {
RenderWorkerEntity worker = getWorker(req); RenderWorkerEntity worker = getWorker(req);
@@ -159,14 +182,19 @@ public class TaskTaskServiceImpl implements TaskService {
} else { } else {
updTemplateList = templateRepository.getAllEnabledTemplateList(); updTemplateList = templateRepository.getAllEnabledTemplateList();
} }
RenderWorkerConfigManager configManager = repository.getWorkerConfigManager(worker.getId());
try { try {
if (lock.tryLock(2, TimeUnit.SECONDS)) { if (lock.tryLock(2, TimeUnit.SECONDS)) {
try { try {
List<TaskRespVO> taskList; List<TaskRespVO> taskList;
if (worker.getScenicOnly() != null) { if (Strings.isNotBlank(configManager.getString("scenic_only"))) {
taskList = taskMapper.selectNotRunningByScenicList(worker.getScenicOnly()); taskList = taskMapper.selectNotRunningByScenicList(configManager.getString("scenic_only"));
} else { } else {
taskList = taskMapper.selectNotRunning(); var _taskList = taskMapper.selectNotRunning();
taskList = _taskList.stream().filter(task -> {
boolean workerSelfHostedScenic = isWorkerSelfHostedScenic(task.getScenicId());
return !workerSelfHostedScenic;
}).toList();
} }
resp.setTasks(taskList); resp.setTasks(taskList);
resp.setTemplates(updTemplateList); resp.setTemplates(updTemplateList);