You've already forked FrameTour-RenderWorker
242 lines
6.8 KiB
Python
242 lines
6.8 KiB
Python
"""测试辅助工具"""
|
|
|
|
import json
|
|
import tempfile
|
|
import subprocess
|
|
from typing import Dict, Any, List, Optional
|
|
from pathlib import Path
|
|
|
|
from entity.render_task import RenderTask
|
|
from entity.effects.base import EffectProcessor
|
|
from config.settings import FFmpegConfig
|
|
|
|
|
|
class MockRenderTask:
|
|
"""模拟渲染任务,用于测试"""
|
|
|
|
def __init__(
|
|
self,
|
|
input_files: Optional[List[str]] = None,
|
|
output_file: str = "test_output.mp4",
|
|
effects: Optional[List[str]] = None,
|
|
ext_data: Optional[Dict[str, Any]] = None,
|
|
frame_rate: int = 25,
|
|
):
|
|
# RenderTask required fields
|
|
self.input_files = input_files or []
|
|
self.output_file = output_file
|
|
self.task_type = "copy" # TaskType.COPY equivalent
|
|
|
|
# Optional fields that match RenderTask
|
|
self.resolution = None
|
|
self.frame_rate = frame_rate
|
|
self.speed = 1.0
|
|
self.mute = True
|
|
self.annexb = False
|
|
|
|
# Cut parameters
|
|
self.zoom_cut = None
|
|
self.center_cut = None
|
|
|
|
# Resource lists
|
|
self.subtitles: List[str] = []
|
|
self.luts: List[str] = []
|
|
self.audios: List[str] = []
|
|
self.overlays: List[str] = []
|
|
self.effects = effects or []
|
|
|
|
# Extension data
|
|
self.ext_data = ext_data or {}
|
|
|
|
# Legacy compatibility
|
|
self.task_id = "test_task"
|
|
self.template_id = "test_template"
|
|
self.use_center_cut = False
|
|
self.use_zoom_cut = False
|
|
self.audio_file = None
|
|
|
|
|
|
class FFmpegValidator:
|
|
"""FFmpeg命令验证器"""
|
|
|
|
@staticmethod
|
|
def validate_filter_syntax(filter_str: str) -> bool:
|
|
"""验证滤镜语法是否正确"""
|
|
try:
|
|
# 基本语法检查
|
|
if not filter_str:
|
|
return False
|
|
|
|
# 检查是否包含基本的滤镜结构
|
|
if "[" in filter_str and "]" in filter_str:
|
|
return True
|
|
|
|
# 检查常见的滤镜格式
|
|
common_filters = ["zoompan", "setpts", "trim", "scale", "crop"]
|
|
return any(f in filter_str for f in common_filters)
|
|
|
|
except Exception:
|
|
return False
|
|
|
|
@staticmethod
|
|
def validate_stream_identifier(stream_id: str) -> bool:
|
|
"""验证流标识符格式"""
|
|
if not stream_id:
|
|
return False
|
|
return stream_id.startswith("[") and stream_id.endswith("]")
|
|
|
|
@staticmethod
|
|
def validate_ffmpeg_command(command: List[str]) -> Dict[str, Any]:
|
|
"""验证完整的FFmpeg命令"""
|
|
result = {
|
|
"valid": False,
|
|
"has_input": False,
|
|
"has_output": False,
|
|
"has_filter": False,
|
|
"errors": [],
|
|
}
|
|
|
|
if not command or command[0] != "ffmpeg":
|
|
result["errors"].append("Command must start with 'ffmpeg'")
|
|
return result
|
|
|
|
# 检查输入文件
|
|
if "-i" in command:
|
|
result["has_input"] = True
|
|
else:
|
|
result["errors"].append("No input file specified")
|
|
|
|
# 检查输出文件
|
|
if len(command) > 1 and not command[-1].startswith("-"):
|
|
result["has_output"] = True
|
|
else:
|
|
result["errors"].append("No output file specified")
|
|
|
|
# 检查滤镜
|
|
if "-filter_complex" in command or "-vf" in command:
|
|
result["has_filter"] = True
|
|
|
|
result["valid"] = (
|
|
result["has_input"] and result["has_output"] and len(result["errors"]) == 0
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
class EffectTestHelper:
|
|
"""特效测试辅助类"""
|
|
|
|
@staticmethod
|
|
def create_test_effect(
|
|
effect_class, params: str = "", ext_data: Dict[str, Any] = None
|
|
):
|
|
"""创建测试用特效实例"""
|
|
return effect_class(params, ext_data)
|
|
|
|
@staticmethod
|
|
def test_effect_params_validation(
|
|
effect: EffectProcessor, test_cases: List[Dict[str, Any]]
|
|
):
|
|
"""批量测试特效参数验证"""
|
|
results = []
|
|
for case in test_cases:
|
|
effect.params = case.get("params", "")
|
|
effect.ext_data = case.get("ext_data", {})
|
|
|
|
is_valid = effect.validate_params()
|
|
expected = case.get("expected", True)
|
|
|
|
results.append(
|
|
{
|
|
"params": effect.params,
|
|
"expected": expected,
|
|
"actual": is_valid,
|
|
"passed": is_valid == expected,
|
|
"description": case.get("description", ""),
|
|
}
|
|
)
|
|
|
|
return results
|
|
|
|
@staticmethod
|
|
def test_filter_generation(
|
|
effect: EffectProcessor, video_input: str = "[0:v]", effect_index: int = 1
|
|
):
|
|
"""测试滤镜生成"""
|
|
try:
|
|
filters, output_stream = effect.generate_filter_args(
|
|
video_input, effect_index
|
|
)
|
|
|
|
result = {
|
|
"success": True,
|
|
"filters": filters,
|
|
"output_stream": output_stream,
|
|
"filter_count": len(filters),
|
|
"valid_syntax": all(
|
|
FFmpegValidator.validate_filter_syntax(f) for f in filters
|
|
),
|
|
"valid_output": (
|
|
FFmpegValidator.validate_stream_identifier(output_stream)
|
|
if output_stream != video_input
|
|
else True
|
|
),
|
|
}
|
|
except Exception as e:
|
|
result = {
|
|
"success": False,
|
|
"error": str(e),
|
|
"filters": [],
|
|
"output_stream": "",
|
|
"filter_count": 0,
|
|
"valid_syntax": False,
|
|
"valid_output": False,
|
|
}
|
|
|
|
return result
|
|
|
|
|
|
def create_test_video_file(
|
|
output_path: str, duration: int = 5, resolution: str = "640x480"
|
|
) -> bool:
|
|
"""创建测试用视频文件"""
|
|
try:
|
|
cmd = [
|
|
"ffmpeg",
|
|
"-y", # 覆盖输出文件
|
|
"-f",
|
|
"lavfi", # 使用libavfilter输入
|
|
"-i",
|
|
f"testsrc=duration={duration}:size={resolution}:rate=25",
|
|
"-c:v",
|
|
"libx264",
|
|
"-preset",
|
|
"ultrafast",
|
|
"-crf",
|
|
"23",
|
|
output_path,
|
|
]
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
return result.returncode == 0
|
|
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def create_sample_template_data() -> Dict[str, Any]:
|
|
"""创建示例模板数据"""
|
|
return {
|
|
"templateId": "test_template_001",
|
|
"name": "测试模板",
|
|
"parts": [
|
|
{
|
|
"id": "part1",
|
|
"type": "video",
|
|
"duration": 10.0,
|
|
"effects": ["zoom:0,2.0,3.0", "ospeed:1.5"],
|
|
}
|
|
],
|
|
"settings": {"width": 1920, "height": 1080, "frameRate": 25},
|
|
}
|