From 88aa3adca1b2bf32307a7896fe3adf33d57f7b6c Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Sat, 7 Feb 2026 00:38:43 +0800 Subject: [PATCH] =?UTF-8?q?feat(base):=20=E6=B7=BB=E5=8A=A0=E5=8D=95?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E5=86=85=E6=96=87=E4=BB=B6=E4=BC=A0=E8=BE=93?= =?UTF-8?q?=E5=B9=B6=E5=8F=91=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 引入 ThreadPoolExecutor 实现并行下载和上传 - 新增 download_files_parallel 和 upload_files_parallel 方法 - 添加任务传输并发数配置选项 TASK_DOWNLOAD_CONCURRENCY 和 TASK_UPLOAD_CONCURRENCY - 实现并发数配置的环境变量解析和验证逻辑 - 在多个处理器中应用并行下载优化文件获取性能 - 更新 .env.example 配置文件模板 - 移除 FFmpeg 命令日志长度限制 --- .env.example | 2 + handlers/base.py | 261 ++++++++++++++++++++++++++++++++- handlers/compose_transition.py | 38 +++-- handlers/finalize_mp4.py | 18 ++- handlers/package_ts.py | 36 +++-- handlers/prepare_audio.py | 74 +++++++--- handlers/render_video.py | 94 +++++++----- 7 files changed, 435 insertions(+), 88 deletions(-) diff --git a/.env.example b/.env.example index 3692fac..7bf6354 100644 --- a/.env.example +++ b/.env.example @@ -30,6 +30,8 @@ TEMP_DIR=tmp/ #FFMPEG_TIMEOUT=3600 # FFmpeg 执行超时(秒) #DOWNLOAD_TIMEOUT=300 # 下载超时(秒) #UPLOAD_TIMEOUT=600 # 上传超时(秒) +#TASK_DOWNLOAD_CONCURRENCY=4 # 单任务内并行下载数(1-16) +#TASK_UPLOAD_CONCURRENCY=2 # 单任务内并行上传数(1-16) # =================== # 硬件加速与多显卡 diff --git a/handlers/base.py b/handlers/base.py index 48ed890..22cf223 100644 --- a/handlers/base.py +++ b/handlers/base.py @@ -12,6 +12,7 @@ import shutil import tempfile import subprocess import threading +from concurrent.futures import ThreadPoolExecutor, as_completed from abc import ABC from typing import Optional, List, Dict, Any, Tuple, TYPE_CHECKING @@ -23,7 +24,13 @@ from domain.result import TaskResult, ErrorCode from domain.config import WorkerConfig from services import storage from services.cache import MaterialCache -from util.tracing import get_current_task_context, mark_span_error, start_span +from util.tracing import ( + bind_trace_context, + capture_otel_context, + get_current_task_context, + mark_span_error, + start_span, +) from constant import ( HW_ACCEL_NONE, HW_ACCEL_QSV, HW_ACCEL_CUDA, VIDEO_ENCODE_PARAMS, VIDEO_ENCODE_PARAMS_QSV, VIDEO_ENCODE_PARAMS_CUDA @@ -274,6 +281,9 @@ class BaseHandler(TaskHandler, ABC): # 线程本地存储:用于存储当前线程的 GPU 设备索引 _thread_local = threading.local() + DEFAULT_TASK_DOWNLOAD_CONCURRENCY = 4 + DEFAULT_TASK_UPLOAD_CONCURRENCY = 2 + MAX_TASK_TRANSFER_CONCURRENCY = 16 def __init__(self, config: WorkerConfig, api_client: 'APIClientV2'): """ @@ -290,6 +300,251 @@ class BaseHandler(TaskHandler, ABC): enabled=config.cache_enabled, max_size_gb=config.cache_max_size_gb ) + self.task_download_concurrency = self._resolve_task_transfer_concurrency( + "TASK_DOWNLOAD_CONCURRENCY", + self.DEFAULT_TASK_DOWNLOAD_CONCURRENCY + ) + self.task_upload_concurrency = self._resolve_task_transfer_concurrency( + "TASK_UPLOAD_CONCURRENCY", + self.DEFAULT_TASK_UPLOAD_CONCURRENCY + ) + + def _resolve_task_transfer_concurrency(self, env_name: str, default_value: int) -> int: + """读取并规范化任务内传输并发数配置。""" + raw_value = os.getenv(env_name) + if raw_value is None or not raw_value.strip(): + return default_value + + try: + parsed_value = int(raw_value.strip()) + except ValueError: + logger.warning( + f"Invalid {env_name} value '{raw_value}', using default {default_value}" + ) + return default_value + + if parsed_value < 1: + logger.warning(f"{env_name} must be >= 1, forcing to 1") + return 1 + + if parsed_value > self.MAX_TASK_TRANSFER_CONCURRENCY: + logger.warning( + f"{env_name}={parsed_value} exceeds limit {self.MAX_TASK_TRANSFER_CONCURRENCY}, " + f"using {self.MAX_TASK_TRANSFER_CONCURRENCY}" + ) + return self.MAX_TASK_TRANSFER_CONCURRENCY + + return parsed_value + + def download_files_parallel( + self, + download_jobs: List[Dict[str, Any]], + timeout: Optional[int] = None + ) -> Dict[str, Dict[str, Any]]: + """ + 单任务内并行下载多个文件。 + + Args: + download_jobs: 下载任务列表。每项字段: + - key: 唯一标识 + - url: 下载地址 + - dest: 目标文件路径 + - required: 是否关键文件(可选,默认 True) + - use_cache: 是否使用缓存(可选,默认 True) + timeout: 单文件下载超时(秒) + + Returns: + key -> 结果字典: + - success: 是否成功 + - url: 原始 URL + - dest: 目标文件路径 + - required: 是否关键文件 + """ + if not download_jobs: + return {} + + normalized_jobs: List[Dict[str, Any]] = [] + seen_keys = set() + for download_job in download_jobs: + job_key = str(download_job.get("key", "")).strip() + job_url = str(download_job.get("url", "")).strip() + job_dest = str(download_job.get("dest", "")).strip() + if not job_key or not job_url or not job_dest: + raise ValueError("Each download job must include non-empty key/url/dest") + if job_key in seen_keys: + raise ValueError(f"Duplicate download job key: {job_key}") + seen_keys.add(job_key) + normalized_jobs.append({ + "key": job_key, + "url": job_url, + "dest": job_dest, + "required": bool(download_job.get("required", True)), + "use_cache": bool(download_job.get("use_cache", True)), + }) + + if timeout is None: + timeout = self.config.download_timeout + + parent_otel_context = capture_otel_context() + task_context = get_current_task_context() + task_prefix = f"[task:{task_context.task_id}] " if task_context else "" + results: Dict[str, Dict[str, Any]] = {} + + def _run_download_job(download_job: Dict[str, Any]) -> bool: + with bind_trace_context(parent_otel_context, task_context): + return self.download_file( + download_job["url"], + download_job["dest"], + timeout=timeout, + use_cache=download_job["use_cache"], + ) + + max_workers = min(self.task_download_concurrency, len(normalized_jobs)) + if max_workers <= 1: + for download_job in normalized_jobs: + is_success = _run_download_job(download_job) + results[download_job["key"]] = { + "success": is_success, + "url": download_job["url"], + "dest": download_job["dest"], + "required": download_job["required"], + } + else: + with ThreadPoolExecutor( + max_workers=max_workers, + thread_name_prefix="TaskDownload", + ) as executor: + future_to_job = { + executor.submit(_run_download_job, download_job): download_job + for download_job in normalized_jobs + } + for completed_future in as_completed(future_to_job): + download_job = future_to_job[completed_future] + is_success = False + try: + is_success = bool(completed_future.result()) + except Exception as exc: + logger.error( + f"{task_prefix}Parallel download raised exception for " + f"key={download_job['key']}: {exc}" + ) + results[download_job["key"]] = { + "success": is_success, + "url": download_job["url"], + "dest": download_job["dest"], + "required": download_job["required"], + } + + success_count = sum(1 for item in results.values() if item["success"]) + logger.debug( + f"{task_prefix}Parallel download completed: {success_count}/{len(normalized_jobs)}" + ) + return results + + def upload_files_parallel( + self, + upload_jobs: List[Dict[str, Any]] + ) -> Dict[str, Dict[str, Any]]: + """ + 单任务内并行上传多个文件。 + + Args: + upload_jobs: 上传任务列表。每项字段: + - key: 唯一标识 + - task_id: 任务 ID + - file_type: 文件类型(video/audio/ts/mp4) + - file_path: 本地文件路径 + - file_name: 文件名(可选) + - required: 是否关键文件(可选,默认 True) + + Returns: + key -> 结果字典: + - success: 是否成功 + - url: 上传后的访问 URL(失败为 None) + - file_path: 本地文件路径 + - required: 是否关键文件 + """ + if not upload_jobs: + return {} + + normalized_jobs: List[Dict[str, Any]] = [] + seen_keys = set() + for upload_job in upload_jobs: + job_key = str(upload_job.get("key", "")).strip() + task_id = str(upload_job.get("task_id", "")).strip() + file_type = str(upload_job.get("file_type", "")).strip() + file_path = str(upload_job.get("file_path", "")).strip() + if not job_key or not task_id or not file_type or not file_path: + raise ValueError( + "Each upload job must include non-empty key/task_id/file_type/file_path" + ) + if job_key in seen_keys: + raise ValueError(f"Duplicate upload job key: {job_key}") + seen_keys.add(job_key) + normalized_jobs.append({ + "key": job_key, + "task_id": task_id, + "file_type": file_type, + "file_path": file_path, + "file_name": upload_job.get("file_name"), + "required": bool(upload_job.get("required", True)), + }) + + parent_otel_context = capture_otel_context() + task_context = get_current_task_context() + task_prefix = f"[task:{task_context.task_id}] " if task_context else "" + results: Dict[str, Dict[str, Any]] = {} + + def _run_upload_job(upload_job: Dict[str, Any]) -> Optional[str]: + with bind_trace_context(parent_otel_context, task_context): + return self.upload_file( + upload_job["task_id"], + upload_job["file_type"], + upload_job["file_path"], + upload_job.get("file_name") + ) + + max_workers = min(self.task_upload_concurrency, len(normalized_jobs)) + if max_workers <= 1: + for upload_job in normalized_jobs: + result_url = _run_upload_job(upload_job) + results[upload_job["key"]] = { + "success": bool(result_url), + "url": result_url, + "file_path": upload_job["file_path"], + "required": upload_job["required"], + } + else: + with ThreadPoolExecutor( + max_workers=max_workers, + thread_name_prefix="TaskUpload", + ) as executor: + future_to_job = { + executor.submit(_run_upload_job, upload_job): upload_job + for upload_job in normalized_jobs + } + for completed_future in as_completed(future_to_job): + upload_job = future_to_job[completed_future] + result_url = None + try: + result_url = completed_future.result() + except Exception as exc: + logger.error( + f"{task_prefix}Parallel upload raised exception for " + f"key={upload_job['key']}: {exc}" + ) + results[upload_job["key"]] = { + "success": bool(result_url), + "url": result_url, + "file_path": upload_job["file_path"], + "required": upload_job["required"], + } + + success_count = sum(1 for item in results.values() if item["success"]) + logger.debug( + f"{task_prefix}Parallel upload completed: {success_count}/{len(normalized_jobs)}" + ) + return results # ========== GPU 设备管理 ========== @@ -538,10 +793,8 @@ class BaseHandler(TaskHandler, ABC): if cmd_to_run and cmd_to_run[0] == 'ffmpeg' and '-loglevel' not in cmd_to_run: cmd_to_run[1:1] = ['-loglevel', FFMPEG_LOGLEVEL] - # 日志记录命令(限制长度) + # 日志记录命令(不限制长度) cmd_str = ' '.join(cmd_to_run) - if len(cmd_str) > 500: - cmd_str = cmd_str[:500] + '...' logger.info(f"[task:{task_id}] FFmpeg: {cmd_str}") with start_span( diff --git a/handlers/compose_transition.py b/handlers/compose_transition.py index cf8dba6..edd0076 100644 --- a/handlers/compose_transition.py +++ b/handlers/compose_transition.py @@ -91,23 +91,37 @@ class ComposeTransitionHandler(BaseHandler): f"overlap_tail={overlap_tail_ms}ms, overlap_head={overlap_head_ms}ms" ) - # 1. 下载前一个片段视频 + # 1. 并行下载前后片段视频 prev_video_file = os.path.join(work_dir, 'prev_segment.mp4') - if not self.download_file(prev_segment['videoUrl'], prev_video_file): + next_video_file = os.path.join(work_dir, 'next_segment.mp4') + download_results = self.download_files_parallel([ + { + 'key': 'prev_video', + 'url': prev_segment['videoUrl'], + 'dest': prev_video_file, + 'required': True + }, + { + 'key': 'next_video', + 'url': next_segment['videoUrl'], + 'dest': next_video_file, + 'required': True + } + ]) + prev_result = download_results.get('prev_video') + if not prev_result or not prev_result['success']: return TaskResult.fail( ErrorCode.E_INPUT_UNAVAILABLE, f"Failed to download prev segment video: {prev_segment['videoUrl']}" ) - - # 2. 下载后一个片段视频 - next_video_file = os.path.join(work_dir, 'next_segment.mp4') - if not self.download_file(next_segment['videoUrl'], next_video_file): + next_result = download_results.get('next_video') + if not next_result or not next_result['success']: return TaskResult.fail( ErrorCode.E_INPUT_UNAVAILABLE, f"Failed to download next segment video: {next_segment['videoUrl']}" ) - # 3. 获取前一个片段的实际时长 + # 2. 获取前一个片段的实际时长 prev_duration = self.probe_duration(prev_video_file) if not prev_duration: return TaskResult.fail( @@ -115,7 +129,7 @@ class ComposeTransitionHandler(BaseHandler): "Failed to probe prev segment duration" ) - # 4. 构建转场合成命令 + # 3. 构建转场合成命令 output_file = os.path.join(work_dir, 'transition.mp4') cmd = self._build_command( prev_video_file=prev_video_file, @@ -128,25 +142,25 @@ class ComposeTransitionHandler(BaseHandler): output_spec=output_spec ) - # 5. 执行 FFmpeg + # 4. 执行 FFmpeg if not self.run_ffmpeg(cmd, task.task_id): return TaskResult.fail( ErrorCode.E_FFMPEG_FAILED, "FFmpeg transition composition failed" ) - # 6. 验证输出文件 + # 5. 验证输出文件 if not self.ensure_file_exists(output_file, min_size=1024): return TaskResult.fail( ErrorCode.E_FFMPEG_FAILED, "Transition output file is missing or too small" ) - # 7. 获取实际时长 + # 6. 获取实际时长 actual_duration = self.probe_duration(output_file) actual_duration_ms = int(actual_duration * 1000) if actual_duration else transition_duration_ms - # 8. 上传产物 + # 7. 上传产物 transition_video_url = self.upload_file(task.task_id, 'video', output_file) if not transition_video_url: return TaskResult.fail( diff --git a/handlers/finalize_mp4.py b/handlers/finalize_mp4.py index d494d18..e5471cc 100644 --- a/handlers/finalize_mp4.py +++ b/handlers/finalize_mp4.py @@ -110,16 +110,26 @@ class FinalizeMp4Handler(BaseHandler): Returns: TaskResult """ - # 1. 下载所有 TS 分片 + # 1. 并行下载所有 TS 分片 + download_jobs = [] + for i, ts_url in enumerate(ts_list): + download_jobs.append({ + 'key': str(i), + 'url': ts_url, + 'dest': os.path.join(work_dir, f'seg_{i}.ts'), + 'required': True + }) + download_results = self.download_files_parallel(download_jobs) + ts_files = [] for i, ts_url in enumerate(ts_list): - ts_file = os.path.join(work_dir, f'seg_{i}.ts') - if not self.download_file(ts_url, ts_file): + result = download_results.get(str(i)) + if not result or not result['success']: return TaskResult.fail( ErrorCode.E_INPUT_UNAVAILABLE, f"Failed to download TS segment {i}: {ts_url}" ) - ts_files.append(ts_file) + ts_files.append(result['dest']) logger.info(f"[task:{task.task_id}] Downloaded {len(ts_files)} TS segments") diff --git a/handlers/package_ts.py b/handlers/package_ts.py index 0d6864a..51772c9 100644 --- a/handlers/package_ts.py +++ b/handlers/package_ts.py @@ -81,29 +81,43 @@ class PackageSegmentTsHandler(BaseHandler): start_sec = start_time_ms / 1000.0 duration_sec = duration_ms / 1000.0 - # 1. 下载视频片段 + # 1. 并行下载视频片段与全局音频 video_file = os.path.join(work_dir, 'video.mp4') - if not self.download_file(video_url, video_file): + audio_file = os.path.join(work_dir, 'audio.aac') + download_results = self.download_files_parallel([ + { + 'key': 'video', + 'url': video_url, + 'dest': video_file, + 'required': True + }, + { + 'key': 'audio', + 'url': audio_url, + 'dest': audio_file, + 'required': True + } + ]) + video_result = download_results.get('video') + if not video_result or not video_result['success']: return TaskResult.fail( ErrorCode.E_INPUT_UNAVAILABLE, f"Failed to download video: {video_url}" ) - - # 2. 下载全局音频 - audio_file = os.path.join(work_dir, 'audio.aac') - if not self.download_file(audio_url, audio_file): + audio_result = download_results.get('audio') + if not audio_result or not audio_result['success']: return TaskResult.fail( ErrorCode.E_INPUT_UNAVAILABLE, f"Failed to download audio: {audio_url}" ) - # 3. 判断是否需要精确裁剪视频 + # 2. 判断是否需要精确裁剪视频 needs_video_trim = not is_transition_segment and ( (trim_head and trim_head_ms > 0) or (trim_tail and trim_tail_ms > 0) ) - # 4. 如果需要裁剪,先重编码裁剪视频 + # 3. 如果需要裁剪,先重编码裁剪视频 processed_video_file = video_file if needs_video_trim: processed_video_file = os.path.join(work_dir, 'trimmed_video.mp4') @@ -129,7 +143,7 @@ class PackageSegmentTsHandler(BaseHandler): "Trimmed video file is missing or too small" ) - # 5. 构建 TS 封装命令 + # 4. 构建 TS 封装命令 output_file = os.path.join(work_dir, 'segment.ts') cmd = self._build_package_command( video_file=processed_video_file, @@ -139,14 +153,14 @@ class PackageSegmentTsHandler(BaseHandler): duration_sec=duration_sec ) - # 6. 执行 FFmpeg + # 5. 执行 FFmpeg if not self.run_ffmpeg(cmd, task.task_id): return TaskResult.fail( ErrorCode.E_FFMPEG_FAILED, "TS packaging failed" ) - # 7. 验证输出文件 + # 6. 验证输出文件 if not self.ensure_file_exists(output_file, min_size=1024): return TaskResult.fail( ErrorCode.E_FFMPEG_FAILED, diff --git a/handlers/prepare_audio.py b/handlers/prepare_audio.py index 27d1263..eaa311f 100644 --- a/handlers/prepare_audio.py +++ b/handlers/prepare_audio.py @@ -55,32 +55,60 @@ class PrepareJobAudioHandler(BaseHandler): bgm_url = task.get_bgm_url() segments = task.get_segments() - # 1. 下载 BGM(如有) - bgm_file = None + # 1. 并行下载 BGM 与叠加音效 + bgm_file = os.path.join(work_dir, 'bgm.mp3') if bgm_url else None + download_jobs = [] + if bgm_url and bgm_file: + download_jobs.append({ + 'key': 'bgm', + 'url': bgm_url, + 'dest': bgm_file, + 'required': False + }) + + sfx_download_candidates = [] + for i, seg in enumerate(segments): + audio_spec_data = seg.get('audioSpecJson') + if not audio_spec_data: + continue + audio_spec = AudioSpec.from_dict(audio_spec_data) + if not audio_spec or not audio_spec.audio_url: + continue + sfx_file = os.path.join(work_dir, f'sfx_{i}.mp3') + job_key = f'sfx_{i}' + sfx_download_candidates.append({ + 'key': job_key, + 'file': sfx_file, + 'spec': audio_spec, + 'segment': seg + }) + download_jobs.append({ + 'key': job_key, + 'url': audio_spec.audio_url, + 'dest': sfx_file, + 'required': False + }) + + download_results = self.download_files_parallel(download_jobs) if bgm_url: - bgm_file = os.path.join(work_dir, 'bgm.mp3') - if not self.download_file(bgm_url, bgm_file): + bgm_result = download_results.get('bgm') + if not bgm_result or not bgm_result['success']: logger.warning(f"[task:{task.task_id}] Failed to download BGM") bgm_file = None - # 2. 下载叠加音效 sfx_files = [] - for i, seg in enumerate(segments): - audio_spec_data = seg.get('audioSpecJson') - if audio_spec_data: - audio_spec = AudioSpec.from_dict(audio_spec_data) - if audio_spec and audio_spec.audio_url: - sfx_file = os.path.join(work_dir, f'sfx_{i}.mp3') - if self.download_file(audio_spec.audio_url, sfx_file): - sfx_files.append({ - 'file': sfx_file, - 'spec': audio_spec, - 'segment': seg - }) - else: - logger.warning(f"[task:{task.task_id}] Failed to download SFX {i}") + for sfx_candidate in sfx_download_candidates: + sfx_result = download_results.get(sfx_candidate['key']) + if sfx_result and sfx_result['success']: + sfx_files.append({ + 'file': sfx_candidate['file'], + 'spec': sfx_candidate['spec'], + 'segment': sfx_candidate['segment'] + }) + else: + logger.warning(f"[task:{task.task_id}] Failed to download SFX {sfx_candidate['key']}") - # 3. 构建音频混音命令 + # 2. 构建音频混音命令 output_file = os.path.join(work_dir, 'audio_full.aac') cmd = self._build_audio_command( bgm_file=bgm_file, @@ -90,21 +118,21 @@ class PrepareJobAudioHandler(BaseHandler): audio_profile=audio_profile ) - # 4. 执行 FFmpeg + # 3. 执行 FFmpeg if not self.run_ffmpeg(cmd, task.task_id): return TaskResult.fail( ErrorCode.E_FFMPEG_FAILED, "Audio mixing failed" ) - # 5. 验证输出文件 + # 4. 验证输出文件 if not self.ensure_file_exists(output_file, min_size=1024): return TaskResult.fail( ErrorCode.E_FFMPEG_FAILED, "Audio output file is missing or too small" ) - # 6. 上传产物 + # 5. 上传产物 audio_url = self.upload_file(task.task_id, 'audio', output_file) if not audio_url: return TaskResult.fail( diff --git a/handlers/render_video.py b/handlers/render_video.py index f277730..d3d7749 100644 --- a/handlers/render_video.py +++ b/handlers/render_video.py @@ -85,13 +85,63 @@ class RenderSegmentVideoHandler(BaseHandler): else: input_file = os.path.join(work_dir, 'input.mp4') - # 2. 下载素材 - if not self.download_file(material_url, input_file): + # 2. 构建并行下载任务(主素材 + 可选 LUT + 可选叠加层) + lut_file = os.path.join(work_dir, 'lut.cube') if render_spec.lut_url else None + overlay_file = None + if render_spec.overlay_url: + # 根据 URL 后缀确定文件扩展名 + overlay_url_lower = render_spec.overlay_url.lower() + if overlay_url_lower.endswith('.jpg') or overlay_url_lower.endswith('.jpeg'): + overlay_ext = '.jpg' + elif overlay_url_lower.endswith('.mov'): + overlay_ext = '.mov' + else: + overlay_ext = '.png' + overlay_file = os.path.join(work_dir, f'overlay{overlay_ext}') + + download_jobs = [ + { + 'key': 'material', + 'url': material_url, + 'dest': input_file, + 'required': True + } + ] + if render_spec.lut_url and lut_file: + download_jobs.append({ + 'key': 'lut', + 'url': render_spec.lut_url, + 'dest': lut_file, + 'required': False + }) + if render_spec.overlay_url and overlay_file: + download_jobs.append({ + 'key': 'overlay', + 'url': render_spec.overlay_url, + 'dest': overlay_file, + 'required': False + }) + download_results = self.download_files_parallel(download_jobs) + + material_result = download_results.get('material') + if not material_result or not material_result['success']: return TaskResult.fail( ErrorCode.E_INPUT_UNAVAILABLE, f"Failed to download material: {material_url}" ) + if render_spec.lut_url: + lut_result = download_results.get('lut') + if not lut_result or not lut_result['success']: + logger.warning(f"[task:{task.task_id}] Failed to download LUT, continuing without it") + lut_file = None + + if render_spec.overlay_url: + overlay_result = download_results.get('overlay') + if not overlay_result or not overlay_result['success']: + logger.warning(f"[task:{task.task_id}] Failed to download overlay, continuing without it") + overlay_file = None + # 3. 图片素材转换为视频 if is_image: video_input_file = os.path.join(work_dir, 'input_video.mp4') @@ -111,31 +161,7 @@ class RenderSegmentVideoHandler(BaseHandler): input_file = video_input_file logger.info(f"[task:{task.task_id}] Image converted to video successfully") - # 4. 下载 LUT(如有) - lut_file = None - if render_spec.lut_url: - lut_file = os.path.join(work_dir, 'lut.cube') - if not self.download_file(render_spec.lut_url, lut_file): - logger.warning(f"[task:{task.task_id}] Failed to download LUT, continuing without it") - lut_file = None - - # 5. 下载叠加层(如有) - overlay_file = None - if render_spec.overlay_url: - # 根据 URL 后缀确定文件扩展名 - url_lower = render_spec.overlay_url.lower() - if url_lower.endswith('.jpg') or url_lower.endswith('.jpeg'): - ext = '.jpg' - elif url_lower.endswith('.mov'): - ext = '.mov' - else: - ext = '.png' # 默认 - overlay_file = os.path.join(work_dir, f'overlay{ext}') - if not self.download_file(render_spec.overlay_url, overlay_file): - logger.warning(f"[task:{task.task_id}] Failed to download overlay, continuing without it") - overlay_file = None - - # 6. 探测源视频时长(仅对视频素材) + # 4. 探测源视频时长(仅对视频素材) # 用于检测时长不足并通过冻结最后一帧补足 source_duration_sec = None if not is_image: @@ -158,13 +184,13 @@ class RenderSegmentVideoHandler(BaseHandler): f"will freeze last frame for {shortage_sec:.2f}s" ) - # 7. 计算 overlap 时长(用于转场帧冻结) + # 5. 计算 overlap 时长(用于转场帧冻结) # 头部 overlap: 来自前一片段的出场转场 overlap_head_ms = task.get_overlap_head_ms() # 尾部 overlap: 当前片段的出场转场 overlap_tail_ms = task.get_overlap_tail_ms_v2() - # 8. 构建 FFmpeg 命令 + # 6. 构建 FFmpeg 命令 output_file = os.path.join(work_dir, 'output.mp4') cmd = self._build_command( input_file=input_file, @@ -179,25 +205,25 @@ class RenderSegmentVideoHandler(BaseHandler): source_duration_sec=source_duration_sec ) - # 9. 执行 FFmpeg + # 7. 执行 FFmpeg if not self.run_ffmpeg(cmd, task.task_id): return TaskResult.fail( ErrorCode.E_FFMPEG_FAILED, "FFmpeg rendering failed" ) - # 10. 验证输出文件 + # 8. 验证输出文件 if not self.ensure_file_exists(output_file, min_size=4096): return TaskResult.fail( ErrorCode.E_FFMPEG_FAILED, "Output file is missing or too small" ) - # 11. 获取实际时长 + # 9. 获取实际时长 actual_duration = self.probe_duration(output_file) actual_duration_ms = int(actual_duration * 1000) if actual_duration else duration_ms - # 12. 上传产物 + # 10. 上传产物 video_url = self.upload_file(task.task_id, 'video', output_file) if not video_url: return TaskResult.fail( @@ -205,7 +231,7 @@ class RenderSegmentVideoHandler(BaseHandler): "Failed to upload video" ) - # 13. 构建结果(包含 overlap 信息) + # 11. 构建结果(包含 overlap 信息) result_data = { 'videoUrl': video_url, 'actualDurationMs': actual_duration_ms,