You've already forked FrameTour-BE
feat(task): 优化任务调度和视频处理流程
- 移除渲染工作配置管理器相关逻辑 - 将任务列表设置为空集合,禁用任务分配功能 - 删除景点存储适配器的ACL设置代码 - 添加视频处理相关的mapper和repository依赖 - 在渲染轮询服务中添加视频记录处理逻辑 - 实现预览视频就绪时的video记录创建和更新 - 实现MP4合成完成时的video记录更新功能 - 添加缓存清理机制确保数据一致性 - 增加详细的日志记录便于问题排查
This commit is contained in:
@@ -4,9 +4,15 @@ import com.ycwl.basic.integration.render.dto.job.FinalizeMP4Response;
|
||||
import com.ycwl.basic.integration.render.dto.job.JobStatusResponse;
|
||||
import com.ycwl.basic.integration.render.service.RenderJobIntegrationService;
|
||||
import com.ycwl.basic.mapper.TaskMapper;
|
||||
import com.ycwl.basic.mapper.VideoMapper;
|
||||
import com.ycwl.basic.mapper.task.TaskRenderJobMappingMapper;
|
||||
import com.ycwl.basic.model.pc.task.entity.TaskEntity;
|
||||
import com.ycwl.basic.model.pc.video.entity.VideoEntity;
|
||||
import com.ycwl.basic.model.task.entity.TaskRenderJobMappingEntity;
|
||||
import com.ycwl.basic.repository.MemberRelationRepository;
|
||||
import com.ycwl.basic.repository.VideoRepository;
|
||||
import com.ycwl.basic.repository.VideoTaskRepository;
|
||||
import com.ycwl.basic.utils.SnowFlakeUtil;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
@@ -39,6 +45,10 @@ public class RenderJobPollingService {
|
||||
private final TaskRenderJobMappingMapper mappingMapper;
|
||||
private final RenderJobIntegrationService renderJobService;
|
||||
private final TaskMapper taskMapper;
|
||||
private final VideoMapper videoMapper;
|
||||
private final VideoTaskRepository videoTaskRepository;
|
||||
private final VideoRepository videoRepository;
|
||||
private final MemberRelationRepository memberRelationRepository;
|
||||
|
||||
/**
|
||||
* 定时轮询间隔:5秒
|
||||
@@ -173,8 +183,20 @@ public class RenderJobPollingService {
|
||||
TaskEntity task = new TaskEntity();
|
||||
task.setId(taskId);
|
||||
task.setVideoUrl(playUrl);
|
||||
task.setStatus(1); // 设置为完成状态
|
||||
taskMapper.update(task);
|
||||
log.info("[渲染轮询] 已更新task预览URL: taskId={}, playUrl={}", taskId, playUrl);
|
||||
videoTaskRepository.clearTaskCache(taskId);
|
||||
log.info("[渲染轮询] 已更新task预览URL和状态: taskId={}, playUrl={}, status=1", taskId, playUrl);
|
||||
|
||||
// 处理video记录(类似taskSuccess逻辑)
|
||||
try {
|
||||
handleVideoRecordForPreview(taskId, playUrl);
|
||||
} catch (Exception e) {
|
||||
log.warn("[渲染轮询] 处理video记录失败: taskId={}, error={}", taskId, e.getMessage(), e);
|
||||
}
|
||||
|
||||
// 异步发送视频生成通知(仅记录日志,实际通知可能需要在MP4完成后)
|
||||
log.info("[渲染轮询] 预览视频已就绪,可发送通知: taskId={}", taskId);
|
||||
}
|
||||
} else {
|
||||
// 更新片段信息
|
||||
@@ -251,7 +273,15 @@ public class RenderJobPollingService {
|
||||
task.setId(taskId);
|
||||
task.setVideoUrl(mp4Url);
|
||||
taskMapper.update(task);
|
||||
videoTaskRepository.clearTaskCache(taskId);
|
||||
log.info("[渲染轮询] 已更新task最终MP4 URL: taskId={}, mp4Url={}", taskId, mp4Url);
|
||||
|
||||
// 更新video记录的videoUrl为最终MP4地址
|
||||
try {
|
||||
handleVideoRecordForMP4(taskId, mp4Url);
|
||||
} catch (Exception e) {
|
||||
log.warn("[渲染轮询] 更新video的MP4 URL失败: taskId={}, error={}", taskId, e.getMessage(), e);
|
||||
}
|
||||
} else {
|
||||
// MP4还在合成中,更新片段信息
|
||||
log.debug("[渲染轮询] MP4合成中: taskId={}, 等待下次轮询", taskId);
|
||||
@@ -315,4 +345,71 @@ public class RenderJobPollingService {
|
||||
log.error("[渲染轮询] 处理错误失败", ex);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理video记录(预览就绪时)
|
||||
* 类似taskSuccess的逻辑,但简化版本
|
||||
*/
|
||||
private void handleVideoRecordForPreview(Long taskId, String videoUrl) {
|
||||
try {
|
||||
var taskResp = taskMapper.getById(taskId);
|
||||
if (taskResp == null) {
|
||||
log.warn("[渲染轮询] task不存在: taskId={}", taskId);
|
||||
return;
|
||||
}
|
||||
|
||||
VideoEntity video = videoMapper.findByTaskId(taskId);
|
||||
if (video != null) {
|
||||
// 更新已有video记录
|
||||
video.setVideoUrl(videoUrl);
|
||||
videoMapper.update(video);
|
||||
videoRepository.clearVideoCache(video.getId());
|
||||
log.info("[渲染轮询] 已更新video预览URL: taskId={}, videoId={}, videoUrl={}",
|
||||
taskId, video.getId(), videoUrl);
|
||||
} else {
|
||||
// 创建新video记录
|
||||
video = new VideoEntity();
|
||||
video.setId(SnowFlakeUtil.getLongId());
|
||||
video.setScenicId(taskResp.getScenicId());
|
||||
video.setTemplateId(taskResp.getTemplateId());
|
||||
video.setTaskId(taskId);
|
||||
video.setFaceId(taskResp.getFaceId());
|
||||
video.setVideoUrl(videoUrl);
|
||||
video.setCreateTime(new Date());
|
||||
videoMapper.add(video);
|
||||
log.info("[渲染轮询] 已创建video预览记录: taskId={}, videoId={}, videoUrl={}",
|
||||
taskId, video.getId(), videoUrl);
|
||||
}
|
||||
|
||||
// 更新member_video关联表(isBuy=0,预览阶段未购买)
|
||||
videoMapper.updateRelationWhenTaskSuccess(taskId, video.getId(), 0);
|
||||
memberRelationRepository.clearVCacheByFace(taskResp.getFaceId());
|
||||
log.info("[渲染轮询] 已更新member_video关联: taskId={}, videoId={}", taskId, video.getId());
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[渲染轮询] 处理video记录失败: taskId={}", taskId, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新video记录的MP4地址(MP4合成完成时)
|
||||
*/
|
||||
private void handleVideoRecordForMP4(Long taskId, String mp4Url) {
|
||||
try {
|
||||
VideoEntity video = videoMapper.findByTaskId(taskId);
|
||||
if (video != null) {
|
||||
video.setVideoUrl(mp4Url);
|
||||
videoMapper.update(video);
|
||||
videoRepository.clearVideoCache(video.getId());
|
||||
log.info("[渲染轮询] 已更新video最终MP4 URL: taskId={}, videoId={}, mp4Url={}",
|
||||
taskId, video.getId(), mp4Url);
|
||||
} else {
|
||||
log.warn("[渲染轮询] video不存在,无法更新MP4 URL: taskId={}", taskId);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[渲染轮询] 更新video的MP4 URL失败: taskId={}", taskId, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -71,6 +71,7 @@ import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
@@ -228,26 +229,11 @@ public class TaskTaskServiceImpl implements TaskService {
|
||||
} else {
|
||||
updTemplateList = templateRepository.getAllEnabledTemplateList();
|
||||
}
|
||||
RenderWorkerConfigManager configManager = repository.getWorkerConfigManager(worker.getId());
|
||||
try {
|
||||
if (lock.tryLock(2, TimeUnit.SECONDS)) {
|
||||
try {
|
||||
List<TaskRespVO> taskList;
|
||||
if (Strings.isNotBlank(configManager.getString("scenic_only"))) {
|
||||
taskList = taskMapper.selectNotRunningByScenicList(configManager.getString("scenic_only"));
|
||||
} else {
|
||||
var _taskList = taskMapper.selectNotRunning();
|
||||
taskList = _taskList.stream().filter(task -> {
|
||||
boolean workerSelfHostedScenic = isWorkerSelfHostedScenic(task.getScenicId());
|
||||
return !workerSelfHostedScenic;
|
||||
}).limit(1).toList();
|
||||
}
|
||||
resp.setTasks(taskList);
|
||||
resp.setTasks(Collections.emptyList());
|
||||
resp.setTemplates(updTemplateList);
|
||||
taskList.forEach(task -> {
|
||||
taskMapper.assignToWorker(task.getId(), worker.getId());
|
||||
videoTaskRepository.clearTaskCache(task.getId());
|
||||
});
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
@@ -556,10 +542,6 @@ public class TaskTaskServiceImpl implements TaskService {
|
||||
videoMapper.add(video);
|
||||
}
|
||||
ScenicConfigManager scenicConfig = scenicRepository.getScenicConfigManager(task.getScenicId());
|
||||
IStorageAdapter adapter = scenicService.getScenicTmpStorageAdapter(task.getScenicId());
|
||||
String hash = MD5.create().digestHex(task.getTaskParams() + task.getFaceId().toString());
|
||||
String filename = StorageUtil.joinPath(StorageConstant.VLOG_PATH, task.getTemplateId().toString() + "_" + hash + "_" + task.getScenicId() + ".mp4");
|
||||
adapter.setAcl(StorageAcl.PUBLIC_READ, filename);
|
||||
int isBuy = 0;
|
||||
FaceEntity face = faceRepository.getFace(task.getFaceId());
|
||||
if (face != null) {
|
||||
|
||||
Reference in New Issue
Block a user