You've already forked FrameTour-BE
feat(watchdog): 增强任务监控告警机制
- 引入ZtMessageProducerService实现消息通知 - 添加任务积压、失败任务和长时间运行任务的分类监控 - 实现异常通知计数器,限制重复告警次数 -优化告警逻辑,支持异常恢复后计数器重置 - 移除旧的通知工厂依赖,统一使用消息队列发送 - 增加长时任务监控的清理机制,避免无效计数累积
This commit is contained in:
@@ -1,16 +1,20 @@
|
|||||||
package com.ycwl.basic.watchdog;
|
package com.ycwl.basic.watchdog;
|
||||||
|
|
||||||
|
import com.ycwl.basic.integration.message.dto.ZtMessage;
|
||||||
|
import com.ycwl.basic.integration.message.service.ZtMessageProducerService;
|
||||||
import com.ycwl.basic.mapper.TaskMapper;
|
import com.ycwl.basic.mapper.TaskMapper;
|
||||||
import com.ycwl.basic.model.pc.task.entity.TaskEntity;
|
import com.ycwl.basic.model.pc.task.entity.TaskEntity;
|
||||||
import com.ycwl.basic.notify.NotifyFactory;
|
|
||||||
import com.ycwl.basic.notify.entity.NotifyContent;
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.context.annotation.Profile;
|
import org.springframework.context.annotation.Profile;
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
@Component
|
@Component
|
||||||
@Profile("prod")
|
@Profile("prod")
|
||||||
@@ -19,41 +23,145 @@ public class TaskWatchDog {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private TaskMapper taskMapper;
|
private TaskMapper taskMapper;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private ZtMessageProducerService ztMessageProducerService;
|
||||||
|
|
||||||
|
// 异常通知计数器
|
||||||
|
private final Map<String, Integer> notificationCounters = new HashMap<>();
|
||||||
|
|
||||||
|
// 配置参数
|
||||||
|
private static final int MAX_NOTIFICATION_COUNT = 3; // 每种异常最多通知3次
|
||||||
|
|
||||||
|
// 异常类型标识
|
||||||
|
private static final String TASK_BACKLOG = "task_backlog";
|
||||||
|
private static final String FAILED_TASKS = "failed_tasks";
|
||||||
|
private static final String LONG_RUNNING_TASK_PREFIX = "long_running_task_"; // 长时间运行任务前缀
|
||||||
|
|
||||||
@Scheduled(fixedDelay = 1000 * 60L)
|
@Scheduled(fixedDelay = 1000 * 60L)
|
||||||
public void scanTaskStatus() {
|
public void scanTaskStatus() {
|
||||||
List<TaskEntity> allNotRunningTaskList = taskMapper.selectAllNotRunning();
|
List<TaskEntity> allNotRunningTaskList = taskMapper.selectAllNotRunning();
|
||||||
String title = "任务堆积警告!";
|
|
||||||
StringBuilder content = new StringBuilder();
|
|
||||||
if (allNotRunningTaskList.size() > 10) {
|
|
||||||
content.append("当前任务队列中存在超过10个未运行任务,请及时处理!未运行任务数量:").append(allNotRunningTaskList.size());
|
|
||||||
}
|
|
||||||
|
|
||||||
List<TaskEntity> allFailedTaskList = taskMapper.selectAllFailed();
|
List<TaskEntity> allFailedTaskList = taskMapper.selectAllFailed();
|
||||||
if (allFailedTaskList.size() > 5) {
|
List<TaskEntity> allRunningTaskList = taskMapper.selectAllRunning();
|
||||||
if (content.length() > 0) {
|
|
||||||
content.append("\n");
|
// 检查任务积压
|
||||||
}
|
checkTaskBacklog(allNotRunningTaskList);
|
||||||
content.append("当前存在超过5个失败任务(status=3),请及时检查和处理!失败任务数量:").append(allFailedTaskList.size());
|
|
||||||
|
// 检查失败任务
|
||||||
|
checkFailedTasks(allFailedTaskList);
|
||||||
|
|
||||||
|
// 检查长时间运行任务
|
||||||
|
checkLongRunningTasks(allRunningTaskList);
|
||||||
}
|
}
|
||||||
|
|
||||||
List<TaskEntity> allRunningTaskList = taskMapper.selectAllRunning();
|
/**
|
||||||
for (TaskEntity taskEntity : allRunningTaskList) {
|
* 检查任务积压
|
||||||
|
*/
|
||||||
|
private void checkTaskBacklog(List<TaskEntity> notRunningTasks) {
|
||||||
|
if (notRunningTasks.size() > 10) {
|
||||||
|
if (shouldSendNotification(TASK_BACKLOG)) {
|
||||||
|
String content = String.format("当前任务队列中存在超过10个未运行任务,请及时处理!未运行任务数量:%d", notRunningTasks.size());
|
||||||
|
sendNotification("任务堆积警告", content, TASK_BACKLOG);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// 异常已恢复,重置计数器
|
||||||
|
resetNotificationCounter(TASK_BACKLOG);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 检查失败任务
|
||||||
|
*/
|
||||||
|
private void checkFailedTasks(List<TaskEntity> failedTasks) {
|
||||||
|
if (failedTasks.size() > 5) {
|
||||||
|
if (shouldSendNotification(FAILED_TASKS)) {
|
||||||
|
String content = String.format("当前存在超过5个失败任务(status=3),请及时检查和处理!失败任务数量:%d", failedTasks.size());
|
||||||
|
sendNotification("任务失败警告", content, FAILED_TASKS);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// 异常已恢复,重置计数器
|
||||||
|
resetNotificationCounter(FAILED_TASKS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 检查长时间运行任务
|
||||||
|
*/
|
||||||
|
private void checkLongRunningTasks(List<TaskEntity> runningTasks) {
|
||||||
|
Set<String> currentLongRunningTasks = new HashSet<>();
|
||||||
|
|
||||||
|
for (TaskEntity taskEntity : runningTasks) {
|
||||||
if (taskEntity.getStartTime() == null) {
|
if (taskEntity.getStartTime() == null) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// startTime已经过去3分钟了
|
// startTime已经过去3分钟了
|
||||||
if (System.currentTimeMillis() - taskEntity.getStartTime().getTime() > 1000 * 60 * 3) {
|
if (System.currentTimeMillis() - taskEntity.getStartTime().getTime() > 1000 * 60 * 3) {
|
||||||
if (content.length() > 0) {
|
String taskKey = LONG_RUNNING_TASK_PREFIX + taskEntity.getId();
|
||||||
content.append("\n");
|
currentLongRunningTasks.add(taskKey);
|
||||||
}
|
|
||||||
content.append("当前【").append(taskEntity.getWorkerId()).append("】渲染机的【").append(taskEntity.getId()).append("】任务已超过3分钟未完成!");
|
if (shouldSendNotification(taskKey)) {
|
||||||
|
String content = String.format("当前【%s】渲染机的【%d】任务已超过3分钟未完成!",
|
||||||
|
taskEntity.getWorkerId(), taskEntity.getId());
|
||||||
|
sendNotification("长时间运行任务警告", content, taskKey);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (StringUtils.isNotBlank(content)) {
|
}
|
||||||
NotifyFactory.via().sendTo(
|
|
||||||
new NotifyContent(title, content.toString()),
|
// 清理已恢复正常的长时运行任务的计数器
|
||||||
"default_user"
|
cleanupLongRunningTaskCounters(currentLongRunningTasks);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 清理已恢复正常的长时运行任务的计数器
|
||||||
|
*/
|
||||||
|
private void cleanupLongRunningTaskCounters(Set<String> currentLongRunningTasks) {
|
||||||
|
Set<String> keysToRemove = new HashSet<>();
|
||||||
|
|
||||||
|
for (String key : notificationCounters.keySet()) {
|
||||||
|
if (key.startsWith(LONG_RUNNING_TASK_PREFIX)) {
|
||||||
|
if (!currentLongRunningTasks.contains(key)) {
|
||||||
|
keysToRemove.add(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 移除已恢复任务的计数器
|
||||||
|
for (String key : keysToRemove) {
|
||||||
|
notificationCounters.remove(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 判断是否应该发送通知
|
||||||
|
*/
|
||||||
|
private boolean shouldSendNotification(String abnormalType) {
|
||||||
|
int count = notificationCounters.getOrDefault(abnormalType, 0);
|
||||||
|
return count < MAX_NOTIFICATION_COUNT;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 发送通知并更新计数器
|
||||||
|
*/
|
||||||
|
private void sendNotification(String title, String content, String abnormalType) {
|
||||||
|
ZtMessage ztMessage = ZtMessage.of(
|
||||||
|
"serverchan",
|
||||||
|
title,
|
||||||
|
content,
|
||||||
|
"system"
|
||||||
);
|
);
|
||||||
}
|
ztMessage.setSendReason("任务监控");
|
||||||
|
ztMessage.setSendBiz("系统监控");
|
||||||
|
|
||||||
|
ztMessageProducerService.send(ztMessage);
|
||||||
|
|
||||||
|
// 更新通知计数器
|
||||||
|
int currentCount = notificationCounters.getOrDefault(abnormalType, 0);
|
||||||
|
notificationCounters.put(abnormalType, currentCount + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 重置通知计数器(异常恢复时调用)
|
||||||
|
*/
|
||||||
|
private void resetNotificationCounter(String abnormalType) {
|
||||||
|
notificationCounters.remove(abnormalType);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user