From 1dc0754b7f4dc237e411dcd80a235af4f5d726b2 Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Sun, 25 Jan 2026 00:29:06 +0800 Subject: [PATCH] =?UTF-8?q?refactor(render):=20=E7=A7=BB=E9=99=A4=E4=BD=9C?= =?UTF-8?q?=E4=B8=9A=E6=9C=8D=E5=8A=A1=E7=9A=84=E9=99=8D=E7=BA=A7=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=E5=B9=B6=E5=88=A0=E9=99=A4=E4=BB=BB=E5=8A=A1=E7=9B=91?= =?UTF-8?q?=E6=8E=A7=E7=BB=84=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 移除了 RenderJobIntegrationService 中的 fallbackService 降级处理逻辑 - 直接调用 renderJobV2Client 客户端获取作业状态、播放列表信息、作业详情和作业片段 - 删除了 TaskWatchDog 组件及其相关的任务状态扫描和异常通知功能 - 移除了任务积压、失败任务和长时间运行任务的监控逻辑 - 清理了相关的通知计数器和异常恢复机制代码 --- .../service/RenderJobIntegrationService.java | 46 +---- .../com/ycwl/basic/watchdog/TaskWatchDog.java | 167 ------------------ 2 files changed, 9 insertions(+), 204 deletions(-) delete mode 100644 src/main/java/com/ycwl/basic/watchdog/TaskWatchDog.java diff --git a/src/main/java/com/ycwl/basic/integration/render/service/RenderJobIntegrationService.java b/src/main/java/com/ycwl/basic/integration/render/service/RenderJobIntegrationService.java index 3218a036..562b8e9d 100644 --- a/src/main/java/com/ycwl/basic/integration/render/service/RenderJobIntegrationService.java +++ b/src/main/java/com/ycwl/basic/integration/render/service/RenderJobIntegrationService.java @@ -41,15 +41,8 @@ public class RenderJobIntegrationService { */ public JobStatusResponse getJobStatus(Long jobId) { log.debug("获取作业状态, jobId: {}", jobId); - return fallbackService.executeWithFallback( - SERVICE_NAME, - "job:status:" + jobId, - () -> { - CommonResponse response = renderJobV2Client.getJobStatus(jobId); - return handleResponse(response, "获取作业状态失败"); - }, - JobStatusResponse.class - ); + CommonResponse response = renderJobV2Client.getJobStatus(jobId); + return handleResponse(response, "获取作业状态失败"); } /** @@ -71,15 +64,8 @@ public class RenderJobIntegrationService { */ public PlaylistInfoDTO getPlaylistInfo(Long jobId) { log.debug("获取播放列表信息, jobId: {}", jobId); - return fallbackService.executeWithFallback( - SERVICE_NAME, - "job:playlist-info:" + jobId, - () -> { - CommonResponse response = renderJobV2Client.getPlaylistInfo(jobId); - return handleResponse(response, "获取播放列表信息失败"); - }, - PlaylistInfoDTO.class - ); + CommonResponse response = renderJobV2Client.getPlaylistInfo(jobId); + return handleResponse(response, "获取播放列表信息失败"); } /** @@ -124,15 +110,8 @@ public class RenderJobIntegrationService { */ public RenderJobV2DTO getJobDetail(Long jobId) { log.debug("获取作业详情, jobId: {}", jobId); - return fallbackService.executeWithFallback( - SERVICE_NAME, - "job:detail:" + jobId, - () -> { - CommonResponse response = renderJobV2Client.getJobDetail(jobId); - return handleResponse(response, "获取作业详情失败"); - }, - RenderJobV2DTO.class - ); + CommonResponse response = renderJobV2Client.getJobDetail(jobId); + return handleResponse(response, "获取作业详情失败"); } /** @@ -141,16 +120,9 @@ public class RenderJobIntegrationService { @SuppressWarnings("unchecked") public List getJobSegments(Long jobId) { log.debug("获取作业片段列表, jobId: {}", jobId); - return fallbackService.executeWithFallback( - SERVICE_NAME, - "job:segments:" + jobId, - () -> { - CommonResponse> response = - renderJobV2Client.getJobSegments(jobId); - return handleResponse(response, "获取作业片段列表失败"); - }, - (Class>) (Class) List.class - ); + CommonResponse> response = + renderJobV2Client.getJobSegments(jobId); + return handleResponse(response, "获取作业片段列表失败"); } // ==================== Helper Methods ==================== diff --git a/src/main/java/com/ycwl/basic/watchdog/TaskWatchDog.java b/src/main/java/com/ycwl/basic/watchdog/TaskWatchDog.java deleted file mode 100644 index 098cfd13..00000000 --- a/src/main/java/com/ycwl/basic/watchdog/TaskWatchDog.java +++ /dev/null @@ -1,167 +0,0 @@ -package com.ycwl.basic.watchdog; - -import com.ycwl.basic.integration.message.dto.ZtMessage; -import com.ycwl.basic.integration.message.service.ZtMessageProducerService; -import com.ycwl.basic.mapper.TaskMapper; -import com.ycwl.basic.model.pc.task.entity.TaskEntity; -import org.apache.commons.lang3.StringUtils; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Profile; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -@Component -@Profile("prod") -public class TaskWatchDog { - - @Autowired - private TaskMapper taskMapper; - - @Autowired - private ZtMessageProducerService ztMessageProducerService; - - // 异常通知计数器 - private final Map notificationCounters = new HashMap<>(); - - // 配置参数 - private static final int MAX_NOTIFICATION_COUNT = 3; // 每种异常最多通知3次 - - // 异常类型标识 - private static final String TASK_BACKLOG = "task_backlog"; - private static final String FAILED_TASKS = "failed_tasks"; - private static final String LONG_RUNNING_TASK_PREFIX = "long_running_task_"; // 长时间运行任务前缀 - - @Scheduled(fixedDelay = 1000 * 60L) - public void scanTaskStatus() { - List allNotRunningTaskList = taskMapper.selectAllNotRunning(); - List allFailedTaskList = taskMapper.selectAllFailed(); - List allRunningTaskList = taskMapper.selectAllRunning(); - - // 检查任务积压 - checkTaskBacklog(allNotRunningTaskList); - - // 检查失败任务 - checkFailedTasks(allFailedTaskList); - - // 检查长时间运行任务 - checkLongRunningTasks(allRunningTaskList); - } - - /** - * 检查任务积压 - */ - private void checkTaskBacklog(List notRunningTasks) { - if (notRunningTasks.size() > 10) { - if (shouldSendNotification(TASK_BACKLOG)) { - String content = String.format("当前任务队列中存在超过10个未运行任务,请及时处理!未运行任务数量:%d", notRunningTasks.size()); - sendNotification("任务堆积警告", content, TASK_BACKLOG); - } - } else { - // 异常已恢复,重置计数器 - resetNotificationCounter(TASK_BACKLOG); - } - } - - /** - * 检查失败任务 - */ - private void checkFailedTasks(List failedTasks) { - if (failedTasks.size() > 5) { - if (shouldSendNotification(FAILED_TASKS)) { - String content = String.format("当前存在超过5个失败任务(status=3),请及时检查和处理!失败任务数量:%d", failedTasks.size()); - sendNotification("任务失败警告", content, FAILED_TASKS); - } - } else { - // 异常已恢复,重置计数器 - resetNotificationCounter(FAILED_TASKS); - } - } - - /** - * 检查长时间运行任务 - */ - private void checkLongRunningTasks(List runningTasks) { - Set currentLongRunningTasks = new HashSet<>(); - - for (TaskEntity taskEntity : runningTasks) { - if (taskEntity.getStartTime() == null) { - continue; - } - // startTime已经过去3分钟了 - if (System.currentTimeMillis() - taskEntity.getStartTime().getTime() > 1000 * 60 * 3) { - String taskKey = LONG_RUNNING_TASK_PREFIX + taskEntity.getId(); - currentLongRunningTasks.add(taskKey); - - if (shouldSendNotification(taskKey)) { - String content = String.format("当前【%s】渲染机的【%d】任务已超过3分钟未完成!", - taskEntity.getWorkerId(), taskEntity.getId()); - sendNotification("长时间运行任务警告", content, taskKey); - } - } - } - - // 清理已恢复正常的长时运行任务的计数器 - cleanupLongRunningTaskCounters(currentLongRunningTasks); - } - - /** - * 清理已恢复正常的长时运行任务的计数器 - */ - private void cleanupLongRunningTaskCounters(Set currentLongRunningTasks) { - Set keysToRemove = new HashSet<>(); - - for (String key : notificationCounters.keySet()) { - if (key.startsWith(LONG_RUNNING_TASK_PREFIX)) { - if (!currentLongRunningTasks.contains(key)) { - keysToRemove.add(key); - } - } - } - - // 移除已恢复任务的计数器 - for (String key : keysToRemove) { - notificationCounters.remove(key); - } - } - - /** - * 判断是否应该发送通知 - */ - private boolean shouldSendNotification(String abnormalType) { - int count = notificationCounters.getOrDefault(abnormalType, 0); - return count < MAX_NOTIFICATION_COUNT; - } - - /** - * 发送通知并更新计数器 - */ - private void sendNotification(String title, String content, String abnormalType) { - ZtMessage ztMessage = ZtMessage.of( - "serverchan", - title, - content, - "system" - ); - ztMessage.setSendReason("任务监控"); - ztMessage.setSendBiz("系统监控"); - - ztMessageProducerService.send(ztMessage); - - // 更新通知计数器 - int currentCount = notificationCounters.getOrDefault(abnormalType, 0); - notificationCounters.put(abnormalType, currentCount + 1); - } - - /** - * 重置通知计数器(异常恢复时调用) - */ - private void resetNotificationCounter(String abnormalType) { - notificationCounters.remove(abnormalType); - } -}