import json import os.path import time from opentelemetry.trace import Status, StatusCode from entity.ffmpeg import FfmpegTask import logging from util import ffmpeg, oss from util.ffmpeg import fade_out_audio from telemetry import get_tracer logger = logging.getLogger('biz/ffmpeg') def parse_ffmpeg_task(task_info, template_info): tracer = get_tracer(__name__) with tracer.start_as_current_span("parse_ffmpeg_task") as span: tasks = [] # 中间片段 task_params_str = task_info.get("taskParams", "{}") span.set_attribute("task_params", task_params_str) task_params = json.loads(task_params_str) task_params_orig = json.loads(task_params_str) for part in template_info.get("video_parts"): source, ext_data = parse_video(part.get('source'), task_params, template_info) if not source: logger.warning("no video found for part: " + str(part)) continue only_if = part.get('only_if', '') if only_if: if not check_placeholder_exist(only_if, task_params_orig): logger.info("because only_if exist, placeholder: %s not exist, skip part: %s", only_if, part) continue sub_ffmpeg_task = FfmpegTask(source) sub_ffmpeg_task.resolution = template_info.get("video_size", "") sub_ffmpeg_task.annexb = True sub_ffmpeg_task.ext_data = ext_data or {} sub_ffmpeg_task.frame_rate = template_info.get("frame_rate", 25) sub_ffmpeg_task.center_cut = part.get("crop_mode", None) for effect in part.get('effects', []): sub_ffmpeg_task.add_effect(effect) for lut in part.get('filters', []): sub_ffmpeg_task.add_lut(os.path.join(template_info.get("local_path"), lut)) for audio in part.get('audios', []): sub_ffmpeg_task.add_audios(os.path.join(template_info.get("local_path"), audio)) for overlay in part.get('overlays', []): sub_ffmpeg_task.add_overlay(os.path.join(template_info.get("local_path"), overlay)) tasks.append(sub_ffmpeg_task) output_file = "out_" + str(time.time()) + ".mp4" task = FfmpegTask(tasks, output_file=output_file) task.resolution = template_info.get("video_size", "") overall = template_info.get("overall_template") task.center_cut = template_info.get("crop_mode", None) task.frame_rate = template_info.get("frame_rate", 25) if overall.get('source', ''): source, ext_data = parse_video(overall.get('source'), task_params, template_info) task.add_inputs(source) task.ext_data = ext_data or {} for effect in overall.get('effects', []): task.add_effect(effect) for lut in overall.get('filters', []): task.add_lut(os.path.join(template_info.get("local_path"), lut)) for audio in overall.get('audios', []): task.add_audios(os.path.join(template_info.get("local_path"), audio)) for overlay in overall.get('overlays', []): task.add_overlay(os.path.join(template_info.get("local_path"), overlay)) return task def parse_video(source, task_params, template_info): if source.startswith('PLACEHOLDER_'): placeholder_id = source.replace('PLACEHOLDER_', '') new_sources = task_params.get(placeholder_id, []) _pick_source = {} if type(new_sources) is list: if len(new_sources) == 0: logger.debug("no video found for placeholder: " + placeholder_id) return None, _pick_source else: _pick_source = new_sources.pop(0) new_sources = _pick_source.get("url") if new_sources.startswith("http"): _, source_name = os.path.split(new_sources) oss.download_from_oss(new_sources, source_name) return source_name, _pick_source return new_sources, _pick_source return os.path.join(template_info.get("local_path"), source), None def check_placeholder_exist(placeholder_id, task_params): if placeholder_id in task_params: new_sources = task_params.get(placeholder_id, []) if type(new_sources) is list: if len(new_sources) == 0: return False else: return True return True return False def start_ffmpeg_task(ffmpeg_task): tracer = get_tracer(__name__) with tracer.start_as_current_span("start_ffmpeg_task") as span: for task in ffmpeg_task.analyze_input_render_tasks(): result = start_ffmpeg_task(task) if not result: return False ffmpeg_task.correct_task_type() span.set_attribute("task.type", ffmpeg_task.task_type) span.set_attribute("task.center_cut", ffmpeg_task.center_cut) span.set_attribute("task.frame_rate", ffmpeg_task.frame_rate) span.set_attribute("task.resolution", ffmpeg_task.resolution) span.set_attribute("task.ext_data", json.dumps(ffmpeg_task.ext_data)) result = ffmpeg.start_render(ffmpeg_task) if not result: span.set_status(Status(StatusCode.ERROR)) return False span.set_status(Status(StatusCode.OK)) return True def clear_task_tmp_file(ffmpeg_task): for task in ffmpeg_task.analyze_input_render_tasks(): clear_task_tmp_file(task) try: if os.getenv("TEMPLATE_DIR") not in ffmpeg_task.get_output_file(): os.remove(ffmpeg_task.get_output_file()) logger.info("delete tmp file: " + ffmpeg_task.get_output_file()) else: logger.info("skip delete template file: " + ffmpeg_task.get_output_file()) except OSError: logger.warning("delete tmp file failed: " + ffmpeg_task.get_output_file()) return False return True def probe_video_info(ffmpeg_task): # 获取视频长度宽度和时长 return ffmpeg.probe_video_info(ffmpeg_task.get_output_file())