Merge branch 'render-worker-microservice'

# Conflicts:
#	src/main/java/com/ycwl/basic/integration/scenic/service/ScenicIntegrationService.java
#	src/main/java/com/ycwl/basic/service/task/impl/TaskTaskServiceImpl.java
This commit is contained in:
2025-09-09 11:00:10 +08:00
52 changed files with 2711 additions and 630 deletions

View File

@@ -2,7 +2,8 @@ package com.ycwl.basic.service.task.impl;
import cn.hutool.core.date.DateUtil;
import cn.hutool.crypto.digest.MD5;
import com.ycwl.basic.integration.common.manager.DeviceConfigManager;
import com.ycwl.basic.integration.common.manager.RenderWorkerConfigManager;
import com.ycwl.basic.integration.common.manager.ScenicConfigManager;
import com.ycwl.basic.utils.JacksonUtil;
import com.ycwl.basic.biz.OrderBiz;
import com.ycwl.basic.biz.TaskStatusBiz;
@@ -12,7 +13,6 @@ import com.ycwl.basic.constant.TaskConstant;
import com.ycwl.basic.mapper.FaceMapper;
import com.ycwl.basic.mapper.FaceSampleMapper;
import com.ycwl.basic.mapper.MemberMapper;
import com.ycwl.basic.mapper.RenderWorkerMapper;
import com.ycwl.basic.mapper.SourceMapper;
import com.ycwl.basic.mapper.TaskMapper;
import com.ycwl.basic.mapper.TemplateMapper;
@@ -83,6 +83,8 @@ import java.util.stream.Collectors;
@Slf4j
@Service
public class TaskTaskServiceImpl implements TaskService {
private static final String WORKER_SELF_HOSTED_CACHE_KEY = "worker_self_hosted_scenic:%s";
private static final int CACHE_EXPIRE_MINUTES = 3;
@Autowired
private TaskMapper taskMapper;
@Autowired
@@ -121,13 +123,40 @@ public class TaskTaskServiceImpl implements TaskService {
private DeviceRepository deviceRepository;
@Autowired
private VideoReUploader videoReUploader;
@Autowired
private RedisTemplate<String, String> redisTemplate;
private RenderWorkerEntity getWorker(@NonNull WorkerAuthReqVo req) {
String accessKey = req.getAccessKey();
if (accessKey == null) {
return null;
}
return repository.getWorkerByAccessKey(accessKey);
RenderWorkerEntity worker = repository.getWorkerByAccessKey(accessKey);
if (worker == null) {
return null;
}
if (worker.getStatus() != 1) {
return null;
}
return worker;
}
private boolean isWorkerSelfHostedScenic(Long scenicId) {
String cacheKey = String.format(WORKER_SELF_HOSTED_CACHE_KEY, scenicId);
String cachedValue = redisTemplate.opsForValue().get(cacheKey);
if (cachedValue != null) {
return Boolean.parseBoolean(cachedValue);
}
// 缓存中没有,查询配置
ScenicConfigManager config = scenicRepository.getScenicConfigManager(scenicId);
boolean workerSelfHostedScenic = Boolean.TRUE.equals(config.getBoolean("worker_self_hosted"));
// 缓存结果,设置30分钟过期
redisTemplate.opsForValue().set(cacheKey, String.valueOf(workerSelfHostedScenic), CACHE_EXPIRE_MINUTES, TimeUnit.MINUTES);
return workerSelfHostedScenic;
}
@Override
@@ -136,7 +165,6 @@ public class TaskTaskServiceImpl implements TaskService {
if (worker == null) {
return null;
}
worker.setOnline(1);
worker.setName(null);
worker.setStatus(null);
// get status
@@ -162,21 +190,25 @@ public class TaskTaskServiceImpl implements TaskService {
} else {
updTemplateList = templateRepository.getAllEnabledTemplateList();
}
RenderWorkerConfigManager configManager = repository.getWorkerConfigManager(worker.getId());
try {
if (lock.tryLock(2, TimeUnit.SECONDS)) {
try {
List<TaskRespVO> taskList;
if (worker.getScenicOnly() != null) {
taskList = taskMapper.selectNotRunningByScenicList(worker.getScenicOnly());
if (Strings.isNotBlank(configManager.getString("scenic_only"))) {
taskList = taskMapper.selectNotRunningByScenicList(configManager.getString("scenic_only"));
} else {
taskList = taskMapper.selectNotRunning();
var _taskList = taskMapper.selectNotRunning();
taskList = _taskList.stream().filter(task -> {
boolean workerSelfHostedScenic = isWorkerSelfHostedScenic(task.getScenicId());
return !workerSelfHostedScenic;
}).limit(1).toList();
}
resp.setTasks(taskList);
resp.setTemplates(updTemplateList);
taskList.forEach(task -> {
taskMapper.assignToWorker(task.getId(), worker.getId());
videoTaskRepository.clearTaskCache(task.getId());
repository.clearCache(worker.getId());
});
} finally {
lock.unlock();
@@ -551,10 +583,11 @@ public class TaskTaskServiceImpl implements TaskService {
if (worker == null) {
return null;
}
RenderWorkerConfigManager config = repository.getWorkerConfigManager(worker.getId());
IStorageAdapter adapter;
try {
adapter = StorageFactory.get(worker.getStoreType());
adapter.loadConfig(JacksonUtil.parseObject(worker.getStoreConfigJson(), Map.class));
adapter = StorageFactory.get(config.getString("store_type"));
adapter.loadConfig(config.getObject("store_config_json", Map.class));
} catch (Exception e) {
adapter = scenicService.getScenicStorageAdapter(task.getScenicId());
}