feat(task): 添加版本校验和任务重分配功能

- 实现版本号比较方法,支持版本号大小判断
- 添加客户端版本校验逻辑,防止低版本上报覆盖高版本缓存
- 增加任务重分配功能,在更新旧任务时解除任务分配
- 修复worker状态处理中的版本冲突问题
This commit is contained in:
2026-01-05 11:54:07 +08:00
parent 43775f550b
commit 8d3dae32f3

View File

@@ -146,6 +146,34 @@ public class TaskTaskServiceImpl implements TaskService {
return worker; return worker;
} }
/**
* 比较两个版本号
* @param v1 版本号1
* @param v2 版本号2
* @return 负数表示 v1 < v2,0 表示相等,正数表示 v1 > v2
*/
private int compareVersion(String v1, String v2) {
String[] parts1 = v1.split("\\.");
String[] parts2 = v2.split("\\.");
int maxLen = Math.max(parts1.length, parts2.length);
for (int i = 0; i < maxLen; i++) {
int num1 = i < parts1.length ? parseVersionPart(parts1[i]) : 0;
int num2 = i < parts2.length ? parseVersionPart(parts2[i]) : 0;
if (num1 != num2) {
return num1 - num2;
}
}
return 0;
}
private int parseVersionPart(String part) {
try {
return Integer.parseInt(part.replaceAll("[^0-9]", ""));
} catch (NumberFormatException e) {
return 0;
}
}
private boolean isWorkerSelfHostedScenic(Long scenicId) { private boolean isWorkerSelfHostedScenic(Long scenicId) {
String cacheKey = String.format(WORKER_SELF_HOSTED_CACHE_KEY, scenicId); String cacheKey = String.format(WORKER_SELF_HOSTED_CACHE_KEY, scenicId);
String cachedValue = redisTemplate.opsForValue().get(cacheKey); String cachedValue = redisTemplate.opsForValue().get(cacheKey);
@@ -174,6 +202,13 @@ public class TaskTaskServiceImpl implements TaskService {
worker.setStatus(null); worker.setStatus(null);
// get status // get status
ClientStatusReqVo clientStatus = req.getClientStatus(); ClientStatusReqVo clientStatus = req.getClientStatus();
// 版本校验:上报版本低于缓存版本时认为 worker 异常
ClientStatusReqVo cachedStatus = repository.getWorkerHostStatus(worker.getId());
if (cachedStatus != null && clientStatus != null
&& cachedStatus.getVersion() != null && clientStatus.getVersion() != null
&& compareVersion(clientStatus.getVersion(), cachedStatus.getVersion()) < 0) {
return null;
}
repository.setWorkerHostStatus(worker.getId(), clientStatus); repository.setWorkerHostStatus(worker.getId(), clientStatus);
TaskSyncRespVo resp = new TaskSyncRespVo(); TaskSyncRespVo resp = new TaskSyncRespVo();
// Template // Template
@@ -428,11 +463,11 @@ public class TaskTaskServiceImpl implements TaskService {
taskEntity.setTemplateId(templateId); taskEntity.setTemplateId(templateId);
taskEntity.setAutomatic(automatic ? 1 : 0); taskEntity.setAutomatic(automatic ? 1 : 0);
} }
taskEntity.setWorkerId(null);
taskEntity.setStatus(0); taskEntity.setStatus(0);
taskEntity.setTaskParams(JacksonUtil.toJSONString(sourcesMap)); taskEntity.setTaskParams(JacksonUtil.toJSONString(sourcesMap));
if (isReuseOldTask) { if (isReuseOldTask) {
taskMapper.update(taskEntity); taskMapper.update(taskEntity);
taskMapper.deassign(taskEntity.getId());
log.info("更新旧任务! taskId:{}", taskEntity.getId()); log.info("更新旧任务! taskId:{}", taskEntity.getId());
} else { } else {
taskMapper.add(taskEntity); taskMapper.add(taskEntity);