feat(render): 添加视频渲染作业轮询服务

- 在RenderJobV2Client中新增createFinalizeMP4Task接口用于创建MP4合成任务
- 在RenderJobIntegrationService中实现createFinalizeMP4Task方法
- 创建TaskRenderJobMappingEntity实体类用于跟踪任务与渲染作业关联
- 创建TaskRenderJobMappingMapper接口及对应XML映射文件
- 在TaskTaskServiceImpl中添加mapping表写入逻辑
- 新增RenderJobPollingService定时轮询服务处理渲染状态流转
- 实现从PENDING到PREVIEW_READY再到MP4_COMPOSING最后到COMPLETED的状态转换
- 添加MP4合成任务创建及状态更新功能
This commit is contained in:
2026-01-24 21:20:09 +08:00
parent d25d09cb66
commit ad3741fd15
8 changed files with 637 additions and 0 deletions

View File

@@ -47,6 +47,21 @@ public interface RenderJobV2Client {
@PostMapping("/jobs/{jobId}/cancel")
CommonResponse<Void> cancelJob(@PathVariable("jobId") Long jobId);
/**
* 创建FINALIZE_MP4任务
* 将所有已发布的TS片段合成为MP4文件
*
* 前置条件:
* 1. 作业存在且状态为RUNNING
* 2. 所有片段都已发布(PublishedCount == SegmentCount)
* 3. 不存在已有的FINALIZE_MP4任务
*
* @param jobId 作业ID
* @return 任务创建结果
*/
@PostMapping("/jobs/{jobId}/finalize-mp4")
CommonResponse<FinalizeMP4Response> createFinalizeMP4Task(@PathVariable("jobId") Long jobId);
// ==================== 管理端接口 ====================
/**

View File

@@ -0,0 +1,20 @@
package com.ycwl.basic.integration.render.dto.job;
import lombok.Data;
/**
* 创建FINALIZE_MP4任务响应
*/
@Data
public class FinalizeMP4Response {
/**
* 任务ID
*/
private Long taskId;
/**
* 任务状态
*/
private String status;
}

View File

@@ -91,6 +91,20 @@ public class RenderJobIntegrationService {
handleVoidResponse(response, "取消作业失败");
}
/**
* 创建FINALIZE_MP4任务(直接调用,不降级)
* 将所有已发布的TS片段合成为MP4文件
*
* @param jobId 作业ID
* @return 任务创建结果
* @throws IntegrationException 当前置条件不满足时抛出异常
*/
public FinalizeMP4Response createFinalizeMP4Task(Long jobId) {
log.debug("创建FINALIZE_MP4任务, jobId: {}", jobId);
CommonResponse<FinalizeMP4Response> response = renderJobV2Client.createFinalizeMP4Task(jobId);
return handleResponse(response, "创建FINALIZE_MP4任务失败");
}
// ==================== 管理端接口 ====================
/**

View File

@@ -0,0 +1,64 @@
package com.ycwl.basic.mapper.task;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.ycwl.basic.model.task.entity.TaskRenderJobMappingEntity;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.Date;
import java.util.List;
/**
* Task与RenderJob关联Mapper
*/
@Mapper
public interface TaskRenderJobMappingMapper extends BaseMapper<TaskRenderJobMappingEntity> {
/**
* 根据taskId查询mapping
*/
TaskRenderJobMappingEntity selectByTaskId(@Param("taskId") Long taskId);
/**
* 根据renderJobId查询mapping
*/
TaskRenderJobMappingEntity selectByRenderJobId(@Param("renderJobId") Long renderJobId);
/**
* 查询需要轮询的记录
* 条件:状态为PENDING或PREVIEW_READY,且最后检查时间超过指定间隔
*/
List<TaskRenderJobMappingEntity> selectPendingForPolling(
@Param("statuses") List<String> statuses,
@Param("checkIntervalSeconds") int checkIntervalSeconds,
@Param("limit") int limit
);
/**
* 更新渲染状态和片段信息
*/
int updateRenderStatus(
@Param("id") Long id,
@Param("renderStatus") String renderStatus,
@Param("publishedCount") Integer publishedCount,
@Param("segmentCount") Integer segmentCount,
@Param("previewUrl") String previewUrl,
@Param("mp4Url") String mp4Url,
@Param("lastCheckTime") Date lastCheckTime
);
/**
* 更新为失败状态
*/
int updateToFailed(
@Param("id") Long id,
@Param("errorCode") String errorCode,
@Param("errorMessage") String errorMessage,
@Param("lastCheckTime") Date lastCheckTime
);
/**
* 增加重试次数
*/
int incrementRetryCount(@Param("id") Long id);
}

View File

@@ -0,0 +1,98 @@
package com.ycwl.basic.model.task.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.util.Date;
/**
* Task与RenderJob关联实体
* 用于跟踪task和zt-render-worker服务中渲染作业的关联
*/
@Data
@TableName("task_render_job_mapping")
public class TaskRenderJobMappingEntity {
@TableId(type = IdType.AUTO)
private Long id;
/**
* 任务ID(task表的id)
*/
private Long taskId;
/**
* 渲染作业ID(zt-render-worker返回的jobId)
*/
private Long renderJobId;
/**
* 渲染状态
* PENDING-等待中, PREVIEW_READY-预览就绪, COMPLETED-已完成, FAILED-失败
*/
private String renderStatus;
/**
* 已发布片段数
*/
private Integer publishedCount;
/**
* 总片段数
*/
private Integer segmentCount;
/**
* 预览播放地址(HLS)
*/
private String previewUrl;
/**
* 最终MP4地址
*/
private String mp4Url;
/**
* 错误码
*/
private String errorCode;
/**
* 错误信息
*/
private String errorMessage;
/**
* 重试次数
*/
private Integer retryCount;
/**
* 最后检查时间
*/
private Date lastCheckTime;
private Date createTime;
private Date updateTime;
/**
* 渲染状态常量
*/
public static final String STATUS_PENDING = "PENDING";
public static final String STATUS_PREVIEW_READY = "PREVIEW_READY";
public static final String STATUS_MP4_COMPOSING = "MP4_COMPOSING";
public static final String STATUS_COMPLETED = "COMPLETED";
public static final String STATUS_FAILED = "FAILED";
/**
* 预览就绪所需的最小已发布片段数
*/
public static final int MIN_PUBLISHED_FOR_PREVIEW = 2;
/**
* 最大重试次数
*/
public static final int MAX_RETRY_COUNT = 10;
}

View File

@@ -0,0 +1,318 @@
package com.ycwl.basic.service.task;
import com.ycwl.basic.integration.render.dto.job.FinalizeMP4Response;
import com.ycwl.basic.integration.render.dto.job.JobStatusResponse;
import com.ycwl.basic.integration.render.service.RenderJobIntegrationService;
import com.ycwl.basic.mapper.TaskMapper;
import com.ycwl.basic.mapper.task.TaskRenderJobMappingMapper;
import com.ycwl.basic.model.pc.task.entity.TaskEntity;
import com.ycwl.basic.model.task.entity.TaskRenderJobMappingEntity;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
/**
* 渲染作业轮询服务
* 定时查询zt-render-worker服务中的渲染作业状态,并更新本地task状态
*
* 状态流转:
* PENDING → PREVIEW_READY → MP4_COMPOSING → COMPLETED
* │ │ │ │
* └────────────┴──────────────┴──────────────┴──→ FAILED
*/
@Slf4j
@Service
@EnableScheduling
@RequiredArgsConstructor
@Profile({"dev", "prod"}) // 开发和生产环境启用
public class RenderJobPollingService {
private final TaskRenderJobMappingMapper mappingMapper;
private final RenderJobIntegrationService renderJobService;
private final TaskMapper taskMapper;
/**
* 定时轮询间隔:5秒
*/
private static final int POLL_INTERVAL_SECONDS = 5;
/**
* 每次查询的最大记录数
*/
private static final int BATCH_SIZE = 50;
/**
* 定时轮询渲染作业状态
* 每5秒执行一次
*/
@Scheduled(fixedDelay = 5000, initialDelay = 10000)
public void pollRenderJobs() {
try {
log.debug("[渲染轮询] 开始轮询渲染作业状态");
// 查询待轮询的记录(包含MP4_COMPOSING状态)
List<String> pendingStatuses = Arrays.asList(
TaskRenderJobMappingEntity.STATUS_PENDING,
TaskRenderJobMappingEntity.STATUS_PREVIEW_READY,
TaskRenderJobMappingEntity.STATUS_MP4_COMPOSING
);
List<TaskRenderJobMappingEntity> mappings = mappingMapper.selectPendingForPolling(
pendingStatuses,
POLL_INTERVAL_SECONDS,
BATCH_SIZE
);
if (mappings.isEmpty()) {
log.debug("[渲染轮询] 无待处理记录");
return;
}
log.info("[渲染轮询] 查询到 {} 条待处理记录", mappings.size());
// 处理每条记录
for (TaskRenderJobMappingEntity mapping : mappings) {
try {
processMapping(mapping);
} catch (Exception e) {
log.error("[渲染轮询] 处理失败, mappingId: {}, taskId: {}, renderJobId: {}, error: {}",
mapping.getId(), mapping.getTaskId(), mapping.getRenderJobId(), e.getMessage(), e);
handleProcessError(mapping, e);
}
}
log.debug("[渲染轮询] 轮询完成");
} catch (Exception e) {
log.error("[渲染轮询] 轮询异常", e);
}
}
/**
* 处理单条mapping记录
*/
@Transactional(rollbackFor = Exception.class)
public void processMapping(TaskRenderJobMappingEntity mapping) {
Long renderJobId = mapping.getRenderJobId();
Long taskId = mapping.getTaskId();
String currentStatus = mapping.getRenderStatus();
log.debug("[渲染轮询] 处理记录: mappingId={}, taskId={}, renderJobId={}, currentStatus={}",
mapping.getId(), taskId, renderJobId, currentStatus);
// 查询渲染作业状态
JobStatusResponse jobStatus;
try {
jobStatus = renderJobService.getJobStatus(renderJobId);
} catch (Exception e) {
log.warn("[渲染轮询] 查询作业状态失败, renderJobId: {}, error: {}", renderJobId, e.getMessage());
mappingMapper.incrementRetryCount(mapping.getId());
throw e;
}
// 检查作业状态
String status = jobStatus.getStatus();
Integer publishedCount = jobStatus.getPublishedCount();
Integer segmentCount = jobStatus.getSegmentCount();
String playUrl = jobStatus.getPlayUrl();
String mp4Url = jobStatus.getMp4Url();
log.info("[渲染轮询] 作业状态: taskId={}, status={}, publishedCount={}/{}, playUrl={}, mp4Url={}",
taskId, status, publishedCount, segmentCount, playUrl, mp4Url);
// 处理失败状态
if ("FAILED".equals(status) || "CANCELED".equals(status)) {
handleJobFailed(mapping, jobStatus);
return;
}
// 状态流转处理
switch (currentStatus) {
case TaskRenderJobMappingEntity.STATUS_PENDING:
handlePendingStatus(mapping, jobStatus, taskId);
break;
case TaskRenderJobMappingEntity.STATUS_PREVIEW_READY:
handlePreviewReadyStatus(mapping, jobStatus, taskId);
break;
case TaskRenderJobMappingEntity.STATUS_MP4_COMPOSING:
handleMp4ComposingStatus(mapping, jobStatus, taskId);
break;
default:
log.warn("[渲染轮询] 未知状态: {}", currentStatus);
}
}
/**
* 处理PENDING状态
* PENDING → PREVIEW_READY:当publishedCount >= 2时
*/
private void handlePendingStatus(TaskRenderJobMappingEntity mapping, JobStatusResponse jobStatus, Long taskId) {
Integer publishedCount = jobStatus.getPublishedCount();
Integer segmentCount = jobStatus.getSegmentCount();
String playUrl = jobStatus.getPlayUrl();
if (publishedCount != null && publishedCount >= TaskRenderJobMappingEntity.MIN_PUBLISHED_FOR_PREVIEW) {
log.info("[渲染轮询] 预览就绪: taskId={}, publishedCount={}/{}, playUrl={}",
taskId, publishedCount, segmentCount, playUrl);
// 更新mapping状态为PREVIEW_READY
updateMappingStatus(mapping.getId(), TaskRenderJobMappingEntity.STATUS_PREVIEW_READY,
publishedCount, segmentCount, playUrl, null);
// 更新task的videoUrl为预览地址
if (StringUtils.isNotBlank(playUrl)) {
TaskEntity task = new TaskEntity();
task.setId(taskId);
task.setVideoUrl(playUrl);
taskMapper.update(task);
log.info("[渲染轮询] 已更新task预览URL: taskId={}, playUrl={}", taskId, playUrl);
}
} else {
// 更新片段信息
updateMappingStatus(mapping.getId(), TaskRenderJobMappingEntity.STATUS_PENDING,
publishedCount, segmentCount, null, null);
}
}
/**
* 处理PREVIEW_READY状态
* PREVIEW_READY → MP4_COMPOSING:当所有片段都已发布时,调用finalize-mp4接口
*/
private void handlePreviewReadyStatus(TaskRenderJobMappingEntity mapping, JobStatusResponse jobStatus, Long taskId) {
Integer publishedCount = jobStatus.getPublishedCount();
Integer segmentCount = jobStatus.getSegmentCount();
String playUrl = jobStatus.getPlayUrl();
Long renderJobId = mapping.getRenderJobId();
// 检查是否所有片段都已发布
if (publishedCount != null && segmentCount != null && publishedCount.equals(segmentCount) && segmentCount > 0) {
log.info("[渲染轮询] 所有片段已发布,开始创建MP4合成任务: taskId={}, renderJobId={}, publishedCount={}/{}",
taskId, renderJobId, publishedCount, segmentCount);
try {
// 调用finalize-mp4接口创建MP4合成任务
FinalizeMP4Response response = renderJobService.createFinalizeMP4Task(renderJobId);
log.info("[渲染轮询] MP4合成任务创建成功: taskId={}, renderJobId={}, mp4TaskId={}, status={}",
taskId, renderJobId, response.getTaskId(), response.getStatus());
// 更新mapping状态为MP4_COMPOSING
updateMappingStatus(mapping.getId(), TaskRenderJobMappingEntity.STATUS_MP4_COMPOSING,
publishedCount, segmentCount, playUrl, null);
} catch (Exception e) {
// 409表示任务已存在,直接进入MP4_COMPOSING状态
if (e.getMessage() != null && e.getMessage().contains("409")) {
log.info("[渲染轮询] MP4合成任务已存在,继续等待: taskId={}, renderJobId={}", taskId, renderJobId);
updateMappingStatus(mapping.getId(), TaskRenderJobMappingEntity.STATUS_MP4_COMPOSING,
publishedCount, segmentCount, playUrl, null);
} else {
log.warn("[渲染轮询] 创建MP4合成任务失败: taskId={}, renderJobId={}, error={}",
taskId, renderJobId, e.getMessage());
// 不改变状态,下次轮询重试
updateMappingStatus(mapping.getId(), TaskRenderJobMappingEntity.STATUS_PREVIEW_READY,
publishedCount, segmentCount, playUrl, null);
}
}
} else {
// 更新片段信息
updateMappingStatus(mapping.getId(), TaskRenderJobMappingEntity.STATUS_PREVIEW_READY,
publishedCount, segmentCount, playUrl, null);
}
}
/**
* 处理MP4_COMPOSING状态
* MP4_COMPOSING → COMPLETED:当mp4Url有值时
*/
private void handleMp4ComposingStatus(TaskRenderJobMappingEntity mapping, JobStatusResponse jobStatus, Long taskId) {
Integer publishedCount = jobStatus.getPublishedCount();
Integer segmentCount = jobStatus.getSegmentCount();
String playUrl = jobStatus.getPlayUrl();
String mp4Url = jobStatus.getMp4Url();
if (StringUtils.isNotBlank(mp4Url)) {
log.info("[渲染轮询] MP4合成完成: taskId={}, mp4Url={}", taskId, mp4Url);
// 更新mapping状态为COMPLETED
updateMappingStatus(mapping.getId(), TaskRenderJobMappingEntity.STATUS_COMPLETED,
publishedCount, segmentCount, playUrl, mp4Url);
// 更新task的videoUrl为最终MP4地址
TaskEntity task = new TaskEntity();
task.setId(taskId);
task.setVideoUrl(mp4Url);
taskMapper.update(task);
log.info("[渲染轮询] 已更新task最终MP4 URL: taskId={}, mp4Url={}", taskId, mp4Url);
} else {
// MP4还在合成中,更新片段信息
log.debug("[渲染轮询] MP4合成中: taskId={}, 等待下次轮询", taskId);
updateMappingStatus(mapping.getId(), TaskRenderJobMappingEntity.STATUS_MP4_COMPOSING,
publishedCount, segmentCount, playUrl, null);
}
}
/**
* 处理作业失败
*/
private void handleJobFailed(TaskRenderJobMappingEntity mapping, JobStatusResponse jobStatus) {
String errorCode = jobStatus.getErrorCode();
String errorMessage = jobStatus.getErrorMessage();
log.warn("[渲染轮询] 作业失败: taskId={}, status={}, errorCode={}, errorMessage={}",
mapping.getTaskId(), jobStatus.getStatus(), errorCode, errorMessage);
mappingMapper.updateToFailed(
mapping.getId(),
errorCode,
errorMessage,
new Date()
);
}
/**
* 更新mapping状态
*/
private void updateMappingStatus(Long id, String renderStatus, Integer publishedCount,
Integer segmentCount, String previewUrl, String mp4Url) {
mappingMapper.updateRenderStatus(
id,
renderStatus,
publishedCount,
segmentCount,
previewUrl,
mp4Url,
new Date()
);
}
/**
* 处理异常
*/
private void handleProcessError(TaskRenderJobMappingEntity mapping, Exception e) {
try {
mappingMapper.incrementRetryCount(mapping.getId());
// 超过最大重试次数,标记为失败
if (mapping.getRetryCount() != null &&
mapping.getRetryCount() >= TaskRenderJobMappingEntity.MAX_RETRY_COUNT - 1) {
mappingMapper.updateToFailed(
mapping.getId(),
"MAX_RETRY",
"超过最大重试次数: " + e.getMessage(),
new Date()
);
}
} catch (Exception ex) {
log.error("[渲染轮询] 处理错误失败", ex);
}
}
}

View File

@@ -13,6 +13,8 @@ import com.ycwl.basic.integration.render.dto.job.CreatePreviewRequest;
import com.ycwl.basic.integration.render.dto.job.CreatePreviewResponse;
import com.ycwl.basic.integration.render.dto.job.MaterialDTO;
import com.ycwl.basic.integration.render.service.RenderJobIntegrationService;
import com.ycwl.basic.mapper.task.TaskRenderJobMappingMapper;
import com.ycwl.basic.model.task.entity.TaskRenderJobMappingEntity;
import com.ycwl.basic.repository.MemberRelationRepository;
import com.ycwl.basic.repository.SourceRepository;
import com.ycwl.basic.utils.JacksonUtil;
@@ -124,6 +126,8 @@ public class TaskTaskServiceImpl implements TaskService {
private FaceStatusManager faceStatusManager;
@Autowired
private RenderJobIntegrationService renderJobIntegrationService;
@Autowired
private TaskRenderJobMappingMapper taskRenderJobMappingMapper;
private RenderWorkerEntity getWorker(@NonNull WorkerAuthReqVo req) {
String accessKey = req.getAccessKey();
@@ -729,6 +733,21 @@ public class TaskTaskServiceImpl implements TaskService {
CreatePreviewResponse response = renderJobIntegrationService.createPreview(request);
log.info("[灰度测试] 渲染预览任务创建成功, taskId: {}, renderJobId: {}, playUrl: {}",
taskId, response.getJobId(), response.getPlayUrl());
// 写入mapping表,供轮询服务处理
try {
TaskRenderJobMappingEntity mapping = new TaskRenderJobMappingEntity();
mapping.setTaskId(taskId);
mapping.setRenderJobId(response.getJobId());
mapping.setRenderStatus(TaskRenderJobMappingEntity.STATUS_PENDING);
mapping.setPublishedCount(0);
mapping.setSegmentCount(0);
mapping.setRetryCount(0);
taskRenderJobMappingMapper.insert(mapping);
log.info("[灰度测试] 写入mapping成功, taskId: {}, renderJobId: {}", taskId, response.getJobId());
} catch (Exception ex) {
log.warn("[灰度测试] 写入mapping失败,不影响主流程, taskId: {}, error: {}", taskId, ex.getMessage());
}
} catch (Exception e) {
// 灰度测试:不管返回什么或者报错,都不影响现有流程
log.warn("[灰度测试] 渲染预览任务创建失败,不影响主流程, taskId: {}, templateId: {}, error: {}",

View File

@@ -0,0 +1,89 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.ycwl.basic.mapper.task.TaskRenderJobMappingMapper">
<resultMap id="BaseResultMap" type="com.ycwl.basic.model.task.entity.TaskRenderJobMappingEntity">
<id column="id" property="id"/>
<result column="task_id" property="taskId"/>
<result column="render_job_id" property="renderJobId"/>
<result column="render_status" property="renderStatus"/>
<result column="published_count" property="publishedCount"/>
<result column="segment_count" property="segmentCount"/>
<result column="preview_url" property="previewUrl"/>
<result column="mp4_url" property="mp4Url"/>
<result column="error_code" property="errorCode"/>
<result column="error_message" property="errorMessage"/>
<result column="retry_count" property="retryCount"/>
<result column="last_check_time" property="lastCheckTime"/>
<result column="create_time" property="createTime"/>
<result column="update_time" property="updateTime"/>
</resultMap>
<sql id="Base_Column_List">
id, task_id, render_job_id, render_status, published_count, segment_count,
preview_url, mp4_url, error_code, error_message, retry_count, last_check_time,
create_time, update_time
</sql>
<select id="selectByTaskId" resultMap="BaseResultMap">
SELECT <include refid="Base_Column_List"/>
FROM task_render_job_mapping
WHERE task_id = #{taskId}
</select>
<select id="selectByRenderJobId" resultMap="BaseResultMap">
SELECT <include refid="Base_Column_List"/>
FROM task_render_job_mapping
WHERE render_job_id = #{renderJobId}
</select>
<select id="selectPendingForPolling" resultMap="BaseResultMap">
SELECT <include refid="Base_Column_List"/>
FROM task_render_job_mapping
WHERE render_status IN
<foreach collection="statuses" item="status" open="(" separator="," close=")">
#{status}
</foreach>
AND (
last_check_time IS NULL
OR last_check_time &lt; DATE_SUB(NOW(), INTERVAL #{checkIntervalSeconds} SECOND)
)
AND retry_count &lt; 10
ORDER BY last_check_time ASC, create_time ASC
LIMIT #{limit}
</select>
<update id="updateRenderStatus">
UPDATE task_render_job_mapping
SET render_status = #{renderStatus},
published_count = #{publishedCount},
segment_count = #{segmentCount},
<if test="previewUrl != null">
preview_url = #{previewUrl},
</if>
<if test="mp4Url != null">
mp4_url = #{mp4Url},
</if>
last_check_time = #{lastCheckTime},
update_time = NOW()
WHERE id = #{id}
</update>
<update id="updateToFailed">
UPDATE task_render_job_mapping
SET render_status = 'FAILED',
error_code = #{errorCode},
error_message = #{errorMessage},
last_check_time = #{lastCheckTime},
update_time = NOW()
WHERE id = #{id}
</update>
<update id="incrementRetryCount">
UPDATE task_render_job_mapping
SET retry_count = retry_count + 1,
update_time = NOW()
WHERE id = #{id}
</update>
</mapper>