refactor(render): 移除作业服务的降级功能并删除任务监控组件

- 移除了 RenderJobIntegrationService 中的 fallbackService 降级处理逻辑
- 直接调用 renderJobV2Client 客户端获取作业状态、播放列表信息、作业详情和作业片段
- 删除了 TaskWatchDog 组件及其相关的任务状态扫描和异常通知功能
- 移除了任务积压、失败任务和长时间运行任务的监控逻辑
- 清理了相关的通知计数器和异常恢复机制代码
This commit is contained in:
2026-01-25 00:29:06 +08:00
parent 7b4a2f3fe8
commit 1dc0754b7f
2 changed files with 9 additions and 204 deletions

View File

@@ -41,15 +41,8 @@ public class RenderJobIntegrationService {
*/
public JobStatusResponse getJobStatus(Long jobId) {
log.debug("获取作业状态, jobId: {}", jobId);
return fallbackService.executeWithFallback(
SERVICE_NAME,
"job:status:" + jobId,
() -> {
CommonResponse<JobStatusResponse> response = renderJobV2Client.getJobStatus(jobId);
return handleResponse(response, "获取作业状态失败");
},
JobStatusResponse.class
);
CommonResponse<JobStatusResponse> response = renderJobV2Client.getJobStatus(jobId);
return handleResponse(response, "获取作业状态失败");
}
/**
@@ -71,15 +64,8 @@ public class RenderJobIntegrationService {
*/
public PlaylistInfoDTO getPlaylistInfo(Long jobId) {
log.debug("获取播放列表信息, jobId: {}", jobId);
return fallbackService.executeWithFallback(
SERVICE_NAME,
"job:playlist-info:" + jobId,
() -> {
CommonResponse<PlaylistInfoDTO> response = renderJobV2Client.getPlaylistInfo(jobId);
return handleResponse(response, "获取播放列表信息失败");
},
PlaylistInfoDTO.class
);
CommonResponse<PlaylistInfoDTO> response = renderJobV2Client.getPlaylistInfo(jobId);
return handleResponse(response, "获取播放列表信息失败");
}
/**
@@ -124,15 +110,8 @@ public class RenderJobIntegrationService {
*/
public RenderJobV2DTO getJobDetail(Long jobId) {
log.debug("获取作业详情, jobId: {}", jobId);
return fallbackService.executeWithFallback(
SERVICE_NAME,
"job:detail:" + jobId,
() -> {
CommonResponse<RenderJobV2DTO> response = renderJobV2Client.getJobDetail(jobId);
return handleResponse(response, "获取作业详情失败");
},
RenderJobV2DTO.class
);
CommonResponse<RenderJobV2DTO> response = renderJobV2Client.getJobDetail(jobId);
return handleResponse(response, "获取作业详情失败");
}
/**
@@ -141,16 +120,9 @@ public class RenderJobIntegrationService {
@SuppressWarnings("unchecked")
public List<RenderJobSegmentV2DTO> getJobSegments(Long jobId) {
log.debug("获取作业片段列表, jobId: {}", jobId);
return fallbackService.executeWithFallback(
SERVICE_NAME,
"job:segments:" + jobId,
() -> {
CommonResponse<List<RenderJobSegmentV2DTO>> response =
renderJobV2Client.getJobSegments(jobId);
return handleResponse(response, "获取作业片段列表失败");
},
(Class<List<RenderJobSegmentV2DTO>>) (Class<?>) List.class
);
CommonResponse<List<RenderJobSegmentV2DTO>> response =
renderJobV2Client.getJobSegments(jobId);
return handleResponse(response, "获取作业片段列表失败");
}
// ==================== Helper Methods ====================

View File

@@ -1,167 +0,0 @@
package com.ycwl.basic.watchdog;
import com.ycwl.basic.integration.message.dto.ZtMessage;
import com.ycwl.basic.integration.message.service.ZtMessageProducerService;
import com.ycwl.basic.mapper.TaskMapper;
import com.ycwl.basic.model.pc.task.entity.TaskEntity;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Component
@Profile("prod")
public class TaskWatchDog {
@Autowired
private TaskMapper taskMapper;
@Autowired
private ZtMessageProducerService ztMessageProducerService;
// 异常通知计数器
private final Map<String, Integer> notificationCounters = new HashMap<>();
// 配置参数
private static final int MAX_NOTIFICATION_COUNT = 3; // 每种异常最多通知3次
// 异常类型标识
private static final String TASK_BACKLOG = "task_backlog";
private static final String FAILED_TASKS = "failed_tasks";
private static final String LONG_RUNNING_TASK_PREFIX = "long_running_task_"; // 长时间运行任务前缀
@Scheduled(fixedDelay = 1000 * 60L)
public void scanTaskStatus() {
List<TaskEntity> allNotRunningTaskList = taskMapper.selectAllNotRunning();
List<TaskEntity> allFailedTaskList = taskMapper.selectAllFailed();
List<TaskEntity> allRunningTaskList = taskMapper.selectAllRunning();
// 检查任务积压
checkTaskBacklog(allNotRunningTaskList);
// 检查失败任务
checkFailedTasks(allFailedTaskList);
// 检查长时间运行任务
checkLongRunningTasks(allRunningTaskList);
}
/**
* 检查任务积压
*/
private void checkTaskBacklog(List<TaskEntity> notRunningTasks) {
if (notRunningTasks.size() > 10) {
if (shouldSendNotification(TASK_BACKLOG)) {
String content = String.format("当前任务队列中存在超过10个未运行任务,请及时处理!未运行任务数量:%d", notRunningTasks.size());
sendNotification("任务堆积警告", content, TASK_BACKLOG);
}
} else {
// 异常已恢复,重置计数器
resetNotificationCounter(TASK_BACKLOG);
}
}
/**
* 检查失败任务
*/
private void checkFailedTasks(List<TaskEntity> failedTasks) {
if (failedTasks.size() > 5) {
if (shouldSendNotification(FAILED_TASKS)) {
String content = String.format("当前存在超过5个失败任务(status=3),请及时检查和处理!失败任务数量:%d", failedTasks.size());
sendNotification("任务失败警告", content, FAILED_TASKS);
}
} else {
// 异常已恢复,重置计数器
resetNotificationCounter(FAILED_TASKS);
}
}
/**
* 检查长时间运行任务
*/
private void checkLongRunningTasks(List<TaskEntity> runningTasks) {
Set<String> currentLongRunningTasks = new HashSet<>();
for (TaskEntity taskEntity : runningTasks) {
if (taskEntity.getStartTime() == null) {
continue;
}
// startTime已经过去3分钟了
if (System.currentTimeMillis() - taskEntity.getStartTime().getTime() > 1000 * 60 * 3) {
String taskKey = LONG_RUNNING_TASK_PREFIX + taskEntity.getId();
currentLongRunningTasks.add(taskKey);
if (shouldSendNotification(taskKey)) {
String content = String.format("当前【%s】渲染机的【%d】任务已超过3分钟未完成!",
taskEntity.getWorkerId(), taskEntity.getId());
sendNotification("长时间运行任务警告", content, taskKey);
}
}
}
// 清理已恢复正常的长时运行任务的计数器
cleanupLongRunningTaskCounters(currentLongRunningTasks);
}
/**
* 清理已恢复正常的长时运行任务的计数器
*/
private void cleanupLongRunningTaskCounters(Set<String> currentLongRunningTasks) {
Set<String> keysToRemove = new HashSet<>();
for (String key : notificationCounters.keySet()) {
if (key.startsWith(LONG_RUNNING_TASK_PREFIX)) {
if (!currentLongRunningTasks.contains(key)) {
keysToRemove.add(key);
}
}
}
// 移除已恢复任务的计数器
for (String key : keysToRemove) {
notificationCounters.remove(key);
}
}
/**
* 判断是否应该发送通知
*/
private boolean shouldSendNotification(String abnormalType) {
int count = notificationCounters.getOrDefault(abnormalType, 0);
return count < MAX_NOTIFICATION_COUNT;
}
/**
* 发送通知并更新计数器
*/
private void sendNotification(String title, String content, String abnormalType) {
ZtMessage ztMessage = ZtMessage.of(
"serverchan",
title,
content,
"system"
);
ztMessage.setSendReason("任务监控");
ztMessage.setSendBiz("系统监控");
ztMessageProducerService.send(ztMessage);
// 更新通知计数器
int currentCount = notificationCounters.getOrDefault(abnormalType, 0);
notificationCounters.put(abnormalType, currentCount + 1);
}
/**
* 重置通知计数器(异常恢复时调用)
*/
private void resetNotificationCounter(String abnormalType) {
notificationCounters.remove(abnormalType);
}
}