# -*- coding: utf-8 -*- """ TS 分片封装处理器 处理 PACKAGE_SEGMENT_TS 任务,将视频片段和对应时间区间的音频封装为 TS 分片。 支持转场相关的 overlap 裁剪和转场分片封装。 """ import os import logging from typing import List, Optional from handlers.base import BaseHandler, VIDEO_ENCODE_ARGS from domain.task import Task, TaskType from domain.result import TaskResult, ErrorCode logger = logging.getLogger(__name__) class PackageSegmentTsHandler(BaseHandler): """ TS 分片封装处理器 职责: - 下载视频片段 - 下载全局音频 - 截取对应时间区间的音频 - 封装为 TS 分片 - 上传 TS 产物 关键约束: - TS 必须包含音视频同轨 - 使用 output_ts_offset 保证时间戳连续 - 输出 extinfDurationSec 供 m3u8 使用 转场相关: - 普通片段 TS:需要裁剪掉 overlap 区域(已被转场分片使用) - 转场分片 TS:直接封装转场视频产物,无需裁剪 - 无转场时:走原有逻辑,不做裁剪 精确裁剪: - 当需要裁剪 overlap 区域时,必须使用重编码方式(-vf trim)才能精确切割 - 使用 -c copy 只能从关键帧切割,会导致不精确 """ def get_supported_type(self) -> TaskType: return TaskType.PACKAGE_SEGMENT_TS def handle(self, task: Task) -> TaskResult: """处理 TS 封装任务""" work_dir = self.create_work_dir(task.task_id) try: # 解析参数 video_url = task.get_video_url() audio_url = task.get_audio_url() start_time_ms = task.get_start_time_ms() duration_ms = task.get_duration_ms() output_spec = task.get_output_spec() # 转场相关参数 is_transition_segment = task.is_transition_segment() trim_head = task.should_trim_head() trim_tail = task.should_trim_tail() trim_head_ms = task.get_trim_head_ms() trim_tail_ms = task.get_trim_tail_ms() if not video_url: return TaskResult.fail( ErrorCode.E_SPEC_INVALID, "Missing videoUrl" ) if not audio_url: return TaskResult.fail( ErrorCode.E_SPEC_INVALID, "Missing audioUrl" ) # 计算时间参数 start_sec = start_time_ms / 1000.0 duration_sec = duration_ms / 1000.0 # 1. 下载视频片段 video_file = os.path.join(work_dir, 'video.mp4') if not self.download_file(video_url, video_file): return TaskResult.fail( ErrorCode.E_INPUT_UNAVAILABLE, f"Failed to download video: {video_url}" ) # 2. 下载全局音频 audio_file = os.path.join(work_dir, 'audio.aac') if not self.download_file(audio_url, audio_file): return TaskResult.fail( ErrorCode.E_INPUT_UNAVAILABLE, f"Failed to download audio: {audio_url}" ) # 3. 判断是否需要精确裁剪视频 needs_video_trim = not is_transition_segment and ( (trim_head and trim_head_ms > 0) or (trim_tail and trim_tail_ms > 0) ) # 4. 如果需要裁剪,先重编码裁剪视频 processed_video_file = video_file if needs_video_trim: processed_video_file = os.path.join(work_dir, 'trimmed_video.mp4') trim_cmd = self._build_trim_command( video_file=video_file, output_file=processed_video_file, trim_head_ms=trim_head_ms if trim_head else 0, trim_tail_ms=trim_tail_ms if trim_tail else 0, output_spec=output_spec ) logger.info(f"[task:{task.task_id}] Trimming video: head={trim_head_ms}ms, tail={trim_tail_ms}ms") if not self.run_ffmpeg(trim_cmd, task.task_id): return TaskResult.fail( ErrorCode.E_FFMPEG_FAILED, "Video trim failed" ) if not self.ensure_file_exists(processed_video_file, min_size=1024): return TaskResult.fail( ErrorCode.E_FFMPEG_FAILED, "Trimmed video file is missing or too small" ) # 5. 构建 TS 封装命令 output_file = os.path.join(work_dir, 'segment.ts') cmd = self._build_package_command( video_file=processed_video_file, audio_file=audio_file, output_file=output_file, start_sec=start_sec, duration_sec=duration_sec ) # 6. 执行 FFmpeg if not self.run_ffmpeg(cmd, task.task_id): return TaskResult.fail( ErrorCode.E_FFMPEG_FAILED, "TS packaging failed" ) # 7. 验证输出文件 if not self.ensure_file_exists(output_file, min_size=1024): return TaskResult.fail( ErrorCode.E_FFMPEG_FAILED, "TS output file is missing or too small" ) # 8. 获取实际时长(用于 EXTINF) actual_duration = self.probe_duration(output_file) extinf_duration = actual_duration if actual_duration else duration_sec # 9. 上传产物 ts_url = self.upload_file(task.task_id, 'ts', output_file) if not ts_url: return TaskResult.fail( ErrorCode.E_UPLOAD_FAILED, "Failed to upload TS" ) return TaskResult.ok({ 'tsUrl': ts_url, 'extinfDurationSec': extinf_duration }) except Exception as e: logger.error(f"[task:{task.task_id}] Unexpected error: {e}", exc_info=True) return TaskResult.fail(ErrorCode.E_UNKNOWN, str(e)) finally: self.cleanup_work_dir(work_dir) def _build_trim_command( self, video_file: str, output_file: str, trim_head_ms: int, trim_tail_ms: int, output_spec ) -> List[str]: """ 构建视频精确裁剪命令(重编码方式) 使用 trim 滤镜进行精确帧级裁剪,而非 -ss/-t 参数的关键帧裁剪。 Args: video_file: 输入视频路径 output_file: 输出视频路径 trim_head_ms: 头部裁剪时长(毫秒) trim_tail_ms: 尾部裁剪时长(毫秒) output_spec: 输出规格 Returns: FFmpeg 命令参数列表 """ # 获取原视频时长 original_duration = self.probe_duration(video_file) if not original_duration: original_duration = 10.0 # 默认值,避免除零 trim_head_sec = trim_head_ms / 1000.0 trim_tail_sec = trim_tail_ms / 1000.0 # 计算裁剪后的起止时间 start_time = trim_head_sec end_time = original_duration - trim_tail_sec # 构建 trim 滤镜 vf_filter = f"trim=start={start_time}:end={end_time},setpts=PTS-STARTPTS" cmd = [ 'ffmpeg', '-y', '-hide_banner', '-i', video_file, '-vf', vf_filter, ] # 编码参数 cmd.extend(VIDEO_ENCODE_ARGS) # 帧率 fps = output_spec.fps cmd.extend(['-r', str(fps)]) # 计算输出视频帧数,动态调整 GOP output_duration_sec = end_time - start_time total_frames = int(output_duration_sec * fps) # 动态 GOP:短视频使用较小的 GOP if total_frames <= 1: gop_size = 1 elif total_frames < fps: gop_size = total_frames else: gop_size = fps # 每秒一个关键帧 cmd.extend(['-g', str(gop_size)]) cmd.extend(['-keyint_min', str(min(gop_size, fps // 2 or 1))]) # 强制第一帧为关键帧 cmd.extend(['-force_key_frames', 'expr:eq(n,0)']) # 无音频(音频单独处理) cmd.append('-an') cmd.append(output_file) return cmd def _build_package_command( self, video_file: str, audio_file: str, output_file: str, start_sec: float, duration_sec: float ) -> List[str]: """ 构建 TS 封装命令 将视频和对应时间区间的音频封装为 TS 分片。 视频使用 copy 模式(已经过精确裁剪或无需裁剪)。 Args: video_file: 视频文件路径(已处理) audio_file: 音频文件路径 output_file: 输出文件路径 start_sec: 音频开始时间(秒) duration_sec: 音频时长(秒) Returns: FFmpeg 命令参数列表 """ cmd = [ 'ffmpeg', '-y', '-hide_banner', # 视频输入 '-i', video_file, # 音频输入(从 start_sec 开始截取 duration_sec) '-ss', str(start_sec), '-t', str(duration_sec), '-i', audio_file, # 映射流 '-map', '0:v:0', # 使用第一个输入的视频流 '-map', '1:a:0', # 使用第二个输入的音频流 # 复制编码(视频已处理,无需重编码) '-c:v', 'copy', '-c:a', 'copy', # 关键:时间戳偏移,保证整体连续 '-output_ts_offset', str(start_sec), # 复用参数 '-muxdelay', '0', '-muxpreload', '0', # 输出格式 '-f', 'mpegts', output_file ] return cmd