package com.ycwl.basic.watchdog; import com.ycwl.basic.integration.message.dto.ZtMessage; import com.ycwl.basic.integration.message.service.ZtMessageProducerService; import com.ycwl.basic.mapper.TaskMapper; import com.ycwl.basic.model.pc.task.entity.TaskEntity; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Profile; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @Component @Profile("prod") public class TaskWatchDog { @Autowired private TaskMapper taskMapper; @Autowired private ZtMessageProducerService ztMessageProducerService; // 异常通知计数器 private final Map notificationCounters = new HashMap<>(); // 配置参数 private static final int MAX_NOTIFICATION_COUNT = 3; // 每种异常最多通知3次 // 异常类型标识 private static final String TASK_BACKLOG = "task_backlog"; private static final String FAILED_TASKS = "failed_tasks"; private static final String LONG_RUNNING_TASK_PREFIX = "long_running_task_"; // 长时间运行任务前缀 @Scheduled(fixedDelay = 1000 * 60L) public void scanTaskStatus() { List allNotRunningTaskList = taskMapper.selectAllNotRunning(); List allFailedTaskList = taskMapper.selectAllFailed(); List allRunningTaskList = taskMapper.selectAllRunning(); // 检查任务积压 checkTaskBacklog(allNotRunningTaskList); // 检查失败任务 checkFailedTasks(allFailedTaskList); // 检查长时间运行任务 checkLongRunningTasks(allRunningTaskList); } /** * 检查任务积压 */ private void checkTaskBacklog(List notRunningTasks) { if (notRunningTasks.size() > 10) { if (shouldSendNotification(TASK_BACKLOG)) { String content = String.format("当前任务队列中存在超过10个未运行任务,请及时处理!未运行任务数量:%d", notRunningTasks.size()); sendNotification("任务堆积警告", content, TASK_BACKLOG); } } else { // 异常已恢复,重置计数器 resetNotificationCounter(TASK_BACKLOG); } } /** * 检查失败任务 */ private void checkFailedTasks(List failedTasks) { if (failedTasks.size() > 5) { if (shouldSendNotification(FAILED_TASKS)) { String content = String.format("当前存在超过5个失败任务(status=3),请及时检查和处理!失败任务数量:%d", failedTasks.size()); sendNotification("任务失败警告", content, FAILED_TASKS); } } else { // 异常已恢复,重置计数器 resetNotificationCounter(FAILED_TASKS); } } /** * 检查长时间运行任务 */ private void checkLongRunningTasks(List runningTasks) { Set currentLongRunningTasks = new HashSet<>(); for (TaskEntity taskEntity : runningTasks) { if (taskEntity.getStartTime() == null) { continue; } // startTime已经过去3分钟了 if (System.currentTimeMillis() - taskEntity.getStartTime().getTime() > 1000 * 60 * 3) { String taskKey = LONG_RUNNING_TASK_PREFIX + taskEntity.getId(); currentLongRunningTasks.add(taskKey); if (shouldSendNotification(taskKey)) { String content = String.format("当前【%s】渲染机的【%d】任务已超过3分钟未完成!", taskEntity.getWorkerId(), taskEntity.getId()); sendNotification("长时间运行任务警告", content, taskKey); } } } // 清理已恢复正常的长时运行任务的计数器 cleanupLongRunningTaskCounters(currentLongRunningTasks); } /** * 清理已恢复正常的长时运行任务的计数器 */ private void cleanupLongRunningTaskCounters(Set currentLongRunningTasks) { Set keysToRemove = new HashSet<>(); for (String key : notificationCounters.keySet()) { if (key.startsWith(LONG_RUNNING_TASK_PREFIX)) { if (!currentLongRunningTasks.contains(key)) { keysToRemove.add(key); } } } // 移除已恢复任务的计数器 for (String key : keysToRemove) { notificationCounters.remove(key); } } /** * 判断是否应该发送通知 */ private boolean shouldSendNotification(String abnormalType) { int count = notificationCounters.getOrDefault(abnormalType, 0); return count < MAX_NOTIFICATION_COUNT; } /** * 发送通知并更新计数器 */ private void sendNotification(String title, String content, String abnormalType) { ZtMessage ztMessage = ZtMessage.of( "serverchan", title, content, "system" ); ztMessage.setSendReason("任务监控"); ztMessage.setSendBiz("系统监控"); ztMessageProducerService.send(ztMessage); // 更新通知计数器 int currentCount = notificationCounters.getOrDefault(abnormalType, 0); notificationCounters.put(abnormalType, currentCount + 1); } /** * 重置通知计数器(异常恢复时调用) */ private void resetNotificationCounter(String abnormalType) { notificationCounters.remove(abnormalType); } }