# -*- coding: utf-8 -*- """ 全局音频准备处理器 处理 PREPARE_JOB_AUDIO 任务,生成整个视频的连续音频轨道。 """ import os import logging from typing import List, Dict, Optional from handlers.base import BaseHandler, AUDIO_ENCODE_ARGS from domain.task import Task, TaskType, AudioSpec, AudioProfile from domain.result import TaskResult, ErrorCode logger = logging.getLogger(__name__) class PrepareJobAudioHandler(BaseHandler): """ 全局音频准备处理器 职责: - 下载全局 BGM - 下载各片段叠加音效 - 构建复杂混音命令 - 执行混音 - 上传音频产物 关键约束: - 全局 BGM 连续生成一次,贯穿整个时长 - 禁止使用 amix normalize=1 - 只对叠加音轨做极短淡入淡出(5-20ms) - 不对 BGM 做边界 fade """ def get_supported_type(self) -> TaskType: return TaskType.PREPARE_JOB_AUDIO def handle(self, task: Task) -> TaskResult: """处理音频准备任务""" work_dir = self.create_work_dir(task.task_id) try: # 解析参数 total_duration_ms = task.get_total_duration_ms() if total_duration_ms <= 0: return TaskResult.fail( ErrorCode.E_SPEC_INVALID, "Invalid totalDurationMs" ) total_duration_sec = total_duration_ms / 1000.0 audio_profile = task.get_audio_profile() bgm_url = task.get_bgm_url() segments = task.get_segments() # 1. 下载 BGM(如有) bgm_file = None if bgm_url: bgm_file = os.path.join(work_dir, 'bgm.mp3') if not self.download_file(bgm_url, bgm_file): logger.warning(f"[task:{task.task_id}] Failed to download BGM") bgm_file = None # 2. 下载叠加音效 sfx_files = [] for i, seg in enumerate(segments): audio_spec_data = seg.get('audioSpecJson') if audio_spec_data: audio_spec = AudioSpec.from_dict(audio_spec_data) if audio_spec and audio_spec.audio_url: sfx_file = os.path.join(work_dir, f'sfx_{i}.mp3') if self.download_file(audio_spec.audio_url, sfx_file): sfx_files.append({ 'file': sfx_file, 'spec': audio_spec, 'segment': seg }) else: logger.warning(f"[task:{task.task_id}] Failed to download SFX {i}") # 3. 构建音频混音命令 output_file = os.path.join(work_dir, 'audio_full.aac') cmd = self._build_audio_command( bgm_file=bgm_file, sfx_files=sfx_files, output_file=output_file, total_duration_sec=total_duration_sec, audio_profile=audio_profile ) # 4. 执行 FFmpeg if not self.run_ffmpeg(cmd, task.task_id): return TaskResult.fail( ErrorCode.E_FFMPEG_FAILED, "Audio mixing failed" ) # 5. 验证输出文件 if not self.ensure_file_exists(output_file, min_size=1024): return TaskResult.fail( ErrorCode.E_FFMPEG_FAILED, "Audio output file is missing or too small" ) # 6. 上传产物 audio_url = self.upload_file(task.task_id, 'audio', output_file) if not audio_url: return TaskResult.fail( ErrorCode.E_UPLOAD_FAILED, "Failed to upload audio" ) return TaskResult.ok({ 'audioUrl': audio_url }) except Exception as e: logger.error(f"[task:{task.task_id}] Unexpected error: {e}", exc_info=True) return TaskResult.fail(ErrorCode.E_UNKNOWN, str(e)) finally: self.cleanup_work_dir(work_dir) def _build_audio_command( self, bgm_file: Optional[str], sfx_files: List[Dict], output_file: str, total_duration_sec: float, audio_profile: AudioProfile ) -> List[str]: """ 构建音频混音命令 Args: bgm_file: BGM 文件路径(可选) sfx_files: 叠加音效列表 output_file: 输出文件路径 total_duration_sec: 总时长(秒) audio_profile: 音频配置 Returns: FFmpeg 命令参数列表 """ sample_rate = audio_profile.sample_rate channels = audio_profile.channels # 情况1:无 BGM 也无叠加音效 -> 生成静音 if not bgm_file and not sfx_files: return [ 'ffmpeg', '-y', '-hide_banner', '-f', 'lavfi', '-i', f'anullsrc=r={sample_rate}:cl=stereo', '-t', str(total_duration_sec), '-c:a', 'aac', '-b:a', '128k', output_file ] # 情况2:仅 BGM,无叠加音效 if not sfx_files: return [ 'ffmpeg', '-y', '-hide_banner', '-i', bgm_file, '-t', str(total_duration_sec), '-c:a', 'aac', '-b:a', '128k', '-ar', str(sample_rate), '-ac', str(channels), output_file ] # 情况3:BGM + 叠加音效 -> 复杂滤镜 inputs = [] if bgm_file: inputs.extend(['-i', bgm_file]) for sfx in sfx_files: inputs.extend(['-i', sfx['file']]) filter_parts = [] input_idx = 0 # BGM 处理(或生成静音底轨) if bgm_file: filter_parts.append( f"[0:a]atrim=0:{total_duration_sec},asetpts=PTS-STARTPTS," f"apad=whole_dur={total_duration_sec}[bgm]" ) input_idx = 1 else: filter_parts.append( f"anullsrc=r={sample_rate}:cl=stereo," f"atrim=0:{total_duration_sec}[bgm]" ) input_idx = 0 # 叠加音效处理 sfx_labels = [] for i, sfx in enumerate(sfx_files): idx = input_idx + i spec = sfx['spec'] seg = sfx['segment'] # 计算时间参数 start_time_ms = seg.get('startTimeMs', 0) duration_ms = seg.get('durationMs', 5000) delay_ms = start_time_ms + spec.delay_ms delay_sec = delay_ms / 1000.0 duration_sec = duration_ms / 1000.0 # 淡入淡出参数(极短,5-20ms) fade_in_sec = spec.fade_in_ms / 1000.0 fade_out_sec = spec.fade_out_ms / 1000.0 # 音量 volume = spec.volume label = f"sfx{i}" sfx_labels.append(f"[{label}]") # 构建滤镜:延迟 + 淡入淡出 + 音量 # 注意:只对叠加音轨做淡入淡出,不对 BGM 做 sfx_filter = ( f"[{idx}:a]" f"adelay={int(delay_ms)}|{int(delay_ms)}," f"afade=t=in:st={delay_sec}:d={fade_in_sec}," f"afade=t=out:st={delay_sec + duration_sec - fade_out_sec}:d={fade_out_sec}," f"volume={volume}" f"[{label}]" ) filter_parts.append(sfx_filter) # 混音(关键:normalize=0,禁止归一化) # dropout_transition=0 表示输入结束时不做渐变 mix_inputs = "[bgm]" + "".join(sfx_labels) num_inputs = 1 + len(sfx_files) filter_parts.append( f"{mix_inputs}amix=inputs={num_inputs}:duration=first:" f"dropout_transition=0:normalize=0[out]" ) filter_complex = ';'.join(filter_parts) cmd = ['ffmpeg', '-y', '-hide_banner'] + inputs + [ '-filter_complex', filter_complex, '-map', '[out]', '-c:a', 'aac', '-b:a', '128k', '-ar', str(sample_rate), '-ac', str(channels), output_file ] return cmd