This commit is contained in:
2025-09-24 09:21:03 +08:00
parent 6d37e7c23c
commit dfb07d679f
14 changed files with 2378 additions and 1 deletions

4
.gitignore vendored
View File

@@ -31,4 +31,6 @@ target/
.venv
venv/
cython_debug/
.env
.env
.serena
.claude

335
Jenkinsfile vendored Normal file
View File

@@ -0,0 +1,335 @@
pipeline {
agent any
environment {
// 环境变量
PYTHON_VERSION = '3.9'
VENV_NAME = 'venv'
TEST_REPORTS_DIR = 'test-reports'
COVERAGE_DIR = 'coverage-reports'
}
stages {
stage('Preparation') {
steps {
script {
// 清理工作空间
echo 'Cleaning workspace...'
deleteDir()
// 检出代码
checkout scm
// 创建报告目录
sh """
mkdir -p ${TEST_REPORTS_DIR}
mkdir -p ${COVERAGE_DIR}
"""
}
}
}
stage('Environment Setup') {
steps {
script {
echo 'Setting up Python environment...'
sh """
# 创建虚拟环境
python${PYTHON_VERSION} -m venv ${VENV_NAME}
# 激活虚拟环境并安装依赖
. ${VENV_NAME}/bin/activate
# 升级pip
pip install --upgrade pip
# 安装项目依赖
pip install -r requirements.txt
# 安装测试依赖
pip install -r requirements-test.txt
# 验证FFmpeg可用性
which ffmpeg || echo "Warning: FFmpeg not found, integration tests may be skipped"
"""
}
}
}
stage('Code Quality Check') {
parallel {
stage('Linting') {
steps {
script {
echo 'Running code linting...'
sh """
. ${VENV_NAME}/bin/activate
# 运行flake8检查
flake8 entity/ services/ --output-file=${TEST_REPORTS_DIR}/flake8-report.txt --tee || true
# 运行black格式检查
black --check --diff entity/ services/ > ${TEST_REPORTS_DIR}/black-report.txt || true
"""
}
}
post {
always {
// 发布lint报告
publishHTML([
allowMissing: false,
alwaysLinkToLastBuild: true,
keepAll: true,
reportDir: TEST_REPORTS_DIR,
reportFiles: 'flake8-report.txt,black-report.txt',
reportName: 'Code Quality Report'
])
}
}
}
stage('Type Checking') {
steps {
script {
echo 'Running type checking...'
sh """
. ${VENV_NAME}/bin/activate
# 运行mypy类型检查
mypy entity/ services/ --html-report ${TEST_REPORTS_DIR}/mypy-html --txt-report ${TEST_REPORTS_DIR}/mypy-txt || true
"""
}
}
}
}
}
stage('Unit Tests') {
steps {
script {
echo 'Running unit tests...'
sh """
. ${VENV_NAME}/bin/activate
# 运行单元测试
pytest tests/test_effects/ tests/test_ffmpeg_builder/ \\
--junitxml=${TEST_REPORTS_DIR}/unit-tests.xml \\
--html=${TEST_REPORTS_DIR}/unit-tests.html \\
--self-contained-html \\
--cov=entity \\
--cov=services \\
--cov-report=xml:${COVERAGE_DIR}/unit-coverage.xml \\
--cov-branch \\
-v \\
-m "not integration"
"""
}
}
post {
always {
// 发布单元测试结果
junit "${TEST_REPORTS_DIR}/unit-tests.xml"
// 发布HTML测试报告
publishHTML([
allowMissing: false,
alwaysLinkToLastBuild: true,
keepAll: true,
reportDir: TEST_REPORTS_DIR,
reportFiles: 'unit-tests.html',
reportName: 'Unit Tests Report'
])
}
}
}
stage('Integration Tests') {
when {
// 只在有FFmpeg的环境中运行集成测试
expression {
return sh(
script: 'which ffmpeg',
returnStatus: true
) == 0
}
}
steps {
script {
echo 'Running integration tests...'
sh """
. ${VENV_NAME}/bin/activate
# 运行集成测试
pytest tests/test_integration/ \\
--junitxml=${TEST_REPORTS_DIR}/integration-tests.xml \\
--html=${TEST_REPORTS_DIR}/integration-tests.html \\
--self-contained-html \\
--cov=entity \\
--cov=services \\
--cov-report=xml:${COVERAGE_DIR}/integration-coverage.xml \\
--cov-branch \\
--timeout=300 \\
-v \\
-m "integration"
"""
}
}
post {
always {
// 发布集成测试结果
junit "${TEST_REPORTS_DIR}/integration-tests.xml"
// 发布HTML集成测试报告
publishHTML([
allowMissing: false,
alwaysLinkToLastBuild: true,
keepAll: true,
reportDir: TEST_REPORTS_DIR,
reportFiles: 'integration-tests.html',
reportName: 'Integration Tests Report'
])
}
}
}
stage('Complete Test Suite') {
steps {
script {
echo 'Running complete test suite with coverage...'
sh """
. ${VENV_NAME}/bin/activate
# 运行完整测试套件
pytest tests/ \\
--junitxml=${TEST_REPORTS_DIR}/all-tests.xml \\
--html=${TEST_REPORTS_DIR}/all-tests.html \\
--self-contained-html \\
--cov=entity \\
--cov=services \\
--cov-report=xml:${COVERAGE_DIR}/coverage.xml \\
--cov-report=term-missing \\
--cov-branch \\
--cov-fail-under=70 \\
-v
"""
}
}
post {
always {
// 发布完整测试结果
junit "${TEST_REPORTS_DIR}/all-tests.xml"
// 发布代码覆盖率报告
publishCoverage([
adapters: [
[
mergeToOneReport: true,
path: "${COVERAGE_DIR}/coverage.xml"
]
],
sourceFileResolver: [
[
level: 'NEVER_STORE'
]
]
])
// 发布完整测试HTML报告
publishHTML([
allowMissing: false,
alwaysLinkToLastBuild: true,
keepAll: true,
reportDir: TEST_REPORTS_DIR,
reportFiles: 'all-tests.html',
reportName: 'Complete Test Report'
])
}
}
}
stage('Performance Tests') {
when {
// 只在主分支或发布分支运行性能测试
anyOf {
branch 'master'
branch 'main'
branch 'release/*'
}
}
steps {
script {
echo 'Running performance tests...'
sh """
. ${VENV_NAME}/bin/activate
# 运行性能测试
RUN_STRESS_TESTS=1 pytest tests/test_integration/test_ffmpeg_execution.py::TestFFmpegExecution::test_stress_test_large_effects_chain \\
--benchmark-json=${TEST_REPORTS_DIR}/benchmark.json \\
--html=${TEST_REPORTS_DIR}/performance-tests.html \\
--self-contained-html \\
-v \\
-m "stress" || true
"""
}
}
post {
always {
// 发布性能测试报告
publishHTML([
allowMissing: true,
alwaysLinkToLastBuild: true,
keepAll: true,
reportDir: TEST_REPORTS_DIR,
reportFiles: 'performance-tests.html',
reportName: 'Performance Tests Report'
])
}
}
}
}
post {
always {
// 清理虚拟环境
sh """
rm -rf ${VENV_NAME}
"""
// 归档测试报告
archiveArtifacts artifacts: "${TEST_REPORTS_DIR}/**/*", allowEmptyArchive: true
archiveArtifacts artifacts: "${COVERAGE_DIR}/**/*", allowEmptyArchive: true
}
success {
echo 'All tests passed successfully!'
// 发送成功通知
script {
if (env.BRANCH_NAME == 'master' || env.BRANCH_NAME == 'main') {
// 这里可以添加Slack、邮件或其他通知
echo 'Sending success notification...'
}
}
}
failure {
echo 'Tests failed!'
// 发送失败通知
script {
// 这里可以添加Slack、邮件或其他通知
echo 'Sending failure notification...'
}
}
unstable {
echo 'Tests completed with warnings!'
}
cleanup {
// 清理临时文件
deleteDir()
}
}
}

53
pytest.ini Normal file
View File

@@ -0,0 +1,53 @@
[tool:pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
# 标记定义
markers =
integration: marks tests as integration tests (may be slow)
unit: marks tests as unit tests (fast)
slow: marks tests as slow running
stress: marks tests as stress tests
# 输出配置
addopts =
-v
--tb=short
--strict-markers
--strict-config
--cov=entity
--cov=services
--cov-report=xml:coverage.xml
--cov-report=html:htmlcov
--cov-report=term-missing
--cov-branch
# 过滤警告
filterwarnings =
ignore::DeprecationWarning
ignore::PendingDeprecationWarning
# 最小覆盖率要求
[tool:coverage:run]
source = entity,services
omit =
*/tests/*
*/test_*
*/__pycache__/*
*/venv/*
*/env/*
[tool:coverage:report]
exclude_lines =
pragma: no cover
def __repr__
if self.debug:
if settings.DEBUG
raise AssertionError
raise NotImplementedError
if 0:
if __name__ == .__main__.:
class .*\bProtocol\):
@(abc\.)?abstractmethod

23
requirements-test.txt Normal file
View File

@@ -0,0 +1,23 @@
# 测试依赖
pytest>=7.0.0
pytest-cov>=4.0.0
pytest-mock>=3.10.0
pytest-xdist>=3.0.0 # 并行测试
pytest-timeout>=2.1.0
pytest-html>=3.1.0 # HTML报告
# 代码质量
flake8>=5.0.0
black>=22.0.0
mypy>=1.0.0
# 覆盖率报告
coverage[toml]>=6.0.0
# 性能测试
pytest-benchmark>=4.0.0
# 测试辅助
factory-boy>=3.2.0 # 测试数据工厂
freezegun>=1.2.0 # 时间模拟
responses>=0.22.0 # HTTP模拟

267
run_tests.py Normal file
View File

@@ -0,0 +1,267 @@
#!/usr/bin/env python
"""
测试运行器脚本
支持不同类型的测试运行和报告生成
"""
import os
import sys
import subprocess
import argparse
import tempfile
from pathlib import Path
def run_command(cmd, cwd=None):
"""运行命令并返回结果"""
print(f"Running: {' '.join(cmd)}")
try:
result = subprocess.run(
cmd,
cwd=cwd,
capture_output=True,
text=True,
check=False
)
if result.stdout:
print(result.stdout)
if result.stderr:
print(result.stderr, file=sys.stderr)
return result.returncode == 0
except Exception as e:
print(f"Error running command: {e}", file=sys.stderr)
return False
def check_dependencies():
"""检查依赖是否安装"""
print("Checking dependencies...")
# 检查pytest
if not run_command([sys.executable, "-c", "import pytest"]):
print("pytest not found. Installing test dependencies...")
if not run_command([sys.executable, "-m", "pip", "install", "-r", "requirements-test.txt"]):
print("Failed to install test dependencies", file=sys.stderr)
return False
# 检查FFmpeg
ffmpeg_available = run_command(["ffmpeg", "-version"])
if not ffmpeg_available:
print("Warning: FFmpeg not found. Integration tests may be skipped.")
return True
def run_unit_tests(args):
"""运行单元测试"""
print("\n=== Running Unit Tests ===")
cmd = [
sys.executable, "-m", "pytest",
"tests/test_effects/",
"tests/test_ffmpeg_builder/",
"-v",
"-m", "not integration"
]
if args.coverage:
cmd.extend([
"--cov=entity",
"--cov=services",
"--cov-report=xml:coverage.xml",
"--cov-report=html:htmlcov",
"--cov-report=term-missing",
"--cov-branch"
])
if args.xml_report:
cmd.extend(["--junitxml=unit-tests.xml"])
if args.html_report:
cmd.extend(["--html=unit-tests.html", "--self-contained-html"])
return run_command(cmd)
def run_integration_tests(args):
"""运行集成测试"""
print("\n=== Running Integration Tests ===")
# 检查FFmpeg
if not run_command(["ffmpeg", "-version"]):
print("FFmpeg not available, skipping integration tests")
return True
cmd = [
sys.executable, "-m", "pytest",
"tests/test_integration/",
"-v",
"-m", "integration",
"--timeout=300"
]
if args.coverage:
cmd.extend([
"--cov=entity",
"--cov=services",
"--cov-report=xml:integration-coverage.xml",
"--cov-report=html:integration-htmlcov",
"--cov-branch"
])
if args.xml_report:
cmd.extend(["--junitxml=integration-tests.xml"])
if args.html_report:
cmd.extend(["--html=integration-tests.html", "--self-contained-html"])
return run_command(cmd)
def run_all_tests(args):
"""运行所有测试"""
print("\n=== Running All Tests ===")
cmd = [
sys.executable, "-m", "pytest",
"tests/",
"-v"
]
if args.coverage:
cmd.extend([
"--cov=entity",
"--cov=services",
"--cov-report=xml:coverage.xml",
"--cov-report=html:htmlcov",
"--cov-report=term-missing",
"--cov-branch"
])
if args.fail_under:
cmd.extend([f"--cov-fail-under={args.fail_under}"])
if args.xml_report:
cmd.extend(["--junitxml=all-tests.xml"])
if args.html_report:
cmd.extend(["--html=all-tests.html", "--self-contained-html"])
return run_command(cmd)
def run_effect_tests(effect_name=None):
"""运行特定特效测试"""
if effect_name:
print(f"\n=== Running {effect_name} Effect Tests ===")
cmd = [
sys.executable, "-m", "pytest",
f"tests/test_effects/test_{effect_name}_effect.py",
"-v"
]
else:
print("\n=== Running All Effect Tests ===")
cmd = [
sys.executable, "-m", "pytest",
"tests/test_effects/",
"-v"
]
return run_command(cmd)
def run_stress_tests():
"""运行压力测试"""
print("\n=== Running Stress Tests ===")
env = os.environ.copy()
env['RUN_STRESS_TESTS'] = '1'
cmd = [
sys.executable, "-m", "pytest",
"tests/test_integration/",
"-v",
"-m", "stress",
"--timeout=600"
]
return subprocess.run(cmd, env=env).returncode == 0
def create_test_video():
"""创建测试视频文件"""
print("\n=== Creating Test Video Files ===")
if not run_command(["ffmpeg", "-version"]):
print("FFmpeg not available, cannot create test videos")
return False
test_data_dir = Path("tests/test_data/videos")
test_data_dir.mkdir(parents=True, exist_ok=True)
# 创建短视频文件
video_path = test_data_dir / "sample.mp4"
cmd = [
"ffmpeg", "-y",
"-f", "lavfi",
"-i", "testsrc=duration=5:size=640x480:rate=25",
"-c:v", "libx264",
"-preset", "ultrafast",
"-crf", "23",
str(video_path)
]
if run_command(cmd):
print(f"Created test video: {video_path}")
return True
else:
print("Failed to create test video")
return False
def main():
parser = argparse.ArgumentParser(description="RenderWorker Test Runner")
parser.add_argument("test_type", choices=[
"unit", "integration", "all", "effects", "stress", "setup"
], help="Type of tests to run")
parser.add_argument("--effect", help="Specific effect to test (for effects command)")
parser.add_argument("--coverage", action="store_true", help="Generate coverage report")
parser.add_argument("--xml-report", action="store_true", help="Generate XML test report")
parser.add_argument("--html-report", action="store_true", help="Generate HTML test report")
parser.add_argument("--fail-under", type=int, default=70, help="Minimum coverage percentage")
parser.add_argument("--no-deps-check", action="store_true", help="Skip dependency check")
args = parser.parse_args()
# 检查依赖
if not args.no_deps_check and not check_dependencies():
sys.exit(1)
# 运行相应的测试
success = True
if args.test_type == "unit":
success = run_unit_tests(args)
elif args.test_type == "integration":
success = run_integration_tests(args)
elif args.test_type == "all":
success = run_all_tests(args)
elif args.test_type == "effects":
success = run_effect_tests(args.effect)
elif args.test_type == "stress":
success = run_stress_tests()
elif args.test_type == "setup":
success = create_test_video()
if success:
print("\n✅ Tests completed successfully!")
if args.coverage:
print("📊 Coverage report generated in htmlcov/")
sys.exit(0)
else:
print("\n❌ Tests failed!")
sys.exit(1)
if __name__ == "__main__":
main()

313
tests/README.md Normal file
View File

@@ -0,0 +1,313 @@
# RenderWorker 测试文档
本目录包含 RenderWorker 项目的完整测试套件,用于验证特效参数生成和 FFmpeg 渲染功能。
## 目录结构
```
tests/
├── conftest.py # pytest 配置和公共 fixtures
├── test_data/ # 测试数据
│ ├── videos/ # 测试视频文件
│ ├── templates/ # 测试模板数据
│ └── expected_outputs/ # 预期输出结果
├── test_effects/ # 特效单元测试
│ ├── test_base.py # 基础类测试
│ ├── test_zoom_effect.py # 缩放特效测试
│ ├── test_speed_effect.py # 变速特效测试
│ └── ... # 其他特效测试
├── test_ffmpeg_builder/ # FFmpeg 命令构建测试
│ └── test_ffmpeg_command_builder.py
├── test_integration/ # 集成测试
│ └── test_ffmpeg_execution.py # FFmpeg 执行测试
└── utils/ # 测试工具
└── test_helpers.py # 测试辅助函数
```
## 快速开始
### 1. 安装测试依赖
```bash
pip install -r requirements-test.txt
```
### 2. 运行测试
使用测试运行器脚本:
```bash
# 运行所有测试
python run_tests.py all --coverage
# 只运行单元测试
python run_tests.py unit --coverage --html-report
# 只运行集成测试
python run_tests.py integration
# 运行特定特效测试
python run_tests.py effects --effect zoom
# 运行压力测试
python run_tests.py stress
```
或直接使用 pytest:
```bash
# 运行所有测试
pytest tests/ -v --cov=entity --cov=services
# 只运行单元测试
pytest tests/test_effects/ tests/test_ffmpeg_builder/ -v
# 只运行集成测试
pytest tests/test_integration/ -v -m integration
# 生成覆盖率报告
pytest tests/ --cov=entity --cov=services --cov-report=html
```
## 测试类型说明
### 单元测试 (Unit Tests)
位置:`tests/test_effects/`, `tests/test_ffmpeg_builder/`
测试内容:
- 各个特效处理器的参数验证
- FFmpeg 滤镜参数生成
- 特效注册表功能
- FFmpeg 命令构建逻辑
特点:
- 运行速度快
- 不依赖外部工具
- 测试覆盖率高
### 集成测试 (Integration Tests)
位置:`tests/test_integration/`
测试内容:
- 实际 FFmpeg 命令执行
- 视频文件处理验证
- 特效组合效果测试
- 错误处理验证
依赖:
- 需要系统安装 FFmpeg
- 需要测试视频文件
### 压力测试 (Stress Tests)
测试内容:
- 大量特效链处理
- 长时间运行稳定性
- 资源使用情况
运行条件:
- 设置环境变量 `RUN_STRESS_TESTS=1`
- 较长的超时时间
## 测试配置
### pytest 配置
`pytest.ini` 中配置:
- 测试路径和文件模式
- 覆盖率设置
- 标记定义
- 输出格式
### 环境变量
- `RUN_STRESS_TESTS`: 启用压力测试
- `FFMPEG_PATH`: 自定义 FFmpeg 路径
## 特效测试详解
### 缩放特效 (ZoomEffect)
测试用例:
- 有效参数验证:`"0,2.0,3.0"`(开始时间,缩放因子,持续时间)
- 无效参数处理:负值、非数字、参数不足
- 静态缩放:持续时间为 0
- 动态缩放:指定时间段内的缩放
- 位置 JSON 解析:自定义缩放中心点
生成滤镜格式:
```
# 静态缩放
[0:v]trim=start=0,zoompan=z=2.0:x=iw/2:y=ih/2:d=1[v_eff1]
# 动态缩放
[0:v]zoompan=z=if(between(t\,0\,3.0)\,2.0\,1):x=iw/2:y=ih/2:d=1[v_eff1]
```
### 变速特效 (SpeedEffect)
测试用例:
- 加速效果:`"2.0"`(2倍速)
- 减速效果:`"0.5"`(0.5倍速)
- 无效参数:零值、负值、非数字
- 默认处理:空参数或 1.0 倍速
生成滤镜格式:
```
[0:v]setpts=2.0*PTS[v_eff1]
```
## FFmpeg 命令构建测试
### 测试场景
1. **单文件复制**:直接复制无需编码
2. **多文件拼接**:使用 concat 滤镜
3. **特效处理**:复杂滤镜链构建
4. **错误处理**:缺失文件、无效参数
### 命令验证
使用 `FFmpegValidator` 验证:
- 命令结构完整性
- 输入输出文件存在
- 滤镜语法正确性
- 流标识符格式
## 集成测试详解
### 真实 FFmpeg 执行
测试流程:
1. 创建测试视频文件
2. 构建 FFmpeg 命令
3. 执行命令并检查返回码
4. 验证输出文件存在且有效
### 测试用例
- 简单复制操作
- 单特效处理
- 多特效组合
- 视频拼接
- 错误情况处理
### 性能测试
监控指标:
- 执行时间
- 内存使用
- 文件大小
- 处理速度
## 代码覆盖率
目标覆盖率:≥ 70%
覆盖范围:
- `entity/` 目录下的所有模块
- `services/` 目录下的所有模块
报告格式:
- XML 格式:用于 Jenkins CI/CD
- HTML 格式:用于本地查看
- 终端输出:实时查看缺失覆盖
## CI/CD 集成
### Jenkins Pipeline
配置文件:`Jenkinsfile`
阶段:
1. 环境准备
2. 代码质量检查
3. 单元测试
4. 集成测试
5. 完整测试套件
6. 性能测试
报告生成:
- JUnit XML 测试报告
- Cobertura 覆盖率报告
- HTML 测试和覆盖率报告
### 自动化触发
- 代码提交时运行单元测试
- PR 创建时运行完整测试
- 主分支更新时运行包含性能测试的完整套件
## 故障排除
### 常见问题
1. **FFmpeg 未找到**
```bash
# 安装 FFmpeg
sudo apt-get install ffmpeg # Ubuntu/Debian
brew install ffmpeg # macOS
```
2. **测试视频创建失败**
```bash
# 手动创建测试视频
python run_tests.py setup
```
3. **覆盖率过低**
- 检查是否有未测试的代码路径
- 添加边界条件测试
- 验证测试是否实际运行
4. **集成测试超时**
- 增加超时时间:`--timeout=600`
- 使用更小的测试文件
- 检查系统资源使用情况
### 调试技巧
1. **查看详细输出**:
```bash
pytest tests/ -v -s
```
2. **只运行失败的测试**:
```bash
pytest tests/ --lf
```
3. **停在第一个失败**:
```bash
pytest tests/ -x
```
4. **查看覆盖率详情**:
```bash
pytest tests/ --cov=entity --cov-report=term-missing
```
## 贡献指南
### 添加新测试
1. 在相应目录创建测试文件
2. 使用描述性的测试函数名
3. 添加适当的测试标记
4. 更新文档说明
### 测试最佳实践
1. **独立性**:每个测试应该独立运行
2. **可重复性**:测试结果应该一致
3. **清晰性**:测试意图应该明确
4. **完整性**:覆盖正常和异常情况
### 代码质量
- 遵循 PEP 8 代码风格
- 使用类型注解
- 添加适当的文档字符串
- 使用有意义的变量名

90
tests/conftest.py Normal file
View File

@@ -0,0 +1,90 @@
"""pytest配置文件"""
import pytest
import os
import tempfile
import shutil
from pathlib import Path
from typing import Dict, Any
from entity.render_task import RenderTask
from config.settings import Config
@pytest.fixture
def temp_dir():
"""创建临时目录"""
temp_path = tempfile.mkdtemp()
yield temp_path
shutil.rmtree(temp_path, ignore_errors=True)
@pytest.fixture
def test_config():
"""测试用配置"""
return Config(
encoder_args="-c:v h264",
video_args="",
template_dir="tests/test_data/templates",
api_endpoint="http://test.local",
access_key="test_key"
)
@pytest.fixture
def sample_render_task():
"""示例渲染任务"""
return RenderTask(
task_id="test_task_001",
template_id="test_template",
output_path="test_output.mp4",
frame_rate=25,
effects=["zoom:0,2.0,3.0", "ospeed:1.5"],
ext_data={
"posJson": '{"ltX": 100, "ltY": 100, "rbX": 200, "rbY": 200, "imgWidth": 300, "imgHeight": 300}'
}
)
@pytest.fixture
def sample_video_file(temp_dir):
"""创建测试用的样本视频文件路径"""
video_path = os.path.join(temp_dir, "sample.mp4")
# 这里只返回路径,实际测试中可能需要真实视频文件
return video_path
@pytest.fixture(scope="session")
def ffmpeg_available():
"""检查FFmpeg是否可用"""
import subprocess
try:
subprocess.run(["ffmpeg", "-version"], capture_output=True, check=True)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
return False
@pytest.fixture
def mock_ext_data():
"""模拟扩展数据"""
return {
"posJson": '{"ltX": 50, "ltY": 50, "rbX": 150, "rbY": 150, "imgWidth": 200, "imgHeight": 200}',
"templateData": {"width": 1920, "height": 1080}
}
# 标记:只在FFmpeg可用时运行集成测试
def pytest_collection_modifyitems(config, items):
"""根据FFmpeg可用性修改测试收集"""
try:
import subprocess
subprocess.run(["ffmpeg", "-version"], capture_output=True, check=True)
ffmpeg_available = True
except (subprocess.CalledProcessError, FileNotFoundError):
ffmpeg_available = False
if not ffmpeg_available:
skip_ffmpeg = pytest.mark.skip(reason="FFmpeg not available")
for item in items:
if "integration" in item.keywords:
item.add_marker(skip_ffmpeg)

15
tests/test_data/README.md Normal file
View File

@@ -0,0 +1,15 @@
# 测试数据说明
这个目录包含测试所需的样本数据:
## 目录结构
- `videos/` - 测试用视频文件
- `templates/` - 测试用模板数据
- `expected_outputs/` - 预期输出结果
## 使用说明
- 小尺寸视频文件用于快速测试
- 模板文件包含各种特效配置
- 预期输出用于验证测试结果
注意:实际的视频文件可能较大,建议使用小尺寸测试文件。

View File

@@ -0,0 +1,188 @@
"""测试特效基础类和注册表"""
import pytest
from unittest.mock import Mock
from entity.effects.base import EffectProcessor, EffectRegistry
class TestEffectProcessor:
"""测试特效处理器基类"""
def test_init(self):
"""测试初始化"""
processor = Mock(spec=EffectProcessor)
processor.__init__("test_params", {"key": "value"})
assert processor.params == "test_params"
assert processor.ext_data == {"key": "value"}
assert processor.frame_rate == 25
def test_init_default_ext_data(self):
"""测试默认扩展数据"""
processor = Mock(spec=EffectProcessor)
processor.__init__("test_params")
assert processor.ext_data == {}
def test_parse_params_empty(self):
"""测试解析空参数"""
processor = EffectProcessor("", {})
result = processor.parse_params()
assert result == []
def test_parse_params_single(self):
"""测试解析单个参数"""
processor = EffectProcessor("123", {})
result = processor.parse_params()
assert result == ["123"]
def test_parse_params_multiple(self):
"""测试解析多个参数"""
processor = EffectProcessor("1,2.5,test", {})
result = processor.parse_params()
assert result == ["1", "2.5", "test"]
def test_get_pos_json_empty(self):
"""测试获取空位置JSON"""
processor = EffectProcessor("", {})
result = processor.get_pos_json()
assert result == {}
def test_get_pos_json_valid(self):
"""测试获取有效位置JSON"""
ext_data = {
"posJson": '{"ltX": 100, "ltY": 200, "rbX": 300, "rbY": 400}'
}
processor = EffectProcessor("", ext_data)
result = processor.get_pos_json()
expected = {"ltX": 100, "ltY": 200, "rbX": 300, "rbY": 400}
assert result == expected
def test_get_pos_json_invalid(self):
"""测试获取无效位置JSON"""
ext_data = {"posJson": "invalid_json"}
processor = EffectProcessor("", ext_data)
result = processor.get_pos_json()
assert result == {}
def test_get_pos_json_default_value(self):
"""测试默认位置JSON值"""
ext_data = {"posJson": "{}"}
processor = EffectProcessor("", ext_data)
result = processor.get_pos_json()
assert result == {}
class TestEffectRegistry:
"""测试特效注册表"""
def test_init(self):
"""测试初始化"""
registry = EffectRegistry()
assert registry._processors == {}
def test_register_valid_processor(self):
"""测试注册有效处理器"""
registry = EffectRegistry()
class TestEffect(EffectProcessor):
def validate_params(self):
return True
def generate_filter_args(self, video_input, effect_index):
return [], video_input
def get_effect_name(self):
return "test"
registry.register("test_effect", TestEffect)
assert "test_effect" in registry._processors
assert registry._processors["test_effect"] == TestEffect
def test_register_invalid_processor(self):
"""测试注册无效处理器"""
registry = EffectRegistry()
class InvalidEffect:
pass
with pytest.raises(ValueError, match="must be a subclass of EffectProcessor"):
registry.register("invalid", InvalidEffect)
def test_get_processor_exists(self):
"""测试获取存在的处理器"""
registry = EffectRegistry()
class TestEffect(EffectProcessor):
def validate_params(self):
return True
def generate_filter_args(self, video_input, effect_index):
return [], video_input
def get_effect_name(self):
return "test"
registry.register("test_effect", TestEffect)
processor = registry.get_processor("test_effect", "params", {"data": "value"})
assert isinstance(processor, TestEffect)
assert processor.params == "params"
assert processor.ext_data == {"data": "value"}
def test_get_processor_not_exists(self):
"""测试获取不存在的处理器"""
registry = EffectRegistry()
processor = registry.get_processor("nonexistent")
assert processor is None
def test_list_effects_empty(self):
"""测试列出空特效"""
registry = EffectRegistry()
effects = registry.list_effects()
assert effects == []
def test_list_effects_with_processors(self):
"""测试列出已注册特效"""
registry = EffectRegistry()
class TestEffect(EffectProcessor):
def validate_params(self):
return True
def generate_filter_args(self, video_input, effect_index):
return [], video_input
def get_effect_name(self):
return "test"
registry.register("effect1", TestEffect)
registry.register("effect2", TestEffect)
effects = registry.list_effects()
assert set(effects) == {"effect1", "effect2"}
def test_parse_effect_string_with_params(self):
"""测试解析带参数的特效字符串"""
registry = EffectRegistry()
name, params = registry.parse_effect_string("zoom:0,2.0,3.0")
assert name == "zoom"
assert params == "0,2.0,3.0"
def test_parse_effect_string_without_params(self):
"""测试解析无参数的特效字符串"""
registry = EffectRegistry()
name, params = registry.parse_effect_string("zoom")
assert name == "zoom"
assert params == ""
def test_parse_effect_string_multiple_colons(self):
"""测试解析多个冒号的特效字符串"""
registry = EffectRegistry()
name, params = registry.parse_effect_string("effect:param1:param2")
assert name == "effect"
assert params == "param1:param2"

View File

@@ -0,0 +1,169 @@
"""测试变速特效"""
import pytest
from entity.effects.speed import SpeedEffect
from tests.utils.test_helpers import EffectTestHelper
class TestSpeedEffect:
"""测试变速特效处理器"""
def test_validate_params_valid_cases(self):
"""测试有效参数验证"""
test_cases = [
{
"params": "2.0",
"expected": True,
"description": "2倍速"
},
{
"params": "0.5",
"expected": True,
"description": "0.5倍速(慢速)"
},
{
"params": "1.0",
"expected": True,
"description": "正常速度"
},
{
"params": "10.0",
"expected": True,
"description": "10倍速"
},
{
"params": "",
"expected": True,
"description": "空参数(默认不变速)"
}
]
effect = SpeedEffect()
results = EffectTestHelper.test_effect_params_validation(effect, test_cases)
for result in results:
assert result["passed"], f"Failed case: {result['description']} - {result}"
def test_validate_params_invalid_cases(self):
"""测试无效参数验证"""
test_cases = [
{
"params": "0",
"expected": False,
"description": "零速度"
},
{
"params": "-1.0",
"expected": False,
"description": "负速度"
},
{
"params": "abc",
"expected": False,
"description": "非数字参数"
},
{
"params": "1.0,2.0",
"expected": False,
"description": "多余参数"
}
]
effect = SpeedEffect()
results = EffectTestHelper.test_effect_params_validation(effect, test_cases)
for result in results:
assert result["passed"], f"Failed case: {result['description']} - {result}"
def test_generate_filter_args_speed_change(self):
"""测试变速滤镜生成"""
effect = SpeedEffect("2.0") # 2倍速
result = EffectTestHelper.test_filter_generation(effect, "[0:v]", 1)
assert result["success"], f"Filter generation failed: {result.get('error')}"
assert result["filter_count"] == 1
assert result["valid_syntax"]
assert result["output_stream"] == "[v_eff1]"
# 检查滤镜内容
filter_str = result["filters"][0]
assert "setpts=2.0*PTS" in filter_str
assert "[0:v]" in filter_str
assert "[v_eff1]" in filter_str
def test_generate_filter_args_slow_motion(self):
"""测试慢动作滤镜生成"""
effect = SpeedEffect("0.5") # 0.5倍速(慢动作)
result = EffectTestHelper.test_filter_generation(effect, "[input]", 3)
assert result["success"]
assert result["filter_count"] == 1
assert result["valid_syntax"]
assert result["output_stream"] == "[v_eff3]"
filter_str = result["filters"][0]
assert "setpts=0.5*PTS" in filter_str
assert "[input]" in filter_str
assert "[v_eff3]" in filter_str
def test_generate_filter_args_no_change(self):
"""测试无变速效果"""
test_cases = [
{"params": "", "description": "空参数"},
{"params": "1", "description": "1倍速"},
{"params": "1.0", "description": "1.0倍速"}
]
for case in test_cases:
effect = SpeedEffect(case["params"])
result = EffectTestHelper.test_filter_generation(effect, "[0:v]", 1)
assert result["success"], f"Failed for {case['description']}"
assert result["filter_count"] == 0, f"Should not generate filter for {case['description']}"
assert result["output_stream"] == "[0:v]", f"Output should equal input for {case['description']}"
def test_generate_filter_args_invalid_params(self):
"""测试无效参数的滤镜生成"""
effect = SpeedEffect("invalid")
result = EffectTestHelper.test_filter_generation(effect, "[0:v]", 1)
assert result["success"]
assert result["filter_count"] == 0
assert result["output_stream"] == "[0:v]"
def test_get_effect_name(self):
"""测试获取特效名称"""
effect = SpeedEffect()
assert effect.get_effect_name() == "ospeed"
def test_various_speed_factors(self):
"""测试各种速度因子"""
speed_factors = ["0.1", "0.25", "0.75", "1.5", "3.0", "5.0"]
for speed in speed_factors:
effect = SpeedEffect(speed)
result = EffectTestHelper.test_filter_generation(effect, "[test]", 10)
if speed == "1.0":
# 1倍速不应该生成滤镜
assert result["filter_count"] == 0
assert result["output_stream"] == "[test]"
else:
# 其他速度应该生成滤镜
assert result["success"], f"Failed for speed {speed}"
assert result["filter_count"] == 1
assert result["valid_syntax"]
assert f"setpts={speed}*PTS" in result["filters"][0]
def test_effect_chaining(self):
"""测试特效链式处理"""
# 模拟在特效链中的使用
effect = SpeedEffect("2.0")
# 第一个特效
result1 = EffectTestHelper.test_filter_generation(effect, "[0:v]", 1)
assert result1["output_stream"] == "[v_eff1]"
# 作为链中的第二个特效
result2 = EffectTestHelper.test_filter_generation(effect, "[v_eff1]", 2)
assert result2["output_stream"] == "[v_eff2]"
assert "[v_eff1]" in result2["filters"][0]

View File

@@ -0,0 +1,194 @@
"""测试缩放特效"""
import pytest
from entity.effects.zoom import ZoomEffect
from tests.utils.test_helpers import EffectTestHelper, FFmpegValidator
class TestZoomEffect:
"""测试缩放特效处理器"""
def test_validate_params_valid_cases(self):
"""测试有效参数验证"""
test_cases = [
{
"params": "0,2.0,3.0",
"expected": True,
"description": "标准缩放参数"
},
{
"params": "1.5,1.5,0",
"expected": True,
"description": "静态缩放(duration=0)"
},
{
"params": "0,1.0,5.0",
"expected": True,
"description": "无缩放效果(factor=1.0)"
},
{
"params": "10,0.5,2.0",
"expected": True,
"description": "缩小效果"
}
]
effect = ZoomEffect()
results = EffectTestHelper.test_effect_params_validation(effect, test_cases)
for result in results:
assert result["passed"], f"Failed case: {result['description']} - {result}"
def test_validate_params_invalid_cases(self):
"""测试无效参数验证"""
test_cases = [
{
"params": "",
"expected": False,
"description": "空参数"
},
{
"params": "1,2",
"expected": False,
"description": "参数不足"
},
{
"params": "-1,2.0,3.0",
"expected": False,
"description": "负开始时间"
},
{
"params": "0,0,3.0",
"expected": False,
"description": "零缩放因子"
},
{
"params": "0,-2.0,3.0",
"expected": False,
"description": "负缩放因子"
},
{
"params": "0,2.0,-1.0",
"expected": False,
"description": "负持续时间"
},
{
"params": "abc,2.0,3.0",
"expected": False,
"description": "非数字参数"
}
]
effect = ZoomEffect()
results = EffectTestHelper.test_effect_params_validation(effect, test_cases)
for result in results:
assert result["passed"], f"Failed case: {result['description']} - {result}"
def test_generate_filter_args_static_zoom(self):
"""测试静态缩放滤镜生成"""
effect = ZoomEffect("0,2.0,0") # duration=0表示静态缩放
result = EffectTestHelper.test_filter_generation(effect, "[0:v]", 1)
assert result["success"], f"Filter generation failed: {result.get('error')}"
assert result["filter_count"] == 1
assert result["valid_syntax"]
assert result["output_stream"] == "[v_eff1]"
# 检查滤镜内容
filter_str = result["filters"][0]
assert "trim=start=0" in filter_str
assert "zoompan=z=2.0" in filter_str
assert "[v_eff1]" in filter_str
def test_generate_filter_args_dynamic_zoom(self):
"""测试动态缩放滤镜生成"""
effect = ZoomEffect("1.0,1.5,2.0") # 从1秒开始,持续2秒的1.5倍缩放
result = EffectTestHelper.test_filter_generation(effect, "[0:v]", 2)
assert result["success"], f"Filter generation failed: {result.get('error')}"
assert result["filter_count"] == 1
assert result["valid_syntax"]
assert result["output_stream"] == "[v_eff2]"
# 检查滤镜内容
filter_str = result["filters"][0]
assert "zoompan=z=" in filter_str
assert "between(t\\\\,1.0\\\\,3.0)" in filter_str # 检查时间范围
assert "[v_eff2]" in filter_str
def test_generate_filter_args_no_zoom(self):
"""测试无缩放效果(factor=1.0)"""
effect = ZoomEffect("0,1.0,3.0") # 缩放因子为1.0,应该不生成滤镜
result = EffectTestHelper.test_filter_generation(effect, "[0:v]", 1)
assert result["success"]
assert result["filter_count"] == 0
assert result["output_stream"] == "[0:v]" # 输出应该等于输入
def test_generate_filter_args_invalid_params(self):
"""测试无效参数的滤镜生成"""
effect = ZoomEffect("invalid,params")
result = EffectTestHelper.test_filter_generation(effect, "[0:v]", 1)
assert result["success"]
assert result["filter_count"] == 0
assert result["output_stream"] == "[0:v]" # 无效参数时应该返回原输入
def test_get_zoom_center_default(self):
"""测试默认缩放中心点"""
effect = ZoomEffect("0,2.0,3.0")
center_x, center_y = effect._get_zoom_center()
assert center_x == "iw/2"
assert center_y == "ih/2"
def test_get_zoom_center_with_pos_json(self):
"""测试基于posJson的缩放中心点"""
ext_data = {
"posJson": '{"ltX": 100, "ltY": 100, "rbX": 200, "rbY": 200, "imgWidth": 400, "imgHeight": 300}'
}
effect = ZoomEffect("0,2.0,3.0", ext_data)
center_x, center_y = effect._get_zoom_center()
# 中心点应该是矩形的中心
# center_x_ratio = (100 + 200) / (2 * 400) = 0.375
# center_y_ratio = (100 + 200) / (2 * 300) = 0.5
assert "iw*0.375" in center_x
assert "ih*0.5" in center_y
def test_get_zoom_center_invalid_pos_json(self):
"""测试无效posJson时的缩放中心点"""
ext_data = {
"posJson": '{"imgWidth": 0, "imgHeight": 0}' # 无效尺寸
}
effect = ZoomEffect("0,2.0,3.0", ext_data)
center_x, center_y = effect._get_zoom_center()
# 应该回退到默认中心点
assert center_x == "iw/2"
assert center_y == "ih/2"
def test_get_effect_name(self):
"""测试获取特效名称"""
effect = ZoomEffect()
assert effect.get_effect_name() == "zoom"
def test_complex_zoom_scenario(self):
"""测试复杂缩放场景"""
# 带有复杂posJson数据的动态缩放
ext_data = {
"posJson": '{"ltX": 50, "ltY": 75, "rbX": 350, "rbY": 225, "imgWidth": 400, "imgHeight": 300}'
}
effect = ZoomEffect("2.5,3.0,1.5", ext_data)
result = EffectTestHelper.test_filter_generation(effect, "[input]", 5)
assert result["success"]
assert result["filter_count"] == 1
assert result["valid_syntax"]
assert result["output_stream"] == "[v_eff5]"
filter_str = result["filters"][0]
assert "zoompan=z=" in filter_str
assert "between(t\\\\,2.5\\\\,4.0)" in filter_str # 2.5 + 1.5 = 4.0
assert "[input]" in filter_str
assert "[v_eff5]" in filter_str

View File

@@ -0,0 +1,244 @@
"""测试FFmpeg命令构建器"""
import pytest
from unittest.mock import Mock, patch
from entity.ffmpeg_command_builder import FFmpegCommandBuilder
from entity.render_task import RenderTask
from config.settings import Config
from tests.utils.test_helpers import MockRenderTask, FFmpegValidator
class TestFFmpegCommandBuilder:
"""测试FFmpeg命令构建器"""
@pytest.fixture
def mock_config(self):
"""模拟配置"""
return Config(
encoder_args="-c:v h264",
video_args="-preset fast",
template_dir="tests/test_data/templates",
api_endpoint="http://test.local",
access_key="test_key"
)
@pytest.fixture
def simple_task(self):
"""简单的渲染任务"""
task = MockRenderTask()
task.input_files = ["input1.mp4", "input2.mp4"]
task.output_path = "output.mp4"
task.effects = []
return task
@pytest.fixture
def task_with_effects(self):
"""带特效的渲染任务"""
task = MockRenderTask()
task.input_files = ["input.mp4"]
task.output_path = "output.mp4"
task.effects = ["zoom:0,2.0,3.0", "ospeed:1.5"]
task.ext_data = {
"posJson": '{"ltX": 100, "ltY": 100, "rbX": 200, "rbY": 200, "imgWidth": 300, "imgHeight": 300}'
}
return task
def test_init(self, simple_task, mock_config):
"""测试初始化"""
builder = FFmpegCommandBuilder(simple_task, mock_config)
assert builder.task == simple_task
assert builder.config == mock_config
def test_build_copy_command(self, simple_task, mock_config):
"""测试构建复制命令"""
builder = FFmpegCommandBuilder(simple_task, mock_config)
command = builder._build_copy_command()
# 验证命令结构
validation = FFmpegValidator.validate_ffmpeg_command(command)
assert validation["valid"], f"Invalid command: {validation['errors']}"
assert validation["has_input"]
assert validation["has_output"]
# 验证具体内容
assert command[0] == "ffmpeg"
assert "-i" in command
assert "input1.mp4" in command
assert "output.mp4" in command
assert "-c" in command and "copy" in command
def test_build_concat_command_multiple_files(self, mock_config):
"""测试构建多文件拼接命令"""
task = MockRenderTask()
task.input_files = ["file1.mp4", "file2.mp4", "file3.mp4"]
task.output_path = "concat_output.mp4"
builder = FFmpegCommandBuilder(task, mock_config)
command = builder._build_concat_command()
validation = FFmpegValidator.validate_ffmpeg_command(command)
assert validation["valid"]
assert validation["has_filter"]
# 验证拼接相关参数
assert "concat=n=3:v=1:a=1" in " ".join(command)
assert all(f"file{i}.mp4" in command for i in range(1, 4))
def test_build_encode_command_with_effects(self, task_with_effects, mock_config):
"""测试构建带特效的编码命令"""
builder = FFmpegCommandBuilder(task_with_effects, mock_config)
command = builder._build_encode_command()
validation = FFmpegValidator.validate_ffmpeg_command(command)
assert validation["valid"]
assert validation["has_filter"]
command_str = " ".join(command)
# 应该包含特效相关的滤镜
assert "-filter_complex" in command
# 应该包含编码参数
assert "-c:v" in command and "h264" in command
def test_add_effects_single_effect(self, mock_config):
"""测试添加单个特效"""
task = MockRenderTask()
task.effects = ["zoom:0,2.0,3.0"]
task.ext_data = {"posJson": "{}"}
builder = FFmpegCommandBuilder(task, mock_config)
filter_args = []
result_input, result_index = builder._add_effects(filter_args, "[0:v]", 1)
# 验证结果
assert len(filter_args) > 0 # 应该有滤镜被添加
assert result_input == "[v_eff1]" # 输出流应该更新
assert result_index == 2 # 索引应该递增
def test_add_effects_multiple_effects(self, mock_config):
"""测试添加多个特效"""
task = MockRenderTask()
task.effects = ["zoom:0,2.0,3.0", "ospeed:1.5"]
task.ext_data = {"posJson": "{}"}
builder = FFmpegCommandBuilder(task, mock_config)
filter_args = []
result_input, result_index = builder._add_effects(filter_args, "[0:v]", 1)
# 验证特效链
assert len(filter_args) >= 2 # 应该有两个特效的滤镜
assert result_input == "[v_eff2]" # 最终输出流
assert result_index == 3 # 索引应该递增两次
def test_add_effects_invalid_effect(self, mock_config):
"""测试添加无效特效"""
task = MockRenderTask()
task.effects = ["invalid_effect:params"]
builder = FFmpegCommandBuilder(task, mock_config)
filter_args = []
result_input, result_index = builder._add_effects(filter_args, "[0:v]", 1)
# 无效特效应该被忽略
assert result_input == "[0:v]" # 输入流不变
assert result_index == 1 # 索引不变
def test_add_effects_no_effects(self, mock_config):
"""测试无特效情况"""
task = MockRenderTask()
task.effects = []
builder = FFmpegCommandBuilder(task, mock_config)
filter_args = []
result_input, result_index = builder._add_effects(filter_args, "[0:v]", 1)
assert result_input == "[0:v]" # 输入流不变
assert result_index == 1 # 索引不变
assert len(filter_args) == 0 # 无滤镜添加
def test_build_command_copy_mode(self, simple_task, mock_config):
"""测试构建复制模式命令"""
simple_task.input_files = ["single_file.mp4"]
builder = FFmpegCommandBuilder(simple_task, mock_config)
command = builder.build_command()
validation = FFmpegValidator.validate_ffmpeg_command(command)
assert validation["valid"]
# 应该是复制模式
command_str = " ".join(command)
assert "-c copy" in command_str
def test_build_command_concat_mode(self, mock_config):
"""测试构建拼接模式命令"""
task = MockRenderTask()
task.input_files = ["file1.mp4", "file2.mp4"]
task.effects = []
builder = FFmpegCommandBuilder(task, mock_config)
command = builder.build_command()
validation = FFmpegValidator.validate_ffmpeg_command(command)
assert validation["valid"]
assert validation["has_filter"]
# 应该包含拼接滤镜
command_str = " ".join(command)
assert "concat=" in command_str
def test_build_command_encode_mode(self, task_with_effects, mock_config):
"""测试构建编码模式命令"""
builder = FFmpegCommandBuilder(task_with_effects, mock_config)
command = builder.build_command()
validation = FFmpegValidator.validate_ffmpeg_command(command)
assert validation["valid"]
assert validation["has_filter"]
# 应该包含编码参数和特效滤镜
command_str = " ".join(command)
assert "-c:v h264" in command_str
@patch('entity.effects.registry.get_processor')
def test_effect_processor_integration(self, mock_get_processor, mock_config):
"""测试与特效处理器的集成"""
# 模拟特效处理器
mock_processor = Mock()
mock_processor.frame_rate = 25
mock_processor.generate_filter_args.return_value = (
["[0:v]zoompan=z=2.0:x=iw/2:y=ih/2:d=1[v_eff1]"],
"[v_eff1]"
)
mock_get_processor.return_value = mock_processor
task = MockRenderTask()
task.effects = ["zoom:0,2.0,3.0"]
task.frame_rate = 30
builder = FFmpegCommandBuilder(task, mock_config)
filter_args = []
builder._add_effects(filter_args, "[0:v]", 1)
# 验证处理器被正确调用
mock_processor.generate_filter_args.assert_called_once_with("[0:v]", 1)
assert mock_processor.frame_rate == 30 # 帧率应该被设置
def test_error_handling_missing_input(self, mock_config):
"""测试缺少输入文件的错误处理"""
task = MockRenderTask()
task.input_files = []
builder = FFmpegCommandBuilder(task, mock_config)
# 构建命令时应该处理错误情况
# 具体的错误处理依赖于实现
command = builder.build_command()
# 验证至少返回了基本的ffmpeg命令结构
assert isinstance(command, list)
assert len(command) > 0

View File

@@ -0,0 +1,282 @@
"""FFmpeg执行集成测试"""
import pytest
import subprocess
import tempfile
import os
from pathlib import Path
from entity.ffmpeg_command_builder import FFmpegCommandBuilder
from entity.render_task import RenderTask
from config.settings import Config
from services.render_service import DefaultRenderService
from tests.utils.test_helpers import MockRenderTask, create_test_video_file
@pytest.mark.integration
class TestFFmpegExecution:
"""FFmpeg执行集成测试"""
@pytest.fixture
def test_config(self, temp_dir):
"""测试配置"""
return Config(
encoder_args="-c:v libx264",
video_args="-preset ultrafast -crf 23",
template_dir=temp_dir,
api_endpoint="http://test.local",
access_key="test_key"
)
@pytest.fixture
def sample_video(self, temp_dir):
"""创建测试视频文件"""
video_path = os.path.join(temp_dir, "test_input.mp4")
success = create_test_video_file(video_path, duration=3, resolution="320x240")
if not success:
pytest.skip("Cannot create test video file")
return video_path
@pytest.fixture
def render_service(self, test_config):
"""渲染服务实例"""
return DefaultRenderService()
def test_simple_copy_execution(self, sample_video, temp_dir, test_config):
"""测试简单复制执行"""
output_path = os.path.join(temp_dir, "copy_output.mp4")
task = MockRenderTask()
task.input_files = [sample_video]
task.output_path = output_path
task.effects = []
builder = FFmpegCommandBuilder(task, test_config)
command = builder.build_command()
# 执行命令
try:
result = subprocess.run(command, capture_output=True, text=True, timeout=30)
assert result.returncode == 0, f"FFmpeg failed: {result.stderr}"
assert os.path.exists(output_path), "Output file was not created"
assert os.path.getsize(output_path) > 0, "Output file is empty"
except subprocess.TimeoutExpired:
pytest.fail("FFmpeg execution timed out")
def test_zoom_effect_execution(self, sample_video, temp_dir, test_config):
"""测试缩放特效执行"""
output_path = os.path.join(temp_dir, "zoom_output.mp4")
task = MockRenderTask()
task.input_files = [sample_video]
task.output_path = output_path
task.effects = ["zoom:0,2.0,2.0"] # 2倍缩放,持续2秒
task.ext_data = {"posJson": "{}"}
builder = FFmpegCommandBuilder(task, test_config)
command = builder.build_command()
try:
result = subprocess.run(command, capture_output=True, text=True, timeout=60)
assert result.returncode == 0, f"FFmpeg failed: {result.stderr}"
assert os.path.exists(output_path), "Output file was not created"
assert os.path.getsize(output_path) > 0, "Output file is empty"
except subprocess.TimeoutExpired:
pytest.fail("FFmpeg execution timed out")
def test_speed_effect_execution(self, sample_video, temp_dir, test_config):
"""测试变速特效执行"""
output_path = os.path.join(temp_dir, "speed_output.mp4")
task = MockRenderTask()
task.input_files = [sample_video]
task.output_path = output_path
task.effects = ["ospeed:2.0"] # 2倍速
task.ext_data = {}
builder = FFmpegCommandBuilder(task, test_config)
command = builder.build_command()
try:
result = subprocess.run(command, capture_output=True, text=True, timeout=60)
assert result.returncode == 0, f"FFmpeg failed: {result.stderr}"
assert os.path.exists(output_path), "Output file was not created"
assert os.path.getsize(output_path) > 0, "Output file is empty"
except subprocess.TimeoutExpired:
pytest.fail("FFmpeg execution timed out")
def test_multiple_effects_execution(self, sample_video, temp_dir, test_config):
"""测试多特效组合执行"""
output_path = os.path.join(temp_dir, "multi_effects_output.mp4")
task = MockRenderTask()
task.input_files = [sample_video]
task.output_path = output_path
task.effects = ["zoom:0,1.5,1.0", "ospeed:1.5"] # 缩放+变速
task.ext_data = {"posJson": "{}"}
builder = FFmpegCommandBuilder(task, test_config)
command = builder.build_command()
try:
result = subprocess.run(command, capture_output=True, text=True, timeout=60)
assert result.returncode == 0, f"FFmpeg failed: {result.stderr}"
assert os.path.exists(output_path), "Output file was not created"
assert os.path.getsize(output_path) > 0, "Output file is empty"
except subprocess.TimeoutExpired:
pytest.fail("FFmpeg execution timed out")
def test_concat_execution(self, temp_dir, test_config):
"""测试视频拼接执行"""
# 创建两个测试视频
video1_path = os.path.join(temp_dir, "video1.mp4")
video2_path = os.path.join(temp_dir, "video2.mp4")
output_path = os.path.join(temp_dir, "concat_output.mp4")
success1 = create_test_video_file(video1_path, duration=2, resolution="320x240")
success2 = create_test_video_file(video2_path, duration=2, resolution="320x240")
if not (success1 and success2):
pytest.skip("Cannot create test video files")
task = MockRenderTask()
task.input_files = [video1_path, video2_path]
task.output_path = output_path
task.effects = []
builder = FFmpegCommandBuilder(task, test_config)
command = builder.build_command()
try:
result = subprocess.run(command, capture_output=True, text=True, timeout=60)
assert result.returncode == 0, f"FFmpeg failed: {result.stderr}"
assert os.path.exists(output_path), "Output file was not created"
assert os.path.getsize(output_path) > 0, "Output file is empty"
except subprocess.TimeoutExpired:
pytest.fail("FFmpeg execution timed out")
def test_invalid_effect_execution(self, sample_video, temp_dir, test_config):
"""测试无效特效的执行处理"""
output_path = os.path.join(temp_dir, "invalid_effect_output.mp4")
task = MockRenderTask()
task.input_files = [sample_video]
task.output_path = output_path
task.effects = ["invalid_effect:params", "zoom:0,2.0,1.0"] # 混合有效和无效特效
task.ext_data = {"posJson": "{}"}
builder = FFmpegCommandBuilder(task, test_config)
command = builder.build_command()
# 应该忽略无效特效,继续处理有效特效
try:
result = subprocess.run(command, capture_output=True, text=True, timeout=60)
assert result.returncode == 0, f"FFmpeg failed: {result.stderr}"
assert os.path.exists(output_path), "Output file was not created"
except subprocess.TimeoutExpired:
pytest.fail("FFmpeg execution timed out")
def test_render_service_integration(self, sample_video, temp_dir, test_config, render_service):
"""测试渲染服务集成"""
output_path = os.path.join(temp_dir, "service_output.mp4")
# 创建真实的RenderTask(不是Mock)
task_data = {
"task_id": "integration_test",
"template_id": "test_template",
"input_files": [sample_video],
"output_path": output_path,
"effects": ["zoom:0,1.8,2.0"],
"ext_data": {"posJson": "{}"},
"frame_rate": 25
}
# 这里需要根据实际的RenderTask构造方法调整
task = MockRenderTask(**task_data)
# 使用渲染服务执行
try:
# 这里的方法调用需要根据实际的渲染服务接口调整
# success = render_service.render(task, test_config)
# assert success, "Render service failed"
# 临时直接使用FFmpegCommandBuilder测试
builder = FFmpegCommandBuilder(task, test_config)
command = builder.build_command()
result = subprocess.run(command, capture_output=True, text=True, timeout=60)
assert result.returncode == 0, f"Render failed: {result.stderr}"
assert os.path.exists(output_path), "Output file was not created"
except Exception as e:
pytest.fail(f"Render service integration failed: {e}")
def test_error_handling_missing_input(self, temp_dir, test_config):
"""测试缺失输入文件的错误处理"""
missing_file = os.path.join(temp_dir, "nonexistent.mp4")
output_path = os.path.join(temp_dir, "error_output.mp4")
task = MockRenderTask()
task.input_files = [missing_file]
task.output_path = output_path
task.effects = []
builder = FFmpegCommandBuilder(task, test_config)
command = builder.build_command()
# 应该失败,因为输入文件不存在
result = subprocess.run(command, capture_output=True, text=True, timeout=30)
assert result.returncode != 0, "FFmpeg should fail with missing input file"
def test_performance_multiple_effects(self, sample_video, temp_dir, test_config):
"""测试多特效性能"""
output_path = os.path.join(temp_dir, "performance_output.mp4")
task = MockRenderTask()
task.input_files = [sample_video]
task.output_path = output_path
# 多个特效组合
task.effects = [
"zoom:0,1.5,1.0",
"ospeed:1.2",
"zoom:1.5,2.0,1.0"
]
task.ext_data = {"posJson": "{}"}
builder = FFmpegCommandBuilder(task, test_config)
command = builder.build_command()
import time
start_time = time.time()
try:
result = subprocess.run(command, capture_output=True, text=True, timeout=120)
execution_time = time.time() - start_time
assert result.returncode == 0, f"FFmpeg failed: {result.stderr}"
assert os.path.exists(output_path), "Output file was not created"
assert execution_time < 60, f"Execution took too long: {execution_time}s"
except subprocess.TimeoutExpired:
pytest.fail("FFmpeg execution timed out")
@pytest.mark.skipif(not os.environ.get('RUN_STRESS_TESTS'), reason="Stress tests disabled")
def test_stress_test_large_effects_chain(self, sample_video, temp_dir, test_config):
"""压力测试:大量特效链"""
output_path = os.path.join(temp_dir, "stress_output.mp4")
task = MockRenderTask()
task.input_files = [sample_video]
task.output_path = output_path
# 创建大量特效
task.effects = [f"zoom:{i*0.5},1.{i+5},{i*0.2+0.5}" for i in range(10)]
task.ext_data = {"posJson": "{}"}
builder = FFmpegCommandBuilder(task, test_config)
command = builder.build_command()
try:
result = subprocess.run(command, capture_output=True, text=True, timeout=300)
assert result.returncode == 0, f"Stress test failed: {result.stderr}"
assert os.path.exists(output_path), "Output file was not created"
except subprocess.TimeoutExpired:
pytest.fail("Stress test timed out")

202
tests/utils/test_helpers.py Normal file
View File

@@ -0,0 +1,202 @@
"""测试辅助工具"""
import json
import tempfile
import subprocess
from typing import Dict, Any, List, Optional
from pathlib import Path
from entity.render_task import RenderTask
from entity.effects.base import EffectProcessor
from config.settings import Config
class MockRenderTask:
"""模拟渲染任务,用于测试"""
def __init__(
self,
task_id: str = "test_task",
template_id: str = "test_template",
effects: List[str] = None,
ext_data: Dict[str, Any] = None,
frame_rate: int = 25,
output_path: str = "test_output.mp4"
):
self.task_id = task_id
self.template_id = template_id
self.effects = effects or []
self.ext_data = ext_data or {}
self.frame_rate = frame_rate
self.output_path = output_path
self.input_files = []
self.overlays = []
self.use_center_cut = False
self.use_zoom_cut = False
self.audio_file = None
class FFmpegValidator:
"""FFmpeg命令验证器"""
@staticmethod
def validate_filter_syntax(filter_str: str) -> bool:
"""验证滤镜语法是否正确"""
try:
# 基本语法检查
if not filter_str:
return False
# 检查是否包含基本的滤镜结构
if '[' in filter_str and ']' in filter_str:
return True
# 检查常见的滤镜格式
common_filters = ['zoompan', 'setpts', 'trim', 'scale', 'crop']
return any(f in filter_str for f in common_filters)
except Exception:
return False
@staticmethod
def validate_stream_identifier(stream_id: str) -> bool:
"""验证流标识符格式"""
if not stream_id:
return False
return stream_id.startswith('[') and stream_id.endswith(']')
@staticmethod
def validate_ffmpeg_command(command: List[str]) -> Dict[str, Any]:
"""验证完整的FFmpeg命令"""
result = {
"valid": False,
"has_input": False,
"has_output": False,
"has_filter": False,
"errors": []
}
if not command or command[0] != "ffmpeg":
result["errors"].append("Command must start with 'ffmpeg'")
return result
# 检查输入文件
if "-i" in command:
result["has_input"] = True
else:
result["errors"].append("No input file specified")
# 检查输出文件
if len(command) > 1 and not command[-1].startswith("-"):
result["has_output"] = True
else:
result["errors"].append("No output file specified")
# 检查滤镜
if "-filter_complex" in command or "-vf" in command:
result["has_filter"] = True
result["valid"] = (
result["has_input"] and
result["has_output"] and
len(result["errors"]) == 0
)
return result
class EffectTestHelper:
"""特效测试辅助类"""
@staticmethod
def create_test_effect(effect_class, params: str = "", ext_data: Dict[str, Any] = None):
"""创建测试用特效实例"""
return effect_class(params, ext_data)
@staticmethod
def test_effect_params_validation(effect: EffectProcessor, test_cases: List[Dict[str, Any]]):
"""批量测试特效参数验证"""
results = []
for case in test_cases:
effect.params = case.get("params", "")
effect.ext_data = case.get("ext_data", {})
is_valid = effect.validate_params()
expected = case.get("expected", True)
results.append({
"params": effect.params,
"expected": expected,
"actual": is_valid,
"passed": is_valid == expected,
"description": case.get("description", "")
})
return results
@staticmethod
def test_filter_generation(effect: EffectProcessor, video_input: str = "[0:v]", effect_index: int = 1):
"""测试滤镜生成"""
try:
filters, output_stream = effect.generate_filter_args(video_input, effect_index)
result = {
"success": True,
"filters": filters,
"output_stream": output_stream,
"filter_count": len(filters),
"valid_syntax": all(FFmpegValidator.validate_filter_syntax(f) for f in filters),
"valid_output": FFmpegValidator.validate_stream_identifier(output_stream) if output_stream != video_input else True
}
except Exception as e:
result = {
"success": False,
"error": str(e),
"filters": [],
"output_stream": "",
"filter_count": 0,
"valid_syntax": False,
"valid_output": False
}
return result
def create_test_video_file(output_path: str, duration: int = 5, resolution: str = "640x480") -> bool:
"""创建测试用视频文件"""
try:
cmd = [
"ffmpeg", "-y", # 覆盖输出文件
"-f", "lavfi", # 使用libavfilter输入
"-i", f"testsrc=duration={duration}:size={resolution}:rate=25",
"-c:v", "libx264",
"-preset", "ultrafast",
"-crf", "23",
output_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
return result.returncode == 0
except Exception:
return False
def create_sample_template_data() -> Dict[str, Any]:
"""创建示例模板数据"""
return {
"templateId": "test_template_001",
"name": "测试模板",
"parts": [
{
"id": "part1",
"type": "video",
"duration": 10.0,
"effects": ["zoom:0,2.0,3.0", "ospeed:1.5"]
}
],
"settings": {
"width": 1920,
"height": 1080,
"frameRate": 25
}
}