diff --git a/src/main/java/com/ycwl/basic/integration/render/client/RenderJobV2Client.java b/src/main/java/com/ycwl/basic/integration/render/client/RenderJobV2Client.java index c8b1e19d..6ffe16ab 100644 --- a/src/main/java/com/ycwl/basic/integration/render/client/RenderJobV2Client.java +++ b/src/main/java/com/ycwl/basic/integration/render/client/RenderJobV2Client.java @@ -47,6 +47,21 @@ public interface RenderJobV2Client { @PostMapping("/jobs/{jobId}/cancel") CommonResponse cancelJob(@PathVariable("jobId") Long jobId); + /** + * 创建FINALIZE_MP4任务 + * 将所有已发布的TS片段合成为MP4文件 + * + * 前置条件: + * 1. 作业存在且状态为RUNNING + * 2. 所有片段都已发布(PublishedCount == SegmentCount) + * 3. 不存在已有的FINALIZE_MP4任务 + * + * @param jobId 作业ID + * @return 任务创建结果 + */ + @PostMapping("/jobs/{jobId}/finalize-mp4") + CommonResponse createFinalizeMP4Task(@PathVariable("jobId") Long jobId); + // ==================== 管理端接口 ==================== /** diff --git a/src/main/java/com/ycwl/basic/integration/render/dto/job/FinalizeMP4Response.java b/src/main/java/com/ycwl/basic/integration/render/dto/job/FinalizeMP4Response.java new file mode 100644 index 00000000..821a6baa --- /dev/null +++ b/src/main/java/com/ycwl/basic/integration/render/dto/job/FinalizeMP4Response.java @@ -0,0 +1,20 @@ +package com.ycwl.basic.integration.render.dto.job; + +import lombok.Data; + +/** + * 创建FINALIZE_MP4任务响应 + */ +@Data +public class FinalizeMP4Response { + + /** + * 任务ID + */ + private Long taskId; + + /** + * 任务状态 + */ + private String status; +} diff --git a/src/main/java/com/ycwl/basic/integration/render/service/RenderJobIntegrationService.java b/src/main/java/com/ycwl/basic/integration/render/service/RenderJobIntegrationService.java index d37652f3..562b8e9d 100644 --- a/src/main/java/com/ycwl/basic/integration/render/service/RenderJobIntegrationService.java +++ b/src/main/java/com/ycwl/basic/integration/render/service/RenderJobIntegrationService.java @@ -41,15 +41,8 @@ public class RenderJobIntegrationService { */ public JobStatusResponse getJobStatus(Long jobId) { log.debug("获取作业状态, jobId: {}", jobId); - return fallbackService.executeWithFallback( - SERVICE_NAME, - "job:status:" + jobId, - () -> { - CommonResponse response = renderJobV2Client.getJobStatus(jobId); - return handleResponse(response, "获取作业状态失败"); - }, - JobStatusResponse.class - ); + CommonResponse response = renderJobV2Client.getJobStatus(jobId); + return handleResponse(response, "获取作业状态失败"); } /** @@ -71,15 +64,8 @@ public class RenderJobIntegrationService { */ public PlaylistInfoDTO getPlaylistInfo(Long jobId) { log.debug("获取播放列表信息, jobId: {}", jobId); - return fallbackService.executeWithFallback( - SERVICE_NAME, - "job:playlist-info:" + jobId, - () -> { - CommonResponse response = renderJobV2Client.getPlaylistInfo(jobId); - return handleResponse(response, "获取播放列表信息失败"); - }, - PlaylistInfoDTO.class - ); + CommonResponse response = renderJobV2Client.getPlaylistInfo(jobId); + return handleResponse(response, "获取播放列表信息失败"); } /** @@ -91,6 +77,20 @@ public class RenderJobIntegrationService { handleVoidResponse(response, "取消作业失败"); } + /** + * 创建FINALIZE_MP4任务(直接调用,不降级) + * 将所有已发布的TS片段合成为MP4文件 + * + * @param jobId 作业ID + * @return 任务创建结果 + * @throws IntegrationException 当前置条件不满足时抛出异常 + */ + public FinalizeMP4Response createFinalizeMP4Task(Long jobId) { + log.debug("创建FINALIZE_MP4任务, jobId: {}", jobId); + CommonResponse response = renderJobV2Client.createFinalizeMP4Task(jobId); + return handleResponse(response, "创建FINALIZE_MP4任务失败"); + } + // ==================== 管理端接口 ==================== /** @@ -110,15 +110,8 @@ public class RenderJobIntegrationService { */ public RenderJobV2DTO getJobDetail(Long jobId) { log.debug("获取作业详情, jobId: {}", jobId); - return fallbackService.executeWithFallback( - SERVICE_NAME, - "job:detail:" + jobId, - () -> { - CommonResponse response = renderJobV2Client.getJobDetail(jobId); - return handleResponse(response, "获取作业详情失败"); - }, - RenderJobV2DTO.class - ); + CommonResponse response = renderJobV2Client.getJobDetail(jobId); + return handleResponse(response, "获取作业详情失败"); } /** @@ -127,16 +120,9 @@ public class RenderJobIntegrationService { @SuppressWarnings("unchecked") public List getJobSegments(Long jobId) { log.debug("获取作业片段列表, jobId: {}", jobId); - return fallbackService.executeWithFallback( - SERVICE_NAME, - "job:segments:" + jobId, - () -> { - CommonResponse> response = - renderJobV2Client.getJobSegments(jobId); - return handleResponse(response, "获取作业片段列表失败"); - }, - (Class>) (Class) List.class - ); + CommonResponse> response = + renderJobV2Client.getJobSegments(jobId); + return handleResponse(response, "获取作业片段列表失败"); } // ==================== Helper Methods ==================== diff --git a/src/main/java/com/ycwl/basic/mapper/task/TaskRenderJobMappingMapper.java b/src/main/java/com/ycwl/basic/mapper/task/TaskRenderJobMappingMapper.java new file mode 100644 index 00000000..1bed4dd2 --- /dev/null +++ b/src/main/java/com/ycwl/basic/mapper/task/TaskRenderJobMappingMapper.java @@ -0,0 +1,64 @@ +package com.ycwl.basic.mapper.task; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.ycwl.basic.model.task.entity.TaskRenderJobMappingEntity; +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; + +import java.util.Date; +import java.util.List; + +/** + * Task与RenderJob关联Mapper + */ +@Mapper +public interface TaskRenderJobMappingMapper extends BaseMapper { + + /** + * 根据taskId查询mapping + */ + TaskRenderJobMappingEntity selectByTaskId(@Param("taskId") Long taskId); + + /** + * 根据renderJobId查询mapping + */ + TaskRenderJobMappingEntity selectByRenderJobId(@Param("renderJobId") Long renderJobId); + + /** + * 查询需要轮询的记录 + * 条件:状态为PENDING或PREVIEW_READY,且最后检查时间超过指定间隔 + */ + List selectPendingForPolling( + @Param("statuses") List statuses, + @Param("checkIntervalSeconds") int checkIntervalSeconds, + @Param("limit") int limit + ); + + /** + * 更新渲染状态和片段信息 + */ + int updateRenderStatus( + @Param("id") Long id, + @Param("renderStatus") String renderStatus, + @Param("publishedCount") Integer publishedCount, + @Param("segmentCount") Integer segmentCount, + @Param("previewUrl") String previewUrl, + @Param("mp4Url") String mp4Url, + @Param("lastCheckTime") Date lastCheckTime + ); + + /** + * 更新为失败状态 + */ + int updateToFailed( + @Param("id") Long id, + @Param("errorCode") String errorCode, + @Param("errorMessage") String errorMessage, + @Param("lastCheckTime") Date lastCheckTime + ); + + /** + * 增加重试次数 + */ + int incrementRetryCount(@Param("id") Long id); +} diff --git a/src/main/java/com/ycwl/basic/model/task/entity/TaskRenderJobMappingEntity.java b/src/main/java/com/ycwl/basic/model/task/entity/TaskRenderJobMappingEntity.java new file mode 100644 index 00000000..e83fd07a --- /dev/null +++ b/src/main/java/com/ycwl/basic/model/task/entity/TaskRenderJobMappingEntity.java @@ -0,0 +1,98 @@ +package com.ycwl.basic.model.task.entity; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +import java.util.Date; + +/** + * Task与RenderJob关联实体 + * 用于跟踪task和zt-render-worker服务中渲染作业的关联 + */ +@Data +@TableName("task_render_job_mapping") +public class TaskRenderJobMappingEntity { + + @TableId(type = IdType.AUTO) + private Long id; + + /** + * 任务ID(task表的id) + */ + private Long taskId; + + /** + * 渲染作业ID(zt-render-worker返回的jobId) + */ + private Long renderJobId; + + /** + * 渲染状态 + * PENDING-等待中, PREVIEW_READY-预览就绪, COMPLETED-已完成, FAILED-失败 + */ + private String renderStatus; + + /** + * 已发布片段数 + */ + private Integer publishedCount; + + /** + * 总片段数 + */ + private Integer segmentCount; + + /** + * 预览播放地址(HLS) + */ + private String previewUrl; + + /** + * 最终MP4地址 + */ + private String mp4Url; + + /** + * 错误码 + */ + private String errorCode; + + /** + * 错误信息 + */ + private String errorMessage; + + /** + * 重试次数 + */ + private Integer retryCount; + + /** + * 最后检查时间 + */ + private Date lastCheckTime; + + private Date createTime; + private Date updateTime; + + /** + * 渲染状态常量 + */ + public static final String STATUS_PENDING = "PENDING"; + public static final String STATUS_PREVIEW_READY = "PREVIEW_READY"; + public static final String STATUS_MP4_COMPOSING = "MP4_COMPOSING"; + public static final String STATUS_COMPLETED = "COMPLETED"; + public static final String STATUS_FAILED = "FAILED"; + + /** + * 预览就绪所需的最小已发布片段数 + */ + public static final int MIN_PUBLISHED_FOR_PREVIEW = 2; + + /** + * 最大重试次数 + */ + public static final int MAX_RETRY_COUNT = 10; +} diff --git a/src/main/java/com/ycwl/basic/service/task/RenderJobPollingService.java b/src/main/java/com/ycwl/basic/service/task/RenderJobPollingService.java new file mode 100644 index 00000000..8444443f --- /dev/null +++ b/src/main/java/com/ycwl/basic/service/task/RenderJobPollingService.java @@ -0,0 +1,416 @@ +package com.ycwl.basic.service.task; + +import com.ycwl.basic.integration.render.dto.job.FinalizeMP4Response; +import com.ycwl.basic.integration.render.dto.job.JobStatusResponse; +import com.ycwl.basic.integration.render.service.RenderJobIntegrationService; +import com.ycwl.basic.mapper.TaskMapper; +import com.ycwl.basic.mapper.VideoMapper; +import com.ycwl.basic.mapper.task.TaskRenderJobMappingMapper; +import com.ycwl.basic.model.pc.task.entity.TaskEntity; +import com.ycwl.basic.model.pc.video.entity.VideoEntity; +import com.ycwl.basic.model.task.entity.TaskRenderJobMappingEntity; +import com.ycwl.basic.repository.MemberRelationRepository; +import com.ycwl.basic.repository.VideoRepository; +import com.ycwl.basic.repository.VideoTaskRepository; +import com.ycwl.basic.utils.SnowFlakeUtil; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.context.annotation.Profile; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.util.Arrays; +import java.util.Date; +import java.util.List; + +/** + * 渲染作业轮询服务 + * 定时查询zt-render-worker服务中的渲染作业状态,并更新本地task状态 + * + * 状态流转: + * PENDING → PREVIEW_READY → MP4_COMPOSING → COMPLETED + * │ │ │ │ + * └────────────┴──────────────┴──────────────┴──→ FAILED + */ +@Slf4j +@Service +@EnableScheduling +@RequiredArgsConstructor +@Profile({"dev", "prod"}) // 开发和生产环境启用 +public class RenderJobPollingService { + + private final TaskRenderJobMappingMapper mappingMapper; + private final RenderJobIntegrationService renderJobService; + private final TaskMapper taskMapper; + private final VideoMapper videoMapper; + private final VideoTaskRepository videoTaskRepository; + private final VideoRepository videoRepository; + private final MemberRelationRepository memberRelationRepository; + + /** + * 定时轮询间隔:4秒 + */ + private static final int POLL_INTERVAL_SECONDS = 4; + + /** + * 每次查询的最大记录数 + */ + private static final int BATCH_SIZE = 50; + + /** + * 定时轮询渲染作业状态 + * 每3秒执行一次 + */ + @Scheduled(fixedDelay = 3000) + public void pollRenderJobs() { + try { + log.debug("[渲染轮询] 开始轮询渲染作业状态"); + + // 查询待轮询的记录(包含MP4_COMPOSING状态) + List pendingStatuses = Arrays.asList( + TaskRenderJobMappingEntity.STATUS_PENDING, + TaskRenderJobMappingEntity.STATUS_PREVIEW_READY, + TaskRenderJobMappingEntity.STATUS_MP4_COMPOSING + ); + + List mappings = mappingMapper.selectPendingForPolling( + pendingStatuses, + POLL_INTERVAL_SECONDS, + BATCH_SIZE + ); + + if (mappings.isEmpty()) { + log.debug("[渲染轮询] 无待处理记录"); + return; + } + + log.info("[渲染轮询] 查询到 {} 条待处理记录", mappings.size()); + + // 处理每条记录 + for (TaskRenderJobMappingEntity mapping : mappings) { + try { + processMapping(mapping); + } catch (Exception e) { + log.error("[渲染轮询] 处理失败, mappingId: {}, taskId: {}, renderJobId: {}, error: {}", + mapping.getId(), mapping.getTaskId(), mapping.getRenderJobId(), e.getMessage(), e); + handleProcessError(mapping, e); + } + } + + log.debug("[渲染轮询] 轮询完成"); + + } catch (Exception e) { + log.error("[渲染轮询] 轮询异常", e); + } + } + + /** + * 处理单条mapping记录 + */ + @Transactional(rollbackFor = Exception.class) + public void processMapping(TaskRenderJobMappingEntity mapping) { + Long renderJobId = mapping.getRenderJobId(); + Long taskId = mapping.getTaskId(); + String currentStatus = mapping.getRenderStatus(); + + log.debug("[渲染轮询] 处理记录: mappingId={}, taskId={}, renderJobId={}, currentStatus={}", + mapping.getId(), taskId, renderJobId, currentStatus); + + // 查询渲染作业状态 + JobStatusResponse jobStatus; + try { + jobStatus = renderJobService.getJobStatus(renderJobId); + } catch (Exception e) { + log.warn("[渲染轮询] 查询作业状态失败, renderJobId: {}, error: {}", renderJobId, e.getMessage()); + // 注:此处不调用incrementRetryCount,因为@Transactional会回滚 + // 外层handleProcessError会负责增加重试次数 + throw e; + } + + // 检查作业状态 + String status = jobStatus.getStatus(); + Integer publishedCount = jobStatus.getPublishedCount(); + Integer segmentCount = jobStatus.getSegmentCount(); + String playUrl = jobStatus.getPlayUrl(); + String mp4Url = jobStatus.getMp4Url(); + + log.info("[渲染轮询] 作业状态: taskId={}, status={}, publishedCount={}/{}, playUrl={}, mp4Url={}", + taskId, status, publishedCount, segmentCount, playUrl, mp4Url); + + // 处理失败状态 + if ("FAILED".equals(status) || "CANCELED".equals(status)) { + handleJobFailed(mapping, jobStatus); + return; + } + + // 状态流转处理 + switch (currentStatus) { + case TaskRenderJobMappingEntity.STATUS_PENDING: + handlePendingStatus(mapping, jobStatus, taskId); + break; + case TaskRenderJobMappingEntity.STATUS_PREVIEW_READY: + handlePreviewReadyStatus(mapping, jobStatus, taskId); + break; + case TaskRenderJobMappingEntity.STATUS_MP4_COMPOSING: + handleMp4ComposingStatus(mapping, jobStatus, taskId); + break; + default: + log.warn("[渲染轮询] 未知状态: {}", currentStatus); + } + } + + /** + * 处理PENDING状态 + * PENDING → PREVIEW_READY:当publishedCount >= 2时 + */ + private void handlePendingStatus(TaskRenderJobMappingEntity mapping, JobStatusResponse jobStatus, Long taskId) { + Integer publishedCount = jobStatus.getPublishedCount(); + Integer segmentCount = jobStatus.getSegmentCount(); + String playUrl = jobStatus.getPlayUrl(); + + if (publishedCount != null && publishedCount >= TaskRenderJobMappingEntity.MIN_PUBLISHED_FOR_PREVIEW) { + log.info("[渲染轮询] 预览就绪: taskId={}, publishedCount={}/{}, playUrl={}", + taskId, publishedCount, segmentCount, playUrl); + + // 更新mapping状态为PREVIEW_READY + updateMappingStatus(mapping.getId(), TaskRenderJobMappingEntity.STATUS_PREVIEW_READY, + publishedCount, segmentCount, playUrl, null); + + // 更新task的videoUrl为预览地址 + if (StringUtils.isNotBlank(playUrl)) { + TaskEntity task = new TaskEntity(); + task.setId(taskId); + task.setVideoUrl(playUrl); + task.setStatus(1); // 设置为完成状态 + taskMapper.update(task); + videoTaskRepository.clearTaskCache(taskId); + log.info("[渲染轮询] 已更新task预览URL和状态: taskId={}, playUrl={}, status=1", taskId, playUrl); + + // 处理video记录(类似taskSuccess逻辑) + try { + handleVideoRecordForPreview(taskId, playUrl); + } catch (Exception e) { + log.warn("[渲染轮询] 处理video记录失败: taskId={}, error={}", taskId, e.getMessage(), e); + } + + // 异步发送视频生成通知(仅记录日志,实际通知可能需要在MP4完成后) + log.info("[渲染轮询] 预览视频已就绪,可发送通知: taskId={}", taskId); + } + } else { + // 更新片段信息 + updateMappingStatus(mapping.getId(), TaskRenderJobMappingEntity.STATUS_PENDING, + publishedCount, segmentCount, null, null); + } + } + + /** + * 处理PREVIEW_READY状态 + * PREVIEW_READY → MP4_COMPOSING:当所有片段都已发布时,调用finalize-mp4接口 + */ + private void handlePreviewReadyStatus(TaskRenderJobMappingEntity mapping, JobStatusResponse jobStatus, Long taskId) { + Integer publishedCount = jobStatus.getPublishedCount(); + Integer segmentCount = jobStatus.getSegmentCount(); + String playUrl = jobStatus.getPlayUrl(); + Long renderJobId = mapping.getRenderJobId(); + + // 检查是否所有片段都已发布 + if (publishedCount != null && segmentCount != null && publishedCount.equals(segmentCount) && segmentCount > 0) { + log.info("[渲染轮询] 所有片段已发布,开始创建MP4合成任务: taskId={}, renderJobId={}, publishedCount={}/{}", + taskId, renderJobId, publishedCount, segmentCount); + + try { + // 调用finalize-mp4接口创建MP4合成任务 + FinalizeMP4Response response = renderJobService.createFinalizeMP4Task(renderJobId); + log.info("[渲染轮询] MP4合成任务创建成功: taskId={}, renderJobId={}, mp4TaskId={}, status={}", + taskId, renderJobId, response.getTaskId(), response.getStatus()); + + // 更新mapping状态为MP4_COMPOSING + updateMappingStatus(mapping.getId(), TaskRenderJobMappingEntity.STATUS_MP4_COMPOSING, + publishedCount, segmentCount, playUrl, null); + + } catch (Exception e) { + // 409表示任务已存在,直接进入MP4_COMPOSING状态 + if (e.getMessage() != null && e.getMessage().contains("409")) { + log.info("[渲染轮询] MP4合成任务已存在,继续等待: taskId={}, renderJobId={}", taskId, renderJobId); + updateMappingStatus(mapping.getId(), TaskRenderJobMappingEntity.STATUS_MP4_COMPOSING, + publishedCount, segmentCount, playUrl, null); + } else { + log.warn("[渲染轮询] 创建MP4合成任务失败: taskId={}, renderJobId={}, error={}", + taskId, renderJobId, e.getMessage()); + // 不改变状态,下次轮询重试 + updateMappingStatus(mapping.getId(), TaskRenderJobMappingEntity.STATUS_PREVIEW_READY, + publishedCount, segmentCount, playUrl, null); + } + } + } else { + // 更新片段信息 + updateMappingStatus(mapping.getId(), TaskRenderJobMappingEntity.STATUS_PREVIEW_READY, + publishedCount, segmentCount, playUrl, null); + } + } + + /** + * 处理MP4_COMPOSING状态 + * MP4_COMPOSING → COMPLETED:当mp4Url有值时 + */ + private void handleMp4ComposingStatus(TaskRenderJobMappingEntity mapping, JobStatusResponse jobStatus, Long taskId) { + Integer publishedCount = jobStatus.getPublishedCount(); + Integer segmentCount = jobStatus.getSegmentCount(); + String playUrl = jobStatus.getPlayUrl(); + String mp4Url = jobStatus.getMp4Url(); + + if (StringUtils.isNotBlank(mp4Url)) { + log.info("[渲染轮询] MP4合成完成: taskId={}, mp4Url={}", taskId, mp4Url); + + // 更新mapping状态为COMPLETED + updateMappingStatus(mapping.getId(), TaskRenderJobMappingEntity.STATUS_COMPLETED, + publishedCount, segmentCount, playUrl, mp4Url); + + // 更新task的videoUrl为最终MP4地址 + TaskEntity task = new TaskEntity(); + task.setId(taskId); + task.setVideoUrl(mp4Url); + taskMapper.update(task); + videoTaskRepository.clearTaskCache(taskId); + log.info("[渲染轮询] 已更新task最终MP4 URL: taskId={}, mp4Url={}", taskId, mp4Url); + + // 更新video记录的videoUrl为最终MP4地址 + try { + handleVideoRecordForMP4(taskId, mp4Url); + } catch (Exception e) { + log.warn("[渲染轮询] 更新video的MP4 URL失败: taskId={}, error={}", taskId, e.getMessage(), e); + } + } else { + // MP4还在合成中,更新片段信息 + log.debug("[渲染轮询] MP4合成中: taskId={}, 等待下次轮询", taskId); + updateMappingStatus(mapping.getId(), TaskRenderJobMappingEntity.STATUS_MP4_COMPOSING, + publishedCount, segmentCount, playUrl, null); + } + } + + /** + * 处理作业失败 + */ + private void handleJobFailed(TaskRenderJobMappingEntity mapping, JobStatusResponse jobStatus) { + String errorCode = jobStatus.getErrorCode(); + String errorMessage = jobStatus.getErrorMessage(); + + log.warn("[渲染轮询] 作业失败: taskId={}, status={}, errorCode={}, errorMessage={}", + mapping.getTaskId(), jobStatus.getStatus(), errorCode, errorMessage); + + mappingMapper.updateToFailed( + mapping.getId(), + errorCode, + errorMessage, + new Date() + ); + } + + /** + * 更新mapping状态 + */ + private void updateMappingStatus(Long id, String renderStatus, Integer publishedCount, + Integer segmentCount, String previewUrl, String mp4Url) { + mappingMapper.updateRenderStatus( + id, + renderStatus, + publishedCount, + segmentCount, + previewUrl, + mp4Url, + new Date() + ); + } + + /** + * 处理异常 + */ + private void handleProcessError(TaskRenderJobMappingEntity mapping, Exception e) { + try { + mappingMapper.incrementRetryCount(mapping.getId()); + + // 超过最大重试次数,标记为失败 + if (mapping.getRetryCount() != null && + mapping.getRetryCount() >= TaskRenderJobMappingEntity.MAX_RETRY_COUNT - 1) { + mappingMapper.updateToFailed( + mapping.getId(), + "MAX_RETRY", + "超过最大重试次数: " + e.getMessage(), + new Date() + ); + } + } catch (Exception ex) { + log.error("[渲染轮询] 处理错误失败", ex); + } + } + + /** + * 处理video记录(预览就绪时) + * 类似taskSuccess的逻辑,但简化版本 + */ + private void handleVideoRecordForPreview(Long taskId, String videoUrl) { + try { + var taskResp = taskMapper.getById(taskId); + if (taskResp == null) { + log.warn("[渲染轮询] task不存在: taskId={}", taskId); + return; + } + + VideoEntity video = videoMapper.findByTaskId(taskId); + if (video != null) { + // 更新已有video记录 + video.setVideoUrl(videoUrl); + videoMapper.update(video); + videoRepository.clearVideoCache(video.getId()); + log.info("[渲染轮询] 已更新video预览URL: taskId={}, videoId={}, videoUrl={}", + taskId, video.getId(), videoUrl); + } else { + // 创建新video记录 + video = new VideoEntity(); + video.setId(SnowFlakeUtil.getLongId()); + video.setScenicId(taskResp.getScenicId()); + video.setTemplateId(taskResp.getTemplateId()); + video.setTaskId(taskId); + video.setFaceId(taskResp.getFaceId()); + video.setVideoUrl(videoUrl); + video.setCreateTime(new Date()); + videoMapper.add(video); + log.info("[渲染轮询] 已创建video预览记录: taskId={}, videoId={}, videoUrl={}", + taskId, video.getId(), videoUrl); + } + + // 更新member_video关联表(isBuy=0,预览阶段未购买) + videoMapper.updateRelationWhenTaskSuccess(taskId, video.getId(), 0); + memberRelationRepository.clearVCacheByFace(taskResp.getFaceId()); + log.info("[渲染轮询] 已更新member_video关联: taskId={}, videoId={}", taskId, video.getId()); + + } catch (Exception e) { + log.error("[渲染轮询] 处理video记录失败: taskId={}", taskId, e); + throw e; + } + } + + /** + * 更新video记录的MP4地址(MP4合成完成时) + */ + private void handleVideoRecordForMP4(Long taskId, String mp4Url) { + try { + VideoEntity video = videoMapper.findByTaskId(taskId); + if (video != null) { + video.setVideoUrl(mp4Url); + videoMapper.update(video); + videoRepository.clearVideoCache(video.getId()); + log.info("[渲染轮询] 已更新video最终MP4 URL: taskId={}, videoId={}, mp4Url={}", + taskId, video.getId(), mp4Url); + } else { + log.warn("[渲染轮询] video不存在,无法更新MP4 URL: taskId={}", taskId); + } + } catch (Exception e) { + log.error("[渲染轮询] 更新video的MP4 URL失败: taskId={}", taskId, e); + throw e; + } + } +} diff --git a/src/main/java/com/ycwl/basic/service/task/impl/TaskTaskServiceImpl.java b/src/main/java/com/ycwl/basic/service/task/impl/TaskTaskServiceImpl.java index 9f8b2c17..76298775 100644 --- a/src/main/java/com/ycwl/basic/service/task/impl/TaskTaskServiceImpl.java +++ b/src/main/java/com/ycwl/basic/service/task/impl/TaskTaskServiceImpl.java @@ -13,6 +13,8 @@ import com.ycwl.basic.integration.render.dto.job.CreatePreviewRequest; import com.ycwl.basic.integration.render.dto.job.CreatePreviewResponse; import com.ycwl.basic.integration.render.dto.job.MaterialDTO; import com.ycwl.basic.integration.render.service.RenderJobIntegrationService; +import com.ycwl.basic.mapper.task.TaskRenderJobMappingMapper; +import com.ycwl.basic.model.task.entity.TaskRenderJobMappingEntity; import com.ycwl.basic.repository.MemberRelationRepository; import com.ycwl.basic.repository.SourceRepository; import com.ycwl.basic.utils.JacksonUtil; @@ -69,6 +71,7 @@ import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.HashMap; @@ -124,6 +127,8 @@ public class TaskTaskServiceImpl implements TaskService { private FaceStatusManager faceStatusManager; @Autowired private RenderJobIntegrationService renderJobIntegrationService; + @Autowired + private TaskRenderJobMappingMapper taskRenderJobMappingMapper; private RenderWorkerEntity getWorker(@NonNull WorkerAuthReqVo req) { String accessKey = req.getAccessKey(); @@ -224,26 +229,11 @@ public class TaskTaskServiceImpl implements TaskService { } else { updTemplateList = templateRepository.getAllEnabledTemplateList(); } - RenderWorkerConfigManager configManager = repository.getWorkerConfigManager(worker.getId()); try { if (lock.tryLock(2, TimeUnit.SECONDS)) { try { - List taskList; - if (Strings.isNotBlank(configManager.getString("scenic_only"))) { - taskList = taskMapper.selectNotRunningByScenicList(configManager.getString("scenic_only")); - } else { - var _taskList = taskMapper.selectNotRunning(); - taskList = _taskList.stream().filter(task -> { - boolean workerSelfHostedScenic = isWorkerSelfHostedScenic(task.getScenicId()); - return !workerSelfHostedScenic; - }).limit(1).toList(); - } - resp.setTasks(taskList); + resp.setTasks(Collections.emptyList()); resp.setTemplates(updTemplateList); - taskList.forEach(task -> { - taskMapper.assignToWorker(task.getId(), worker.getId()); - videoTaskRepository.clearTaskCache(task.getId()); - }); } finally { lock.unlock(); } @@ -552,10 +542,6 @@ public class TaskTaskServiceImpl implements TaskService { videoMapper.add(video); } ScenicConfigManager scenicConfig = scenicRepository.getScenicConfigManager(task.getScenicId()); - IStorageAdapter adapter = scenicService.getScenicTmpStorageAdapter(task.getScenicId()); - String hash = MD5.create().digestHex(task.getTaskParams() + task.getFaceId().toString()); - String filename = StorageUtil.joinPath(StorageConstant.VLOG_PATH, task.getTemplateId().toString() + "_" + hash + "_" + task.getScenicId() + ".mp4"); - adapter.setAcl(StorageAcl.PUBLIC_READ, filename); int isBuy = 0; FaceEntity face = faceRepository.getFace(task.getFaceId()); if (face != null) { @@ -729,6 +715,21 @@ public class TaskTaskServiceImpl implements TaskService { CreatePreviewResponse response = renderJobIntegrationService.createPreview(request); log.info("[灰度测试] 渲染预览任务创建成功, taskId: {}, renderJobId: {}, playUrl: {}", taskId, response.getJobId(), response.getPlayUrl()); + + // 写入mapping表,供轮询服务处理 + try { + TaskRenderJobMappingEntity mapping = new TaskRenderJobMappingEntity(); + mapping.setTaskId(taskId); + mapping.setRenderJobId(response.getJobId()); + mapping.setRenderStatus(TaskRenderJobMappingEntity.STATUS_PENDING); + mapping.setPublishedCount(0); + mapping.setSegmentCount(0); + mapping.setRetryCount(0); + taskRenderJobMappingMapper.insert(mapping); + log.info("[灰度测试] 写入mapping成功, taskId: {}, renderJobId: {}", taskId, response.getJobId()); + } catch (Exception ex) { + log.warn("[灰度测试] 写入mapping失败,不影响主流程, taskId: {}, error: {}", taskId, ex.getMessage()); + } } catch (Exception e) { // 灰度测试:不管返回什么或者报错,都不影响现有流程 log.warn("[灰度测试] 渲染预览任务创建失败,不影响主流程, taskId: {}, templateId: {}, error: {}", diff --git a/src/main/java/com/ycwl/basic/watchdog/TaskWatchDog.java b/src/main/java/com/ycwl/basic/watchdog/TaskWatchDog.java deleted file mode 100644 index 098cfd13..00000000 --- a/src/main/java/com/ycwl/basic/watchdog/TaskWatchDog.java +++ /dev/null @@ -1,167 +0,0 @@ -package com.ycwl.basic.watchdog; - -import com.ycwl.basic.integration.message.dto.ZtMessage; -import com.ycwl.basic.integration.message.service.ZtMessageProducerService; -import com.ycwl.basic.mapper.TaskMapper; -import com.ycwl.basic.model.pc.task.entity.TaskEntity; -import org.apache.commons.lang3.StringUtils; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Profile; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -@Component -@Profile("prod") -public class TaskWatchDog { - - @Autowired - private TaskMapper taskMapper; - - @Autowired - private ZtMessageProducerService ztMessageProducerService; - - // 异常通知计数器 - private final Map notificationCounters = new HashMap<>(); - - // 配置参数 - private static final int MAX_NOTIFICATION_COUNT = 3; // 每种异常最多通知3次 - - // 异常类型标识 - private static final String TASK_BACKLOG = "task_backlog"; - private static final String FAILED_TASKS = "failed_tasks"; - private static final String LONG_RUNNING_TASK_PREFIX = "long_running_task_"; // 长时间运行任务前缀 - - @Scheduled(fixedDelay = 1000 * 60L) - public void scanTaskStatus() { - List allNotRunningTaskList = taskMapper.selectAllNotRunning(); - List allFailedTaskList = taskMapper.selectAllFailed(); - List allRunningTaskList = taskMapper.selectAllRunning(); - - // 检查任务积压 - checkTaskBacklog(allNotRunningTaskList); - - // 检查失败任务 - checkFailedTasks(allFailedTaskList); - - // 检查长时间运行任务 - checkLongRunningTasks(allRunningTaskList); - } - - /** - * 检查任务积压 - */ - private void checkTaskBacklog(List notRunningTasks) { - if (notRunningTasks.size() > 10) { - if (shouldSendNotification(TASK_BACKLOG)) { - String content = String.format("当前任务队列中存在超过10个未运行任务,请及时处理!未运行任务数量:%d", notRunningTasks.size()); - sendNotification("任务堆积警告", content, TASK_BACKLOG); - } - } else { - // 异常已恢复,重置计数器 - resetNotificationCounter(TASK_BACKLOG); - } - } - - /** - * 检查失败任务 - */ - private void checkFailedTasks(List failedTasks) { - if (failedTasks.size() > 5) { - if (shouldSendNotification(FAILED_TASKS)) { - String content = String.format("当前存在超过5个失败任务(status=3),请及时检查和处理!失败任务数量:%d", failedTasks.size()); - sendNotification("任务失败警告", content, FAILED_TASKS); - } - } else { - // 异常已恢复,重置计数器 - resetNotificationCounter(FAILED_TASKS); - } - } - - /** - * 检查长时间运行任务 - */ - private void checkLongRunningTasks(List runningTasks) { - Set currentLongRunningTasks = new HashSet<>(); - - for (TaskEntity taskEntity : runningTasks) { - if (taskEntity.getStartTime() == null) { - continue; - } - // startTime已经过去3分钟了 - if (System.currentTimeMillis() - taskEntity.getStartTime().getTime() > 1000 * 60 * 3) { - String taskKey = LONG_RUNNING_TASK_PREFIX + taskEntity.getId(); - currentLongRunningTasks.add(taskKey); - - if (shouldSendNotification(taskKey)) { - String content = String.format("当前【%s】渲染机的【%d】任务已超过3分钟未完成!", - taskEntity.getWorkerId(), taskEntity.getId()); - sendNotification("长时间运行任务警告", content, taskKey); - } - } - } - - // 清理已恢复正常的长时运行任务的计数器 - cleanupLongRunningTaskCounters(currentLongRunningTasks); - } - - /** - * 清理已恢复正常的长时运行任务的计数器 - */ - private void cleanupLongRunningTaskCounters(Set currentLongRunningTasks) { - Set keysToRemove = new HashSet<>(); - - for (String key : notificationCounters.keySet()) { - if (key.startsWith(LONG_RUNNING_TASK_PREFIX)) { - if (!currentLongRunningTasks.contains(key)) { - keysToRemove.add(key); - } - } - } - - // 移除已恢复任务的计数器 - for (String key : keysToRemove) { - notificationCounters.remove(key); - } - } - - /** - * 判断是否应该发送通知 - */ - private boolean shouldSendNotification(String abnormalType) { - int count = notificationCounters.getOrDefault(abnormalType, 0); - return count < MAX_NOTIFICATION_COUNT; - } - - /** - * 发送通知并更新计数器 - */ - private void sendNotification(String title, String content, String abnormalType) { - ZtMessage ztMessage = ZtMessage.of( - "serverchan", - title, - content, - "system" - ); - ztMessage.setSendReason("任务监控"); - ztMessage.setSendBiz("系统监控"); - - ztMessageProducerService.send(ztMessage); - - // 更新通知计数器 - int currentCount = notificationCounters.getOrDefault(abnormalType, 0); - notificationCounters.put(abnormalType, currentCount + 1); - } - - /** - * 重置通知计数器(异常恢复时调用) - */ - private void resetNotificationCounter(String abnormalType) { - notificationCounters.remove(abnormalType); - } -} diff --git a/src/main/resources/mapper/TaskRenderJobMappingMapper.xml b/src/main/resources/mapper/TaskRenderJobMappingMapper.xml new file mode 100644 index 00000000..4481e0a0 --- /dev/null +++ b/src/main/resources/mapper/TaskRenderJobMappingMapper.xml @@ -0,0 +1,89 @@ + + + + + + + + + + + + + + + + + + + + + + + id, task_id, render_job_id, render_status, published_count, segment_count, + preview_url, mp4_url, error_code, error_message, retry_count, last_check_time, + create_time, update_time + + + + + + + + + + UPDATE task_render_job_mapping + SET render_status = #{renderStatus}, + published_count = #{publishedCount}, + segment_count = #{segmentCount}, + + preview_url = #{previewUrl}, + + + mp4_url = #{mp4Url}, + + last_check_time = #{lastCheckTime}, + update_time = NOW() + WHERE id = #{id} + + + + UPDATE task_render_job_mapping + SET render_status = 'FAILED', + error_code = #{errorCode}, + error_message = #{errorMessage}, + last_check_time = #{lastCheckTime}, + update_time = NOW() + WHERE id = #{id} + + + + UPDATE task_render_job_mapping + SET retry_count = retry_count + 1, + update_time = NOW() + WHERE id = #{id} + + +