diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..755f26e --- /dev/null +++ b/.flake8 @@ -0,0 +1,15 @@ +[flake8] +max-line-length = 88 +ignore = + # Line too long - handled by black + E501, + # Line break before binary operator - handled by black + W503 +exclude = + .git, + __pycache__, + .venv, + venv, + tests, + .claude, + .serena \ No newline at end of file diff --git a/Jenkinsfile b/Jenkinsfile index 154bf22..759f02d 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -7,6 +7,8 @@ pipeline { VENV_NAME = 'venv' TEST_REPORTS_DIR = 'test-reports' COVERAGE_DIR = 'coverage-reports' + // 设置Python模块路径 + PYTHONPATH = "${WORKSPACE}" } stages { @@ -39,6 +41,10 @@ pipeline { # 激活虚拟环境并安装依赖 . ${VENV_NAME}/bin/activate + pip config set global.index-url https://mirrors.ustc.edu.cn/pypi/simple + + # 设置PYTHONPATH + export PYTHONPATH=\${PWD}:\$PYTHONPATH # 升级pip pip install --upgrade pip @@ -64,12 +70,13 @@ pipeline { echo 'Running code linting...' sh """ . ${VENV_NAME}/bin/activate + export PYTHONPATH=\${PWD}:\$PYTHONPATH - # 运行flake8检查 + # 运行flake8检查(使用项目配置.flake8) flake8 entity/ services/ --output-file=${TEST_REPORTS_DIR}/flake8-report.txt --tee || true # 运行black格式检查 - black --check --diff entity/ services/ > ${TEST_REPORTS_DIR}/black-report.txt || true + black --check --diff --line-length 88 entity/ services/ > ${TEST_REPORTS_DIR}/black-report.txt || true """ } } @@ -94,6 +101,7 @@ pipeline { echo 'Running type checking...' sh """ . ${VENV_NAME}/bin/activate + export PYTHONPATH=\${PWD}:\$PYTHONPATH # 运行mypy类型检查 mypy entity/ services/ --html-report ${TEST_REPORTS_DIR}/mypy-html --txt-report ${TEST_REPORTS_DIR}/mypy-txt || true @@ -110,6 +118,7 @@ pipeline { echo 'Running unit tests...' sh """ . ${VENV_NAME}/bin/activate + export PYTHONPATH=\${PWD}:\$PYTHONPATH # 运行单元测试 pytest tests/test_effects/ tests/test_ffmpeg_builder/ \\ @@ -158,6 +167,7 @@ pipeline { echo 'Running integration tests...' sh """ . ${VENV_NAME}/bin/activate + export PYTHONPATH=\${PWD}:\$PYTHONPATH # 运行集成测试 pytest tests/test_integration/ \\ @@ -198,6 +208,7 @@ pipeline { echo 'Running complete test suite with coverage...' sh """ . ${VENV_NAME}/bin/activate + export PYTHONPATH=\${PWD}:\$PYTHONPATH # 运行完整测试套件 pytest tests/ \\ @@ -262,6 +273,7 @@ pipeline { echo 'Running performance tests...' sh """ . ${VENV_NAME}/bin/activate + export PYTHONPATH=\${PWD}:\$PYTHONPATH # 运行性能测试 RUN_STRESS_TESTS=1 pytest tests/test_integration/test_ffmpeg_execution.py::TestFFmpegExecution::test_stress_test_large_effects_chain \\ diff --git a/entity/effects/camera_shot.py b/entity/effects/camera_shot.py index 20036e4..57f9fd5 100644 --- a/entity/effects/camera_shot.py +++ b/entity/effects/camera_shot.py @@ -1,4 +1,4 @@ -from typing import List, Dict, Any +from typing import List from .base import EffectProcessor @@ -63,20 +63,24 @@ class CameraShotEffect(EffectProcessor): # 选择开始部分帧 filter_args.append( - f"{start_out_str}select=lt(n\\,{int(start * self.frame_rate)}){start_out_str}" + f"{start_out_str}select=lt(n\\," + f"{int(start * self.frame_rate)}){start_out_str}" ) # 选择结束部分帧 filter_args.append( - f"{end_out_str}select=gt(n\\,{int(start * self.frame_rate)}){end_out_str}" + f"{end_out_str}select=gt(n\\," + f"{int(start * self.frame_rate)}){end_out_str}" ) # 选择中间特定帧并扩展 filter_args.append( - f"{mid_out_str}select=eq(n\\,{int(start * self.frame_rate)}){mid_out_str}" + f"{mid_out_str}select=eq(n\\," + f"{int(start * self.frame_rate)}){mid_out_str}" ) filter_args.append( - f"{mid_out_str}tpad=start_mode=clone:start_duration={duration:.4f}{mid_out_str}" + f"{mid_out_str}tpad=start_mode=clone:" + f"start_duration={duration:.4f}{mid_out_str}" ) # 如果需要旋转 @@ -85,7 +89,8 @@ class CameraShotEffect(EffectProcessor): # 连接三部分 filter_args.append( - f"{start_out_str}{mid_out_str}{end_out_str}concat=n=3:v=1:a=0,setpts=N/{self.frame_rate}/TB{final_output}" + f"{start_out_str}{mid_out_str}{end_out_str}concat=n=3:v=1:a=0," + f"setpts=N/{self.frame_rate}/TB{final_output}" ) return filter_args, final_output diff --git a/entity/effects/tail.py b/entity/effects/tail.py index befc098..ec1c144 100644 --- a/entity/effects/tail.py +++ b/entity/effects/tail.py @@ -35,7 +35,8 @@ class TailEffect(EffectProcessor): # 使用reverse+trim+reverse的方法来精确获取最后N秒 filter_args = [ f"{video_input}reverse[v_rev{effect_index}]", - f"[v_rev{effect_index}]trim=duration={tail_seconds}[v_trim{effect_index}]", + f"[v_rev{effect_index}]trim=duration={tail_seconds}" + f"[v_trim{effect_index}]", f"[v_trim{effect_index}]reverse{output_stream}", ] diff --git a/entity/effects/zoom.py b/entity/effects/zoom.py index 714f8ac..5776981 100644 --- a/entity/effects/zoom.py +++ b/entity/effects/zoom.py @@ -1,5 +1,4 @@ from typing import List -import json from .base import EffectProcessor diff --git a/entity/ffmpeg_command_builder.py b/entity/ffmpeg_command_builder.py index fe5332b..daa1080 100644 --- a/entity/ffmpeg_command_builder.py +++ b/entity/ffmpeg_command_builder.py @@ -1,4 +1,3 @@ -import os import time from typing import List, Optional @@ -10,8 +9,6 @@ from util.ffmpeg import probe_video_info, probe_video_audio from util.ffmpeg_utils import ( build_base_ffmpeg_args, build_null_audio_input, - build_amix_filter, - build_overlay_scale_filter, get_annexb_filter, build_standard_output_args, ) @@ -209,8 +206,6 @@ class FFmpegCommandBuilder: pos_json = self.task.ext_data.get("posJson", "{}") pos_data = safe_json_loads(pos_json, {}) - _v_w = pos_data.get("imgWidth", 1) - _v_h = pos_data.get("imgHeight", 1) _f_x = pos_data.get("ltX", 0) _f_x2 = pos_data.get("rbX", 0) _f_y = pos_data.get("ltY", 0) diff --git a/entity/render_task.py b/entity/render_task.py index e4f132f..52d3266 100644 --- a/entity/render_task.py +++ b/entity/render_task.py @@ -1,10 +1,8 @@ -import os import uuid from typing import List, Optional, Dict, Any from dataclasses import dataclass, field from enum import Enum -from config.settings import get_ffmpeg_config from util.exceptions import TaskValidationError, EffectError from entity.effects import registry as effect_registry diff --git a/run_tests.py b/run_tests.py index 4eb96f7..1dfcb3d 100644 --- a/run_tests.py +++ b/run_tests.py @@ -16,11 +16,7 @@ def run_command(cmd, cwd=None): print(f"Running: {' '.join(cmd)}") try: result = subprocess.run( - cmd, - cwd=cwd, - capture_output=True, - text=True, - check=False + cmd, cwd=cwd, capture_output=True, text=True, check=False ) if result.stdout: print(result.stdout) @@ -39,7 +35,9 @@ def check_dependencies(): # 检查pytest if not run_command([sys.executable, "-c", "import pytest"]): print("pytest not found. Installing test dependencies...") - if not run_command([sys.executable, "-m", "pip", "install", "-r", "requirements-test.txt"]): + if not run_command( + [sys.executable, "-m", "pip", "install", "-r", "requirements-test.txt"] + ): print("Failed to install test dependencies", file=sys.stderr) return False @@ -56,22 +54,27 @@ def run_unit_tests(args): print("\n=== Running Unit Tests ===") cmd = [ - sys.executable, "-m", "pytest", + sys.executable, + "-m", + "pytest", "tests/test_effects/", "tests/test_ffmpeg_builder/", "-v", - "-m", "not integration" + "-m", + "not integration", ] if args.coverage: - cmd.extend([ - "--cov=entity", - "--cov=services", - "--cov-report=xml:coverage.xml", - "--cov-report=html:htmlcov", - "--cov-report=term-missing", - "--cov-branch" - ]) + cmd.extend( + [ + "--cov=entity", + "--cov=services", + "--cov-report=xml:coverage.xml", + "--cov-report=html:htmlcov", + "--cov-report=term-missing", + "--cov-branch", + ] + ) if args.xml_report: cmd.extend(["--junitxml=unit-tests.xml"]) @@ -92,21 +95,26 @@ def run_integration_tests(args): return True cmd = [ - sys.executable, "-m", "pytest", + sys.executable, + "-m", + "pytest", "tests/test_integration/", "-v", - "-m", "integration", - "--timeout=300" + "-m", + "integration", + "--timeout=300", ] if args.coverage: - cmd.extend([ - "--cov=entity", - "--cov=services", - "--cov-report=xml:integration-coverage.xml", - "--cov-report=html:integration-htmlcov", - "--cov-branch" - ]) + cmd.extend( + [ + "--cov=entity", + "--cov=services", + "--cov-report=xml:integration-coverage.xml", + "--cov-report=html:integration-htmlcov", + "--cov-branch", + ] + ) if args.xml_report: cmd.extend(["--junitxml=integration-tests.xml"]) @@ -121,21 +129,19 @@ def run_all_tests(args): """运行所有测试""" print("\n=== Running All Tests ===") - cmd = [ - sys.executable, "-m", "pytest", - "tests/", - "-v" - ] + cmd = [sys.executable, "-m", "pytest", "tests/", "-v"] if args.coverage: - cmd.extend([ - "--cov=entity", - "--cov=services", - "--cov-report=xml:coverage.xml", - "--cov-report=html:htmlcov", - "--cov-report=term-missing", - "--cov-branch" - ]) + cmd.extend( + [ + "--cov=entity", + "--cov=services", + "--cov-report=xml:coverage.xml", + "--cov-report=html:htmlcov", + "--cov-report=term-missing", + "--cov-branch", + ] + ) if args.fail_under: cmd.extend([f"--cov-fail-under={args.fail_under}"]) @@ -154,17 +160,15 @@ def run_effect_tests(effect_name=None): if effect_name: print(f"\n=== Running {effect_name} Effect Tests ===") cmd = [ - sys.executable, "-m", "pytest", + sys.executable, + "-m", + "pytest", f"tests/test_effects/test_{effect_name}_effect.py", - "-v" + "-v", ] else: print("\n=== Running All Effect Tests ===") - cmd = [ - sys.executable, "-m", "pytest", - "tests/test_effects/", - "-v" - ] + cmd = [sys.executable, "-m", "pytest", "tests/test_effects/", "-v"] return run_command(cmd) @@ -174,14 +178,17 @@ def run_stress_tests(): print("\n=== Running Stress Tests ===") env = os.environ.copy() - env['RUN_STRESS_TESTS'] = '1' + env["RUN_STRESS_TESTS"] = "1" cmd = [ - sys.executable, "-m", "pytest", + sys.executable, + "-m", + "pytest", "tests/test_integration/", "-v", - "-m", "stress", - "--timeout=600" + "-m", + "stress", + "--timeout=600", ] return subprocess.run(cmd, env=env).returncode == 0 @@ -201,13 +208,19 @@ def create_test_video(): # 创建短视频文件 video_path = test_data_dir / "sample.mp4" cmd = [ - "ffmpeg", "-y", - "-f", "lavfi", - "-i", "testsrc=duration=5:size=640x480:rate=25", - "-c:v", "libx264", - "-preset", "ultrafast", - "-crf", "23", - str(video_path) + "ffmpeg", + "-y", + "-f", + "lavfi", + "-i", + "testsrc=duration=5:size=640x480:rate=25", + "-c:v", + "libx264", + "-preset", + "ultrafast", + "-crf", + "23", + str(video_path), ] if run_command(cmd): @@ -220,16 +233,30 @@ def create_test_video(): def main(): parser = argparse.ArgumentParser(description="RenderWorker Test Runner") - parser.add_argument("test_type", choices=[ - "unit", "integration", "all", "effects", "stress", "setup" - ], help="Type of tests to run") + parser.add_argument( + "test_type", + choices=["unit", "integration", "all", "effects", "stress", "setup"], + help="Type of tests to run", + ) - parser.add_argument("--effect", help="Specific effect to test (for effects command)") - parser.add_argument("--coverage", action="store_true", help="Generate coverage report") - parser.add_argument("--xml-report", action="store_true", help="Generate XML test report") - parser.add_argument("--html-report", action="store_true", help="Generate HTML test report") - parser.add_argument("--fail-under", type=int, default=70, help="Minimum coverage percentage") - parser.add_argument("--no-deps-check", action="store_true", help="Skip dependency check") + parser.add_argument( + "--effect", help="Specific effect to test (for effects command)" + ) + parser.add_argument( + "--coverage", action="store_true", help="Generate coverage report" + ) + parser.add_argument( + "--xml-report", action="store_true", help="Generate XML test report" + ) + parser.add_argument( + "--html-report", action="store_true", help="Generate HTML test report" + ) + parser.add_argument( + "--fail-under", type=int, default=70, help="Minimum coverage percentage" + ) + parser.add_argument( + "--no-deps-check", action="store_true", help="Skip dependency check" + ) args = parser.parse_args() @@ -264,4 +291,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/services/render_service.py b/services/render_service.py index 628532e..5b8a5fb 100644 --- a/services/render_service.py +++ b/services/render_service.py @@ -2,12 +2,13 @@ import subprocess import os import logging from abc import ABC, abstractmethod -from typing import Optional, Union +from typing import Union from opentelemetry.trace import Status, StatusCode from entity.render_task import RenderTask from entity.ffmpeg_command_builder import FFmpegCommandBuilder +from entity.ffmpeg import FfmpegTask from util.exceptions import RenderError, FFmpegError from util.ffmpeg import ( probe_video_info, @@ -26,7 +27,7 @@ class RenderService(ABC): """渲染服务抽象接口""" @abstractmethod - def render(self, task: Union[RenderTask, "FfmpegTask"]) -> bool: + def render(self, task: Union[RenderTask, FfmpegTask]) -> bool: """ 执行渲染任务 @@ -72,7 +73,7 @@ class RenderService(ABC): class DefaultRenderService(RenderService): """默认渲染服务实现""" - def render(self, task: Union[RenderTask, "FfmpegTask"]) -> bool: + def render(self, task: Union[RenderTask, FfmpegTask]) -> bool: """执行渲染任务""" # 兼容旧的FfmpegTask if hasattr(task, "get_ffmpeg_args"): # 这是FfmpegTask @@ -146,7 +147,7 @@ class DefaultRenderService(RenderService): error_msg, ) raise FFmpegError( - f"FFmpeg execution failed", + "FFmpeg execution failed", command=args, return_code=process.returncode, stderr=error_msg, diff --git a/services/service_container.py b/services/service_container.py index cc96dc6..38cace8 100644 --- a/services/service_container.py +++ b/services/service_container.py @@ -3,9 +3,14 @@ """ import threading -from typing import Dict, Type, TypeVar, Optional +from typing import Dict, Type, TypeVar, Optional, TYPE_CHECKING import logging +if TYPE_CHECKING: + from .render_service import RenderService + from .template_service import TemplateService + from .task_service import TaskService + logger = logging.getLogger(__name__) T = TypeVar("T") diff --git a/services/task_service.py b/services/task_service.py index da656e0..1ab7c2c 100644 --- a/services/task_service.py +++ b/services/task_service.py @@ -2,7 +2,7 @@ import logging import os from abc import ABC, abstractmethod from concurrent.futures import ThreadPoolExecutor -from typing import Dict, Any, List, Optional +from typing import Dict, Any, Optional from opentelemetry.trace import Status, StatusCode @@ -125,7 +125,11 @@ class DefaultTaskService(TaskService): # 报告任务成功 api.report_task_success( task_info, - videoInfo={"width": width, "height": height, "duration": duration}, + videoInfo={ + "width": width, + "height": height, + "duration": duration, + }, ) span.set_status(Status(StatusCode.OK)) @@ -249,7 +253,10 @@ class DefaultTaskService(TaskService): ) def _parse_video_source( - self, source: str, task_params: Dict[str, Any], template_info: Dict[str, Any] + self, + source: str, + task_params: Dict[str, Any], + template_info: Dict[str, Any], ) -> tuple[Optional[str], Dict[str, Any]]: """解析视频源""" if source.startswith("PLACEHOLDER_"): @@ -274,7 +281,10 @@ class DefaultTaskService(TaskService): return os.path.join(template_info.get("local_path", ""), source), {} def _check_placeholder_exist_with_count( - self, placeholder_id: str, task_params: Dict[str, Any], required_count: int = 1 + self, + placeholder_id: str, + task_params: Dict[str, Any], + required_count: int = 1, ) -> bool: """检查占位符是否存在足够数量的片段""" if placeholder_id in task_params: diff --git a/tests/conftest.py b/tests/conftest.py index 8cacdca..8353567 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ """pytest配置文件""" + import pytest import os import tempfile @@ -7,7 +8,7 @@ from pathlib import Path from typing import Dict, Any from entity.render_task import RenderTask -from config.settings import Config +from config.settings import FFmpegConfig, APIConfig, StorageConfig @pytest.fixture @@ -19,14 +20,28 @@ def temp_dir(): @pytest.fixture -def test_config(): - """测试用配置""" - return Config( +def test_ffmpeg_config(): + """测试用FFmpeg配置""" + return FFmpegConfig( encoder_args="-c:v h264", video_args="", + ) + + +@pytest.fixture +def test_api_config(): + """测试用API配置""" + return APIConfig( + endpoint="http://test.local", + access_key="test_key", + ) + + +@pytest.fixture +def test_storage_config(): + """测试用存储配置""" + return StorageConfig( template_dir="tests/test_data/templates", - api_endpoint="http://test.local", - access_key="test_key" ) @@ -41,7 +56,7 @@ def sample_render_task(): effects=["zoom:0,2.0,3.0", "ospeed:1.5"], ext_data={ "posJson": '{"ltX": 100, "ltY": 100, "rbX": 200, "rbY": 200, "imgWidth": 300, "imgHeight": 300}' - } + }, ) @@ -57,6 +72,7 @@ def sample_video_file(temp_dir): def ffmpeg_available(): """检查FFmpeg是否可用""" import subprocess + try: subprocess.run(["ffmpeg", "-version"], capture_output=True, check=True) return True @@ -69,7 +85,7 @@ def mock_ext_data(): """模拟扩展数据""" return { "posJson": '{"ltX": 50, "ltY": 50, "rbX": 150, "rbY": 150, "imgWidth": 200, "imgHeight": 200}', - "templateData": {"width": 1920, "height": 1080} + "templateData": {"width": 1920, "height": 1080}, } @@ -78,6 +94,7 @@ def pytest_collection_modifyitems(config, items): """根据FFmpeg可用性修改测试收集""" try: import subprocess + subprocess.run(["ffmpeg", "-version"], capture_output=True, check=True) ffmpeg_available = True except (subprocess.CalledProcessError, FileNotFoundError): @@ -87,4 +104,4 @@ def pytest_collection_modifyitems(config, items): skip_ffmpeg = pytest.mark.skip(reason="FFmpeg not available") for item in items: if "integration" in item.keywords: - item.add_marker(skip_ffmpeg) \ No newline at end of file + item.add_marker(skip_ffmpeg) diff --git a/tests/test_effects/test_base.py b/tests/test_effects/test_base.py index 5e2581f..91d6b47 100644 --- a/tests/test_effects/test_base.py +++ b/tests/test_effects/test_base.py @@ -1,4 +1,5 @@ """测试特效基础类和注册表""" + import pytest from unittest.mock import Mock @@ -50,9 +51,7 @@ class TestEffectProcessor: def test_get_pos_json_valid(self): """测试获取有效位置JSON""" - ext_data = { - "posJson": '{"ltX": 100, "ltY": 200, "rbX": 300, "rbY": 400}' - } + ext_data = {"posJson": '{"ltX": 100, "ltY": 200, "rbX": 300, "rbY": 400}'} processor = EffectProcessor("", ext_data) result = processor.get_pos_json() @@ -185,4 +184,4 @@ class TestEffectRegistry: name, params = registry.parse_effect_string("effect:param1:param2") assert name == "effect" - assert params == "param1:param2" \ No newline at end of file + assert params == "param1:param2" diff --git a/tests/test_effects/test_speed_effect.py b/tests/test_effects/test_speed_effect.py index 5cf70f7..a4d3b64 100644 --- a/tests/test_effects/test_speed_effect.py +++ b/tests/test_effects/test_speed_effect.py @@ -1,4 +1,5 @@ """测试变速特效""" + import pytest from entity.effects.speed import SpeedEffect from tests.utils.test_helpers import EffectTestHelper @@ -10,31 +11,11 @@ class TestSpeedEffect: def test_validate_params_valid_cases(self): """测试有效参数验证""" test_cases = [ - { - "params": "2.0", - "expected": True, - "description": "2倍速" - }, - { - "params": "0.5", - "expected": True, - "description": "0.5倍速(慢速)" - }, - { - "params": "1.0", - "expected": True, - "description": "正常速度" - }, - { - "params": "10.0", - "expected": True, - "description": "10倍速" - }, - { - "params": "", - "expected": True, - "description": "空参数(默认不变速)" - } + {"params": "2.0", "expected": True, "description": "2倍速"}, + {"params": "0.5", "expected": True, "description": "0.5倍速(慢速)"}, + {"params": "1.0", "expected": True, "description": "正常速度"}, + {"params": "10.0", "expected": True, "description": "10倍速"}, + {"params": "", "expected": True, "description": "空参数(默认不变速)"}, ] effect = SpeedEffect() @@ -46,26 +27,10 @@ class TestSpeedEffect: def test_validate_params_invalid_cases(self): """测试无效参数验证""" test_cases = [ - { - "params": "0", - "expected": False, - "description": "零速度" - }, - { - "params": "-1.0", - "expected": False, - "description": "负速度" - }, - { - "params": "abc", - "expected": False, - "description": "非数字参数" - }, - { - "params": "1.0,2.0", - "expected": False, - "description": "多余参数" - } + {"params": "0", "expected": False, "description": "零速度"}, + {"params": "-1.0", "expected": False, "description": "负速度"}, + {"params": "abc", "expected": False, "description": "非数字参数"}, + {"params": "1.0,2.0", "expected": False, "description": "多余参数"}, ] effect = SpeedEffect() @@ -110,7 +75,7 @@ class TestSpeedEffect: test_cases = [ {"params": "", "description": "空参数"}, {"params": "1", "description": "1倍速"}, - {"params": "1.0", "description": "1.0倍速"} + {"params": "1.0", "description": "1.0倍速"}, ] for case in test_cases: @@ -118,8 +83,12 @@ class TestSpeedEffect: result = EffectTestHelper.test_filter_generation(effect, "[0:v]", 1) assert result["success"], f"Failed for {case['description']}" - assert result["filter_count"] == 0, f"Should not generate filter for {case['description']}" - assert result["output_stream"] == "[0:v]", f"Output should equal input for {case['description']}" + assert ( + result["filter_count"] == 0 + ), f"Should not generate filter for {case['description']}" + assert ( + result["output_stream"] == "[0:v]" + ), f"Output should equal input for {case['description']}" def test_generate_filter_args_invalid_params(self): """测试无效参数的滤镜生成""" @@ -166,4 +135,4 @@ class TestSpeedEffect: # 作为链中的第二个特效 result2 = EffectTestHelper.test_filter_generation(effect, "[v_eff1]", 2) assert result2["output_stream"] == "[v_eff2]" - assert "[v_eff1]" in result2["filters"][0] \ No newline at end of file + assert "[v_eff1]" in result2["filters"][0] diff --git a/tests/test_effects/test_zoom_effect.py b/tests/test_effects/test_zoom_effect.py index bd448ab..b5d0415 100644 --- a/tests/test_effects/test_zoom_effect.py +++ b/tests/test_effects/test_zoom_effect.py @@ -1,4 +1,5 @@ """测试缩放特效""" + import pytest from entity.effects.zoom import ZoomEffect from tests.utils.test_helpers import EffectTestHelper, FFmpegValidator @@ -10,26 +11,18 @@ class TestZoomEffect: def test_validate_params_valid_cases(self): """测试有效参数验证""" test_cases = [ - { - "params": "0,2.0,3.0", - "expected": True, - "description": "标准缩放参数" - }, + {"params": "0,2.0,3.0", "expected": True, "description": "标准缩放参数"}, { "params": "1.5,1.5,0", "expected": True, - "description": "静态缩放(duration=0)" + "description": "静态缩放(duration=0)", }, { "params": "0,1.0,5.0", "expected": True, - "description": "无缩放效果(factor=1.0)" + "description": "无缩放效果(factor=1.0)", }, - { - "params": "10,0.5,2.0", - "expected": True, - "description": "缩小效果" - } + {"params": "10,0.5,2.0", "expected": True, "description": "缩小效果"}, ] effect = ZoomEffect() @@ -41,41 +34,13 @@ class TestZoomEffect: def test_validate_params_invalid_cases(self): """测试无效参数验证""" test_cases = [ - { - "params": "", - "expected": False, - "description": "空参数" - }, - { - "params": "1,2", - "expected": False, - "description": "参数不足" - }, - { - "params": "-1,2.0,3.0", - "expected": False, - "description": "负开始时间" - }, - { - "params": "0,0,3.0", - "expected": False, - "description": "零缩放因子" - }, - { - "params": "0,-2.0,3.0", - "expected": False, - "description": "负缩放因子" - }, - { - "params": "0,2.0,-1.0", - "expected": False, - "description": "负持续时间" - }, - { - "params": "abc,2.0,3.0", - "expected": False, - "description": "非数字参数" - } + {"params": "", "expected": False, "description": "空参数"}, + {"params": "1,2", "expected": False, "description": "参数不足"}, + {"params": "-1,2.0,3.0", "expected": False, "description": "负开始时间"}, + {"params": "0,0,3.0", "expected": False, "description": "零缩放因子"}, + {"params": "0,-2.0,3.0", "expected": False, "description": "负缩放因子"}, + {"params": "0,2.0,-1.0", "expected": False, "description": "负持续时间"}, + {"params": "abc,2.0,3.0", "expected": False, "description": "非数字参数"}, ] effect = ZoomEffect() @@ -158,9 +123,7 @@ class TestZoomEffect: def test_get_zoom_center_invalid_pos_json(self): """测试无效posJson时的缩放中心点""" - ext_data = { - "posJson": '{"imgWidth": 0, "imgHeight": 0}' # 无效尺寸 - } + ext_data = {"posJson": '{"imgWidth": 0, "imgHeight": 0}'} # 无效尺寸 effect = ZoomEffect("0,2.0,3.0", ext_data) center_x, center_y = effect._get_zoom_center() @@ -191,4 +154,4 @@ class TestZoomEffect: assert "zoompan=z=" in filter_str assert "between(t\\\\,2.5\\\\,4.0)" in filter_str # 2.5 + 1.5 = 4.0 assert "[input]" in filter_str - assert "[v_eff5]" in filter_str \ No newline at end of file + assert "[v_eff5]" in filter_str diff --git a/tests/test_ffmpeg_builder/test_ffmpeg_command_builder.py b/tests/test_ffmpeg_builder/test_ffmpeg_command_builder.py index 820dd75..bf874a1 100644 --- a/tests/test_ffmpeg_builder/test_ffmpeg_command_builder.py +++ b/tests/test_ffmpeg_builder/test_ffmpeg_command_builder.py @@ -1,27 +1,17 @@ """测试FFmpeg命令构建器""" + import pytest from unittest.mock import Mock, patch from entity.ffmpeg_command_builder import FFmpegCommandBuilder from entity.render_task import RenderTask -from config.settings import Config +from config.settings import FFmpegConfig, APIConfig, StorageConfig from tests.utils.test_helpers import MockRenderTask, FFmpegValidator class TestFFmpegCommandBuilder: """测试FFmpeg命令构建器""" - @pytest.fixture - def mock_config(self): - """模拟配置""" - return Config( - encoder_args="-c:v h264", - video_args="-preset fast", - template_dir="tests/test_data/templates", - api_endpoint="http://test.local", - access_key="test_key" - ) - @pytest.fixture def simple_task(self): """简单的渲染任务""" @@ -43,15 +33,15 @@ class TestFFmpegCommandBuilder: } return task - def test_init(self, simple_task, mock_config): + def test_init(self, simple_task): """测试初始化""" - builder = FFmpegCommandBuilder(simple_task, mock_config) + builder = FFmpegCommandBuilder(simple_task) assert builder.task == simple_task - assert builder.config == mock_config + assert builder.config is not None - def test_build_copy_command(self, simple_task, mock_config): + def test_build_copy_command(self, simple_task): """测试构建复制命令""" - builder = FFmpegCommandBuilder(simple_task, mock_config) + builder = FFmpegCommandBuilder(simple_task) command = builder._build_copy_command() # 验证命令结构 @@ -67,13 +57,13 @@ class TestFFmpegCommandBuilder: assert "output.mp4" in command assert "-c" in command and "copy" in command - def test_build_concat_command_multiple_files(self, mock_config): + def test_build_concat_command_multiple_files(self): """测试构建多文件拼接命令""" task = MockRenderTask() task.input_files = ["file1.mp4", "file2.mp4", "file3.mp4"] task.output_path = "concat_output.mp4" - builder = FFmpegCommandBuilder(task, mock_config) + builder = FFmpegCommandBuilder(task) command = builder._build_concat_command() validation = FFmpegValidator.validate_ffmpeg_command(command) @@ -84,9 +74,9 @@ class TestFFmpegCommandBuilder: assert "concat=n=3:v=1:a=1" in " ".join(command) assert all(f"file{i}.mp4" in command for i in range(1, 4)) - def test_build_encode_command_with_effects(self, task_with_effects, mock_config): + def test_build_encode_command_with_effects(self, task_with_effects): """测试构建带特效的编码命令""" - builder = FFmpegCommandBuilder(task_with_effects, mock_config) + builder = FFmpegCommandBuilder(task_with_effects) command = builder._build_encode_command() validation = FFmpegValidator.validate_ffmpeg_command(command) @@ -99,13 +89,13 @@ class TestFFmpegCommandBuilder: # 应该包含编码参数 assert "-c:v" in command and "h264" in command - def test_add_effects_single_effect(self, mock_config): + def test_add_effects_single_effect(self): """测试添加单个特效""" task = MockRenderTask() task.effects = ["zoom:0,2.0,3.0"] task.ext_data = {"posJson": "{}"} - builder = FFmpegCommandBuilder(task, mock_config) + builder = FFmpegCommandBuilder(task) filter_args = [] result_input, result_index = builder._add_effects(filter_args, "[0:v]", 1) @@ -115,13 +105,13 @@ class TestFFmpegCommandBuilder: assert result_input == "[v_eff1]" # 输出流应该更新 assert result_index == 2 # 索引应该递增 - def test_add_effects_multiple_effects(self, mock_config): + def test_add_effects_multiple_effects(self): """测试添加多个特效""" task = MockRenderTask() task.effects = ["zoom:0,2.0,3.0", "ospeed:1.5"] task.ext_data = {"posJson": "{}"} - builder = FFmpegCommandBuilder(task, mock_config) + builder = FFmpegCommandBuilder(task) filter_args = [] result_input, result_index = builder._add_effects(filter_args, "[0:v]", 1) @@ -131,12 +121,12 @@ class TestFFmpegCommandBuilder: assert result_input == "[v_eff2]" # 最终输出流 assert result_index == 3 # 索引应该递增两次 - def test_add_effects_invalid_effect(self, mock_config): + def test_add_effects_invalid_effect(self): """测试添加无效特效""" task = MockRenderTask() task.effects = ["invalid_effect:params"] - builder = FFmpegCommandBuilder(task, mock_config) + builder = FFmpegCommandBuilder(task) filter_args = [] result_input, result_index = builder._add_effects(filter_args, "[0:v]", 1) @@ -145,12 +135,12 @@ class TestFFmpegCommandBuilder: assert result_input == "[0:v]" # 输入流不变 assert result_index == 1 # 索引不变 - def test_add_effects_no_effects(self, mock_config): + def test_add_effects_no_effects(self): """测试无特效情况""" task = MockRenderTask() task.effects = [] - builder = FFmpegCommandBuilder(task, mock_config) + builder = FFmpegCommandBuilder(task) filter_args = [] result_input, result_index = builder._add_effects(filter_args, "[0:v]", 1) @@ -159,11 +149,11 @@ class TestFFmpegCommandBuilder: assert result_index == 1 # 索引不变 assert len(filter_args) == 0 # 无滤镜添加 - def test_build_command_copy_mode(self, simple_task, mock_config): + def test_build_command_copy_mode(self, simple_task): """测试构建复制模式命令""" simple_task.input_files = ["single_file.mp4"] - builder = FFmpegCommandBuilder(simple_task, mock_config) + builder = FFmpegCommandBuilder(simple_task) command = builder.build_command() validation = FFmpegValidator.validate_ffmpeg_command(command) @@ -173,13 +163,13 @@ class TestFFmpegCommandBuilder: command_str = " ".join(command) assert "-c copy" in command_str - def test_build_command_concat_mode(self, mock_config): + def test_build_command_concat_mode(self): """测试构建拼接模式命令""" task = MockRenderTask() task.input_files = ["file1.mp4", "file2.mp4"] task.effects = [] - builder = FFmpegCommandBuilder(task, mock_config) + builder = FFmpegCommandBuilder(task) command = builder.build_command() validation = FFmpegValidator.validate_ffmpeg_command(command) @@ -190,9 +180,9 @@ class TestFFmpegCommandBuilder: command_str = " ".join(command) assert "concat=" in command_str - def test_build_command_encode_mode(self, task_with_effects, mock_config): + def test_build_command_encode_mode(self, task_with_effects): """测试构建编码模式命令""" - builder = FFmpegCommandBuilder(task_with_effects, mock_config) + builder = FFmpegCommandBuilder(task_with_effects) command = builder.build_command() validation = FFmpegValidator.validate_ffmpeg_command(command) @@ -203,15 +193,15 @@ class TestFFmpegCommandBuilder: command_str = " ".join(command) assert "-c:v h264" in command_str - @patch('entity.effects.registry.get_processor') - def test_effect_processor_integration(self, mock_get_processor, mock_config): + @patch("entity.effects.registry.get_processor") + def test_effect_processor_integration(self, mock_get_processor): """测试与特效处理器的集成""" # 模拟特效处理器 mock_processor = Mock() mock_processor.frame_rate = 25 mock_processor.generate_filter_args.return_value = ( ["[0:v]zoompan=z=2.0:x=iw/2:y=ih/2:d=1[v_eff1]"], - "[v_eff1]" + "[v_eff1]", ) mock_get_processor.return_value = mock_processor @@ -219,7 +209,7 @@ class TestFFmpegCommandBuilder: task.effects = ["zoom:0,2.0,3.0"] task.frame_rate = 30 - builder = FFmpegCommandBuilder(task, mock_config) + builder = FFmpegCommandBuilder(task) filter_args = [] builder._add_effects(filter_args, "[0:v]", 1) @@ -228,12 +218,12 @@ class TestFFmpegCommandBuilder: mock_processor.generate_filter_args.assert_called_once_with("[0:v]", 1) assert mock_processor.frame_rate == 30 # 帧率应该被设置 - def test_error_handling_missing_input(self, mock_config): + def test_error_handling_missing_input(self): """测试缺少输入文件的错误处理""" task = MockRenderTask() task.input_files = [] - builder = FFmpegCommandBuilder(task, mock_config) + builder = FFmpegCommandBuilder(task) # 构建命令时应该处理错误情况 # 具体的错误处理依赖于实现 @@ -241,4 +231,4 @@ class TestFFmpegCommandBuilder: # 验证至少返回了基本的ffmpeg命令结构 assert isinstance(command, list) - assert len(command) > 0 \ No newline at end of file + assert len(command) > 0 diff --git a/tests/test_integration/test_ffmpeg_execution.py b/tests/test_integration/test_ffmpeg_execution.py index b55ba0c..b400838 100644 --- a/tests/test_integration/test_ffmpeg_execution.py +++ b/tests/test_integration/test_ffmpeg_execution.py @@ -1,4 +1,5 @@ """FFmpeg执行集成测试""" + import pytest import subprocess import tempfile @@ -7,7 +8,7 @@ from pathlib import Path from entity.ffmpeg_command_builder import FFmpegCommandBuilder from entity.render_task import RenderTask -from config.settings import Config +from config.settings import FFmpegConfig from services.render_service import DefaultRenderService from tests.utils.test_helpers import MockRenderTask, create_test_video_file @@ -16,17 +17,6 @@ from tests.utils.test_helpers import MockRenderTask, create_test_video_file class TestFFmpegExecution: """FFmpeg执行集成测试""" - @pytest.fixture - def test_config(self, temp_dir): - """测试配置""" - return Config( - encoder_args="-c:v libx264", - video_args="-preset ultrafast -crf 23", - template_dir=temp_dir, - api_endpoint="http://test.local", - access_key="test_key" - ) - @pytest.fixture def sample_video(self, temp_dir): """创建测试视频文件""" @@ -37,11 +27,11 @@ class TestFFmpegExecution: return video_path @pytest.fixture - def render_service(self, test_config): + def render_service(self): """渲染服务实例""" return DefaultRenderService() - def test_simple_copy_execution(self, sample_video, temp_dir, test_config): + def test_simple_copy_execution(self, sample_video, temp_dir): """测试简单复制执行""" output_path = os.path.join(temp_dir, "copy_output.mp4") @@ -50,7 +40,7 @@ class TestFFmpegExecution: task.output_path = output_path task.effects = [] - builder = FFmpegCommandBuilder(task, test_config) + builder = FFmpegCommandBuilder(task) command = builder.build_command() # 执行命令 @@ -62,7 +52,7 @@ class TestFFmpegExecution: except subprocess.TimeoutExpired: pytest.fail("FFmpeg execution timed out") - def test_zoom_effect_execution(self, sample_video, temp_dir, test_config): + def test_zoom_effect_execution(self, sample_video, temp_dir): """测试缩放特效执行""" output_path = os.path.join(temp_dir, "zoom_output.mp4") @@ -72,7 +62,7 @@ class TestFFmpegExecution: task.effects = ["zoom:0,2.0,2.0"] # 2倍缩放,持续2秒 task.ext_data = {"posJson": "{}"} - builder = FFmpegCommandBuilder(task, test_config) + builder = FFmpegCommandBuilder(task) command = builder.build_command() try: @@ -83,7 +73,7 @@ class TestFFmpegExecution: except subprocess.TimeoutExpired: pytest.fail("FFmpeg execution timed out") - def test_speed_effect_execution(self, sample_video, temp_dir, test_config): + def test_speed_effect_execution(self, sample_video, temp_dir): """测试变速特效执行""" output_path = os.path.join(temp_dir, "speed_output.mp4") @@ -93,7 +83,7 @@ class TestFFmpegExecution: task.effects = ["ospeed:2.0"] # 2倍速 task.ext_data = {} - builder = FFmpegCommandBuilder(task, test_config) + builder = FFmpegCommandBuilder(task) command = builder.build_command() try: @@ -104,7 +94,7 @@ class TestFFmpegExecution: except subprocess.TimeoutExpired: pytest.fail("FFmpeg execution timed out") - def test_multiple_effects_execution(self, sample_video, temp_dir, test_config): + def test_multiple_effects_execution(self, sample_video, temp_dir): """测试多特效组合执行""" output_path = os.path.join(temp_dir, "multi_effects_output.mp4") @@ -114,7 +104,7 @@ class TestFFmpegExecution: task.effects = ["zoom:0,1.5,1.0", "ospeed:1.5"] # 缩放+变速 task.ext_data = {"posJson": "{}"} - builder = FFmpegCommandBuilder(task, test_config) + builder = FFmpegCommandBuilder(task) command = builder.build_command() try: @@ -125,7 +115,7 @@ class TestFFmpegExecution: except subprocess.TimeoutExpired: pytest.fail("FFmpeg execution timed out") - def test_concat_execution(self, temp_dir, test_config): + def test_concat_execution(self, temp_dir): """测试视频拼接执行""" # 创建两个测试视频 video1_path = os.path.join(temp_dir, "video1.mp4") @@ -143,7 +133,7 @@ class TestFFmpegExecution: task.output_path = output_path task.effects = [] - builder = FFmpegCommandBuilder(task, test_config) + builder = FFmpegCommandBuilder(task) command = builder.build_command() try: @@ -154,7 +144,7 @@ class TestFFmpegExecution: except subprocess.TimeoutExpired: pytest.fail("FFmpeg execution timed out") - def test_invalid_effect_execution(self, sample_video, temp_dir, test_config): + def test_invalid_effect_execution(self, sample_video, temp_dir): """测试无效特效的执行处理""" output_path = os.path.join(temp_dir, "invalid_effect_output.mp4") @@ -164,7 +154,7 @@ class TestFFmpegExecution: task.effects = ["invalid_effect:params", "zoom:0,2.0,1.0"] # 混合有效和无效特效 task.ext_data = {"posJson": "{}"} - builder = FFmpegCommandBuilder(task, test_config) + builder = FFmpegCommandBuilder(task) command = builder.build_command() # 应该忽略无效特效,继续处理有效特效 @@ -175,7 +165,7 @@ class TestFFmpegExecution: except subprocess.TimeoutExpired: pytest.fail("FFmpeg execution timed out") - def test_render_service_integration(self, sample_video, temp_dir, test_config, render_service): + def test_render_service_integration(self, sample_video, temp_dir, render_service): """测试渲染服务集成""" output_path = os.path.join(temp_dir, "service_output.mp4") @@ -187,7 +177,7 @@ class TestFFmpegExecution: "output_path": output_path, "effects": ["zoom:0,1.8,2.0"], "ext_data": {"posJson": "{}"}, - "frame_rate": 25 + "frame_rate": 25, } # 这里需要根据实际的RenderTask构造方法调整 @@ -196,11 +186,11 @@ class TestFFmpegExecution: # 使用渲染服务执行 try: # 这里的方法调用需要根据实际的渲染服务接口调整 - # success = render_service.render(task, test_config) + # success = render_service.render(task) # assert success, "Render service failed" # 临时直接使用FFmpegCommandBuilder测试 - builder = FFmpegCommandBuilder(task, test_config) + builder = FFmpegCommandBuilder(task) command = builder.build_command() result = subprocess.run(command, capture_output=True, text=True, timeout=60) @@ -210,7 +200,7 @@ class TestFFmpegExecution: except Exception as e: pytest.fail(f"Render service integration failed: {e}") - def test_error_handling_missing_input(self, temp_dir, test_config): + def test_error_handling_missing_input(self, temp_dir): """测试缺失输入文件的错误处理""" missing_file = os.path.join(temp_dir, "nonexistent.mp4") output_path = os.path.join(temp_dir, "error_output.mp4") @@ -220,14 +210,14 @@ class TestFFmpegExecution: task.output_path = output_path task.effects = [] - builder = FFmpegCommandBuilder(task, test_config) + builder = FFmpegCommandBuilder(task) command = builder.build_command() # 应该失败,因为输入文件不存在 result = subprocess.run(command, capture_output=True, text=True, timeout=30) assert result.returncode != 0, "FFmpeg should fail with missing input file" - def test_performance_multiple_effects(self, sample_video, temp_dir, test_config): + def test_performance_multiple_effects(self, sample_video, temp_dir): """测试多特效性能""" output_path = os.path.join(temp_dir, "performance_output.mp4") @@ -235,21 +225,20 @@ class TestFFmpegExecution: task.input_files = [sample_video] task.output_path = output_path # 多个特效组合 - task.effects = [ - "zoom:0,1.5,1.0", - "ospeed:1.2", - "zoom:1.5,2.0,1.0" - ] + task.effects = ["zoom:0,1.5,1.0", "ospeed:1.2", "zoom:1.5,2.0,1.0"] task.ext_data = {"posJson": "{}"} - builder = FFmpegCommandBuilder(task, test_config) + builder = FFmpegCommandBuilder(task) command = builder.build_command() import time + start_time = time.time() try: - result = subprocess.run(command, capture_output=True, text=True, timeout=120) + result = subprocess.run( + command, capture_output=True, text=True, timeout=120 + ) execution_time = time.time() - start_time assert result.returncode == 0, f"FFmpeg failed: {result.stderr}" @@ -259,8 +248,10 @@ class TestFFmpegExecution: except subprocess.TimeoutExpired: pytest.fail("FFmpeg execution timed out") - @pytest.mark.skipif(not os.environ.get('RUN_STRESS_TESTS'), reason="Stress tests disabled") - def test_stress_test_large_effects_chain(self, sample_video, temp_dir, test_config): + @pytest.mark.skipif( + not os.environ.get("RUN_STRESS_TESTS"), reason="Stress tests disabled" + ) + def test_stress_test_large_effects_chain(self, sample_video, temp_dir): """压力测试:大量特效链""" output_path = os.path.join(temp_dir, "stress_output.mp4") @@ -271,12 +262,14 @@ class TestFFmpegExecution: task.effects = [f"zoom:{i*0.5},1.{i+5},{i*0.2+0.5}" for i in range(10)] task.ext_data = {"posJson": "{}"} - builder = FFmpegCommandBuilder(task, test_config) + builder = FFmpegCommandBuilder(task) command = builder.build_command() try: - result = subprocess.run(command, capture_output=True, text=True, timeout=300) + result = subprocess.run( + command, capture_output=True, text=True, timeout=300 + ) assert result.returncode == 0, f"Stress test failed: {result.stderr}" assert os.path.exists(output_path), "Output file was not created" except subprocess.TimeoutExpired: - pytest.fail("Stress test timed out") \ No newline at end of file + pytest.fail("Stress test timed out") diff --git a/tests/utils/test_helpers.py b/tests/utils/test_helpers.py index a6b4e3e..b6d6119 100644 --- a/tests/utils/test_helpers.py +++ b/tests/utils/test_helpers.py @@ -1,4 +1,5 @@ """测试辅助工具""" + import json import tempfile import subprocess @@ -7,7 +8,7 @@ from pathlib import Path from entity.render_task import RenderTask from entity.effects.base import EffectProcessor -from config.settings import Config +from config.settings import FFmpegConfig class MockRenderTask: @@ -20,7 +21,7 @@ class MockRenderTask: effects: List[str] = None, ext_data: Dict[str, Any] = None, frame_rate: int = 25, - output_path: str = "test_output.mp4" + output_path: str = "test_output.mp4", ): self.task_id = task_id self.template_id = template_id @@ -47,11 +48,11 @@ class FFmpegValidator: return False # 检查是否包含基本的滤镜结构 - if '[' in filter_str and ']' in filter_str: + if "[" in filter_str and "]" in filter_str: return True # 检查常见的滤镜格式 - common_filters = ['zoompan', 'setpts', 'trim', 'scale', 'crop'] + common_filters = ["zoompan", "setpts", "trim", "scale", "crop"] return any(f in filter_str for f in common_filters) except Exception: @@ -62,7 +63,7 @@ class FFmpegValidator: """验证流标识符格式""" if not stream_id: return False - return stream_id.startswith('[') and stream_id.endswith(']') + return stream_id.startswith("[") and stream_id.endswith("]") @staticmethod def validate_ffmpeg_command(command: List[str]) -> Dict[str, Any]: @@ -72,7 +73,7 @@ class FFmpegValidator: "has_input": False, "has_output": False, "has_filter": False, - "errors": [] + "errors": [], } if not command or command[0] != "ffmpeg": @@ -96,9 +97,7 @@ class FFmpegValidator: result["has_filter"] = True result["valid"] = ( - result["has_input"] and - result["has_output"] and - len(result["errors"]) == 0 + result["has_input"] and result["has_output"] and len(result["errors"]) == 0 ) return result @@ -108,12 +107,16 @@ class EffectTestHelper: """特效测试辅助类""" @staticmethod - def create_test_effect(effect_class, params: str = "", ext_data: Dict[str, Any] = None): + def create_test_effect( + effect_class, params: str = "", ext_data: Dict[str, Any] = None + ): """创建测试用特效实例""" return effect_class(params, ext_data) @staticmethod - def test_effect_params_validation(effect: EffectProcessor, test_cases: List[Dict[str, Any]]): + def test_effect_params_validation( + effect: EffectProcessor, test_cases: List[Dict[str, Any]] + ): """批量测试特效参数验证""" results = [] for case in test_cases: @@ -123,29 +126,41 @@ class EffectTestHelper: is_valid = effect.validate_params() expected = case.get("expected", True) - results.append({ - "params": effect.params, - "expected": expected, - "actual": is_valid, - "passed": is_valid == expected, - "description": case.get("description", "") - }) + results.append( + { + "params": effect.params, + "expected": expected, + "actual": is_valid, + "passed": is_valid == expected, + "description": case.get("description", ""), + } + ) return results @staticmethod - def test_filter_generation(effect: EffectProcessor, video_input: str = "[0:v]", effect_index: int = 1): + def test_filter_generation( + effect: EffectProcessor, video_input: str = "[0:v]", effect_index: int = 1 + ): """测试滤镜生成""" try: - filters, output_stream = effect.generate_filter_args(video_input, effect_index) + filters, output_stream = effect.generate_filter_args( + video_input, effect_index + ) result = { "success": True, "filters": filters, "output_stream": output_stream, "filter_count": len(filters), - "valid_syntax": all(FFmpegValidator.validate_filter_syntax(f) for f in filters), - "valid_output": FFmpegValidator.validate_stream_identifier(output_stream) if output_stream != video_input else True + "valid_syntax": all( + FFmpegValidator.validate_filter_syntax(f) for f in filters + ), + "valid_output": ( + FFmpegValidator.validate_stream_identifier(output_stream) + if output_stream != video_input + else True + ), } except Exception as e: result = { @@ -155,23 +170,31 @@ class EffectTestHelper: "output_stream": "", "filter_count": 0, "valid_syntax": False, - "valid_output": False + "valid_output": False, } return result -def create_test_video_file(output_path: str, duration: int = 5, resolution: str = "640x480") -> bool: +def create_test_video_file( + output_path: str, duration: int = 5, resolution: str = "640x480" +) -> bool: """创建测试用视频文件""" try: cmd = [ - "ffmpeg", "-y", # 覆盖输出文件 - "-f", "lavfi", # 使用libavfilter输入 - "-i", f"testsrc=duration={duration}:size={resolution}:rate=25", - "-c:v", "libx264", - "-preset", "ultrafast", - "-crf", "23", - output_path + "ffmpeg", + "-y", # 覆盖输出文件 + "-f", + "lavfi", # 使用libavfilter输入 + "-i", + f"testsrc=duration={duration}:size={resolution}:rate=25", + "-c:v", + "libx264", + "-preset", + "ultrafast", + "-crf", + "23", + output_path, ] result = subprocess.run(cmd, capture_output=True, text=True) @@ -191,12 +214,8 @@ def create_sample_template_data() -> Dict[str, Any]: "id": "part1", "type": "video", "duration": 10.0, - "effects": ["zoom:0,2.0,3.0", "ospeed:1.5"] + "effects": ["zoom:0,2.0,3.0", "ospeed:1.5"], } ], - "settings": { - "width": 1920, - "height": 1080, - "frameRate": 25 - } - } \ No newline at end of file + "settings": {"width": 1920, "height": 1080, "frameRate": 25}, + }