Files
FrameTour-RenderWorker/biz/ffmpeg.py
Jerry Yan 8de0564fef feat(biz): 更新FFmpeg任务启动功能以支持环境变量配置最大工作线程数
- 修改start_ffmpeg_task函数参数max_workers默认值为None
- 添加环境变量FFMPEG_MAX_WORKERS读取逻辑
- 当max_workers为None时从环境变量获取默认值,否则使用传入值
- 保持原有tracer和任务分析功能不变
2026-01-10 18:28:00 +08:00

193 lines
8.5 KiB
Python

import json
import os.path
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from opentelemetry.trace import Status, StatusCode
from entity.ffmpeg import FfmpegTask
import logging
from util import ffmpeg, oss
from util.ffmpeg import fade_out_audio
from telemetry import get_tracer
logger = logging.getLogger('biz/ffmpeg')
def parse_ffmpeg_task(task_info, template_info):
tracer = get_tracer(__name__)
with tracer.start_as_current_span("parse_ffmpeg_task") as span:
tasks = []
# 中间片段
task_params_str = task_info.get("taskParams", "{}")
span.set_attribute("task_params", task_params_str)
task_params: dict = json.loads(task_params_str)
task_params_orig = json.loads(task_params_str)
# 统计only_if占位符的使用次数
only_if_usage_count = {}
with tracer.start_as_current_span("parse_ffmpeg_task.download_all") as sub_span:
with ThreadPoolExecutor(max_workers=8) as executor:
param_list: list[dict]
for param_list in task_params.values():
for param in param_list:
url = param.get("url")
if url.startswith("http"):
_, fn = os.path.split(url)
executor.submit(oss.download_from_oss, url, fn, True)
executor.shutdown(wait=True)
for part in template_info.get("video_parts"):
source, ext_data = parse_video(part.get('source'), task_params, template_info)
if not source:
logger.warning("no video found for part: " + str(part))
continue
only_if = part.get('only_if', '')
if only_if:
only_if_usage_count[only_if] = only_if_usage_count.get(only_if, 0) + 1
required_count = only_if_usage_count.get(only_if)
if not check_placeholder_exist_with_count(only_if, task_params_orig, required_count):
logger.info("because only_if exist, placeholder: %s insufficient (need %d), skip part: %s", only_if, required_count, part)
continue
sub_ffmpeg_task = FfmpegTask(source)
sub_ffmpeg_task.resolution = template_info.get("video_size", "")
sub_ffmpeg_task.annexb = True
sub_ffmpeg_task.ext_data = ext_data or {}
sub_ffmpeg_task.frame_rate = template_info.get("frame_rate", 25)
sub_ffmpeg_task.center_cut = part.get("crop_mode", None)
sub_ffmpeg_task.zoom_cut = part.get("zoom_cut", None)
for effect in part.get('effects', []):
sub_ffmpeg_task.add_effect(effect)
for lut in part.get('luts', []):
sub_ffmpeg_task.add_lut(os.path.join(template_info.get("local_path"), lut).replace("\\", "/"))
for audio in part.get('audios', []):
sub_ffmpeg_task.add_audios(os.path.join(template_info.get("local_path"), audio))
for overlay in part.get('overlays', []):
sub_ffmpeg_task.add_overlay(os.path.join(template_info.get("local_path"), overlay))
tasks.append(sub_ffmpeg_task)
output_file = "out_" + str(time.time()) + ".mp4"
task = FfmpegTask(tasks, output_file=output_file)
task.resolution = template_info.get("video_size", "")
overall = template_info.get("overall_template")
task.center_cut = template_info.get("crop_mode", None)
task.zoom_cut = template_info.get("zoom_cut", None)
task.frame_rate = template_info.get("frame_rate", 25)
# if overall.get('source', ''):
# source, ext_data = parse_video(overall.get('source'), task_params, template_info)
# task.add_inputs(source)
# task.ext_data = ext_data or {}
for effect in overall.get('effects', []):
task.add_effect(effect)
for lut in overall.get('luts', []):
task.add_lut(os.path.join(template_info.get("local_path"), lut).replace("\\", "/"))
for audio in overall.get('audios', []):
task.add_audios(os.path.join(template_info.get("local_path"), audio))
for overlay in overall.get('overlays', []):
task.add_overlay(os.path.join(template_info.get("local_path"), overlay))
return task
def parse_video(source, task_params, template_info):
if source.startswith('PLACEHOLDER_'):
placeholder_id = source.replace('PLACEHOLDER_', '')
new_sources = task_params.get(placeholder_id, [])
_pick_source = {}
if type(new_sources) is list:
if len(new_sources) == 0:
logger.debug("no video found for placeholder: " + placeholder_id)
return None, _pick_source
else:
_pick_source = new_sources.pop(0)
new_sources = _pick_source.get("url")
if new_sources.startswith("http"):
_, source_name = os.path.split(new_sources)
oss.download_from_oss(new_sources, source_name, True)
return source_name, _pick_source
return new_sources, _pick_source
return os.path.join(template_info.get("local_path"), source), None
def check_placeholder_exist(placeholder_id, task_params):
if placeholder_id in task_params:
new_sources = task_params.get(placeholder_id, [])
if type(new_sources) is list:
if len(new_sources) == 0:
return False
else:
return True
return True
return False
def check_placeholder_exist_with_count(placeholder_id, task_params, required_count=1):
"""检查占位符是否存在足够数量的片段"""
if placeholder_id in task_params:
new_sources = task_params.get(placeholder_id, [])
if type(new_sources) is list:
return len(new_sources) >= required_count
return required_count <= 1
return False
def start_ffmpeg_task(ffmpeg_task, max_workers=None):
if max_workers is None:
max_workers = int(os.environ.get("FFMPEG_MAX_WORKERS", 4))
tracer = get_tracer(__name__)
with tracer.start_as_current_span("start_ffmpeg_task") as span:
sub_tasks = list(ffmpeg_task.analyze_input_render_tasks())
if sub_tasks:
span.set_attribute("sub_tasks.count", len(sub_tasks))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(start_ffmpeg_task, task, max_workers): task
for task in sub_tasks}
for future in as_completed(futures):
try:
if not future.result():
# 快速失败:取消剩余任务
for f in futures:
f.cancel()
span.set_status(Status(StatusCode.ERROR))
return False
except Exception as e:
logger.error("子任务执行失败: %s", e)
for f in futures:
f.cancel()
span.set_status(Status(StatusCode.ERROR))
return False
ffmpeg_task.correct_task_type()
span.set_attribute("task.type", ffmpeg_task.task_type)
span.set_attribute("task.center_cut", str(ffmpeg_task.center_cut))
span.set_attribute("task.frame_rate", ffmpeg_task.frame_rate)
span.set_attribute("task.resolution", str(ffmpeg_task.resolution))
span.set_attribute("task.ext_data", json.dumps(ffmpeg_task.ext_data))
result = ffmpeg.start_render(ffmpeg_task)
if not result:
span.set_status(Status(StatusCode.ERROR))
return False
span.set_status(Status(StatusCode.OK))
return True
def clear_task_tmp_file(ffmpeg_task):
for task in ffmpeg_task.analyze_input_render_tasks():
clear_task_tmp_file(task)
try:
if os.getenv("TEMPLATE_DIR") not in ffmpeg_task.get_output_file():
os.remove(ffmpeg_task.get_output_file())
logger.info("delete tmp file: " + ffmpeg_task.get_output_file())
else:
logger.info("skip delete template file: " + ffmpeg_task.get_output_file())
except OSError:
logger.warning("delete tmp file failed: " + ffmpeg_task.get_output_file())
return False
return True
def probe_video_info(ffmpeg_task):
# 获取视频长度宽度和时长
return ffmpeg.probe_video_info(ffmpeg_task.get_output_file())