This commit is contained in:
2025-09-24 10:50:34 +08:00
parent c055a68592
commit ec1705769c
18 changed files with 348 additions and 330 deletions

View File

@@ -1,4 +1,5 @@
"""pytest配置文件"""
import pytest
import os
import tempfile
@@ -7,7 +8,7 @@ from pathlib import Path
from typing import Dict, Any
from entity.render_task import RenderTask
from config.settings import Config
from config.settings import FFmpegConfig, APIConfig, StorageConfig
@pytest.fixture
@@ -19,14 +20,28 @@ def temp_dir():
@pytest.fixture
def test_config():
"""测试用配置"""
return Config(
def test_ffmpeg_config():
"""测试用FFmpeg配置"""
return FFmpegConfig(
encoder_args="-c:v h264",
video_args="",
)
@pytest.fixture
def test_api_config():
"""测试用API配置"""
return APIConfig(
endpoint="http://test.local",
access_key="test_key",
)
@pytest.fixture
def test_storage_config():
"""测试用存储配置"""
return StorageConfig(
template_dir="tests/test_data/templates",
api_endpoint="http://test.local",
access_key="test_key"
)
@@ -41,7 +56,7 @@ def sample_render_task():
effects=["zoom:0,2.0,3.0", "ospeed:1.5"],
ext_data={
"posJson": '{"ltX": 100, "ltY": 100, "rbX": 200, "rbY": 200, "imgWidth": 300, "imgHeight": 300}'
}
},
)
@@ -57,6 +72,7 @@ def sample_video_file(temp_dir):
def ffmpeg_available():
"""检查FFmpeg是否可用"""
import subprocess
try:
subprocess.run(["ffmpeg", "-version"], capture_output=True, check=True)
return True
@@ -69,7 +85,7 @@ def mock_ext_data():
"""模拟扩展数据"""
return {
"posJson": '{"ltX": 50, "ltY": 50, "rbX": 150, "rbY": 150, "imgWidth": 200, "imgHeight": 200}',
"templateData": {"width": 1920, "height": 1080}
"templateData": {"width": 1920, "height": 1080},
}
@@ -78,6 +94,7 @@ def pytest_collection_modifyitems(config, items):
"""根据FFmpeg可用性修改测试收集"""
try:
import subprocess
subprocess.run(["ffmpeg", "-version"], capture_output=True, check=True)
ffmpeg_available = True
except (subprocess.CalledProcessError, FileNotFoundError):
@@ -87,4 +104,4 @@ def pytest_collection_modifyitems(config, items):
skip_ffmpeg = pytest.mark.skip(reason="FFmpeg not available")
for item in items:
if "integration" in item.keywords:
item.add_marker(skip_ffmpeg)
item.add_marker(skip_ffmpeg)

View File

@@ -1,4 +1,5 @@
"""测试特效基础类和注册表"""
import pytest
from unittest.mock import Mock
@@ -50,9 +51,7 @@ class TestEffectProcessor:
def test_get_pos_json_valid(self):
"""测试获取有效位置JSON"""
ext_data = {
"posJson": '{"ltX": 100, "ltY": 200, "rbX": 300, "rbY": 400}'
}
ext_data = {"posJson": '{"ltX": 100, "ltY": 200, "rbX": 300, "rbY": 400}'}
processor = EffectProcessor("", ext_data)
result = processor.get_pos_json()
@@ -185,4 +184,4 @@ class TestEffectRegistry:
name, params = registry.parse_effect_string("effect:param1:param2")
assert name == "effect"
assert params == "param1:param2"
assert params == "param1:param2"

View File

@@ -1,4 +1,5 @@
"""测试变速特效"""
import pytest
from entity.effects.speed import SpeedEffect
from tests.utils.test_helpers import EffectTestHelper
@@ -10,31 +11,11 @@ class TestSpeedEffect:
def test_validate_params_valid_cases(self):
"""测试有效参数验证"""
test_cases = [
{
"params": "2.0",
"expected": True,
"description": "2倍速"
},
{
"params": "0.5",
"expected": True,
"description": "0.5倍速(慢速)"
},
{
"params": "1.0",
"expected": True,
"description": "正常速度"
},
{
"params": "10.0",
"expected": True,
"description": "10倍速"
},
{
"params": "",
"expected": True,
"description": "空参数(默认不变速)"
}
{"params": "2.0", "expected": True, "description": "2倍速"},
{"params": "0.5", "expected": True, "description": "0.5倍速(慢速)"},
{"params": "1.0", "expected": True, "description": "正常速度"},
{"params": "10.0", "expected": True, "description": "10倍速"},
{"params": "", "expected": True, "description": "空参数(默认不变速)"},
]
effect = SpeedEffect()
@@ -46,26 +27,10 @@ class TestSpeedEffect:
def test_validate_params_invalid_cases(self):
"""测试无效参数验证"""
test_cases = [
{
"params": "0",
"expected": False,
"description": "零速度"
},
{
"params": "-1.0",
"expected": False,
"description": "负速度"
},
{
"params": "abc",
"expected": False,
"description": "非数字参数"
},
{
"params": "1.0,2.0",
"expected": False,
"description": "多余参数"
}
{"params": "0", "expected": False, "description": "零速度"},
{"params": "-1.0", "expected": False, "description": "负速度"},
{"params": "abc", "expected": False, "description": "非数字参数"},
{"params": "1.0,2.0", "expected": False, "description": "多余参数"},
]
effect = SpeedEffect()
@@ -110,7 +75,7 @@ class TestSpeedEffect:
test_cases = [
{"params": "", "description": "空参数"},
{"params": "1", "description": "1倍速"},
{"params": "1.0", "description": "1.0倍速"}
{"params": "1.0", "description": "1.0倍速"},
]
for case in test_cases:
@@ -118,8 +83,12 @@ class TestSpeedEffect:
result = EffectTestHelper.test_filter_generation(effect, "[0:v]", 1)
assert result["success"], f"Failed for {case['description']}"
assert result["filter_count"] == 0, f"Should not generate filter for {case['description']}"
assert result["output_stream"] == "[0:v]", f"Output should equal input for {case['description']}"
assert (
result["filter_count"] == 0
), f"Should not generate filter for {case['description']}"
assert (
result["output_stream"] == "[0:v]"
), f"Output should equal input for {case['description']}"
def test_generate_filter_args_invalid_params(self):
"""测试无效参数的滤镜生成"""
@@ -166,4 +135,4 @@ class TestSpeedEffect:
# 作为链中的第二个特效
result2 = EffectTestHelper.test_filter_generation(effect, "[v_eff1]", 2)
assert result2["output_stream"] == "[v_eff2]"
assert "[v_eff1]" in result2["filters"][0]
assert "[v_eff1]" in result2["filters"][0]

View File

@@ -1,4 +1,5 @@
"""测试缩放特效"""
import pytest
from entity.effects.zoom import ZoomEffect
from tests.utils.test_helpers import EffectTestHelper, FFmpegValidator
@@ -10,26 +11,18 @@ class TestZoomEffect:
def test_validate_params_valid_cases(self):
"""测试有效参数验证"""
test_cases = [
{
"params": "0,2.0,3.0",
"expected": True,
"description": "标准缩放参数"
},
{"params": "0,2.0,3.0", "expected": True, "description": "标准缩放参数"},
{
"params": "1.5,1.5,0",
"expected": True,
"description": "静态缩放(duration=0)"
"description": "静态缩放(duration=0)",
},
{
"params": "0,1.0,5.0",
"expected": True,
"description": "无缩放效果(factor=1.0)"
"description": "无缩放效果(factor=1.0)",
},
{
"params": "10,0.5,2.0",
"expected": True,
"description": "缩小效果"
}
{"params": "10,0.5,2.0", "expected": True, "description": "缩小效果"},
]
effect = ZoomEffect()
@@ -41,41 +34,13 @@ class TestZoomEffect:
def test_validate_params_invalid_cases(self):
"""测试无效参数验证"""
test_cases = [
{
"params": "",
"expected": False,
"description": "空参数"
},
{
"params": "1,2",
"expected": False,
"description": "参数不足"
},
{
"params": "-1,2.0,3.0",
"expected": False,
"description": "负开始时间"
},
{
"params": "0,0,3.0",
"expected": False,
"description": "零缩放因子"
},
{
"params": "0,-2.0,3.0",
"expected": False,
"description": "负缩放因子"
},
{
"params": "0,2.0,-1.0",
"expected": False,
"description": "负持续时间"
},
{
"params": "abc,2.0,3.0",
"expected": False,
"description": "非数字参数"
}
{"params": "", "expected": False, "description": "空参数"},
{"params": "1,2", "expected": False, "description": "参数不足"},
{"params": "-1,2.0,3.0", "expected": False, "description": "负开始时间"},
{"params": "0,0,3.0", "expected": False, "description": "零缩放因子"},
{"params": "0,-2.0,3.0", "expected": False, "description": "负缩放因子"},
{"params": "0,2.0,-1.0", "expected": False, "description": "负持续时间"},
{"params": "abc,2.0,3.0", "expected": False, "description": "非数字参数"},
]
effect = ZoomEffect()
@@ -158,9 +123,7 @@ class TestZoomEffect:
def test_get_zoom_center_invalid_pos_json(self):
"""测试无效posJson时的缩放中心点"""
ext_data = {
"posJson": '{"imgWidth": 0, "imgHeight": 0}' # 无效尺寸
}
ext_data = {"posJson": '{"imgWidth": 0, "imgHeight": 0}'} # 无效尺寸
effect = ZoomEffect("0,2.0,3.0", ext_data)
center_x, center_y = effect._get_zoom_center()
@@ -191,4 +154,4 @@ class TestZoomEffect:
assert "zoompan=z=" in filter_str
assert "between(t\\\\,2.5\\\\,4.0)" in filter_str # 2.5 + 1.5 = 4.0
assert "[input]" in filter_str
assert "[v_eff5]" in filter_str
assert "[v_eff5]" in filter_str

View File

@@ -1,27 +1,17 @@
"""测试FFmpeg命令构建器"""
import pytest
from unittest.mock import Mock, patch
from entity.ffmpeg_command_builder import FFmpegCommandBuilder
from entity.render_task import RenderTask
from config.settings import Config
from config.settings import FFmpegConfig, APIConfig, StorageConfig
from tests.utils.test_helpers import MockRenderTask, FFmpegValidator
class TestFFmpegCommandBuilder:
"""测试FFmpeg命令构建器"""
@pytest.fixture
def mock_config(self):
"""模拟配置"""
return Config(
encoder_args="-c:v h264",
video_args="-preset fast",
template_dir="tests/test_data/templates",
api_endpoint="http://test.local",
access_key="test_key"
)
@pytest.fixture
def simple_task(self):
"""简单的渲染任务"""
@@ -43,15 +33,15 @@ class TestFFmpegCommandBuilder:
}
return task
def test_init(self, simple_task, mock_config):
def test_init(self, simple_task):
"""测试初始化"""
builder = FFmpegCommandBuilder(simple_task, mock_config)
builder = FFmpegCommandBuilder(simple_task)
assert builder.task == simple_task
assert builder.config == mock_config
assert builder.config is not None
def test_build_copy_command(self, simple_task, mock_config):
def test_build_copy_command(self, simple_task):
"""测试构建复制命令"""
builder = FFmpegCommandBuilder(simple_task, mock_config)
builder = FFmpegCommandBuilder(simple_task)
command = builder._build_copy_command()
# 验证命令结构
@@ -67,13 +57,13 @@ class TestFFmpegCommandBuilder:
assert "output.mp4" in command
assert "-c" in command and "copy" in command
def test_build_concat_command_multiple_files(self, mock_config):
def test_build_concat_command_multiple_files(self):
"""测试构建多文件拼接命令"""
task = MockRenderTask()
task.input_files = ["file1.mp4", "file2.mp4", "file3.mp4"]
task.output_path = "concat_output.mp4"
builder = FFmpegCommandBuilder(task, mock_config)
builder = FFmpegCommandBuilder(task)
command = builder._build_concat_command()
validation = FFmpegValidator.validate_ffmpeg_command(command)
@@ -84,9 +74,9 @@ class TestFFmpegCommandBuilder:
assert "concat=n=3:v=1:a=1" in " ".join(command)
assert all(f"file{i}.mp4" in command for i in range(1, 4))
def test_build_encode_command_with_effects(self, task_with_effects, mock_config):
def test_build_encode_command_with_effects(self, task_with_effects):
"""测试构建带特效的编码命令"""
builder = FFmpegCommandBuilder(task_with_effects, mock_config)
builder = FFmpegCommandBuilder(task_with_effects)
command = builder._build_encode_command()
validation = FFmpegValidator.validate_ffmpeg_command(command)
@@ -99,13 +89,13 @@ class TestFFmpegCommandBuilder:
# 应该包含编码参数
assert "-c:v" in command and "h264" in command
def test_add_effects_single_effect(self, mock_config):
def test_add_effects_single_effect(self):
"""测试添加单个特效"""
task = MockRenderTask()
task.effects = ["zoom:0,2.0,3.0"]
task.ext_data = {"posJson": "{}"}
builder = FFmpegCommandBuilder(task, mock_config)
builder = FFmpegCommandBuilder(task)
filter_args = []
result_input, result_index = builder._add_effects(filter_args, "[0:v]", 1)
@@ -115,13 +105,13 @@ class TestFFmpegCommandBuilder:
assert result_input == "[v_eff1]" # 输出流应该更新
assert result_index == 2 # 索引应该递增
def test_add_effects_multiple_effects(self, mock_config):
def test_add_effects_multiple_effects(self):
"""测试添加多个特效"""
task = MockRenderTask()
task.effects = ["zoom:0,2.0,3.0", "ospeed:1.5"]
task.ext_data = {"posJson": "{}"}
builder = FFmpegCommandBuilder(task, mock_config)
builder = FFmpegCommandBuilder(task)
filter_args = []
result_input, result_index = builder._add_effects(filter_args, "[0:v]", 1)
@@ -131,12 +121,12 @@ class TestFFmpegCommandBuilder:
assert result_input == "[v_eff2]" # 最终输出流
assert result_index == 3 # 索引应该递增两次
def test_add_effects_invalid_effect(self, mock_config):
def test_add_effects_invalid_effect(self):
"""测试添加无效特效"""
task = MockRenderTask()
task.effects = ["invalid_effect:params"]
builder = FFmpegCommandBuilder(task, mock_config)
builder = FFmpegCommandBuilder(task)
filter_args = []
result_input, result_index = builder._add_effects(filter_args, "[0:v]", 1)
@@ -145,12 +135,12 @@ class TestFFmpegCommandBuilder:
assert result_input == "[0:v]" # 输入流不变
assert result_index == 1 # 索引不变
def test_add_effects_no_effects(self, mock_config):
def test_add_effects_no_effects(self):
"""测试无特效情况"""
task = MockRenderTask()
task.effects = []
builder = FFmpegCommandBuilder(task, mock_config)
builder = FFmpegCommandBuilder(task)
filter_args = []
result_input, result_index = builder._add_effects(filter_args, "[0:v]", 1)
@@ -159,11 +149,11 @@ class TestFFmpegCommandBuilder:
assert result_index == 1 # 索引不变
assert len(filter_args) == 0 # 无滤镜添加
def test_build_command_copy_mode(self, simple_task, mock_config):
def test_build_command_copy_mode(self, simple_task):
"""测试构建复制模式命令"""
simple_task.input_files = ["single_file.mp4"]
builder = FFmpegCommandBuilder(simple_task, mock_config)
builder = FFmpegCommandBuilder(simple_task)
command = builder.build_command()
validation = FFmpegValidator.validate_ffmpeg_command(command)
@@ -173,13 +163,13 @@ class TestFFmpegCommandBuilder:
command_str = " ".join(command)
assert "-c copy" in command_str
def test_build_command_concat_mode(self, mock_config):
def test_build_command_concat_mode(self):
"""测试构建拼接模式命令"""
task = MockRenderTask()
task.input_files = ["file1.mp4", "file2.mp4"]
task.effects = []
builder = FFmpegCommandBuilder(task, mock_config)
builder = FFmpegCommandBuilder(task)
command = builder.build_command()
validation = FFmpegValidator.validate_ffmpeg_command(command)
@@ -190,9 +180,9 @@ class TestFFmpegCommandBuilder:
command_str = " ".join(command)
assert "concat=" in command_str
def test_build_command_encode_mode(self, task_with_effects, mock_config):
def test_build_command_encode_mode(self, task_with_effects):
"""测试构建编码模式命令"""
builder = FFmpegCommandBuilder(task_with_effects, mock_config)
builder = FFmpegCommandBuilder(task_with_effects)
command = builder.build_command()
validation = FFmpegValidator.validate_ffmpeg_command(command)
@@ -203,15 +193,15 @@ class TestFFmpegCommandBuilder:
command_str = " ".join(command)
assert "-c:v h264" in command_str
@patch('entity.effects.registry.get_processor')
def test_effect_processor_integration(self, mock_get_processor, mock_config):
@patch("entity.effects.registry.get_processor")
def test_effect_processor_integration(self, mock_get_processor):
"""测试与特效处理器的集成"""
# 模拟特效处理器
mock_processor = Mock()
mock_processor.frame_rate = 25
mock_processor.generate_filter_args.return_value = (
["[0:v]zoompan=z=2.0:x=iw/2:y=ih/2:d=1[v_eff1]"],
"[v_eff1]"
"[v_eff1]",
)
mock_get_processor.return_value = mock_processor
@@ -219,7 +209,7 @@ class TestFFmpegCommandBuilder:
task.effects = ["zoom:0,2.0,3.0"]
task.frame_rate = 30
builder = FFmpegCommandBuilder(task, mock_config)
builder = FFmpegCommandBuilder(task)
filter_args = []
builder._add_effects(filter_args, "[0:v]", 1)
@@ -228,12 +218,12 @@ class TestFFmpegCommandBuilder:
mock_processor.generate_filter_args.assert_called_once_with("[0:v]", 1)
assert mock_processor.frame_rate == 30 # 帧率应该被设置
def test_error_handling_missing_input(self, mock_config):
def test_error_handling_missing_input(self):
"""测试缺少输入文件的错误处理"""
task = MockRenderTask()
task.input_files = []
builder = FFmpegCommandBuilder(task, mock_config)
builder = FFmpegCommandBuilder(task)
# 构建命令时应该处理错误情况
# 具体的错误处理依赖于实现
@@ -241,4 +231,4 @@ class TestFFmpegCommandBuilder:
# 验证至少返回了基本的ffmpeg命令结构
assert isinstance(command, list)
assert len(command) > 0
assert len(command) > 0

View File

@@ -1,4 +1,5 @@
"""FFmpeg执行集成测试"""
import pytest
import subprocess
import tempfile
@@ -7,7 +8,7 @@ from pathlib import Path
from entity.ffmpeg_command_builder import FFmpegCommandBuilder
from entity.render_task import RenderTask
from config.settings import Config
from config.settings import FFmpegConfig
from services.render_service import DefaultRenderService
from tests.utils.test_helpers import MockRenderTask, create_test_video_file
@@ -16,17 +17,6 @@ from tests.utils.test_helpers import MockRenderTask, create_test_video_file
class TestFFmpegExecution:
"""FFmpeg执行集成测试"""
@pytest.fixture
def test_config(self, temp_dir):
"""测试配置"""
return Config(
encoder_args="-c:v libx264",
video_args="-preset ultrafast -crf 23",
template_dir=temp_dir,
api_endpoint="http://test.local",
access_key="test_key"
)
@pytest.fixture
def sample_video(self, temp_dir):
"""创建测试视频文件"""
@@ -37,11 +27,11 @@ class TestFFmpegExecution:
return video_path
@pytest.fixture
def render_service(self, test_config):
def render_service(self):
"""渲染服务实例"""
return DefaultRenderService()
def test_simple_copy_execution(self, sample_video, temp_dir, test_config):
def test_simple_copy_execution(self, sample_video, temp_dir):
"""测试简单复制执行"""
output_path = os.path.join(temp_dir, "copy_output.mp4")
@@ -50,7 +40,7 @@ class TestFFmpegExecution:
task.output_path = output_path
task.effects = []
builder = FFmpegCommandBuilder(task, test_config)
builder = FFmpegCommandBuilder(task)
command = builder.build_command()
# 执行命令
@@ -62,7 +52,7 @@ class TestFFmpegExecution:
except subprocess.TimeoutExpired:
pytest.fail("FFmpeg execution timed out")
def test_zoom_effect_execution(self, sample_video, temp_dir, test_config):
def test_zoom_effect_execution(self, sample_video, temp_dir):
"""测试缩放特效执行"""
output_path = os.path.join(temp_dir, "zoom_output.mp4")
@@ -72,7 +62,7 @@ class TestFFmpegExecution:
task.effects = ["zoom:0,2.0,2.0"] # 2倍缩放,持续2秒
task.ext_data = {"posJson": "{}"}
builder = FFmpegCommandBuilder(task, test_config)
builder = FFmpegCommandBuilder(task)
command = builder.build_command()
try:
@@ -83,7 +73,7 @@ class TestFFmpegExecution:
except subprocess.TimeoutExpired:
pytest.fail("FFmpeg execution timed out")
def test_speed_effect_execution(self, sample_video, temp_dir, test_config):
def test_speed_effect_execution(self, sample_video, temp_dir):
"""测试变速特效执行"""
output_path = os.path.join(temp_dir, "speed_output.mp4")
@@ -93,7 +83,7 @@ class TestFFmpegExecution:
task.effects = ["ospeed:2.0"] # 2倍速
task.ext_data = {}
builder = FFmpegCommandBuilder(task, test_config)
builder = FFmpegCommandBuilder(task)
command = builder.build_command()
try:
@@ -104,7 +94,7 @@ class TestFFmpegExecution:
except subprocess.TimeoutExpired:
pytest.fail("FFmpeg execution timed out")
def test_multiple_effects_execution(self, sample_video, temp_dir, test_config):
def test_multiple_effects_execution(self, sample_video, temp_dir):
"""测试多特效组合执行"""
output_path = os.path.join(temp_dir, "multi_effects_output.mp4")
@@ -114,7 +104,7 @@ class TestFFmpegExecution:
task.effects = ["zoom:0,1.5,1.0", "ospeed:1.5"] # 缩放+变速
task.ext_data = {"posJson": "{}"}
builder = FFmpegCommandBuilder(task, test_config)
builder = FFmpegCommandBuilder(task)
command = builder.build_command()
try:
@@ -125,7 +115,7 @@ class TestFFmpegExecution:
except subprocess.TimeoutExpired:
pytest.fail("FFmpeg execution timed out")
def test_concat_execution(self, temp_dir, test_config):
def test_concat_execution(self, temp_dir):
"""测试视频拼接执行"""
# 创建两个测试视频
video1_path = os.path.join(temp_dir, "video1.mp4")
@@ -143,7 +133,7 @@ class TestFFmpegExecution:
task.output_path = output_path
task.effects = []
builder = FFmpegCommandBuilder(task, test_config)
builder = FFmpegCommandBuilder(task)
command = builder.build_command()
try:
@@ -154,7 +144,7 @@ class TestFFmpegExecution:
except subprocess.TimeoutExpired:
pytest.fail("FFmpeg execution timed out")
def test_invalid_effect_execution(self, sample_video, temp_dir, test_config):
def test_invalid_effect_execution(self, sample_video, temp_dir):
"""测试无效特效的执行处理"""
output_path = os.path.join(temp_dir, "invalid_effect_output.mp4")
@@ -164,7 +154,7 @@ class TestFFmpegExecution:
task.effects = ["invalid_effect:params", "zoom:0,2.0,1.0"] # 混合有效和无效特效
task.ext_data = {"posJson": "{}"}
builder = FFmpegCommandBuilder(task, test_config)
builder = FFmpegCommandBuilder(task)
command = builder.build_command()
# 应该忽略无效特效,继续处理有效特效
@@ -175,7 +165,7 @@ class TestFFmpegExecution:
except subprocess.TimeoutExpired:
pytest.fail("FFmpeg execution timed out")
def test_render_service_integration(self, sample_video, temp_dir, test_config, render_service):
def test_render_service_integration(self, sample_video, temp_dir, render_service):
"""测试渲染服务集成"""
output_path = os.path.join(temp_dir, "service_output.mp4")
@@ -187,7 +177,7 @@ class TestFFmpegExecution:
"output_path": output_path,
"effects": ["zoom:0,1.8,2.0"],
"ext_data": {"posJson": "{}"},
"frame_rate": 25
"frame_rate": 25,
}
# 这里需要根据实际的RenderTask构造方法调整
@@ -196,11 +186,11 @@ class TestFFmpegExecution:
# 使用渲染服务执行
try:
# 这里的方法调用需要根据实际的渲染服务接口调整
# success = render_service.render(task, test_config)
# success = render_service.render(task)
# assert success, "Render service failed"
# 临时直接使用FFmpegCommandBuilder测试
builder = FFmpegCommandBuilder(task, test_config)
builder = FFmpegCommandBuilder(task)
command = builder.build_command()
result = subprocess.run(command, capture_output=True, text=True, timeout=60)
@@ -210,7 +200,7 @@ class TestFFmpegExecution:
except Exception as e:
pytest.fail(f"Render service integration failed: {e}")
def test_error_handling_missing_input(self, temp_dir, test_config):
def test_error_handling_missing_input(self, temp_dir):
"""测试缺失输入文件的错误处理"""
missing_file = os.path.join(temp_dir, "nonexistent.mp4")
output_path = os.path.join(temp_dir, "error_output.mp4")
@@ -220,14 +210,14 @@ class TestFFmpegExecution:
task.output_path = output_path
task.effects = []
builder = FFmpegCommandBuilder(task, test_config)
builder = FFmpegCommandBuilder(task)
command = builder.build_command()
# 应该失败,因为输入文件不存在
result = subprocess.run(command, capture_output=True, text=True, timeout=30)
assert result.returncode != 0, "FFmpeg should fail with missing input file"
def test_performance_multiple_effects(self, sample_video, temp_dir, test_config):
def test_performance_multiple_effects(self, sample_video, temp_dir):
"""测试多特效性能"""
output_path = os.path.join(temp_dir, "performance_output.mp4")
@@ -235,21 +225,20 @@ class TestFFmpegExecution:
task.input_files = [sample_video]
task.output_path = output_path
# 多个特效组合
task.effects = [
"zoom:0,1.5,1.0",
"ospeed:1.2",
"zoom:1.5,2.0,1.0"
]
task.effects = ["zoom:0,1.5,1.0", "ospeed:1.2", "zoom:1.5,2.0,1.0"]
task.ext_data = {"posJson": "{}"}
builder = FFmpegCommandBuilder(task, test_config)
builder = FFmpegCommandBuilder(task)
command = builder.build_command()
import time
start_time = time.time()
try:
result = subprocess.run(command, capture_output=True, text=True, timeout=120)
result = subprocess.run(
command, capture_output=True, text=True, timeout=120
)
execution_time = time.time() - start_time
assert result.returncode == 0, f"FFmpeg failed: {result.stderr}"
@@ -259,8 +248,10 @@ class TestFFmpegExecution:
except subprocess.TimeoutExpired:
pytest.fail("FFmpeg execution timed out")
@pytest.mark.skipif(not os.environ.get('RUN_STRESS_TESTS'), reason="Stress tests disabled")
def test_stress_test_large_effects_chain(self, sample_video, temp_dir, test_config):
@pytest.mark.skipif(
not os.environ.get("RUN_STRESS_TESTS"), reason="Stress tests disabled"
)
def test_stress_test_large_effects_chain(self, sample_video, temp_dir):
"""压力测试:大量特效链"""
output_path = os.path.join(temp_dir, "stress_output.mp4")
@@ -271,12 +262,14 @@ class TestFFmpegExecution:
task.effects = [f"zoom:{i*0.5},1.{i+5},{i*0.2+0.5}" for i in range(10)]
task.ext_data = {"posJson": "{}"}
builder = FFmpegCommandBuilder(task, test_config)
builder = FFmpegCommandBuilder(task)
command = builder.build_command()
try:
result = subprocess.run(command, capture_output=True, text=True, timeout=300)
result = subprocess.run(
command, capture_output=True, text=True, timeout=300
)
assert result.returncode == 0, f"Stress test failed: {result.stderr}"
assert os.path.exists(output_path), "Output file was not created"
except subprocess.TimeoutExpired:
pytest.fail("Stress test timed out")
pytest.fail("Stress test timed out")

View File

@@ -1,4 +1,5 @@
"""测试辅助工具"""
import json
import tempfile
import subprocess
@@ -7,7 +8,7 @@ from pathlib import Path
from entity.render_task import RenderTask
from entity.effects.base import EffectProcessor
from config.settings import Config
from config.settings import FFmpegConfig
class MockRenderTask:
@@ -20,7 +21,7 @@ class MockRenderTask:
effects: List[str] = None,
ext_data: Dict[str, Any] = None,
frame_rate: int = 25,
output_path: str = "test_output.mp4"
output_path: str = "test_output.mp4",
):
self.task_id = task_id
self.template_id = template_id
@@ -47,11 +48,11 @@ class FFmpegValidator:
return False
# 检查是否包含基本的滤镜结构
if '[' in filter_str and ']' in filter_str:
if "[" in filter_str and "]" in filter_str:
return True
# 检查常见的滤镜格式
common_filters = ['zoompan', 'setpts', 'trim', 'scale', 'crop']
common_filters = ["zoompan", "setpts", "trim", "scale", "crop"]
return any(f in filter_str for f in common_filters)
except Exception:
@@ -62,7 +63,7 @@ class FFmpegValidator:
"""验证流标识符格式"""
if not stream_id:
return False
return stream_id.startswith('[') and stream_id.endswith(']')
return stream_id.startswith("[") and stream_id.endswith("]")
@staticmethod
def validate_ffmpeg_command(command: List[str]) -> Dict[str, Any]:
@@ -72,7 +73,7 @@ class FFmpegValidator:
"has_input": False,
"has_output": False,
"has_filter": False,
"errors": []
"errors": [],
}
if not command or command[0] != "ffmpeg":
@@ -96,9 +97,7 @@ class FFmpegValidator:
result["has_filter"] = True
result["valid"] = (
result["has_input"] and
result["has_output"] and
len(result["errors"]) == 0
result["has_input"] and result["has_output"] and len(result["errors"]) == 0
)
return result
@@ -108,12 +107,16 @@ class EffectTestHelper:
"""特效测试辅助类"""
@staticmethod
def create_test_effect(effect_class, params: str = "", ext_data: Dict[str, Any] = None):
def create_test_effect(
effect_class, params: str = "", ext_data: Dict[str, Any] = None
):
"""创建测试用特效实例"""
return effect_class(params, ext_data)
@staticmethod
def test_effect_params_validation(effect: EffectProcessor, test_cases: List[Dict[str, Any]]):
def test_effect_params_validation(
effect: EffectProcessor, test_cases: List[Dict[str, Any]]
):
"""批量测试特效参数验证"""
results = []
for case in test_cases:
@@ -123,29 +126,41 @@ class EffectTestHelper:
is_valid = effect.validate_params()
expected = case.get("expected", True)
results.append({
"params": effect.params,
"expected": expected,
"actual": is_valid,
"passed": is_valid == expected,
"description": case.get("description", "")
})
results.append(
{
"params": effect.params,
"expected": expected,
"actual": is_valid,
"passed": is_valid == expected,
"description": case.get("description", ""),
}
)
return results
@staticmethod
def test_filter_generation(effect: EffectProcessor, video_input: str = "[0:v]", effect_index: int = 1):
def test_filter_generation(
effect: EffectProcessor, video_input: str = "[0:v]", effect_index: int = 1
):
"""测试滤镜生成"""
try:
filters, output_stream = effect.generate_filter_args(video_input, effect_index)
filters, output_stream = effect.generate_filter_args(
video_input, effect_index
)
result = {
"success": True,
"filters": filters,
"output_stream": output_stream,
"filter_count": len(filters),
"valid_syntax": all(FFmpegValidator.validate_filter_syntax(f) for f in filters),
"valid_output": FFmpegValidator.validate_stream_identifier(output_stream) if output_stream != video_input else True
"valid_syntax": all(
FFmpegValidator.validate_filter_syntax(f) for f in filters
),
"valid_output": (
FFmpegValidator.validate_stream_identifier(output_stream)
if output_stream != video_input
else True
),
}
except Exception as e:
result = {
@@ -155,23 +170,31 @@ class EffectTestHelper:
"output_stream": "",
"filter_count": 0,
"valid_syntax": False,
"valid_output": False
"valid_output": False,
}
return result
def create_test_video_file(output_path: str, duration: int = 5, resolution: str = "640x480") -> bool:
def create_test_video_file(
output_path: str, duration: int = 5, resolution: str = "640x480"
) -> bool:
"""创建测试用视频文件"""
try:
cmd = [
"ffmpeg", "-y", # 覆盖输出文件
"-f", "lavfi", # 使用libavfilter输入
"-i", f"testsrc=duration={duration}:size={resolution}:rate=25",
"-c:v", "libx264",
"-preset", "ultrafast",
"-crf", "23",
output_path
"ffmpeg",
"-y", # 覆盖输出文件
"-f",
"lavfi", # 使用libavfilter输入
"-i",
f"testsrc=duration={duration}:size={resolution}:rate=25",
"-c:v",
"libx264",
"-preset",
"ultrafast",
"-crf",
"23",
output_path,
]
result = subprocess.run(cmd, capture_output=True, text=True)
@@ -191,12 +214,8 @@ def create_sample_template_data() -> Dict[str, Any]:
"id": "part1",
"type": "video",
"duration": 10.0,
"effects": ["zoom:0,2.0,3.0", "ospeed:1.5"]
"effects": ["zoom:0,2.0,3.0", "ospeed:1.5"],
}
],
"settings": {
"width": 1920,
"height": 1080,
"frameRate": 25
}
}
"settings": {"width": 1920, "height": 1080, "frameRate": 25},
}