from entity.ffmpeg import FfmpegTask import logging from util import ffmpeg logger = logging.getLogger('biz/ffmpeg') def parse_ffmpeg_task(task_info, template_info): tasks = [] # 中间片段 for part in template_info.get("video_parts"): sub_ffmpeg_task = parse_video_part(part, task_info) if not sub_ffmpeg_task: continue tasks.append(sub_ffmpeg_task) task = FfmpegTask(tasks, output_file="test.mp4") task.correct_task_type() overall = template_info.get("overall_template") task.add_lut(*overall.get('luts', [])) task.add_audios(*overall.get('audios', [])) task.add_overlay(*overall.get('overlays', [])) return task def parse_video_part(video_part, task_info): source = select_video_if_needed(video_part.get('source'), task_info) if not source: logger.warning("no video found for part: " + str(video_part)) return None task = FfmpegTask(source) task.add_lut(*video_part.get('luts', [])) task.add_audios(*video_part.get('audios', [])) task.add_overlay(*video_part.get('overlays', [])) return task def select_video_if_needed(source, task_info): if source.startswith('PLACEHOLDER_'): placeholder_id = source.replace('PLACEHOLDER_', '') new_sources = task_info.get('user_videos', {}).get(placeholder_id, []) if type(new_sources) is list: if len(new_sources) == 0: logger.debug("no video found for placeholder: " + placeholder_id) return None else: # TODO: Random Pick / Policy Pick new_sources = new_sources[0] return new_sources return source def start_ffmpeg_task(ffmpeg_task): for task in ffmpeg_task.analyze_input_render_tasks(): start_ffmpeg_task(task) ffmpeg.start_render(ffmpeg_task)