diff --git a/.gitignore b/.gitignore index d65ddac..a870de9 100644 --- a/.gitignore +++ b/.gitignore @@ -31,4 +31,6 @@ target/ .venv venv/ cython_debug/ -.env \ No newline at end of file +.env +.serena +.claude \ No newline at end of file diff --git a/Jenkinsfile b/Jenkinsfile new file mode 100644 index 0000000..154bf22 --- /dev/null +++ b/Jenkinsfile @@ -0,0 +1,335 @@ +pipeline { + agent any + + environment { + // 环境变量 + PYTHON_VERSION = '3.9' + VENV_NAME = 'venv' + TEST_REPORTS_DIR = 'test-reports' + COVERAGE_DIR = 'coverage-reports' + } + + stages { + stage('Preparation') { + steps { + script { + // 清理工作空间 + echo 'Cleaning workspace...' + deleteDir() + + // 检出代码 + checkout scm + + // 创建报告目录 + sh """ + mkdir -p ${TEST_REPORTS_DIR} + mkdir -p ${COVERAGE_DIR} + """ + } + } + } + + stage('Environment Setup') { + steps { + script { + echo 'Setting up Python environment...' + sh """ + # 创建虚拟环境 + python${PYTHON_VERSION} -m venv ${VENV_NAME} + + # 激活虚拟环境并安装依赖 + . ${VENV_NAME}/bin/activate + + # 升级pip + pip install --upgrade pip + + # 安装项目依赖 + pip install -r requirements.txt + + # 安装测试依赖 + pip install -r requirements-test.txt + + # 验证FFmpeg可用性 + which ffmpeg || echo "Warning: FFmpeg not found, integration tests may be skipped" + """ + } + } + } + + stage('Code Quality Check') { + parallel { + stage('Linting') { + steps { + script { + echo 'Running code linting...' + sh """ + . ${VENV_NAME}/bin/activate + + # 运行flake8检查 + flake8 entity/ services/ --output-file=${TEST_REPORTS_DIR}/flake8-report.txt --tee || true + + # 运行black格式检查 + black --check --diff entity/ services/ > ${TEST_REPORTS_DIR}/black-report.txt || true + """ + } + } + post { + always { + // 发布lint报告 + publishHTML([ + allowMissing: false, + alwaysLinkToLastBuild: true, + keepAll: true, + reportDir: TEST_REPORTS_DIR, + reportFiles: 'flake8-report.txt,black-report.txt', + reportName: 'Code Quality Report' + ]) + } + } + } + + stage('Type Checking') { + steps { + script { + echo 'Running type checking...' + sh """ + . ${VENV_NAME}/bin/activate + + # 运行mypy类型检查 + mypy entity/ services/ --html-report ${TEST_REPORTS_DIR}/mypy-html --txt-report ${TEST_REPORTS_DIR}/mypy-txt || true + """ + } + } + } + } + } + + stage('Unit Tests') { + steps { + script { + echo 'Running unit tests...' + sh """ + . ${VENV_NAME}/bin/activate + + # 运行单元测试 + pytest tests/test_effects/ tests/test_ffmpeg_builder/ \\ + --junitxml=${TEST_REPORTS_DIR}/unit-tests.xml \\ + --html=${TEST_REPORTS_DIR}/unit-tests.html \\ + --self-contained-html \\ + --cov=entity \\ + --cov=services \\ + --cov-report=xml:${COVERAGE_DIR}/unit-coverage.xml \\ + --cov-branch \\ + -v \\ + -m "not integration" + """ + } + } + post { + always { + // 发布单元测试结果 + junit "${TEST_REPORTS_DIR}/unit-tests.xml" + + // 发布HTML测试报告 + publishHTML([ + allowMissing: false, + alwaysLinkToLastBuild: true, + keepAll: true, + reportDir: TEST_REPORTS_DIR, + reportFiles: 'unit-tests.html', + reportName: 'Unit Tests Report' + ]) + } + } + } + + stage('Integration Tests') { + when { + // 只在有FFmpeg的环境中运行集成测试 + expression { + return sh( + script: 'which ffmpeg', + returnStatus: true + ) == 0 + } + } + steps { + script { + echo 'Running integration tests...' + sh """ + . ${VENV_NAME}/bin/activate + + # 运行集成测试 + pytest tests/test_integration/ \\ + --junitxml=${TEST_REPORTS_DIR}/integration-tests.xml \\ + --html=${TEST_REPORTS_DIR}/integration-tests.html \\ + --self-contained-html \\ + --cov=entity \\ + --cov=services \\ + --cov-report=xml:${COVERAGE_DIR}/integration-coverage.xml \\ + --cov-branch \\ + --timeout=300 \\ + -v \\ + -m "integration" + """ + } + } + post { + always { + // 发布集成测试结果 + junit "${TEST_REPORTS_DIR}/integration-tests.xml" + + // 发布HTML集成测试报告 + publishHTML([ + allowMissing: false, + alwaysLinkToLastBuild: true, + keepAll: true, + reportDir: TEST_REPORTS_DIR, + reportFiles: 'integration-tests.html', + reportName: 'Integration Tests Report' + ]) + } + } + } + + stage('Complete Test Suite') { + steps { + script { + echo 'Running complete test suite with coverage...' + sh """ + . ${VENV_NAME}/bin/activate + + # 运行完整测试套件 + pytest tests/ \\ + --junitxml=${TEST_REPORTS_DIR}/all-tests.xml \\ + --html=${TEST_REPORTS_DIR}/all-tests.html \\ + --self-contained-html \\ + --cov=entity \\ + --cov=services \\ + --cov-report=xml:${COVERAGE_DIR}/coverage.xml \\ + --cov-report=term-missing \\ + --cov-branch \\ + --cov-fail-under=70 \\ + -v + """ + } + } + post { + always { + // 发布完整测试结果 + junit "${TEST_REPORTS_DIR}/all-tests.xml" + + // 发布代码覆盖率报告 + publishCoverage([ + adapters: [ + [ + mergeToOneReport: true, + path: "${COVERAGE_DIR}/coverage.xml" + ] + ], + sourceFileResolver: [ + [ + level: 'NEVER_STORE' + ] + ] + ]) + + + // 发布完整测试HTML报告 + publishHTML([ + allowMissing: false, + alwaysLinkToLastBuild: true, + keepAll: true, + reportDir: TEST_REPORTS_DIR, + reportFiles: 'all-tests.html', + reportName: 'Complete Test Report' + ]) + } + } + } + + stage('Performance Tests') { + when { + // 只在主分支或发布分支运行性能测试 + anyOf { + branch 'master' + branch 'main' + branch 'release/*' + } + } + steps { + script { + echo 'Running performance tests...' + sh """ + . ${VENV_NAME}/bin/activate + + # 运行性能测试 + RUN_STRESS_TESTS=1 pytest tests/test_integration/test_ffmpeg_execution.py::TestFFmpegExecution::test_stress_test_large_effects_chain \\ + --benchmark-json=${TEST_REPORTS_DIR}/benchmark.json \\ + --html=${TEST_REPORTS_DIR}/performance-tests.html \\ + --self-contained-html \\ + -v \\ + -m "stress" || true + """ + } + } + post { + always { + // 发布性能测试报告 + publishHTML([ + allowMissing: true, + alwaysLinkToLastBuild: true, + keepAll: true, + reportDir: TEST_REPORTS_DIR, + reportFiles: 'performance-tests.html', + reportName: 'Performance Tests Report' + ]) + } + } + } + } + + post { + always { + // 清理虚拟环境 + sh """ + rm -rf ${VENV_NAME} + """ + + // 归档测试报告 + archiveArtifacts artifacts: "${TEST_REPORTS_DIR}/**/*", allowEmptyArchive: true + archiveArtifacts artifacts: "${COVERAGE_DIR}/**/*", allowEmptyArchive: true + } + + success { + echo 'All tests passed successfully!' + + // 发送成功通知 + script { + if (env.BRANCH_NAME == 'master' || env.BRANCH_NAME == 'main') { + // 这里可以添加Slack、邮件或其他通知 + echo 'Sending success notification...' + } + } + } + + failure { + echo 'Tests failed!' + + // 发送失败通知 + script { + // 这里可以添加Slack、邮件或其他通知 + echo 'Sending failure notification...' + } + } + + unstable { + echo 'Tests completed with warnings!' + } + + cleanup { + // 清理临时文件 + deleteDir() + } + } +} \ No newline at end of file diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..d8d1c06 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,53 @@ +[tool:pytest] +testpaths = tests +python_files = test_*.py +python_classes = Test* +python_functions = test_* + +# 标记定义 +markers = + integration: marks tests as integration tests (may be slow) + unit: marks tests as unit tests (fast) + slow: marks tests as slow running + stress: marks tests as stress tests + +# 输出配置 +addopts = + -v + --tb=short + --strict-markers + --strict-config + --cov=entity + --cov=services + --cov-report=xml:coverage.xml + --cov-report=html:htmlcov + --cov-report=term-missing + --cov-branch + +# 过滤警告 +filterwarnings = + ignore::DeprecationWarning + ignore::PendingDeprecationWarning + +# 最小覆盖率要求 +[tool:coverage:run] +source = entity,services +omit = + */tests/* + */test_* + */__pycache__/* + */venv/* + */env/* + +[tool:coverage:report] +exclude_lines = + pragma: no cover + def __repr__ + if self.debug: + if settings.DEBUG + raise AssertionError + raise NotImplementedError + if 0: + if __name__ == .__main__.: + class .*\bProtocol\): + @(abc\.)?abstractmethod \ No newline at end of file diff --git a/requirements-test.txt b/requirements-test.txt new file mode 100644 index 0000000..a320acb --- /dev/null +++ b/requirements-test.txt @@ -0,0 +1,23 @@ +# 测试依赖 +pytest>=7.0.0 +pytest-cov>=4.0.0 +pytest-mock>=3.10.0 +pytest-xdist>=3.0.0 # 并行测试 +pytest-timeout>=2.1.0 +pytest-html>=3.1.0 # HTML报告 + +# 代码质量 +flake8>=5.0.0 +black>=22.0.0 +mypy>=1.0.0 + +# 覆盖率报告 +coverage[toml]>=6.0.0 + +# 性能测试 +pytest-benchmark>=4.0.0 + +# 测试辅助 +factory-boy>=3.2.0 # 测试数据工厂 +freezegun>=1.2.0 # 时间模拟 +responses>=0.22.0 # HTTP模拟 \ No newline at end of file diff --git a/run_tests.py b/run_tests.py new file mode 100644 index 0000000..4eb96f7 --- /dev/null +++ b/run_tests.py @@ -0,0 +1,267 @@ +#!/usr/bin/env python +""" +测试运行器脚本 +支持不同类型的测试运行和报告生成 +""" +import os +import sys +import subprocess +import argparse +import tempfile +from pathlib import Path + + +def run_command(cmd, cwd=None): + """运行命令并返回结果""" + print(f"Running: {' '.join(cmd)}") + try: + result = subprocess.run( + cmd, + cwd=cwd, + capture_output=True, + text=True, + check=False + ) + if result.stdout: + print(result.stdout) + if result.stderr: + print(result.stderr, file=sys.stderr) + return result.returncode == 0 + except Exception as e: + print(f"Error running command: {e}", file=sys.stderr) + return False + + +def check_dependencies(): + """检查依赖是否安装""" + print("Checking dependencies...") + + # 检查pytest + if not run_command([sys.executable, "-c", "import pytest"]): + print("pytest not found. Installing test dependencies...") + if not run_command([sys.executable, "-m", "pip", "install", "-r", "requirements-test.txt"]): + print("Failed to install test dependencies", file=sys.stderr) + return False + + # 检查FFmpeg + ffmpeg_available = run_command(["ffmpeg", "-version"]) + if not ffmpeg_available: + print("Warning: FFmpeg not found. Integration tests may be skipped.") + + return True + + +def run_unit_tests(args): + """运行单元测试""" + print("\n=== Running Unit Tests ===") + + cmd = [ + sys.executable, "-m", "pytest", + "tests/test_effects/", + "tests/test_ffmpeg_builder/", + "-v", + "-m", "not integration" + ] + + if args.coverage: + cmd.extend([ + "--cov=entity", + "--cov=services", + "--cov-report=xml:coverage.xml", + "--cov-report=html:htmlcov", + "--cov-report=term-missing", + "--cov-branch" + ]) + + if args.xml_report: + cmd.extend(["--junitxml=unit-tests.xml"]) + + if args.html_report: + cmd.extend(["--html=unit-tests.html", "--self-contained-html"]) + + return run_command(cmd) + + +def run_integration_tests(args): + """运行集成测试""" + print("\n=== Running Integration Tests ===") + + # 检查FFmpeg + if not run_command(["ffmpeg", "-version"]): + print("FFmpeg not available, skipping integration tests") + return True + + cmd = [ + sys.executable, "-m", "pytest", + "tests/test_integration/", + "-v", + "-m", "integration", + "--timeout=300" + ] + + if args.coverage: + cmd.extend([ + "--cov=entity", + "--cov=services", + "--cov-report=xml:integration-coverage.xml", + "--cov-report=html:integration-htmlcov", + "--cov-branch" + ]) + + if args.xml_report: + cmd.extend(["--junitxml=integration-tests.xml"]) + + if args.html_report: + cmd.extend(["--html=integration-tests.html", "--self-contained-html"]) + + return run_command(cmd) + + +def run_all_tests(args): + """运行所有测试""" + print("\n=== Running All Tests ===") + + cmd = [ + sys.executable, "-m", "pytest", + "tests/", + "-v" + ] + + if args.coverage: + cmd.extend([ + "--cov=entity", + "--cov=services", + "--cov-report=xml:coverage.xml", + "--cov-report=html:htmlcov", + "--cov-report=term-missing", + "--cov-branch" + ]) + + if args.fail_under: + cmd.extend([f"--cov-fail-under={args.fail_under}"]) + + if args.xml_report: + cmd.extend(["--junitxml=all-tests.xml"]) + + if args.html_report: + cmd.extend(["--html=all-tests.html", "--self-contained-html"]) + + return run_command(cmd) + + +def run_effect_tests(effect_name=None): + """运行特定特效测试""" + if effect_name: + print(f"\n=== Running {effect_name} Effect Tests ===") + cmd = [ + sys.executable, "-m", "pytest", + f"tests/test_effects/test_{effect_name}_effect.py", + "-v" + ] + else: + print("\n=== Running All Effect Tests ===") + cmd = [ + sys.executable, "-m", "pytest", + "tests/test_effects/", + "-v" + ] + + return run_command(cmd) + + +def run_stress_tests(): + """运行压力测试""" + print("\n=== Running Stress Tests ===") + + env = os.environ.copy() + env['RUN_STRESS_TESTS'] = '1' + + cmd = [ + sys.executable, "-m", "pytest", + "tests/test_integration/", + "-v", + "-m", "stress", + "--timeout=600" + ] + + return subprocess.run(cmd, env=env).returncode == 0 + + +def create_test_video(): + """创建测试视频文件""" + print("\n=== Creating Test Video Files ===") + + if not run_command(["ffmpeg", "-version"]): + print("FFmpeg not available, cannot create test videos") + return False + + test_data_dir = Path("tests/test_data/videos") + test_data_dir.mkdir(parents=True, exist_ok=True) + + # 创建短视频文件 + video_path = test_data_dir / "sample.mp4" + cmd = [ + "ffmpeg", "-y", + "-f", "lavfi", + "-i", "testsrc=duration=5:size=640x480:rate=25", + "-c:v", "libx264", + "-preset", "ultrafast", + "-crf", "23", + str(video_path) + ] + + if run_command(cmd): + print(f"Created test video: {video_path}") + return True + else: + print("Failed to create test video") + return False + + +def main(): + parser = argparse.ArgumentParser(description="RenderWorker Test Runner") + parser.add_argument("test_type", choices=[ + "unit", "integration", "all", "effects", "stress", "setup" + ], help="Type of tests to run") + + parser.add_argument("--effect", help="Specific effect to test (for effects command)") + parser.add_argument("--coverage", action="store_true", help="Generate coverage report") + parser.add_argument("--xml-report", action="store_true", help="Generate XML test report") + parser.add_argument("--html-report", action="store_true", help="Generate HTML test report") + parser.add_argument("--fail-under", type=int, default=70, help="Minimum coverage percentage") + parser.add_argument("--no-deps-check", action="store_true", help="Skip dependency check") + + args = parser.parse_args() + + # 检查依赖 + if not args.no_deps_check and not check_dependencies(): + sys.exit(1) + + # 运行相应的测试 + success = True + + if args.test_type == "unit": + success = run_unit_tests(args) + elif args.test_type == "integration": + success = run_integration_tests(args) + elif args.test_type == "all": + success = run_all_tests(args) + elif args.test_type == "effects": + success = run_effect_tests(args.effect) + elif args.test_type == "stress": + success = run_stress_tests() + elif args.test_type == "setup": + success = create_test_video() + + if success: + print("\n✅ Tests completed successfully!") + if args.coverage: + print("📊 Coverage report generated in htmlcov/") + sys.exit(0) + else: + print("\n❌ Tests failed!") + sys.exit(1) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/tests/README.md b/tests/README.md new file mode 100644 index 0000000..1badfc3 --- /dev/null +++ b/tests/README.md @@ -0,0 +1,313 @@ +# RenderWorker 测试文档 + +本目录包含 RenderWorker 项目的完整测试套件,用于验证特效参数生成和 FFmpeg 渲染功能。 + +## 目录结构 + +``` +tests/ +├── conftest.py # pytest 配置和公共 fixtures +├── test_data/ # 测试数据 +│ ├── videos/ # 测试视频文件 +│ ├── templates/ # 测试模板数据 +│ └── expected_outputs/ # 预期输出结果 +├── test_effects/ # 特效单元测试 +│ ├── test_base.py # 基础类测试 +│ ├── test_zoom_effect.py # 缩放特效测试 +│ ├── test_speed_effect.py # 变速特效测试 +│ └── ... # 其他特效测试 +├── test_ffmpeg_builder/ # FFmpeg 命令构建测试 +│ └── test_ffmpeg_command_builder.py +├── test_integration/ # 集成测试 +│ └── test_ffmpeg_execution.py # FFmpeg 执行测试 +└── utils/ # 测试工具 + └── test_helpers.py # 测试辅助函数 +``` + +## 快速开始 + +### 1. 安装测试依赖 + +```bash +pip install -r requirements-test.txt +``` + +### 2. 运行测试 + +使用测试运行器脚本: + +```bash +# 运行所有测试 +python run_tests.py all --coverage + +# 只运行单元测试 +python run_tests.py unit --coverage --html-report + +# 只运行集成测试 +python run_tests.py integration + +# 运行特定特效测试 +python run_tests.py effects --effect zoom + +# 运行压力测试 +python run_tests.py stress +``` + +或直接使用 pytest: + +```bash +# 运行所有测试 +pytest tests/ -v --cov=entity --cov=services + +# 只运行单元测试 +pytest tests/test_effects/ tests/test_ffmpeg_builder/ -v + +# 只运行集成测试 +pytest tests/test_integration/ -v -m integration + +# 生成覆盖率报告 +pytest tests/ --cov=entity --cov=services --cov-report=html +``` + +## 测试类型说明 + +### 单元测试 (Unit Tests) + +位置:`tests/test_effects/`, `tests/test_ffmpeg_builder/` + +测试内容: +- 各个特效处理器的参数验证 +- FFmpeg 滤镜参数生成 +- 特效注册表功能 +- FFmpeg 命令构建逻辑 + +特点: +- 运行速度快 +- 不依赖外部工具 +- 测试覆盖率高 + +### 集成测试 (Integration Tests) + +位置:`tests/test_integration/` + +测试内容: +- 实际 FFmpeg 命令执行 +- 视频文件处理验证 +- 特效组合效果测试 +- 错误处理验证 + +依赖: +- 需要系统安装 FFmpeg +- 需要测试视频文件 + +### 压力测试 (Stress Tests) + +测试内容: +- 大量特效链处理 +- 长时间运行稳定性 +- 资源使用情况 + +运行条件: +- 设置环境变量 `RUN_STRESS_TESTS=1` +- 较长的超时时间 + +## 测试配置 + +### pytest 配置 + +在 `pytest.ini` 中配置: +- 测试路径和文件模式 +- 覆盖率设置 +- 标记定义 +- 输出格式 + +### 环境变量 + +- `RUN_STRESS_TESTS`: 启用压力测试 +- `FFMPEG_PATH`: 自定义 FFmpeg 路径 + +## 特效测试详解 + +### 缩放特效 (ZoomEffect) + +测试用例: +- 有效参数验证:`"0,2.0,3.0"`(开始时间,缩放因子,持续时间) +- 无效参数处理:负值、非数字、参数不足 +- 静态缩放:持续时间为 0 +- 动态缩放:指定时间段内的缩放 +- 位置 JSON 解析:自定义缩放中心点 + +生成滤镜格式: +``` +# 静态缩放 +[0:v]trim=start=0,zoompan=z=2.0:x=iw/2:y=ih/2:d=1[v_eff1] + +# 动态缩放 +[0:v]zoompan=z=if(between(t\,0\,3.0)\,2.0\,1):x=iw/2:y=ih/2:d=1[v_eff1] +``` + +### 变速特效 (SpeedEffect) + +测试用例: +- 加速效果:`"2.0"`(2倍速) +- 减速效果:`"0.5"`(0.5倍速) +- 无效参数:零值、负值、非数字 +- 默认处理:空参数或 1.0 倍速 + +生成滤镜格式: +``` +[0:v]setpts=2.0*PTS[v_eff1] +``` + +## FFmpeg 命令构建测试 + +### 测试场景 + +1. **单文件复制**:直接复制无需编码 +2. **多文件拼接**:使用 concat 滤镜 +3. **特效处理**:复杂滤镜链构建 +4. **错误处理**:缺失文件、无效参数 + +### 命令验证 + +使用 `FFmpegValidator` 验证: +- 命令结构完整性 +- 输入输出文件存在 +- 滤镜语法正确性 +- 流标识符格式 + +## 集成测试详解 + +### 真实 FFmpeg 执行 + +测试流程: +1. 创建测试视频文件 +2. 构建 FFmpeg 命令 +3. 执行命令并检查返回码 +4. 验证输出文件存在且有效 + +### 测试用例 + +- 简单复制操作 +- 单特效处理 +- 多特效组合 +- 视频拼接 +- 错误情况处理 + +### 性能测试 + +监控指标: +- 执行时间 +- 内存使用 +- 文件大小 +- 处理速度 + +## 代码覆盖率 + +目标覆盖率:≥ 70% + +覆盖范围: +- `entity/` 目录下的所有模块 +- `services/` 目录下的所有模块 + +报告格式: +- XML 格式:用于 Jenkins CI/CD +- HTML 格式:用于本地查看 +- 终端输出:实时查看缺失覆盖 + +## CI/CD 集成 + +### Jenkins Pipeline + +配置文件:`Jenkinsfile` + +阶段: +1. 环境准备 +2. 代码质量检查 +3. 单元测试 +4. 集成测试 +5. 完整测试套件 +6. 性能测试 + +报告生成: +- JUnit XML 测试报告 +- Cobertura 覆盖率报告 +- HTML 测试和覆盖率报告 + +### 自动化触发 + +- 代码提交时运行单元测试 +- PR 创建时运行完整测试 +- 主分支更新时运行包含性能测试的完整套件 + +## 故障排除 + +### 常见问题 + +1. **FFmpeg 未找到** + ```bash + # 安装 FFmpeg + sudo apt-get install ffmpeg # Ubuntu/Debian + brew install ffmpeg # macOS + ``` + +2. **测试视频创建失败** + ```bash + # 手动创建测试视频 + python run_tests.py setup + ``` + +3. **覆盖率过低** + - 检查是否有未测试的代码路径 + - 添加边界条件测试 + - 验证测试是否实际运行 + +4. **集成测试超时** + - 增加超时时间:`--timeout=600` + - 使用更小的测试文件 + - 检查系统资源使用情况 + +### 调试技巧 + +1. **查看详细输出**: + ```bash + pytest tests/ -v -s + ``` + +2. **只运行失败的测试**: + ```bash + pytest tests/ --lf + ``` + +3. **停在第一个失败**: + ```bash + pytest tests/ -x + ``` + +4. **查看覆盖率详情**: + ```bash + pytest tests/ --cov=entity --cov-report=term-missing + ``` + +## 贡献指南 + +### 添加新测试 + +1. 在相应目录创建测试文件 +2. 使用描述性的测试函数名 +3. 添加适当的测试标记 +4. 更新文档说明 + +### 测试最佳实践 + +1. **独立性**:每个测试应该独立运行 +2. **可重复性**:测试结果应该一致 +3. **清晰性**:测试意图应该明确 +4. **完整性**:覆盖正常和异常情况 + +### 代码质量 + +- 遵循 PEP 8 代码风格 +- 使用类型注解 +- 添加适当的文档字符串 +- 使用有意义的变量名 \ No newline at end of file diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..8cacdca --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,90 @@ +"""pytest配置文件""" +import pytest +import os +import tempfile +import shutil +from pathlib import Path +from typing import Dict, Any + +from entity.render_task import RenderTask +from config.settings import Config + + +@pytest.fixture +def temp_dir(): + """创建临时目录""" + temp_path = tempfile.mkdtemp() + yield temp_path + shutil.rmtree(temp_path, ignore_errors=True) + + +@pytest.fixture +def test_config(): + """测试用配置""" + return Config( + encoder_args="-c:v h264", + video_args="", + template_dir="tests/test_data/templates", + api_endpoint="http://test.local", + access_key="test_key" + ) + + +@pytest.fixture +def sample_render_task(): + """示例渲染任务""" + return RenderTask( + task_id="test_task_001", + template_id="test_template", + output_path="test_output.mp4", + frame_rate=25, + effects=["zoom:0,2.0,3.0", "ospeed:1.5"], + ext_data={ + "posJson": '{"ltX": 100, "ltY": 100, "rbX": 200, "rbY": 200, "imgWidth": 300, "imgHeight": 300}' + } + ) + + +@pytest.fixture +def sample_video_file(temp_dir): + """创建测试用的样本视频文件路径""" + video_path = os.path.join(temp_dir, "sample.mp4") + # 这里只返回路径,实际测试中可能需要真实视频文件 + return video_path + + +@pytest.fixture(scope="session") +def ffmpeg_available(): + """检查FFmpeg是否可用""" + import subprocess + try: + subprocess.run(["ffmpeg", "-version"], capture_output=True, check=True) + return True + except (subprocess.CalledProcessError, FileNotFoundError): + return False + + +@pytest.fixture +def mock_ext_data(): + """模拟扩展数据""" + return { + "posJson": '{"ltX": 50, "ltY": 50, "rbX": 150, "rbY": 150, "imgWidth": 200, "imgHeight": 200}', + "templateData": {"width": 1920, "height": 1080} + } + + +# 标记:只在FFmpeg可用时运行集成测试 +def pytest_collection_modifyitems(config, items): + """根据FFmpeg可用性修改测试收集""" + try: + import subprocess + subprocess.run(["ffmpeg", "-version"], capture_output=True, check=True) + ffmpeg_available = True + except (subprocess.CalledProcessError, FileNotFoundError): + ffmpeg_available = False + + if not ffmpeg_available: + skip_ffmpeg = pytest.mark.skip(reason="FFmpeg not available") + for item in items: + if "integration" in item.keywords: + item.add_marker(skip_ffmpeg) \ No newline at end of file diff --git a/tests/test_data/README.md b/tests/test_data/README.md new file mode 100644 index 0000000..417d934 --- /dev/null +++ b/tests/test_data/README.md @@ -0,0 +1,15 @@ +# 测试数据说明 + +这个目录包含测试所需的样本数据: + +## 目录结构 +- `videos/` - 测试用视频文件 +- `templates/` - 测试用模板数据 +- `expected_outputs/` - 预期输出结果 + +## 使用说明 +- 小尺寸视频文件用于快速测试 +- 模板文件包含各种特效配置 +- 预期输出用于验证测试结果 + +注意:实际的视频文件可能较大,建议使用小尺寸测试文件。 \ No newline at end of file diff --git a/tests/test_effects/test_base.py b/tests/test_effects/test_base.py new file mode 100644 index 0000000..5e2581f --- /dev/null +++ b/tests/test_effects/test_base.py @@ -0,0 +1,188 @@ +"""测试特效基础类和注册表""" +import pytest +from unittest.mock import Mock + +from entity.effects.base import EffectProcessor, EffectRegistry + + +class TestEffectProcessor: + """测试特效处理器基类""" + + def test_init(self): + """测试初始化""" + processor = Mock(spec=EffectProcessor) + processor.__init__("test_params", {"key": "value"}) + + assert processor.params == "test_params" + assert processor.ext_data == {"key": "value"} + assert processor.frame_rate == 25 + + def test_init_default_ext_data(self): + """测试默认扩展数据""" + processor = Mock(spec=EffectProcessor) + processor.__init__("test_params") + + assert processor.ext_data == {} + + def test_parse_params_empty(self): + """测试解析空参数""" + processor = EffectProcessor("", {}) + result = processor.parse_params() + assert result == [] + + def test_parse_params_single(self): + """测试解析单个参数""" + processor = EffectProcessor("123", {}) + result = processor.parse_params() + assert result == ["123"] + + def test_parse_params_multiple(self): + """测试解析多个参数""" + processor = EffectProcessor("1,2.5,test", {}) + result = processor.parse_params() + assert result == ["1", "2.5", "test"] + + def test_get_pos_json_empty(self): + """测试获取空位置JSON""" + processor = EffectProcessor("", {}) + result = processor.get_pos_json() + assert result == {} + + def test_get_pos_json_valid(self): + """测试获取有效位置JSON""" + ext_data = { + "posJson": '{"ltX": 100, "ltY": 200, "rbX": 300, "rbY": 400}' + } + processor = EffectProcessor("", ext_data) + result = processor.get_pos_json() + + expected = {"ltX": 100, "ltY": 200, "rbX": 300, "rbY": 400} + assert result == expected + + def test_get_pos_json_invalid(self): + """测试获取无效位置JSON""" + ext_data = {"posJson": "invalid_json"} + processor = EffectProcessor("", ext_data) + result = processor.get_pos_json() + assert result == {} + + def test_get_pos_json_default_value(self): + """测试默认位置JSON值""" + ext_data = {"posJson": "{}"} + processor = EffectProcessor("", ext_data) + result = processor.get_pos_json() + assert result == {} + + +class TestEffectRegistry: + """测试特效注册表""" + + def test_init(self): + """测试初始化""" + registry = EffectRegistry() + assert registry._processors == {} + + def test_register_valid_processor(self): + """测试注册有效处理器""" + registry = EffectRegistry() + + class TestEffect(EffectProcessor): + def validate_params(self): + return True + + def generate_filter_args(self, video_input, effect_index): + return [], video_input + + def get_effect_name(self): + return "test" + + registry.register("test_effect", TestEffect) + assert "test_effect" in registry._processors + assert registry._processors["test_effect"] == TestEffect + + def test_register_invalid_processor(self): + """测试注册无效处理器""" + registry = EffectRegistry() + + class InvalidEffect: + pass + + with pytest.raises(ValueError, match="must be a subclass of EffectProcessor"): + registry.register("invalid", InvalidEffect) + + def test_get_processor_exists(self): + """测试获取存在的处理器""" + registry = EffectRegistry() + + class TestEffect(EffectProcessor): + def validate_params(self): + return True + + def generate_filter_args(self, video_input, effect_index): + return [], video_input + + def get_effect_name(self): + return "test" + + registry.register("test_effect", TestEffect) + processor = registry.get_processor("test_effect", "params", {"data": "value"}) + + assert isinstance(processor, TestEffect) + assert processor.params == "params" + assert processor.ext_data == {"data": "value"} + + def test_get_processor_not_exists(self): + """测试获取不存在的处理器""" + registry = EffectRegistry() + processor = registry.get_processor("nonexistent") + assert processor is None + + def test_list_effects_empty(self): + """测试列出空特效""" + registry = EffectRegistry() + effects = registry.list_effects() + assert effects == [] + + def test_list_effects_with_processors(self): + """测试列出已注册特效""" + registry = EffectRegistry() + + class TestEffect(EffectProcessor): + def validate_params(self): + return True + + def generate_filter_args(self, video_input, effect_index): + return [], video_input + + def get_effect_name(self): + return "test" + + registry.register("effect1", TestEffect) + registry.register("effect2", TestEffect) + + effects = registry.list_effects() + assert set(effects) == {"effect1", "effect2"} + + def test_parse_effect_string_with_params(self): + """测试解析带参数的特效字符串""" + registry = EffectRegistry() + name, params = registry.parse_effect_string("zoom:0,2.0,3.0") + + assert name == "zoom" + assert params == "0,2.0,3.0" + + def test_parse_effect_string_without_params(self): + """测试解析无参数的特效字符串""" + registry = EffectRegistry() + name, params = registry.parse_effect_string("zoom") + + assert name == "zoom" + assert params == "" + + def test_parse_effect_string_multiple_colons(self): + """测试解析多个冒号的特效字符串""" + registry = EffectRegistry() + name, params = registry.parse_effect_string("effect:param1:param2") + + assert name == "effect" + assert params == "param1:param2" \ No newline at end of file diff --git a/tests/test_effects/test_speed_effect.py b/tests/test_effects/test_speed_effect.py new file mode 100644 index 0000000..5cf70f7 --- /dev/null +++ b/tests/test_effects/test_speed_effect.py @@ -0,0 +1,169 @@ +"""测试变速特效""" +import pytest +from entity.effects.speed import SpeedEffect +from tests.utils.test_helpers import EffectTestHelper + + +class TestSpeedEffect: + """测试变速特效处理器""" + + def test_validate_params_valid_cases(self): + """测试有效参数验证""" + test_cases = [ + { + "params": "2.0", + "expected": True, + "description": "2倍速" + }, + { + "params": "0.5", + "expected": True, + "description": "0.5倍速(慢速)" + }, + { + "params": "1.0", + "expected": True, + "description": "正常速度" + }, + { + "params": "10.0", + "expected": True, + "description": "10倍速" + }, + { + "params": "", + "expected": True, + "description": "空参数(默认不变速)" + } + ] + + effect = SpeedEffect() + results = EffectTestHelper.test_effect_params_validation(effect, test_cases) + + for result in results: + assert result["passed"], f"Failed case: {result['description']} - {result}" + + def test_validate_params_invalid_cases(self): + """测试无效参数验证""" + test_cases = [ + { + "params": "0", + "expected": False, + "description": "零速度" + }, + { + "params": "-1.0", + "expected": False, + "description": "负速度" + }, + { + "params": "abc", + "expected": False, + "description": "非数字参数" + }, + { + "params": "1.0,2.0", + "expected": False, + "description": "多余参数" + } + ] + + effect = SpeedEffect() + results = EffectTestHelper.test_effect_params_validation(effect, test_cases) + + for result in results: + assert result["passed"], f"Failed case: {result['description']} - {result}" + + def test_generate_filter_args_speed_change(self): + """测试变速滤镜生成""" + effect = SpeedEffect("2.0") # 2倍速 + result = EffectTestHelper.test_filter_generation(effect, "[0:v]", 1) + + assert result["success"], f"Filter generation failed: {result.get('error')}" + assert result["filter_count"] == 1 + assert result["valid_syntax"] + assert result["output_stream"] == "[v_eff1]" + + # 检查滤镜内容 + filter_str = result["filters"][0] + assert "setpts=2.0*PTS" in filter_str + assert "[0:v]" in filter_str + assert "[v_eff1]" in filter_str + + def test_generate_filter_args_slow_motion(self): + """测试慢动作滤镜生成""" + effect = SpeedEffect("0.5") # 0.5倍速(慢动作) + result = EffectTestHelper.test_filter_generation(effect, "[input]", 3) + + assert result["success"] + assert result["filter_count"] == 1 + assert result["valid_syntax"] + assert result["output_stream"] == "[v_eff3]" + + filter_str = result["filters"][0] + assert "setpts=0.5*PTS" in filter_str + assert "[input]" in filter_str + assert "[v_eff3]" in filter_str + + def test_generate_filter_args_no_change(self): + """测试无变速效果""" + test_cases = [ + {"params": "", "description": "空参数"}, + {"params": "1", "description": "1倍速"}, + {"params": "1.0", "description": "1.0倍速"} + ] + + for case in test_cases: + effect = SpeedEffect(case["params"]) + result = EffectTestHelper.test_filter_generation(effect, "[0:v]", 1) + + assert result["success"], f"Failed for {case['description']}" + assert result["filter_count"] == 0, f"Should not generate filter for {case['description']}" + assert result["output_stream"] == "[0:v]", f"Output should equal input for {case['description']}" + + def test_generate_filter_args_invalid_params(self): + """测试无效参数的滤镜生成""" + effect = SpeedEffect("invalid") + result = EffectTestHelper.test_filter_generation(effect, "[0:v]", 1) + + assert result["success"] + assert result["filter_count"] == 0 + assert result["output_stream"] == "[0:v]" + + def test_get_effect_name(self): + """测试获取特效名称""" + effect = SpeedEffect() + assert effect.get_effect_name() == "ospeed" + + def test_various_speed_factors(self): + """测试各种速度因子""" + speed_factors = ["0.1", "0.25", "0.75", "1.5", "3.0", "5.0"] + + for speed in speed_factors: + effect = SpeedEffect(speed) + result = EffectTestHelper.test_filter_generation(effect, "[test]", 10) + + if speed == "1.0": + # 1倍速不应该生成滤镜 + assert result["filter_count"] == 0 + assert result["output_stream"] == "[test]" + else: + # 其他速度应该生成滤镜 + assert result["success"], f"Failed for speed {speed}" + assert result["filter_count"] == 1 + assert result["valid_syntax"] + assert f"setpts={speed}*PTS" in result["filters"][0] + + def test_effect_chaining(self): + """测试特效链式处理""" + # 模拟在特效链中的使用 + effect = SpeedEffect("2.0") + + # 第一个特效 + result1 = EffectTestHelper.test_filter_generation(effect, "[0:v]", 1) + assert result1["output_stream"] == "[v_eff1]" + + # 作为链中的第二个特效 + result2 = EffectTestHelper.test_filter_generation(effect, "[v_eff1]", 2) + assert result2["output_stream"] == "[v_eff2]" + assert "[v_eff1]" in result2["filters"][0] \ No newline at end of file diff --git a/tests/test_effects/test_zoom_effect.py b/tests/test_effects/test_zoom_effect.py new file mode 100644 index 0000000..bd448ab --- /dev/null +++ b/tests/test_effects/test_zoom_effect.py @@ -0,0 +1,194 @@ +"""测试缩放特效""" +import pytest +from entity.effects.zoom import ZoomEffect +from tests.utils.test_helpers import EffectTestHelper, FFmpegValidator + + +class TestZoomEffect: + """测试缩放特效处理器""" + + def test_validate_params_valid_cases(self): + """测试有效参数验证""" + test_cases = [ + { + "params": "0,2.0,3.0", + "expected": True, + "description": "标准缩放参数" + }, + { + "params": "1.5,1.5,0", + "expected": True, + "description": "静态缩放(duration=0)" + }, + { + "params": "0,1.0,5.0", + "expected": True, + "description": "无缩放效果(factor=1.0)" + }, + { + "params": "10,0.5,2.0", + "expected": True, + "description": "缩小效果" + } + ] + + effect = ZoomEffect() + results = EffectTestHelper.test_effect_params_validation(effect, test_cases) + + for result in results: + assert result["passed"], f"Failed case: {result['description']} - {result}" + + def test_validate_params_invalid_cases(self): + """测试无效参数验证""" + test_cases = [ + { + "params": "", + "expected": False, + "description": "空参数" + }, + { + "params": "1,2", + "expected": False, + "description": "参数不足" + }, + { + "params": "-1,2.0,3.0", + "expected": False, + "description": "负开始时间" + }, + { + "params": "0,0,3.0", + "expected": False, + "description": "零缩放因子" + }, + { + "params": "0,-2.0,3.0", + "expected": False, + "description": "负缩放因子" + }, + { + "params": "0,2.0,-1.0", + "expected": False, + "description": "负持续时间" + }, + { + "params": "abc,2.0,3.0", + "expected": False, + "description": "非数字参数" + } + ] + + effect = ZoomEffect() + results = EffectTestHelper.test_effect_params_validation(effect, test_cases) + + for result in results: + assert result["passed"], f"Failed case: {result['description']} - {result}" + + def test_generate_filter_args_static_zoom(self): + """测试静态缩放滤镜生成""" + effect = ZoomEffect("0,2.0,0") # duration=0表示静态缩放 + result = EffectTestHelper.test_filter_generation(effect, "[0:v]", 1) + + assert result["success"], f"Filter generation failed: {result.get('error')}" + assert result["filter_count"] == 1 + assert result["valid_syntax"] + assert result["output_stream"] == "[v_eff1]" + + # 检查滤镜内容 + filter_str = result["filters"][0] + assert "trim=start=0" in filter_str + assert "zoompan=z=2.0" in filter_str + assert "[v_eff1]" in filter_str + + def test_generate_filter_args_dynamic_zoom(self): + """测试动态缩放滤镜生成""" + effect = ZoomEffect("1.0,1.5,2.0") # 从1秒开始,持续2秒的1.5倍缩放 + result = EffectTestHelper.test_filter_generation(effect, "[0:v]", 2) + + assert result["success"], f"Filter generation failed: {result.get('error')}" + assert result["filter_count"] == 1 + assert result["valid_syntax"] + assert result["output_stream"] == "[v_eff2]" + + # 检查滤镜内容 + filter_str = result["filters"][0] + assert "zoompan=z=" in filter_str + assert "between(t\\\\,1.0\\\\,3.0)" in filter_str # 检查时间范围 + assert "[v_eff2]" in filter_str + + def test_generate_filter_args_no_zoom(self): + """测试无缩放效果(factor=1.0)""" + effect = ZoomEffect("0,1.0,3.0") # 缩放因子为1.0,应该不生成滤镜 + result = EffectTestHelper.test_filter_generation(effect, "[0:v]", 1) + + assert result["success"] + assert result["filter_count"] == 0 + assert result["output_stream"] == "[0:v]" # 输出应该等于输入 + + def test_generate_filter_args_invalid_params(self): + """测试无效参数的滤镜生成""" + effect = ZoomEffect("invalid,params") + result = EffectTestHelper.test_filter_generation(effect, "[0:v]", 1) + + assert result["success"] + assert result["filter_count"] == 0 + assert result["output_stream"] == "[0:v]" # 无效参数时应该返回原输入 + + def test_get_zoom_center_default(self): + """测试默认缩放中心点""" + effect = ZoomEffect("0,2.0,3.0") + center_x, center_y = effect._get_zoom_center() + + assert center_x == "iw/2" + assert center_y == "ih/2" + + def test_get_zoom_center_with_pos_json(self): + """测试基于posJson的缩放中心点""" + ext_data = { + "posJson": '{"ltX": 100, "ltY": 100, "rbX": 200, "rbY": 200, "imgWidth": 400, "imgHeight": 300}' + } + effect = ZoomEffect("0,2.0,3.0", ext_data) + center_x, center_y = effect._get_zoom_center() + + # 中心点应该是矩形的中心 + # center_x_ratio = (100 + 200) / (2 * 400) = 0.375 + # center_y_ratio = (100 + 200) / (2 * 300) = 0.5 + assert "iw*0.375" in center_x + assert "ih*0.5" in center_y + + def test_get_zoom_center_invalid_pos_json(self): + """测试无效posJson时的缩放中心点""" + ext_data = { + "posJson": '{"imgWidth": 0, "imgHeight": 0}' # 无效尺寸 + } + effect = ZoomEffect("0,2.0,3.0", ext_data) + center_x, center_y = effect._get_zoom_center() + + # 应该回退到默认中心点 + assert center_x == "iw/2" + assert center_y == "ih/2" + + def test_get_effect_name(self): + """测试获取特效名称""" + effect = ZoomEffect() + assert effect.get_effect_name() == "zoom" + + def test_complex_zoom_scenario(self): + """测试复杂缩放场景""" + # 带有复杂posJson数据的动态缩放 + ext_data = { + "posJson": '{"ltX": 50, "ltY": 75, "rbX": 350, "rbY": 225, "imgWidth": 400, "imgHeight": 300}' + } + effect = ZoomEffect("2.5,3.0,1.5", ext_data) + result = EffectTestHelper.test_filter_generation(effect, "[input]", 5) + + assert result["success"] + assert result["filter_count"] == 1 + assert result["valid_syntax"] + assert result["output_stream"] == "[v_eff5]" + + filter_str = result["filters"][0] + assert "zoompan=z=" in filter_str + assert "between(t\\\\,2.5\\\\,4.0)" in filter_str # 2.5 + 1.5 = 4.0 + assert "[input]" in filter_str + assert "[v_eff5]" in filter_str \ No newline at end of file diff --git a/tests/test_ffmpeg_builder/test_ffmpeg_command_builder.py b/tests/test_ffmpeg_builder/test_ffmpeg_command_builder.py new file mode 100644 index 0000000..820dd75 --- /dev/null +++ b/tests/test_ffmpeg_builder/test_ffmpeg_command_builder.py @@ -0,0 +1,244 @@ +"""测试FFmpeg命令构建器""" +import pytest +from unittest.mock import Mock, patch + +from entity.ffmpeg_command_builder import FFmpegCommandBuilder +from entity.render_task import RenderTask +from config.settings import Config +from tests.utils.test_helpers import MockRenderTask, FFmpegValidator + + +class TestFFmpegCommandBuilder: + """测试FFmpeg命令构建器""" + + @pytest.fixture + def mock_config(self): + """模拟配置""" + return Config( + encoder_args="-c:v h264", + video_args="-preset fast", + template_dir="tests/test_data/templates", + api_endpoint="http://test.local", + access_key="test_key" + ) + + @pytest.fixture + def simple_task(self): + """简单的渲染任务""" + task = MockRenderTask() + task.input_files = ["input1.mp4", "input2.mp4"] + task.output_path = "output.mp4" + task.effects = [] + return task + + @pytest.fixture + def task_with_effects(self): + """带特效的渲染任务""" + task = MockRenderTask() + task.input_files = ["input.mp4"] + task.output_path = "output.mp4" + task.effects = ["zoom:0,2.0,3.0", "ospeed:1.5"] + task.ext_data = { + "posJson": '{"ltX": 100, "ltY": 100, "rbX": 200, "rbY": 200, "imgWidth": 300, "imgHeight": 300}' + } + return task + + def test_init(self, simple_task, mock_config): + """测试初始化""" + builder = FFmpegCommandBuilder(simple_task, mock_config) + assert builder.task == simple_task + assert builder.config == mock_config + + def test_build_copy_command(self, simple_task, mock_config): + """测试构建复制命令""" + builder = FFmpegCommandBuilder(simple_task, mock_config) + command = builder._build_copy_command() + + # 验证命令结构 + validation = FFmpegValidator.validate_ffmpeg_command(command) + assert validation["valid"], f"Invalid command: {validation['errors']}" + assert validation["has_input"] + assert validation["has_output"] + + # 验证具体内容 + assert command[0] == "ffmpeg" + assert "-i" in command + assert "input1.mp4" in command + assert "output.mp4" in command + assert "-c" in command and "copy" in command + + def test_build_concat_command_multiple_files(self, mock_config): + """测试构建多文件拼接命令""" + task = MockRenderTask() + task.input_files = ["file1.mp4", "file2.mp4", "file3.mp4"] + task.output_path = "concat_output.mp4" + + builder = FFmpegCommandBuilder(task, mock_config) + command = builder._build_concat_command() + + validation = FFmpegValidator.validate_ffmpeg_command(command) + assert validation["valid"] + assert validation["has_filter"] + + # 验证拼接相关参数 + assert "concat=n=3:v=1:a=1" in " ".join(command) + assert all(f"file{i}.mp4" in command for i in range(1, 4)) + + def test_build_encode_command_with_effects(self, task_with_effects, mock_config): + """测试构建带特效的编码命令""" + builder = FFmpegCommandBuilder(task_with_effects, mock_config) + command = builder._build_encode_command() + + validation = FFmpegValidator.validate_ffmpeg_command(command) + assert validation["valid"] + assert validation["has_filter"] + + command_str = " ".join(command) + # 应该包含特效相关的滤镜 + assert "-filter_complex" in command + # 应该包含编码参数 + assert "-c:v" in command and "h264" in command + + def test_add_effects_single_effect(self, mock_config): + """测试添加单个特效""" + task = MockRenderTask() + task.effects = ["zoom:0,2.0,3.0"] + task.ext_data = {"posJson": "{}"} + + builder = FFmpegCommandBuilder(task, mock_config) + filter_args = [] + + result_input, result_index = builder._add_effects(filter_args, "[0:v]", 1) + + # 验证结果 + assert len(filter_args) > 0 # 应该有滤镜被添加 + assert result_input == "[v_eff1]" # 输出流应该更新 + assert result_index == 2 # 索引应该递增 + + def test_add_effects_multiple_effects(self, mock_config): + """测试添加多个特效""" + task = MockRenderTask() + task.effects = ["zoom:0,2.0,3.0", "ospeed:1.5"] + task.ext_data = {"posJson": "{}"} + + builder = FFmpegCommandBuilder(task, mock_config) + filter_args = [] + + result_input, result_index = builder._add_effects(filter_args, "[0:v]", 1) + + # 验证特效链 + assert len(filter_args) >= 2 # 应该有两个特效的滤镜 + assert result_input == "[v_eff2]" # 最终输出流 + assert result_index == 3 # 索引应该递增两次 + + def test_add_effects_invalid_effect(self, mock_config): + """测试添加无效特效""" + task = MockRenderTask() + task.effects = ["invalid_effect:params"] + + builder = FFmpegCommandBuilder(task, mock_config) + filter_args = [] + + result_input, result_index = builder._add_effects(filter_args, "[0:v]", 1) + + # 无效特效应该被忽略 + assert result_input == "[0:v]" # 输入流不变 + assert result_index == 1 # 索引不变 + + def test_add_effects_no_effects(self, mock_config): + """测试无特效情况""" + task = MockRenderTask() + task.effects = [] + + builder = FFmpegCommandBuilder(task, mock_config) + filter_args = [] + + result_input, result_index = builder._add_effects(filter_args, "[0:v]", 1) + + assert result_input == "[0:v]" # 输入流不变 + assert result_index == 1 # 索引不变 + assert len(filter_args) == 0 # 无滤镜添加 + + def test_build_command_copy_mode(self, simple_task, mock_config): + """测试构建复制模式命令""" + simple_task.input_files = ["single_file.mp4"] + + builder = FFmpegCommandBuilder(simple_task, mock_config) + command = builder.build_command() + + validation = FFmpegValidator.validate_ffmpeg_command(command) + assert validation["valid"] + + # 应该是复制模式 + command_str = " ".join(command) + assert "-c copy" in command_str + + def test_build_command_concat_mode(self, mock_config): + """测试构建拼接模式命令""" + task = MockRenderTask() + task.input_files = ["file1.mp4", "file2.mp4"] + task.effects = [] + + builder = FFmpegCommandBuilder(task, mock_config) + command = builder.build_command() + + validation = FFmpegValidator.validate_ffmpeg_command(command) + assert validation["valid"] + assert validation["has_filter"] + + # 应该包含拼接滤镜 + command_str = " ".join(command) + assert "concat=" in command_str + + def test_build_command_encode_mode(self, task_with_effects, mock_config): + """测试构建编码模式命令""" + builder = FFmpegCommandBuilder(task_with_effects, mock_config) + command = builder.build_command() + + validation = FFmpegValidator.validate_ffmpeg_command(command) + assert validation["valid"] + assert validation["has_filter"] + + # 应该包含编码参数和特效滤镜 + command_str = " ".join(command) + assert "-c:v h264" in command_str + + @patch('entity.effects.registry.get_processor') + def test_effect_processor_integration(self, mock_get_processor, mock_config): + """测试与特效处理器的集成""" + # 模拟特效处理器 + mock_processor = Mock() + mock_processor.frame_rate = 25 + mock_processor.generate_filter_args.return_value = ( + ["[0:v]zoompan=z=2.0:x=iw/2:y=ih/2:d=1[v_eff1]"], + "[v_eff1]" + ) + mock_get_processor.return_value = mock_processor + + task = MockRenderTask() + task.effects = ["zoom:0,2.0,3.0"] + task.frame_rate = 30 + + builder = FFmpegCommandBuilder(task, mock_config) + filter_args = [] + + builder._add_effects(filter_args, "[0:v]", 1) + + # 验证处理器被正确调用 + mock_processor.generate_filter_args.assert_called_once_with("[0:v]", 1) + assert mock_processor.frame_rate == 30 # 帧率应该被设置 + + def test_error_handling_missing_input(self, mock_config): + """测试缺少输入文件的错误处理""" + task = MockRenderTask() + task.input_files = [] + + builder = FFmpegCommandBuilder(task, mock_config) + + # 构建命令时应该处理错误情况 + # 具体的错误处理依赖于实现 + command = builder.build_command() + + # 验证至少返回了基本的ffmpeg命令结构 + assert isinstance(command, list) + assert len(command) > 0 \ No newline at end of file diff --git a/tests/test_integration/test_ffmpeg_execution.py b/tests/test_integration/test_ffmpeg_execution.py new file mode 100644 index 0000000..b55ba0c --- /dev/null +++ b/tests/test_integration/test_ffmpeg_execution.py @@ -0,0 +1,282 @@ +"""FFmpeg执行集成测试""" +import pytest +import subprocess +import tempfile +import os +from pathlib import Path + +from entity.ffmpeg_command_builder import FFmpegCommandBuilder +from entity.render_task import RenderTask +from config.settings import Config +from services.render_service import DefaultRenderService +from tests.utils.test_helpers import MockRenderTask, create_test_video_file + + +@pytest.mark.integration +class TestFFmpegExecution: + """FFmpeg执行集成测试""" + + @pytest.fixture + def test_config(self, temp_dir): + """测试配置""" + return Config( + encoder_args="-c:v libx264", + video_args="-preset ultrafast -crf 23", + template_dir=temp_dir, + api_endpoint="http://test.local", + access_key="test_key" + ) + + @pytest.fixture + def sample_video(self, temp_dir): + """创建测试视频文件""" + video_path = os.path.join(temp_dir, "test_input.mp4") + success = create_test_video_file(video_path, duration=3, resolution="320x240") + if not success: + pytest.skip("Cannot create test video file") + return video_path + + @pytest.fixture + def render_service(self, test_config): + """渲染服务实例""" + return DefaultRenderService() + + def test_simple_copy_execution(self, sample_video, temp_dir, test_config): + """测试简单复制执行""" + output_path = os.path.join(temp_dir, "copy_output.mp4") + + task = MockRenderTask() + task.input_files = [sample_video] + task.output_path = output_path + task.effects = [] + + builder = FFmpegCommandBuilder(task, test_config) + command = builder.build_command() + + # 执行命令 + try: + result = subprocess.run(command, capture_output=True, text=True, timeout=30) + assert result.returncode == 0, f"FFmpeg failed: {result.stderr}" + assert os.path.exists(output_path), "Output file was not created" + assert os.path.getsize(output_path) > 0, "Output file is empty" + except subprocess.TimeoutExpired: + pytest.fail("FFmpeg execution timed out") + + def test_zoom_effect_execution(self, sample_video, temp_dir, test_config): + """测试缩放特效执行""" + output_path = os.path.join(temp_dir, "zoom_output.mp4") + + task = MockRenderTask() + task.input_files = [sample_video] + task.output_path = output_path + task.effects = ["zoom:0,2.0,2.0"] # 2倍缩放,持续2秒 + task.ext_data = {"posJson": "{}"} + + builder = FFmpegCommandBuilder(task, test_config) + command = builder.build_command() + + try: + result = subprocess.run(command, capture_output=True, text=True, timeout=60) + assert result.returncode == 0, f"FFmpeg failed: {result.stderr}" + assert os.path.exists(output_path), "Output file was not created" + assert os.path.getsize(output_path) > 0, "Output file is empty" + except subprocess.TimeoutExpired: + pytest.fail("FFmpeg execution timed out") + + def test_speed_effect_execution(self, sample_video, temp_dir, test_config): + """测试变速特效执行""" + output_path = os.path.join(temp_dir, "speed_output.mp4") + + task = MockRenderTask() + task.input_files = [sample_video] + task.output_path = output_path + task.effects = ["ospeed:2.0"] # 2倍速 + task.ext_data = {} + + builder = FFmpegCommandBuilder(task, test_config) + command = builder.build_command() + + try: + result = subprocess.run(command, capture_output=True, text=True, timeout=60) + assert result.returncode == 0, f"FFmpeg failed: {result.stderr}" + assert os.path.exists(output_path), "Output file was not created" + assert os.path.getsize(output_path) > 0, "Output file is empty" + except subprocess.TimeoutExpired: + pytest.fail("FFmpeg execution timed out") + + def test_multiple_effects_execution(self, sample_video, temp_dir, test_config): + """测试多特效组合执行""" + output_path = os.path.join(temp_dir, "multi_effects_output.mp4") + + task = MockRenderTask() + task.input_files = [sample_video] + task.output_path = output_path + task.effects = ["zoom:0,1.5,1.0", "ospeed:1.5"] # 缩放+变速 + task.ext_data = {"posJson": "{}"} + + builder = FFmpegCommandBuilder(task, test_config) + command = builder.build_command() + + try: + result = subprocess.run(command, capture_output=True, text=True, timeout=60) + assert result.returncode == 0, f"FFmpeg failed: {result.stderr}" + assert os.path.exists(output_path), "Output file was not created" + assert os.path.getsize(output_path) > 0, "Output file is empty" + except subprocess.TimeoutExpired: + pytest.fail("FFmpeg execution timed out") + + def test_concat_execution(self, temp_dir, test_config): + """测试视频拼接执行""" + # 创建两个测试视频 + video1_path = os.path.join(temp_dir, "video1.mp4") + video2_path = os.path.join(temp_dir, "video2.mp4") + output_path = os.path.join(temp_dir, "concat_output.mp4") + + success1 = create_test_video_file(video1_path, duration=2, resolution="320x240") + success2 = create_test_video_file(video2_path, duration=2, resolution="320x240") + + if not (success1 and success2): + pytest.skip("Cannot create test video files") + + task = MockRenderTask() + task.input_files = [video1_path, video2_path] + task.output_path = output_path + task.effects = [] + + builder = FFmpegCommandBuilder(task, test_config) + command = builder.build_command() + + try: + result = subprocess.run(command, capture_output=True, text=True, timeout=60) + assert result.returncode == 0, f"FFmpeg failed: {result.stderr}" + assert os.path.exists(output_path), "Output file was not created" + assert os.path.getsize(output_path) > 0, "Output file is empty" + except subprocess.TimeoutExpired: + pytest.fail("FFmpeg execution timed out") + + def test_invalid_effect_execution(self, sample_video, temp_dir, test_config): + """测试无效特效的执行处理""" + output_path = os.path.join(temp_dir, "invalid_effect_output.mp4") + + task = MockRenderTask() + task.input_files = [sample_video] + task.output_path = output_path + task.effects = ["invalid_effect:params", "zoom:0,2.0,1.0"] # 混合有效和无效特效 + task.ext_data = {"posJson": "{}"} + + builder = FFmpegCommandBuilder(task, test_config) + command = builder.build_command() + + # 应该忽略无效特效,继续处理有效特效 + try: + result = subprocess.run(command, capture_output=True, text=True, timeout=60) + assert result.returncode == 0, f"FFmpeg failed: {result.stderr}" + assert os.path.exists(output_path), "Output file was not created" + except subprocess.TimeoutExpired: + pytest.fail("FFmpeg execution timed out") + + def test_render_service_integration(self, sample_video, temp_dir, test_config, render_service): + """测试渲染服务集成""" + output_path = os.path.join(temp_dir, "service_output.mp4") + + # 创建真实的RenderTask(不是Mock) + task_data = { + "task_id": "integration_test", + "template_id": "test_template", + "input_files": [sample_video], + "output_path": output_path, + "effects": ["zoom:0,1.8,2.0"], + "ext_data": {"posJson": "{}"}, + "frame_rate": 25 + } + + # 这里需要根据实际的RenderTask构造方法调整 + task = MockRenderTask(**task_data) + + # 使用渲染服务执行 + try: + # 这里的方法调用需要根据实际的渲染服务接口调整 + # success = render_service.render(task, test_config) + # assert success, "Render service failed" + + # 临时直接使用FFmpegCommandBuilder测试 + builder = FFmpegCommandBuilder(task, test_config) + command = builder.build_command() + result = subprocess.run(command, capture_output=True, text=True, timeout=60) + + assert result.returncode == 0, f"Render failed: {result.stderr}" + assert os.path.exists(output_path), "Output file was not created" + + except Exception as e: + pytest.fail(f"Render service integration failed: {e}") + + def test_error_handling_missing_input(self, temp_dir, test_config): + """测试缺失输入文件的错误处理""" + missing_file = os.path.join(temp_dir, "nonexistent.mp4") + output_path = os.path.join(temp_dir, "error_output.mp4") + + task = MockRenderTask() + task.input_files = [missing_file] + task.output_path = output_path + task.effects = [] + + builder = FFmpegCommandBuilder(task, test_config) + command = builder.build_command() + + # 应该失败,因为输入文件不存在 + result = subprocess.run(command, capture_output=True, text=True, timeout=30) + assert result.returncode != 0, "FFmpeg should fail with missing input file" + + def test_performance_multiple_effects(self, sample_video, temp_dir, test_config): + """测试多特效性能""" + output_path = os.path.join(temp_dir, "performance_output.mp4") + + task = MockRenderTask() + task.input_files = [sample_video] + task.output_path = output_path + # 多个特效组合 + task.effects = [ + "zoom:0,1.5,1.0", + "ospeed:1.2", + "zoom:1.5,2.0,1.0" + ] + task.ext_data = {"posJson": "{}"} + + builder = FFmpegCommandBuilder(task, test_config) + command = builder.build_command() + + import time + start_time = time.time() + + try: + result = subprocess.run(command, capture_output=True, text=True, timeout=120) + execution_time = time.time() - start_time + + assert result.returncode == 0, f"FFmpeg failed: {result.stderr}" + assert os.path.exists(output_path), "Output file was not created" + assert execution_time < 60, f"Execution took too long: {execution_time}s" + + except subprocess.TimeoutExpired: + pytest.fail("FFmpeg execution timed out") + + @pytest.mark.skipif(not os.environ.get('RUN_STRESS_TESTS'), reason="Stress tests disabled") + def test_stress_test_large_effects_chain(self, sample_video, temp_dir, test_config): + """压力测试:大量特效链""" + output_path = os.path.join(temp_dir, "stress_output.mp4") + + task = MockRenderTask() + task.input_files = [sample_video] + task.output_path = output_path + # 创建大量特效 + task.effects = [f"zoom:{i*0.5},1.{i+5},{i*0.2+0.5}" for i in range(10)] + task.ext_data = {"posJson": "{}"} + + builder = FFmpegCommandBuilder(task, test_config) + command = builder.build_command() + + try: + result = subprocess.run(command, capture_output=True, text=True, timeout=300) + assert result.returncode == 0, f"Stress test failed: {result.stderr}" + assert os.path.exists(output_path), "Output file was not created" + except subprocess.TimeoutExpired: + pytest.fail("Stress test timed out") \ No newline at end of file diff --git a/tests/utils/test_helpers.py b/tests/utils/test_helpers.py new file mode 100644 index 0000000..a6b4e3e --- /dev/null +++ b/tests/utils/test_helpers.py @@ -0,0 +1,202 @@ +"""测试辅助工具""" +import json +import tempfile +import subprocess +from typing import Dict, Any, List, Optional +from pathlib import Path + +from entity.render_task import RenderTask +from entity.effects.base import EffectProcessor +from config.settings import Config + + +class MockRenderTask: + """模拟渲染任务,用于测试""" + + def __init__( + self, + task_id: str = "test_task", + template_id: str = "test_template", + effects: List[str] = None, + ext_data: Dict[str, Any] = None, + frame_rate: int = 25, + output_path: str = "test_output.mp4" + ): + self.task_id = task_id + self.template_id = template_id + self.effects = effects or [] + self.ext_data = ext_data or {} + self.frame_rate = frame_rate + self.output_path = output_path + self.input_files = [] + self.overlays = [] + self.use_center_cut = False + self.use_zoom_cut = False + self.audio_file = None + + +class FFmpegValidator: + """FFmpeg命令验证器""" + + @staticmethod + def validate_filter_syntax(filter_str: str) -> bool: + """验证滤镜语法是否正确""" + try: + # 基本语法检查 + if not filter_str: + return False + + # 检查是否包含基本的滤镜结构 + if '[' in filter_str and ']' in filter_str: + return True + + # 检查常见的滤镜格式 + common_filters = ['zoompan', 'setpts', 'trim', 'scale', 'crop'] + return any(f in filter_str for f in common_filters) + + except Exception: + return False + + @staticmethod + def validate_stream_identifier(stream_id: str) -> bool: + """验证流标识符格式""" + if not stream_id: + return False + return stream_id.startswith('[') and stream_id.endswith(']') + + @staticmethod + def validate_ffmpeg_command(command: List[str]) -> Dict[str, Any]: + """验证完整的FFmpeg命令""" + result = { + "valid": False, + "has_input": False, + "has_output": False, + "has_filter": False, + "errors": [] + } + + if not command or command[0] != "ffmpeg": + result["errors"].append("Command must start with 'ffmpeg'") + return result + + # 检查输入文件 + if "-i" in command: + result["has_input"] = True + else: + result["errors"].append("No input file specified") + + # 检查输出文件 + if len(command) > 1 and not command[-1].startswith("-"): + result["has_output"] = True + else: + result["errors"].append("No output file specified") + + # 检查滤镜 + if "-filter_complex" in command or "-vf" in command: + result["has_filter"] = True + + result["valid"] = ( + result["has_input"] and + result["has_output"] and + len(result["errors"]) == 0 + ) + + return result + + +class EffectTestHelper: + """特效测试辅助类""" + + @staticmethod + def create_test_effect(effect_class, params: str = "", ext_data: Dict[str, Any] = None): + """创建测试用特效实例""" + return effect_class(params, ext_data) + + @staticmethod + def test_effect_params_validation(effect: EffectProcessor, test_cases: List[Dict[str, Any]]): + """批量测试特效参数验证""" + results = [] + for case in test_cases: + effect.params = case.get("params", "") + effect.ext_data = case.get("ext_data", {}) + + is_valid = effect.validate_params() + expected = case.get("expected", True) + + results.append({ + "params": effect.params, + "expected": expected, + "actual": is_valid, + "passed": is_valid == expected, + "description": case.get("description", "") + }) + + return results + + @staticmethod + def test_filter_generation(effect: EffectProcessor, video_input: str = "[0:v]", effect_index: int = 1): + """测试滤镜生成""" + try: + filters, output_stream = effect.generate_filter_args(video_input, effect_index) + + result = { + "success": True, + "filters": filters, + "output_stream": output_stream, + "filter_count": len(filters), + "valid_syntax": all(FFmpegValidator.validate_filter_syntax(f) for f in filters), + "valid_output": FFmpegValidator.validate_stream_identifier(output_stream) if output_stream != video_input else True + } + except Exception as e: + result = { + "success": False, + "error": str(e), + "filters": [], + "output_stream": "", + "filter_count": 0, + "valid_syntax": False, + "valid_output": False + } + + return result + + +def create_test_video_file(output_path: str, duration: int = 5, resolution: str = "640x480") -> bool: + """创建测试用视频文件""" + try: + cmd = [ + "ffmpeg", "-y", # 覆盖输出文件 + "-f", "lavfi", # 使用libavfilter输入 + "-i", f"testsrc=duration={duration}:size={resolution}:rate=25", + "-c:v", "libx264", + "-preset", "ultrafast", + "-crf", "23", + output_path + ] + + result = subprocess.run(cmd, capture_output=True, text=True) + return result.returncode == 0 + + except Exception: + return False + + +def create_sample_template_data() -> Dict[str, Any]: + """创建示例模板数据""" + return { + "templateId": "test_template_001", + "name": "测试模板", + "parts": [ + { + "id": "part1", + "type": "video", + "duration": 10.0, + "effects": ["zoom:0,2.0,3.0", "ospeed:1.5"] + } + ], + "settings": { + "width": 1920, + "height": 1080, + "frameRate": 25 + } + } \ No newline at end of file