Merge branch 'render_next'

This commit is contained in:
2026-02-04 16:28:54 +08:00
9 changed files with 746 additions and 224 deletions

View File

@@ -47,6 +47,21 @@ public interface RenderJobV2Client {
@PostMapping("/jobs/{jobId}/cancel")
CommonResponse<Void> cancelJob(@PathVariable("jobId") Long jobId);
/**
* 创建FINALIZE_MP4任务
* 将所有已发布的TS片段合成为MP4文件
*
* 前置条件:
* 1. 作业存在且状态为RUNNING
* 2. 所有片段都已发布(PublishedCount == SegmentCount)
* 3. 不存在已有的FINALIZE_MP4任务
*
* @param jobId 作业ID
* @return 任务创建结果
*/
@PostMapping("/jobs/{jobId}/finalize-mp4")
CommonResponse<FinalizeMP4Response> createFinalizeMP4Task(@PathVariable("jobId") Long jobId);
// ==================== 管理端接口 ====================
/**

View File

@@ -0,0 +1,20 @@
package com.ycwl.basic.integration.render.dto.job;
import lombok.Data;
/**
* 创建FINALIZE_MP4任务响应
*/
@Data
public class FinalizeMP4Response {
/**
* 任务ID
*/
private Long taskId;
/**
* 任务状态
*/
private String status;
}

View File

@@ -41,15 +41,8 @@ public class RenderJobIntegrationService {
*/
public JobStatusResponse getJobStatus(Long jobId) {
log.debug("获取作业状态, jobId: {}", jobId);
return fallbackService.executeWithFallback(
SERVICE_NAME,
"job:status:" + jobId,
() -> {
CommonResponse<JobStatusResponse> response = renderJobV2Client.getJobStatus(jobId);
return handleResponse(response, "获取作业状态失败");
},
JobStatusResponse.class
);
}
/**
@@ -71,15 +64,8 @@ public class RenderJobIntegrationService {
*/
public PlaylistInfoDTO getPlaylistInfo(Long jobId) {
log.debug("获取播放列表信息, jobId: {}", jobId);
return fallbackService.executeWithFallback(
SERVICE_NAME,
"job:playlist-info:" + jobId,
() -> {
CommonResponse<PlaylistInfoDTO> response = renderJobV2Client.getPlaylistInfo(jobId);
return handleResponse(response, "获取播放列表信息失败");
},
PlaylistInfoDTO.class
);
}
/**
@@ -91,6 +77,20 @@ public class RenderJobIntegrationService {
handleVoidResponse(response, "取消作业失败");
}
/**
* 创建FINALIZE_MP4任务(直接调用,不降级)
* 将所有已发布的TS片段合成为MP4文件
*
* @param jobId 作业ID
* @return 任务创建结果
* @throws IntegrationException 当前置条件不满足时抛出异常
*/
public FinalizeMP4Response createFinalizeMP4Task(Long jobId) {
log.debug("创建FINALIZE_MP4任务, jobId: {}", jobId);
CommonResponse<FinalizeMP4Response> response = renderJobV2Client.createFinalizeMP4Task(jobId);
return handleResponse(response, "创建FINALIZE_MP4任务失败");
}
// ==================== 管理端接口 ====================
/**
@@ -110,15 +110,8 @@ public class RenderJobIntegrationService {
*/
public RenderJobV2DTO getJobDetail(Long jobId) {
log.debug("获取作业详情, jobId: {}", jobId);
return fallbackService.executeWithFallback(
SERVICE_NAME,
"job:detail:" + jobId,
() -> {
CommonResponse<RenderJobV2DTO> response = renderJobV2Client.getJobDetail(jobId);
return handleResponse(response, "获取作业详情失败");
},
RenderJobV2DTO.class
);
}
/**
@@ -127,16 +120,9 @@ public class RenderJobIntegrationService {
@SuppressWarnings("unchecked")
public List<RenderJobSegmentV2DTO> getJobSegments(Long jobId) {
log.debug("获取作业片段列表, jobId: {}", jobId);
return fallbackService.executeWithFallback(
SERVICE_NAME,
"job:segments:" + jobId,
() -> {
CommonResponse<List<RenderJobSegmentV2DTO>> response =
renderJobV2Client.getJobSegments(jobId);
return handleResponse(response, "获取作业片段列表失败");
},
(Class<List<RenderJobSegmentV2DTO>>) (Class<?>) List.class
);
}
// ==================== Helper Methods ====================

View File

@@ -0,0 +1,64 @@
package com.ycwl.basic.mapper.task;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.ycwl.basic.model.task.entity.TaskRenderJobMappingEntity;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.Date;
import java.util.List;
/**
* Task与RenderJob关联Mapper
*/
@Mapper
public interface TaskRenderJobMappingMapper extends BaseMapper<TaskRenderJobMappingEntity> {
/**
* 根据taskId查询mapping
*/
TaskRenderJobMappingEntity selectByTaskId(@Param("taskId") Long taskId);
/**
* 根据renderJobId查询mapping
*/
TaskRenderJobMappingEntity selectByRenderJobId(@Param("renderJobId") Long renderJobId);
/**
* 查询需要轮询的记录
* 条件:状态为PENDING或PREVIEW_READY,且最后检查时间超过指定间隔
*/
List<TaskRenderJobMappingEntity> selectPendingForPolling(
@Param("statuses") List<String> statuses,
@Param("checkIntervalSeconds") int checkIntervalSeconds,
@Param("limit") int limit
);
/**
* 更新渲染状态和片段信息
*/
int updateRenderStatus(
@Param("id") Long id,
@Param("renderStatus") String renderStatus,
@Param("publishedCount") Integer publishedCount,
@Param("segmentCount") Integer segmentCount,
@Param("previewUrl") String previewUrl,
@Param("mp4Url") String mp4Url,
@Param("lastCheckTime") Date lastCheckTime
);
/**
* 更新为失败状态
*/
int updateToFailed(
@Param("id") Long id,
@Param("errorCode") String errorCode,
@Param("errorMessage") String errorMessage,
@Param("lastCheckTime") Date lastCheckTime
);
/**
* 增加重试次数
*/
int incrementRetryCount(@Param("id") Long id);
}

View File

@@ -0,0 +1,98 @@
package com.ycwl.basic.model.task.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.util.Date;
/**
* Task与RenderJob关联实体
* 用于跟踪task和zt-render-worker服务中渲染作业的关联
*/
@Data
@TableName("task_render_job_mapping")
public class TaskRenderJobMappingEntity {
@TableId(type = IdType.AUTO)
private Long id;
/**
* 任务ID(task表的id)
*/
private Long taskId;
/**
* 渲染作业ID(zt-render-worker返回的jobId)
*/
private Long renderJobId;
/**
* 渲染状态
* PENDING-等待中, PREVIEW_READY-预览就绪, COMPLETED-已完成, FAILED-失败
*/
private String renderStatus;
/**
* 已发布片段数
*/
private Integer publishedCount;
/**
* 总片段数
*/
private Integer segmentCount;
/**
* 预览播放地址(HLS)
*/
private String previewUrl;
/**
* 最终MP4地址
*/
private String mp4Url;
/**
* 错误码
*/
private String errorCode;
/**
* 错误信息
*/
private String errorMessage;
/**
* 重试次数
*/
private Integer retryCount;
/**
* 最后检查时间
*/
private Date lastCheckTime;
private Date createTime;
private Date updateTime;
/**
* 渲染状态常量
*/
public static final String STATUS_PENDING = "PENDING";
public static final String STATUS_PREVIEW_READY = "PREVIEW_READY";
public static final String STATUS_MP4_COMPOSING = "MP4_COMPOSING";
public static final String STATUS_COMPLETED = "COMPLETED";
public static final String STATUS_FAILED = "FAILED";
/**
* 预览就绪所需的最小已发布片段数
*/
public static final int MIN_PUBLISHED_FOR_PREVIEW = 2;
/**
* 最大重试次数
*/
public static final int MAX_RETRY_COUNT = 10;
}

View File

@@ -0,0 +1,416 @@
package com.ycwl.basic.service.task;
import com.ycwl.basic.integration.render.dto.job.FinalizeMP4Response;
import com.ycwl.basic.integration.render.dto.job.JobStatusResponse;
import com.ycwl.basic.integration.render.service.RenderJobIntegrationService;
import com.ycwl.basic.mapper.TaskMapper;
import com.ycwl.basic.mapper.VideoMapper;
import com.ycwl.basic.mapper.task.TaskRenderJobMappingMapper;
import com.ycwl.basic.model.pc.task.entity.TaskEntity;
import com.ycwl.basic.model.pc.video.entity.VideoEntity;
import com.ycwl.basic.model.task.entity.TaskRenderJobMappingEntity;
import com.ycwl.basic.repository.MemberRelationRepository;
import com.ycwl.basic.repository.VideoRepository;
import com.ycwl.basic.repository.VideoTaskRepository;
import com.ycwl.basic.utils.SnowFlakeUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
/**
* 渲染作业轮询服务
* 定时查询zt-render-worker服务中的渲染作业状态,并更新本地task状态
*
* 状态流转:
* PENDING → PREVIEW_READY → MP4_COMPOSING → COMPLETED
* │ │ │ │
* └────────────┴──────────────┴──────────────┴──→ FAILED
*/
@Slf4j
@Service
@EnableScheduling
@RequiredArgsConstructor
@Profile({"dev", "prod"}) // 开发和生产环境启用
public class RenderJobPollingService {
private final TaskRenderJobMappingMapper mappingMapper;
private final RenderJobIntegrationService renderJobService;
private final TaskMapper taskMapper;
private final VideoMapper videoMapper;
private final VideoTaskRepository videoTaskRepository;
private final VideoRepository videoRepository;
private final MemberRelationRepository memberRelationRepository;
/**
* 定时轮询间隔:4秒
*/
private static final int POLL_INTERVAL_SECONDS = 4;
/**
* 每次查询的最大记录数
*/
private static final int BATCH_SIZE = 50;
/**
* 定时轮询渲染作业状态
* 每3秒执行一次
*/
@Scheduled(fixedDelay = 3000)
public void pollRenderJobs() {
try {
log.debug("[渲染轮询] 开始轮询渲染作业状态");
// 查询待轮询的记录(包含MP4_COMPOSING状态)
List<String> pendingStatuses = Arrays.asList(
TaskRenderJobMappingEntity.STATUS_PENDING,
TaskRenderJobMappingEntity.STATUS_PREVIEW_READY,
TaskRenderJobMappingEntity.STATUS_MP4_COMPOSING
);
List<TaskRenderJobMappingEntity> mappings = mappingMapper.selectPendingForPolling(
pendingStatuses,
POLL_INTERVAL_SECONDS,
BATCH_SIZE
);
if (mappings.isEmpty()) {
log.debug("[渲染轮询] 无待处理记录");
return;
}
log.info("[渲染轮询] 查询到 {} 条待处理记录", mappings.size());
// 处理每条记录
for (TaskRenderJobMappingEntity mapping : mappings) {
try {
processMapping(mapping);
} catch (Exception e) {
log.error("[渲染轮询] 处理失败, mappingId: {}, taskId: {}, renderJobId: {}, error: {}",
mapping.getId(), mapping.getTaskId(), mapping.getRenderJobId(), e.getMessage(), e);
handleProcessError(mapping, e);
}
}
log.debug("[渲染轮询] 轮询完成");
} catch (Exception e) {
log.error("[渲染轮询] 轮询异常", e);
}
}
/**
* 处理单条mapping记录
*/
@Transactional(rollbackFor = Exception.class)
public void processMapping(TaskRenderJobMappingEntity mapping) {
Long renderJobId = mapping.getRenderJobId();
Long taskId = mapping.getTaskId();
String currentStatus = mapping.getRenderStatus();
log.debug("[渲染轮询] 处理记录: mappingId={}, taskId={}, renderJobId={}, currentStatus={}",
mapping.getId(), taskId, renderJobId, currentStatus);
// 查询渲染作业状态
JobStatusResponse jobStatus;
try {
jobStatus = renderJobService.getJobStatus(renderJobId);
} catch (Exception e) {
log.warn("[渲染轮询] 查询作业状态失败, renderJobId: {}, error: {}", renderJobId, e.getMessage());
// 注:此处不调用incrementRetryCount,因为@Transactional会回滚
// 外层handleProcessError会负责增加重试次数
throw e;
}
// 检查作业状态
String status = jobStatus.getStatus();
Integer publishedCount = jobStatus.getPublishedCount();
Integer segmentCount = jobStatus.getSegmentCount();
String playUrl = jobStatus.getPlayUrl();
String mp4Url = jobStatus.getMp4Url();
log.info("[渲染轮询] 作业状态: taskId={}, status={}, publishedCount={}/{}, playUrl={}, mp4Url={}",
taskId, status, publishedCount, segmentCount, playUrl, mp4Url);
// 处理失败状态
if ("FAILED".equals(status) || "CANCELED".equals(status)) {
handleJobFailed(mapping, jobStatus);
return;
}
// 状态流转处理
switch (currentStatus) {
case TaskRenderJobMappingEntity.STATUS_PENDING:
handlePendingStatus(mapping, jobStatus, taskId);
break;
case TaskRenderJobMappingEntity.STATUS_PREVIEW_READY:
handlePreviewReadyStatus(mapping, jobStatus, taskId);
break;
case TaskRenderJobMappingEntity.STATUS_MP4_COMPOSING:
handleMp4ComposingStatus(mapping, jobStatus, taskId);
break;
default:
log.warn("[渲染轮询] 未知状态: {}", currentStatus);
}
}
/**
* 处理PENDING状态
* PENDING → PREVIEW_READY:当publishedCount >= 2时
*/
private void handlePendingStatus(TaskRenderJobMappingEntity mapping, JobStatusResponse jobStatus, Long taskId) {
Integer publishedCount = jobStatus.getPublishedCount();
Integer segmentCount = jobStatus.getSegmentCount();
String playUrl = jobStatus.getPlayUrl();
if (publishedCount != null && publishedCount >= TaskRenderJobMappingEntity.MIN_PUBLISHED_FOR_PREVIEW) {
log.info("[渲染轮询] 预览就绪: taskId={}, publishedCount={}/{}, playUrl={}",
taskId, publishedCount, segmentCount, playUrl);
// 更新mapping状态为PREVIEW_READY
updateMappingStatus(mapping.getId(), TaskRenderJobMappingEntity.STATUS_PREVIEW_READY,
publishedCount, segmentCount, playUrl, null);
// 更新task的videoUrl为预览地址
if (StringUtils.isNotBlank(playUrl)) {
TaskEntity task = new TaskEntity();
task.setId(taskId);
task.setVideoUrl(playUrl);
task.setStatus(1); // 设置为完成状态
taskMapper.update(task);
videoTaskRepository.clearTaskCache(taskId);
log.info("[渲染轮询] 已更新task预览URL和状态: taskId={}, playUrl={}, status=1", taskId, playUrl);
// 处理video记录(类似taskSuccess逻辑)
try {
handleVideoRecordForPreview(taskId, playUrl);
} catch (Exception e) {
log.warn("[渲染轮询] 处理video记录失败: taskId={}, error={}", taskId, e.getMessage(), e);
}
// 异步发送视频生成通知(仅记录日志,实际通知可能需要在MP4完成后)
log.info("[渲染轮询] 预览视频已就绪,可发送通知: taskId={}", taskId);
}
} else {
// 更新片段信息
updateMappingStatus(mapping.getId(), TaskRenderJobMappingEntity.STATUS_PENDING,
publishedCount, segmentCount, null, null);
}
}
/**
* 处理PREVIEW_READY状态
* PREVIEW_READY → MP4_COMPOSING:当所有片段都已发布时,调用finalize-mp4接口
*/
private void handlePreviewReadyStatus(TaskRenderJobMappingEntity mapping, JobStatusResponse jobStatus, Long taskId) {
Integer publishedCount = jobStatus.getPublishedCount();
Integer segmentCount = jobStatus.getSegmentCount();
String playUrl = jobStatus.getPlayUrl();
Long renderJobId = mapping.getRenderJobId();
// 检查是否所有片段都已发布
if (publishedCount != null && segmentCount != null && publishedCount.equals(segmentCount) && segmentCount > 0) {
log.info("[渲染轮询] 所有片段已发布,开始创建MP4合成任务: taskId={}, renderJobId={}, publishedCount={}/{}",
taskId, renderJobId, publishedCount, segmentCount);
try {
// 调用finalize-mp4接口创建MP4合成任务
FinalizeMP4Response response = renderJobService.createFinalizeMP4Task(renderJobId);
log.info("[渲染轮询] MP4合成任务创建成功: taskId={}, renderJobId={}, mp4TaskId={}, status={}",
taskId, renderJobId, response.getTaskId(), response.getStatus());
// 更新mapping状态为MP4_COMPOSING
updateMappingStatus(mapping.getId(), TaskRenderJobMappingEntity.STATUS_MP4_COMPOSING,
publishedCount, segmentCount, playUrl, null);
} catch (Exception e) {
// 409表示任务已存在,直接进入MP4_COMPOSING状态
if (e.getMessage() != null && e.getMessage().contains("409")) {
log.info("[渲染轮询] MP4合成任务已存在,继续等待: taskId={}, renderJobId={}", taskId, renderJobId);
updateMappingStatus(mapping.getId(), TaskRenderJobMappingEntity.STATUS_MP4_COMPOSING,
publishedCount, segmentCount, playUrl, null);
} else {
log.warn("[渲染轮询] 创建MP4合成任务失败: taskId={}, renderJobId={}, error={}",
taskId, renderJobId, e.getMessage());
// 不改变状态,下次轮询重试
updateMappingStatus(mapping.getId(), TaskRenderJobMappingEntity.STATUS_PREVIEW_READY,
publishedCount, segmentCount, playUrl, null);
}
}
} else {
// 更新片段信息
updateMappingStatus(mapping.getId(), TaskRenderJobMappingEntity.STATUS_PREVIEW_READY,
publishedCount, segmentCount, playUrl, null);
}
}
/**
* 处理MP4_COMPOSING状态
* MP4_COMPOSING → COMPLETED:当mp4Url有值时
*/
private void handleMp4ComposingStatus(TaskRenderJobMappingEntity mapping, JobStatusResponse jobStatus, Long taskId) {
Integer publishedCount = jobStatus.getPublishedCount();
Integer segmentCount = jobStatus.getSegmentCount();
String playUrl = jobStatus.getPlayUrl();
String mp4Url = jobStatus.getMp4Url();
if (StringUtils.isNotBlank(mp4Url)) {
log.info("[渲染轮询] MP4合成完成: taskId={}, mp4Url={}", taskId, mp4Url);
// 更新mapping状态为COMPLETED
updateMappingStatus(mapping.getId(), TaskRenderJobMappingEntity.STATUS_COMPLETED,
publishedCount, segmentCount, playUrl, mp4Url);
// 更新task的videoUrl为最终MP4地址
TaskEntity task = new TaskEntity();
task.setId(taskId);
task.setVideoUrl(mp4Url);
taskMapper.update(task);
videoTaskRepository.clearTaskCache(taskId);
log.info("[渲染轮询] 已更新task最终MP4 URL: taskId={}, mp4Url={}", taskId, mp4Url);
// 更新video记录的videoUrl为最终MP4地址
try {
handleVideoRecordForMP4(taskId, mp4Url);
} catch (Exception e) {
log.warn("[渲染轮询] 更新video的MP4 URL失败: taskId={}, error={}", taskId, e.getMessage(), e);
}
} else {
// MP4还在合成中,更新片段信息
log.debug("[渲染轮询] MP4合成中: taskId={}, 等待下次轮询", taskId);
updateMappingStatus(mapping.getId(), TaskRenderJobMappingEntity.STATUS_MP4_COMPOSING,
publishedCount, segmentCount, playUrl, null);
}
}
/**
* 处理作业失败
*/
private void handleJobFailed(TaskRenderJobMappingEntity mapping, JobStatusResponse jobStatus) {
String errorCode = jobStatus.getErrorCode();
String errorMessage = jobStatus.getErrorMessage();
log.warn("[渲染轮询] 作业失败: taskId={}, status={}, errorCode={}, errorMessage={}",
mapping.getTaskId(), jobStatus.getStatus(), errorCode, errorMessage);
mappingMapper.updateToFailed(
mapping.getId(),
errorCode,
errorMessage,
new Date()
);
}
/**
* 更新mapping状态
*/
private void updateMappingStatus(Long id, String renderStatus, Integer publishedCount,
Integer segmentCount, String previewUrl, String mp4Url) {
mappingMapper.updateRenderStatus(
id,
renderStatus,
publishedCount,
segmentCount,
previewUrl,
mp4Url,
new Date()
);
}
/**
* 处理异常
*/
private void handleProcessError(TaskRenderJobMappingEntity mapping, Exception e) {
try {
mappingMapper.incrementRetryCount(mapping.getId());
// 超过最大重试次数,标记为失败
if (mapping.getRetryCount() != null &&
mapping.getRetryCount() >= TaskRenderJobMappingEntity.MAX_RETRY_COUNT - 1) {
mappingMapper.updateToFailed(
mapping.getId(),
"MAX_RETRY",
"超过最大重试次数: " + e.getMessage(),
new Date()
);
}
} catch (Exception ex) {
log.error("[渲染轮询] 处理错误失败", ex);
}
}
/**
* 处理video记录(预览就绪时)
* 类似taskSuccess的逻辑,但简化版本
*/
private void handleVideoRecordForPreview(Long taskId, String videoUrl) {
try {
var taskResp = taskMapper.getById(taskId);
if (taskResp == null) {
log.warn("[渲染轮询] task不存在: taskId={}", taskId);
return;
}
VideoEntity video = videoMapper.findByTaskId(taskId);
if (video != null) {
// 更新已有video记录
video.setVideoUrl(videoUrl);
videoMapper.update(video);
videoRepository.clearVideoCache(video.getId());
log.info("[渲染轮询] 已更新video预览URL: taskId={}, videoId={}, videoUrl={}",
taskId, video.getId(), videoUrl);
} else {
// 创建新video记录
video = new VideoEntity();
video.setId(SnowFlakeUtil.getLongId());
video.setScenicId(taskResp.getScenicId());
video.setTemplateId(taskResp.getTemplateId());
video.setTaskId(taskId);
video.setFaceId(taskResp.getFaceId());
video.setVideoUrl(videoUrl);
video.setCreateTime(new Date());
videoMapper.add(video);
log.info("[渲染轮询] 已创建video预览记录: taskId={}, videoId={}, videoUrl={}",
taskId, video.getId(), videoUrl);
}
// 更新member_video关联表(isBuy=0,预览阶段未购买)
videoMapper.updateRelationWhenTaskSuccess(taskId, video.getId(), 0);
memberRelationRepository.clearVCacheByFace(taskResp.getFaceId());
log.info("[渲染轮询] 已更新member_video关联: taskId={}, videoId={}", taskId, video.getId());
} catch (Exception e) {
log.error("[渲染轮询] 处理video记录失败: taskId={}", taskId, e);
throw e;
}
}
/**
* 更新video记录的MP4地址(MP4合成完成时)
*/
private void handleVideoRecordForMP4(Long taskId, String mp4Url) {
try {
VideoEntity video = videoMapper.findByTaskId(taskId);
if (video != null) {
video.setVideoUrl(mp4Url);
videoMapper.update(video);
videoRepository.clearVideoCache(video.getId());
log.info("[渲染轮询] 已更新video最终MP4 URL: taskId={}, videoId={}, mp4Url={}",
taskId, video.getId(), mp4Url);
} else {
log.warn("[渲染轮询] video不存在,无法更新MP4 URL: taskId={}", taskId);
}
} catch (Exception e) {
log.error("[渲染轮询] 更新video的MP4 URL失败: taskId={}", taskId, e);
throw e;
}
}
}

View File

@@ -13,6 +13,8 @@ import com.ycwl.basic.integration.render.dto.job.CreatePreviewRequest;
import com.ycwl.basic.integration.render.dto.job.CreatePreviewResponse;
import com.ycwl.basic.integration.render.dto.job.MaterialDTO;
import com.ycwl.basic.integration.render.service.RenderJobIntegrationService;
import com.ycwl.basic.mapper.task.TaskRenderJobMappingMapper;
import com.ycwl.basic.model.task.entity.TaskRenderJobMappingEntity;
import com.ycwl.basic.repository.MemberRelationRepository;
import com.ycwl.basic.repository.SourceRepository;
import com.ycwl.basic.utils.JacksonUtil;
@@ -69,6 +71,7 @@ import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
@@ -124,6 +127,8 @@ public class TaskTaskServiceImpl implements TaskService {
private FaceStatusManager faceStatusManager;
@Autowired
private RenderJobIntegrationService renderJobIntegrationService;
@Autowired
private TaskRenderJobMappingMapper taskRenderJobMappingMapper;
private RenderWorkerEntity getWorker(@NonNull WorkerAuthReqVo req) {
String accessKey = req.getAccessKey();
@@ -224,26 +229,11 @@ public class TaskTaskServiceImpl implements TaskService {
} else {
updTemplateList = templateRepository.getAllEnabledTemplateList();
}
RenderWorkerConfigManager configManager = repository.getWorkerConfigManager(worker.getId());
try {
if (lock.tryLock(2, TimeUnit.SECONDS)) {
try {
List<TaskRespVO> taskList;
if (Strings.isNotBlank(configManager.getString("scenic_only"))) {
taskList = taskMapper.selectNotRunningByScenicList(configManager.getString("scenic_only"));
} else {
var _taskList = taskMapper.selectNotRunning();
taskList = _taskList.stream().filter(task -> {
boolean workerSelfHostedScenic = isWorkerSelfHostedScenic(task.getScenicId());
return !workerSelfHostedScenic;
}).limit(1).toList();
}
resp.setTasks(taskList);
resp.setTasks(Collections.emptyList());
resp.setTemplates(updTemplateList);
taskList.forEach(task -> {
taskMapper.assignToWorker(task.getId(), worker.getId());
videoTaskRepository.clearTaskCache(task.getId());
});
} finally {
lock.unlock();
}
@@ -552,10 +542,6 @@ public class TaskTaskServiceImpl implements TaskService {
videoMapper.add(video);
}
ScenicConfigManager scenicConfig = scenicRepository.getScenicConfigManager(task.getScenicId());
IStorageAdapter adapter = scenicService.getScenicTmpStorageAdapter(task.getScenicId());
String hash = MD5.create().digestHex(task.getTaskParams() + task.getFaceId().toString());
String filename = StorageUtil.joinPath(StorageConstant.VLOG_PATH, task.getTemplateId().toString() + "_" + hash + "_" + task.getScenicId() + ".mp4");
adapter.setAcl(StorageAcl.PUBLIC_READ, filename);
int isBuy = 0;
FaceEntity face = faceRepository.getFace(task.getFaceId());
if (face != null) {
@@ -729,6 +715,21 @@ public class TaskTaskServiceImpl implements TaskService {
CreatePreviewResponse response = renderJobIntegrationService.createPreview(request);
log.info("[灰度测试] 渲染预览任务创建成功, taskId: {}, renderJobId: {}, playUrl: {}",
taskId, response.getJobId(), response.getPlayUrl());
// 写入mapping表,供轮询服务处理
try {
TaskRenderJobMappingEntity mapping = new TaskRenderJobMappingEntity();
mapping.setTaskId(taskId);
mapping.setRenderJobId(response.getJobId());
mapping.setRenderStatus(TaskRenderJobMappingEntity.STATUS_PENDING);
mapping.setPublishedCount(0);
mapping.setSegmentCount(0);
mapping.setRetryCount(0);
taskRenderJobMappingMapper.insert(mapping);
log.info("[灰度测试] 写入mapping成功, taskId: {}, renderJobId: {}", taskId, response.getJobId());
} catch (Exception ex) {
log.warn("[灰度测试] 写入mapping失败,不影响主流程, taskId: {}, error: {}", taskId, ex.getMessage());
}
} catch (Exception e) {
// 灰度测试:不管返回什么或者报错,都不影响现有流程
log.warn("[灰度测试] 渲染预览任务创建失败,不影响主流程, taskId: {}, templateId: {}, error: {}",

View File

@@ -1,167 +0,0 @@
package com.ycwl.basic.watchdog;
import com.ycwl.basic.integration.message.dto.ZtMessage;
import com.ycwl.basic.integration.message.service.ZtMessageProducerService;
import com.ycwl.basic.mapper.TaskMapper;
import com.ycwl.basic.model.pc.task.entity.TaskEntity;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Component
@Profile("prod")
public class TaskWatchDog {
@Autowired
private TaskMapper taskMapper;
@Autowired
private ZtMessageProducerService ztMessageProducerService;
// 异常通知计数器
private final Map<String, Integer> notificationCounters = new HashMap<>();
// 配置参数
private static final int MAX_NOTIFICATION_COUNT = 3; // 每种异常最多通知3次
// 异常类型标识
private static final String TASK_BACKLOG = "task_backlog";
private static final String FAILED_TASKS = "failed_tasks";
private static final String LONG_RUNNING_TASK_PREFIX = "long_running_task_"; // 长时间运行任务前缀
@Scheduled(fixedDelay = 1000 * 60L)
public void scanTaskStatus() {
List<TaskEntity> allNotRunningTaskList = taskMapper.selectAllNotRunning();
List<TaskEntity> allFailedTaskList = taskMapper.selectAllFailed();
List<TaskEntity> allRunningTaskList = taskMapper.selectAllRunning();
// 检查任务积压
checkTaskBacklog(allNotRunningTaskList);
// 检查失败任务
checkFailedTasks(allFailedTaskList);
// 检查长时间运行任务
checkLongRunningTasks(allRunningTaskList);
}
/**
* 检查任务积压
*/
private void checkTaskBacklog(List<TaskEntity> notRunningTasks) {
if (notRunningTasks.size() > 10) {
if (shouldSendNotification(TASK_BACKLOG)) {
String content = String.format("当前任务队列中存在超过10个未运行任务,请及时处理!未运行任务数量:%d", notRunningTasks.size());
sendNotification("任务堆积警告", content, TASK_BACKLOG);
}
} else {
// 异常已恢复,重置计数器
resetNotificationCounter(TASK_BACKLOG);
}
}
/**
* 检查失败任务
*/
private void checkFailedTasks(List<TaskEntity> failedTasks) {
if (failedTasks.size() > 5) {
if (shouldSendNotification(FAILED_TASKS)) {
String content = String.format("当前存在超过5个失败任务(status=3),请及时检查和处理!失败任务数量:%d", failedTasks.size());
sendNotification("任务失败警告", content, FAILED_TASKS);
}
} else {
// 异常已恢复,重置计数器
resetNotificationCounter(FAILED_TASKS);
}
}
/**
* 检查长时间运行任务
*/
private void checkLongRunningTasks(List<TaskEntity> runningTasks) {
Set<String> currentLongRunningTasks = new HashSet<>();
for (TaskEntity taskEntity : runningTasks) {
if (taskEntity.getStartTime() == null) {
continue;
}
// startTime已经过去3分钟了
if (System.currentTimeMillis() - taskEntity.getStartTime().getTime() > 1000 * 60 * 3) {
String taskKey = LONG_RUNNING_TASK_PREFIX + taskEntity.getId();
currentLongRunningTasks.add(taskKey);
if (shouldSendNotification(taskKey)) {
String content = String.format("当前【%s】渲染机的【%d】任务已超过3分钟未完成!",
taskEntity.getWorkerId(), taskEntity.getId());
sendNotification("长时间运行任务警告", content, taskKey);
}
}
}
// 清理已恢复正常的长时运行任务的计数器
cleanupLongRunningTaskCounters(currentLongRunningTasks);
}
/**
* 清理已恢复正常的长时运行任务的计数器
*/
private void cleanupLongRunningTaskCounters(Set<String> currentLongRunningTasks) {
Set<String> keysToRemove = new HashSet<>();
for (String key : notificationCounters.keySet()) {
if (key.startsWith(LONG_RUNNING_TASK_PREFIX)) {
if (!currentLongRunningTasks.contains(key)) {
keysToRemove.add(key);
}
}
}
// 移除已恢复任务的计数器
for (String key : keysToRemove) {
notificationCounters.remove(key);
}
}
/**
* 判断是否应该发送通知
*/
private boolean shouldSendNotification(String abnormalType) {
int count = notificationCounters.getOrDefault(abnormalType, 0);
return count < MAX_NOTIFICATION_COUNT;
}
/**
* 发送通知并更新计数器
*/
private void sendNotification(String title, String content, String abnormalType) {
ZtMessage ztMessage = ZtMessage.of(
"serverchan",
title,
content,
"system"
);
ztMessage.setSendReason("任务监控");
ztMessage.setSendBiz("系统监控");
ztMessageProducerService.send(ztMessage);
// 更新通知计数器
int currentCount = notificationCounters.getOrDefault(abnormalType, 0);
notificationCounters.put(abnormalType, currentCount + 1);
}
/**
* 重置通知计数器(异常恢复时调用)
*/
private void resetNotificationCounter(String abnormalType) {
notificationCounters.remove(abnormalType);
}
}

View File

@@ -0,0 +1,89 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.ycwl.basic.mapper.task.TaskRenderJobMappingMapper">
<resultMap id="BaseResultMap" type="com.ycwl.basic.model.task.entity.TaskRenderJobMappingEntity">
<id column="id" property="id"/>
<result column="task_id" property="taskId"/>
<result column="render_job_id" property="renderJobId"/>
<result column="render_status" property="renderStatus"/>
<result column="published_count" property="publishedCount"/>
<result column="segment_count" property="segmentCount"/>
<result column="preview_url" property="previewUrl"/>
<result column="mp4_url" property="mp4Url"/>
<result column="error_code" property="errorCode"/>
<result column="error_message" property="errorMessage"/>
<result column="retry_count" property="retryCount"/>
<result column="last_check_time" property="lastCheckTime"/>
<result column="create_time" property="createTime"/>
<result column="update_time" property="updateTime"/>
</resultMap>
<sql id="Base_Column_List">
id, task_id, render_job_id, render_status, published_count, segment_count,
preview_url, mp4_url, error_code, error_message, retry_count, last_check_time,
create_time, update_time
</sql>
<select id="selectByTaskId" resultMap="BaseResultMap">
SELECT <include refid="Base_Column_List"/>
FROM task_render_job_mapping
WHERE task_id = #{taskId}
</select>
<select id="selectByRenderJobId" resultMap="BaseResultMap">
SELECT <include refid="Base_Column_List"/>
FROM task_render_job_mapping
WHERE render_job_id = #{renderJobId}
</select>
<select id="selectPendingForPolling" resultMap="BaseResultMap">
SELECT <include refid="Base_Column_List"/>
FROM task_render_job_mapping
WHERE render_status IN
<foreach collection="statuses" item="status" open="(" separator="," close=")">
#{status}
</foreach>
AND (
last_check_time IS NULL
OR last_check_time &lt; DATE_SUB(NOW(), INTERVAL #{checkIntervalSeconds} SECOND)
)
AND retry_count &lt; 10
ORDER BY last_check_time ASC, create_time ASC
LIMIT #{limit}
</select>
<update id="updateRenderStatus">
UPDATE task_render_job_mapping
SET render_status = #{renderStatus},
published_count = #{publishedCount},
segment_count = #{segmentCount},
<if test="previewUrl != null">
preview_url = #{previewUrl},
</if>
<if test="mp4Url != null">
mp4_url = #{mp4Url},
</if>
last_check_time = #{lastCheckTime},
update_time = NOW()
WHERE id = #{id}
</update>
<update id="updateToFailed">
UPDATE task_render_job_mapping
SET render_status = 'FAILED',
error_code = #{errorCode},
error_message = #{errorMessage},
last_check_time = #{lastCheckTime},
update_time = NOW()
WHERE id = #{id}
</update>
<update id="incrementRetryCount">
UPDATE task_render_job_mapping
SET retry_count = retry_count + 1,
update_time = NOW()
WHERE id = #{id}
</update>
</mapper>