diff --git a/.gitignore b/.gitignore index c2044f4..6054df3 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,11 @@ __pycache__/ *.so .Python build/ +dist/ +*.mp4 +*.ts +rand*.ts +tmp_concat_*.txt *.egg-info/ *.egg *.manifest diff --git a/biz/ffmpeg.py b/biz/ffmpeg.py index 927637c..b58c86a 100644 --- a/biz/ffmpeg.py +++ b/biz/ffmpeg.py @@ -20,6 +20,11 @@ def parse_ffmpeg_task(task_info, template_info): if not source: logger.warning("no video found for part: " + str(part)) continue + only_if = part.get('only_if', '') + if only_if: + if not check_placeholder_exist(only_if, task_params): + logger.info("because only_if exist, placeholder: %s not exist, skip part: %s", only_if, part) + continue sub_ffmpeg_task = FfmpegTask(source) sub_ffmpeg_task.annexb = True sub_ffmpeg_task.frame_rate = template_info.get("frame_rate", 25) @@ -33,6 +38,7 @@ def parse_ffmpeg_task(task_info, template_info): output_file = "out_" + str(time.time()) + ".mp4" task = FfmpegTask(tasks, output_file=output_file) overall = template_info.get("overall_template") + task.center_cut = template_info.get("crop_mode", None) task.frame_rate = template_info.get("frame_rate", 25) if overall.get('source', ''): source = parse_video(overall.get('source'), task_params, template_info) @@ -66,9 +72,22 @@ def parse_video(source, task_params, template_info): return os.path.join(template_info.get("local_path"), source) +def check_placeholder_exist(placeholder_id, task_params): + if placeholder_id in task_params: + new_sources = task_params.get(placeholder_id, []) + if type(new_sources) is list: + if len(new_sources) == 0: + return False + else: + return True + return True + return False + def start_ffmpeg_task(ffmpeg_task): for task in ffmpeg_task.analyze_input_render_tasks(): - start_ffmpeg_task(task) + result = start_ffmpeg_task(task) + if not result: + return False ffmpeg_task.correct_task_type() return ffmpeg.start_render(ffmpeg_task) diff --git a/entity/ffmpeg.py b/entity/ffmpeg.py index 89f411c..71a3c18 100644 --- a/entity/ffmpeg.py +++ b/entity/ffmpeg.py @@ -1,6 +1,9 @@ import time import uuid +ENCODER_ARGS = ("-c:v", "h264_qsv", "-global_quality", "28", "-look_ahead", "1",) +PROFILE_LEVEL_ARGS = ("-profile:v", "high", "-level:v", "4") + class FfmpegTask(object): @@ -14,6 +17,9 @@ class FfmpegTask(object): self.input_file = input_file else: self.input_file = [] + self.zoom_cut = None + self.center_cut = None + self.ext_data = {} self.task_type = task_type self.output_file = output_file self.mute = True @@ -101,6 +107,10 @@ class FfmpegTask(object): return False if self.speed != 1: return False + if self.zoom_cut is not None: + return False + if self.center_cut is not None: + return False return True def check_can_copy(self): @@ -116,6 +126,10 @@ class FfmpegTask(object): return False if len(self.input_file) > 1: return False + if self.zoom_cut is not None: + return False + if self.center_cut is not None: + return False return True def check_audio_track(self): @@ -125,16 +139,16 @@ class FfmpegTask(object): def get_ffmpeg_args(self): args = ['-y', '-hide_banner'] if self.task_type == 'encode': - # args += ('-hwaccel', 'qsv', '-hwaccel_output_format', 'qsv') input_args = [] filter_args = [] - output_args = ["-shortest", "-c:v", "h264_qsv", "-global_quality", "28", "-look_ahead", "1"] + output_args = ["-profile", "high", "-level", "4","-shortest", *ENCODER_ARGS] if self.annexb: output_args.append("-bsf:v") output_args.append("h264_mp4toannexb") + output_args.append("-reset_timestamps") + output_args.append("1") video_output_str = "[0:v]" audio_output_str = "[0:v]" - video_input_count = 0 audio_input_count = 0 for input_file in self.input_file: input_args.append("-i") @@ -142,8 +156,14 @@ class FfmpegTask(object): input_args.append(input_file) elif isinstance(input_file, FfmpegTask): input_args.append(input_file.get_output_file()) + if self.center_cut == 1: + pos_json = self.ext_data.get('posJson', {}) + _v_w = pos_json.get('imgWidth', 1) + _f_x = pos_json.get('ltX', 0) + _x = f'{float(_f_x/_v_w) :.5f}*iw' + filter_args.append(f"[{video_output_str}]crop=x={_x}:y=0:w=ih*ih/iw:h=ih[{video_output_str}]") for lut in self.luts: - filter_args.append("[0:v]lut3d=file=" + lut + "[0:v]") + filter_args.append(f"[{video_output_str}]lut3d=file={lut}[{video_output_str}]") for overlay in self.overlays: input_index = input_args.count("-i") input_args.append("-i") @@ -202,9 +222,8 @@ class FfmpegTask(object): output_args.append("copy") output_args.append("-f") output_args.append("mp4") - output_args += ("-c:v", "h264_qsv", "-r", "25", "-global_quality", "28", "-look_ahead", "1") return args + input_args + output_args + [self.get_output_file()] - output_args += ("-c:v", "h264_qsv", "-r", "25", "-global_quality", "28", "-look_ahead", "1") + output_args += ["-r", f"{self.frame_rate}", *PROFILE_LEVEL_ARGS, *ENCODER_ARGS] filter_args = [] video_output_str = "[0:v]" audio_output_str = "[0:a]" diff --git a/index.py b/index.py index 09197b7..84e4958 100644 --- a/index.py +++ b/index.py @@ -5,15 +5,36 @@ import config from template import load_local_template from util import api -load_local_template() +import os +import glob +load_local_template() +import logging + +LOGGER = logging.getLogger(__name__) while True: # print(get_sys_info()) print("waiting for task...") - task_list = api.sync_center() + try: + task_list = api.sync_center() + except Exception as e: + LOGGER.error("sync_center error", exc_info=e) + sleep(5) + continue if len(task_list) == 0: + # 删除当前文件夹下所有以.mp4、.ts结尾的文件 + for file_globs in ['*.mp4', '*.ts', 'tmp_concat*.txt']: + for file_path in glob.glob(file_globs): + try: + os.remove(file_path) + print(f"Deleted file: {file_path}") + except Exception as e: + LOGGER.error(f"Error deleting file {file_path}", exc_info=e) sleep(5) for task in task_list: print("start task:", task) - biz.task.start_task(task) + try: + biz.task.start_task(task) + except Exception as e: + LOGGER.error("task_start error", exc_info=e) diff --git a/template/__init__.py b/template/__init__.py index 381860a..1f9bbfa 100644 --- a/template/__init__.py +++ b/template/__init__.py @@ -88,8 +88,8 @@ def download_template(template_id): new_fp = os.path.join(template_info['local_path'], _fn) oss.download_from_oss(_template['source'], new_fp) if _fn.endswith(".mp4"): - from util.ffmpeg import to_annexb - new_fp = to_annexb(new_fp) + from util.ffmpeg import re_encode_and_annexb + new_fp = re_encode_and_annexb(new_fp) _template['source'] = os.path.relpath(new_fp, template_info['local_path']) if 'overlays' in _template: for i in range(len(_template['overlays'])): diff --git a/util/api.py b/util/api.py index 46c698e..f264490 100644 --- a/util/api.py +++ b/util/api.py @@ -73,7 +73,7 @@ def get_template_info(template_id): 'scenic_name': remote_template_info.get('scenicName', '景区'), 'name': remote_template_info.get('name', '模版'), 'video_size': '1920x1080', - 'frame_rate': 30, + 'frame_rate': 25, 'overall_duration': 30, 'video_parts': [ @@ -90,6 +90,7 @@ def get_template_info(template_id): # 占位符 _template['source'] = "PLACEHOLDER_" + template_info.get('sourceUrl', '') _template['mute'] = template_info.get('mute', True) + _template['crop_mode'] = template_info.get('cropEnable', None) else: _template['source'] = None _overlays = template_info.get('overlays', '') @@ -101,6 +102,9 @@ def get_template_info(template_id): _luts = template_info.get('luts', '') if _luts: _template['luts'] = _luts.split(",") + _only_if = template_info.get('onlyIf', '') + if _only_if: + _template['only_if'] = _only_if return _template # outer template definition @@ -165,7 +169,7 @@ def upload_task_file(task_info, ffmpeg_task): logger.info("开始上传文件: %s 至 %s", task_info.get("id"), url) try: with open(ffmpeg_task.get_output_file(), 'rb') as f: - requests.put(url, data=f) + requests.put(url, data=f, headers={"Content-Type": "video/mp4"}) except requests.RequestException as e: logger.error("上传失败!", e) return False diff --git a/util/ffmpeg.py b/util/ffmpeg.py index 1065570..f572f78 100644 --- a/util/ffmpeg.py +++ b/util/ffmpeg.py @@ -4,7 +4,7 @@ import subprocess from datetime import datetime from typing import Optional, IO -from entity.ffmpeg import FfmpegTask +from entity.ffmpeg import FfmpegTask, ENCODER_ARGS, PROFILE_LEVEL_ARGS logger = logging.getLogger(__name__) @@ -21,6 +21,19 @@ def to_annexb(file): else: return file +def re_encode_and_annexb(file): + if not os.path.exists(file): + return file + logger.info("ReEncodeAndAnnexb: %s", file) + ffmpeg_process = subprocess.run(["ffmpeg", "-y", "-hide_banner", "-i", file, *PROFILE_LEVEL_ARGS, *ENCODER_ARGS, "-bsf:v", "h264_mp4toannexb", + "-f", "mpegts", file +".ts"]) + logger.info("ReEncodeAndAnnexb: %s, returned: %s", file, ffmpeg_process.returncode) + if ffmpeg_process.returncode == 0: + os.remove(file) + return file+".ts" + else: + return file + def start_render(ffmpeg_task: FfmpegTask): logger.info(ffmpeg_task) if not ffmpeg_task.need_run(): @@ -34,7 +47,18 @@ def start_render(ffmpeg_task: FfmpegTask): ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *ffmpeg_args], **subprocess_args(True)) logger.info("FINISH TASK, OUTPUT IS %s", handle_ffmpeg_output(ffmpeg_process.stdout)) code = ffmpeg_process.returncode - return code == 0 + if code != 0: + logger.error("FFMPEG ERROR: %s", ffmpeg_process.stderr) + return False + try: + out_file_stat = os.stat(ffmpeg_task.output_file) + if out_file_stat.st_size < 4096: + logger.error("FFMPEG ERROR: OUTPUT FILE IS TOO SMALL") + return False + except OSError: + logger.error("FFMPEG ERROR: OUTPUT FILE NOT FOUND") + return False + return True def handle_ffmpeg_output(stdout: Optional[bytes]) -> str: out_time = "0:0:0.0" @@ -69,6 +93,8 @@ def probe_video_info(video_file): stderr=subprocess.STDOUT, **subprocess_args(True) ) + if result.returncode != 0: + return 0, 0, 0 all_result = result.stdout.decode('utf-8').strip() wh, duration = all_result.split('\n') width, height = wh.strip().split('x') diff --git a/util/oss.py b/util/oss.py index 140dba1..e41f6d1 100644 --- a/util/oss.py +++ b/util/oss.py @@ -13,13 +13,22 @@ def upload_to_oss(url, file_path): :param str file_path: 文件路径 :return bool: 是否成功 """ - with open(file_path, 'rb') as f: + max_retries = 5 + retries = 0 + while retries < max_retries: try: - response = requests.put(url, data=f) - return response.status_code == 200 + with open(file_path, 'rb') as f: + response = requests.put(url, data=f, timeout=60) # 设置超时时间为1分钟 + if response.status_code == 200: + return True + except requests.exceptions.Timeout: + retries += 1 + logger.warning(f"Upload timed out. Retrying {retries}/{max_retries}...") except Exception as e: - print(e) - return False + logger.warning(f"Upload failed. Retrying {retries}/{max_retries}...") + retries += 1 + return False + def download_from_oss(url, file_path): """ @@ -33,11 +42,18 @@ def download_from_oss(url, file_path): if file_dir: if not os.path.exists(file_dir): os.makedirs(file_dir) - try: - response = requests.get(url) - with open(file_path, 'wb') as f: - f.write(response.content) - return True - except Exception as e: - print(e) - return False + max_retries = 5 + retries = 0 + while retries < max_retries: + try: + response = requests.get(url, timeout=15) # 设置超时时间 + with open(file_path, 'wb') as f: + f.write(response.content) + return True + except requests.exceptions.Timeout: + retries += 1 + logger.warning(f"Download timed out. Retrying {retries}/{max_retries}...") + except Exception as e: + logger.warning(f"Download failed. Retrying {retries}/{max_retries}...") + retries += 1 + return False