diff --git a/src/main/java/com/ycwl/basic/integration/render/service/RenderJobIntegrationService.java b/src/main/java/com/ycwl/basic/integration/render/service/RenderJobIntegrationService.java index 3218a036..562b8e9d 100644 --- a/src/main/java/com/ycwl/basic/integration/render/service/RenderJobIntegrationService.java +++ b/src/main/java/com/ycwl/basic/integration/render/service/RenderJobIntegrationService.java @@ -41,15 +41,8 @@ public class RenderJobIntegrationService { */ public JobStatusResponse getJobStatus(Long jobId) { log.debug("获取作业状态, jobId: {}", jobId); - return fallbackService.executeWithFallback( - SERVICE_NAME, - "job:status:" + jobId, - () -> { - CommonResponse response = renderJobV2Client.getJobStatus(jobId); - return handleResponse(response, "获取作业状态失败"); - }, - JobStatusResponse.class - ); + CommonResponse response = renderJobV2Client.getJobStatus(jobId); + return handleResponse(response, "获取作业状态失败"); } /** @@ -71,15 +64,8 @@ public class RenderJobIntegrationService { */ public PlaylistInfoDTO getPlaylistInfo(Long jobId) { log.debug("获取播放列表信息, jobId: {}", jobId); - return fallbackService.executeWithFallback( - SERVICE_NAME, - "job:playlist-info:" + jobId, - () -> { - CommonResponse response = renderJobV2Client.getPlaylistInfo(jobId); - return handleResponse(response, "获取播放列表信息失败"); - }, - PlaylistInfoDTO.class - ); + CommonResponse response = renderJobV2Client.getPlaylistInfo(jobId); + return handleResponse(response, "获取播放列表信息失败"); } /** @@ -124,15 +110,8 @@ public class RenderJobIntegrationService { */ public RenderJobV2DTO getJobDetail(Long jobId) { log.debug("获取作业详情, jobId: {}", jobId); - return fallbackService.executeWithFallback( - SERVICE_NAME, - "job:detail:" + jobId, - () -> { - CommonResponse response = renderJobV2Client.getJobDetail(jobId); - return handleResponse(response, "获取作业详情失败"); - }, - RenderJobV2DTO.class - ); + CommonResponse response = renderJobV2Client.getJobDetail(jobId); + return handleResponse(response, "获取作业详情失败"); } /** @@ -141,16 +120,9 @@ public class RenderJobIntegrationService { @SuppressWarnings("unchecked") public List getJobSegments(Long jobId) { log.debug("获取作业片段列表, jobId: {}", jobId); - return fallbackService.executeWithFallback( - SERVICE_NAME, - "job:segments:" + jobId, - () -> { - CommonResponse> response = - renderJobV2Client.getJobSegments(jobId); - return handleResponse(response, "获取作业片段列表失败"); - }, - (Class>) (Class) List.class - ); + CommonResponse> response = + renderJobV2Client.getJobSegments(jobId); + return handleResponse(response, "获取作业片段列表失败"); } // ==================== Helper Methods ==================== diff --git a/src/main/java/com/ycwl/basic/watchdog/TaskWatchDog.java b/src/main/java/com/ycwl/basic/watchdog/TaskWatchDog.java deleted file mode 100644 index 098cfd13..00000000 --- a/src/main/java/com/ycwl/basic/watchdog/TaskWatchDog.java +++ /dev/null @@ -1,167 +0,0 @@ -package com.ycwl.basic.watchdog; - -import com.ycwl.basic.integration.message.dto.ZtMessage; -import com.ycwl.basic.integration.message.service.ZtMessageProducerService; -import com.ycwl.basic.mapper.TaskMapper; -import com.ycwl.basic.model.pc.task.entity.TaskEntity; -import org.apache.commons.lang3.StringUtils; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Profile; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -@Component -@Profile("prod") -public class TaskWatchDog { - - @Autowired - private TaskMapper taskMapper; - - @Autowired - private ZtMessageProducerService ztMessageProducerService; - - // 异常通知计数器 - private final Map notificationCounters = new HashMap<>(); - - // 配置参数 - private static final int MAX_NOTIFICATION_COUNT = 3; // 每种异常最多通知3次 - - // 异常类型标识 - private static final String TASK_BACKLOG = "task_backlog"; - private static final String FAILED_TASKS = "failed_tasks"; - private static final String LONG_RUNNING_TASK_PREFIX = "long_running_task_"; // 长时间运行任务前缀 - - @Scheduled(fixedDelay = 1000 * 60L) - public void scanTaskStatus() { - List allNotRunningTaskList = taskMapper.selectAllNotRunning(); - List allFailedTaskList = taskMapper.selectAllFailed(); - List allRunningTaskList = taskMapper.selectAllRunning(); - - // 检查任务积压 - checkTaskBacklog(allNotRunningTaskList); - - // 检查失败任务 - checkFailedTasks(allFailedTaskList); - - // 检查长时间运行任务 - checkLongRunningTasks(allRunningTaskList); - } - - /** - * 检查任务积压 - */ - private void checkTaskBacklog(List notRunningTasks) { - if (notRunningTasks.size() > 10) { - if (shouldSendNotification(TASK_BACKLOG)) { - String content = String.format("当前任务队列中存在超过10个未运行任务,请及时处理!未运行任务数量:%d", notRunningTasks.size()); - sendNotification("任务堆积警告", content, TASK_BACKLOG); - } - } else { - // 异常已恢复,重置计数器 - resetNotificationCounter(TASK_BACKLOG); - } - } - - /** - * 检查失败任务 - */ - private void checkFailedTasks(List failedTasks) { - if (failedTasks.size() > 5) { - if (shouldSendNotification(FAILED_TASKS)) { - String content = String.format("当前存在超过5个失败任务(status=3),请及时检查和处理!失败任务数量:%d", failedTasks.size()); - sendNotification("任务失败警告", content, FAILED_TASKS); - } - } else { - // 异常已恢复,重置计数器 - resetNotificationCounter(FAILED_TASKS); - } - } - - /** - * 检查长时间运行任务 - */ - private void checkLongRunningTasks(List runningTasks) { - Set currentLongRunningTasks = new HashSet<>(); - - for (TaskEntity taskEntity : runningTasks) { - if (taskEntity.getStartTime() == null) { - continue; - } - // startTime已经过去3分钟了 - if (System.currentTimeMillis() - taskEntity.getStartTime().getTime() > 1000 * 60 * 3) { - String taskKey = LONG_RUNNING_TASK_PREFIX + taskEntity.getId(); - currentLongRunningTasks.add(taskKey); - - if (shouldSendNotification(taskKey)) { - String content = String.format("当前【%s】渲染机的【%d】任务已超过3分钟未完成!", - taskEntity.getWorkerId(), taskEntity.getId()); - sendNotification("长时间运行任务警告", content, taskKey); - } - } - } - - // 清理已恢复正常的长时运行任务的计数器 - cleanupLongRunningTaskCounters(currentLongRunningTasks); - } - - /** - * 清理已恢复正常的长时运行任务的计数器 - */ - private void cleanupLongRunningTaskCounters(Set currentLongRunningTasks) { - Set keysToRemove = new HashSet<>(); - - for (String key : notificationCounters.keySet()) { - if (key.startsWith(LONG_RUNNING_TASK_PREFIX)) { - if (!currentLongRunningTasks.contains(key)) { - keysToRemove.add(key); - } - } - } - - // 移除已恢复任务的计数器 - for (String key : keysToRemove) { - notificationCounters.remove(key); - } - } - - /** - * 判断是否应该发送通知 - */ - private boolean shouldSendNotification(String abnormalType) { - int count = notificationCounters.getOrDefault(abnormalType, 0); - return count < MAX_NOTIFICATION_COUNT; - } - - /** - * 发送通知并更新计数器 - */ - private void sendNotification(String title, String content, String abnormalType) { - ZtMessage ztMessage = ZtMessage.of( - "serverchan", - title, - content, - "system" - ); - ztMessage.setSendReason("任务监控"); - ztMessage.setSendBiz("系统监控"); - - ztMessageProducerService.send(ztMessage); - - // 更新通知计数器 - int currentCount = notificationCounters.getOrDefault(abnormalType, 0); - notificationCounters.put(abnormalType, currentCount + 1); - } - - /** - * 重置通知计数器(异常恢复时调用) - */ - private void resetNotificationCounter(String abnormalType) { - notificationCounters.remove(abnormalType); - } -}