From 3cb2f8d02ad1ad3ee0cc86e87914f35cfd52223b Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Sat, 7 Feb 2026 18:29:54 +0800 Subject: [PATCH] =?UTF-8?q?test(handlers):=20=E6=B7=BB=E5=8A=A0=E5=9F=BA?= =?UTF-8?q?=E7=A1=80=E5=A4=84=E7=90=86=E5=99=A8=E5=B9=B6=E8=A1=8C=E4=BC=A0?= =?UTF-8?q?=E8=BE=93=E7=9B=B8=E5=85=B3=E5=8D=95=E5=85=83=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 实现 download_files_parallel 方法的并发下载功能测试 - 验证上传文件并行处理收集URL功能的正确性 - 测试下载文件时设置锁等待时间跨度属性 - 验证无缓存下载时锁等待时间为零的场景 - 测试上传文件时设置详细的跨度属性功能 - 添加渲染视频效果相关的参数解析测试 - 实现存储服务上传指标 --- .../test_base_handler_parallel_transfer.py | 238 ++++++++++++++++++ tests/unit/test_render_video_effects.py | 84 +++++++ tests/unit/test_storage_upload_metrics.py | 81 ++++++ tests/unit/test_tracing.py | 51 ++++ 4 files changed, 454 insertions(+) create mode 100644 tests/unit/test_base_handler_parallel_transfer.py create mode 100644 tests/unit/test_render_video_effects.py create mode 100644 tests/unit/test_storage_upload_metrics.py create mode 100644 tests/unit/test_tracing.py diff --git a/tests/unit/test_base_handler_parallel_transfer.py b/tests/unit/test_base_handler_parallel_transfer.py new file mode 100644 index 0000000..3031bf0 --- /dev/null +++ b/tests/unit/test_base_handler_parallel_transfer.py @@ -0,0 +1,238 @@ +# -*- coding: utf-8 -*- + +import os +from contextlib import contextmanager +from types import SimpleNamespace + +import pytest + +from domain.config import WorkerConfig +from domain.result import TaskResult +from domain.task import TaskType +from handlers.base import BaseHandler + + +class _DummyApiClient: + pass + + +class _DummyHandler(BaseHandler): + def handle(self, task): + return TaskResult.ok({}) + + def get_supported_type(self): + return TaskType.RENDER_SEGMENT_VIDEO + + +def _create_handler(tmp_path): + config = WorkerConfig( + api_endpoint='http://127.0.0.1:18084/api', + access_key='TEST_ACCESS_KEY', + worker_id='test-worker', + temp_dir=str(tmp_path), + cache_enabled=False, + cache_dir=str(tmp_path / 'cache') + ) + return _DummyHandler(config, _DummyApiClient()) + + +def test_download_files_parallel_collects_success_and_failure(tmp_path, monkeypatch): + handler = _create_handler(tmp_path) + handler.task_download_concurrency = 3 + captured_calls = [] + + def _fake_download(url, dest, timeout=None, use_cache=True): + captured_calls.append((url, dest, timeout, use_cache)) + os.makedirs(os.path.dirname(dest), exist_ok=True) + with open(dest, 'wb') as file_obj: + file_obj.write(b'1') + return not url.endswith('/fail') + + monkeypatch.setattr(handler, 'download_file', _fake_download) + results = handler.download_files_parallel( + [ + {'key': 'first', 'url': 'https://example.com/first', 'dest': str(tmp_path / 'first.bin')}, + {'key': 'second', 'url': 'https://example.com/fail', 'dest': str(tmp_path / 'second.bin')}, + {'key': 'third', 'url': 'https://example.com/third', 'dest': str(tmp_path / 'third.bin'), 'use_cache': False}, + ], + timeout=15, + ) + + assert len(captured_calls) == 3 + assert results['first']['success'] is True + assert results['second']['success'] is False + assert results['third']['success'] is True + assert any(call_item[3] is False for call_item in captured_calls) + + +def test_download_files_parallel_rejects_duplicate_key(tmp_path): + handler = _create_handler(tmp_path) + with pytest.raises(ValueError, match='Duplicate download job key'): + handler.download_files_parallel( + [ + {'key': 'dup', 'url': 'https://example.com/1', 'dest': str(tmp_path / '1.bin')}, + {'key': 'dup', 'url': 'https://example.com/2', 'dest': str(tmp_path / '2.bin')}, + ] + ) + + +def test_upload_files_parallel_collects_urls(tmp_path, monkeypatch): + handler = _create_handler(tmp_path) + handler.task_upload_concurrency = 2 + + def _fake_upload(task_id, file_type, file_path, file_name=None): + if file_type == 'video': + return f'https://cdn.example.com/{task_id}/{file_name or "video.mp4"}' + return None + + monkeypatch.setattr(handler, 'upload_file', _fake_upload) + results = handler.upload_files_parallel( + [ + { + 'key': 'video_output', + 'task_id': 'task-1', + 'file_type': 'video', + 'file_path': str(tmp_path / 'video.mp4'), + 'file_name': 'output.mp4', + }, + { + 'key': 'audio_output', + 'task_id': 'task-1', + 'file_type': 'audio', + 'file_path': str(tmp_path / 'audio.aac'), + }, + ] + ) + + assert results['video_output']['success'] is True + assert results['video_output']['url'] == 'https://cdn.example.com/task-1/output.mp4' + assert results['audio_output']['success'] is False + assert results['audio_output']['url'] is None + + +def test_download_file_sets_lock_wait_ms_span_attribute(tmp_path, monkeypatch): + handler = _create_handler(tmp_path) + destination = tmp_path / "download.bin" + + class _FakeSpan: + def __init__(self): + self.attributes = {} + + def set_attribute(self, key, value): + self.attributes[key] = value + + fake_span = _FakeSpan() + + @contextmanager + def _fake_start_span(name, kind=None, attributes=None): + if attributes: + fake_span.attributes.update(attributes) + yield fake_span + + def _fake_get_or_download_with_metrics(url, dest, timeout=300, max_retries=5): + os.makedirs(os.path.dirname(dest), exist_ok=True) + with open(dest, 'wb') as file_obj: + file_obj.write(b'abc') + return True, {"lock_wait_ms": 1234, "lock_acquired": True, "cache_path_used": "cache"} + + monkeypatch.setattr("handlers.base.start_span", _fake_start_span) + monkeypatch.setattr( + handler.material_cache, + "get_or_download_with_metrics", + _fake_get_or_download_with_metrics + ) + + assert handler.download_file("https://example.com/file.bin", str(destination), timeout=1, use_cache=True) + assert fake_span.attributes["render.file.lock_wait_ms"] == 1234 + assert fake_span.attributes["render.file.lock_acquired"] is True + assert fake_span.attributes["render.file.cache_path_used"] == "cache" + + +def test_download_file_without_cache_sets_lock_wait_ms_zero(tmp_path, monkeypatch): + handler = _create_handler(tmp_path) + destination = tmp_path / "download-no-cache.bin" + + class _FakeSpan: + def __init__(self): + self.attributes = {} + + def set_attribute(self, key, value): + self.attributes[key] = value + + fake_span = _FakeSpan() + + @contextmanager + def _fake_start_span(name, kind=None, attributes=None): + if attributes: + fake_span.attributes.update(attributes) + yield fake_span + + def _fake_storage_download(url, dest, timeout=30): + os.makedirs(os.path.dirname(dest), exist_ok=True) + with open(dest, 'wb') as file_obj: + file_obj.write(b'def') + return True + + monkeypatch.setattr("handlers.base.start_span", _fake_start_span) + monkeypatch.setattr("handlers.base.storage.download_file", _fake_storage_download) + + assert handler.download_file("https://example.com/file.bin", str(destination), timeout=1, use_cache=False) + assert fake_span.attributes["render.file.lock_wait_ms"] == 0 + assert fake_span.attributes["render.file.lock_acquired"] is False + assert fake_span.attributes["render.file.cache_path_used"] == "direct" + + +def test_upload_file_sets_detailed_span_attributes(tmp_path, monkeypatch): + handler = _create_handler(tmp_path) + source_path = tmp_path / "upload.mp4" + source_path.write_bytes(b"abc123") + fake_span_attributes = {} + + class _FakeSpan: + def set_attribute(self, key, value): + fake_span_attributes[key] = value + + @contextmanager + def _fake_start_span(name, kind=None, attributes=None): + if attributes: + fake_span_attributes.update(attributes) + yield _FakeSpan() + + handler.api_client = SimpleNamespace( + get_upload_url=lambda *args, **kwargs: { + "uploadUrl": "https://example.com/upload", + "accessUrl": "https://cdn.example.com/output.mp4", + } + ) + + monkeypatch.setattr("handlers.base.start_span", _fake_start_span) + monkeypatch.setattr( + "handlers.base.storage.upload_file_with_metrics", + lambda *args, **kwargs: ( + True, + { + "upload_method": "http", + "http_attempts": 2, + "http_retry_count": 1, + "http_status_code": 200, + "http_replace_applied": True, + "content_type": "video/mp4", + "error_type": "", + "rclone_attempted": False, + "rclone_succeeded": False, + "rclone_fallback_http": False, + }, + ) + ) + monkeypatch.setattr(handler.material_cache, "add_to_cache", lambda *args, **kwargs: True) + + access_url = handler.upload_file("task-1", "video", str(source_path), "output.mp4") + assert access_url == "https://cdn.example.com/output.mp4" + assert fake_span_attributes["render.file.upload_success"] is True + assert fake_span_attributes["render.file.upload_method"] == "http" + assert fake_span_attributes["render.file.http_attempts"] == 2 + assert fake_span_attributes["render.file.http_retry_count"] == 1 + assert fake_span_attributes["render.file.http_status_code"] == 200 + assert fake_span_attributes["render.file.http_replace_applied"] is True + assert fake_span_attributes["render.file.content_type"] == "video/mp4" + assert fake_span_attributes["render.file.cache_write_back"] == "success" diff --git a/tests/unit/test_render_video_effects.py b/tests/unit/test_render_video_effects.py new file mode 100644 index 0000000..59417ad --- /dev/null +++ b/tests/unit/test_render_video_effects.py @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- + +import pytest + +from domain.config import WorkerConfig +from domain.task import Effect, OutputSpec, RenderSpec +from handlers.render_video import RenderSegmentVideoHandler + + +class _DummyApiClient: + pass + + +def _create_handler(tmp_path): + config = WorkerConfig( + api_endpoint='http://127.0.0.1:18084/api', + access_key='TEST_ACCESS_KEY', + worker_id='test-worker', + temp_dir=str(tmp_path), + cache_enabled=False, + cache_dir=str(tmp_path / 'cache') + ) + return RenderSegmentVideoHandler(config, _DummyApiClient()) + + +def test_get_zoom_params_with_valid_values(): + effect = Effect.from_string('zoom:1.5,1.35,2') + assert effect is not None + + start_sec, scale_factor, duration_sec = effect.get_zoom_params() + assert start_sec == pytest.approx(1.5) + assert scale_factor == pytest.approx(1.35) + assert duration_sec == pytest.approx(2.0) + + +@pytest.mark.parametrize( + 'effect_str', + [ + 'zoom:-1,0.9,-2', + 'zoom:nan,inf,0', + 'zoom:bad,value,data', + 'zoom:,,', + ], +) +def test_get_zoom_params_invalid_values_fallback_to_default(effect_str): + effect = Effect.from_string(effect_str) + assert effect is not None + assert effect.get_zoom_params() == (0.0, 1.2, 1.0) + + +def test_build_command_with_zoom_uses_filter_complex(tmp_path): + handler = _create_handler(tmp_path) + render_spec = RenderSpec(effects='zoom:1.5,1.4,2') + output_spec = OutputSpec(width=1080, height=1920, fps=30) + + command = handler._build_command( + input_file='input.mp4', + output_file='output.mp4', + render_spec=render_spec, + output_spec=output_spec, + duration_ms=6000, + ) + + assert '-filter_complex' in command + assert '-vf' not in command + + +def test_build_video_filters_zoom_and_camera_shot_stack_in_order(tmp_path): + handler = _create_handler(tmp_path) + render_spec = RenderSpec(effects='cameraShot:3,1|zoom:1,1.2,2') + output_spec = OutputSpec(width=1080, height=1920, fps=30) + + filters = handler._build_video_filters( + render_spec=render_spec, + output_spec=output_spec, + duration_ms=8000, + source_duration_sec=10.0, + ) + + camera_shot_marker = 'concat=n=2:v=1:a=0' + zoom_marker = "overlay=0:0:enable='between(t,1.0,3.0)'" + assert camera_shot_marker in filters + assert zoom_marker in filters + assert filters.index(camera_shot_marker) < filters.index(zoom_marker) diff --git a/tests/unit/test_storage_upload_metrics.py b/tests/unit/test_storage_upload_metrics.py new file mode 100644 index 0000000..d975af2 --- /dev/null +++ b/tests/unit/test_storage_upload_metrics.py @@ -0,0 +1,81 @@ +# -*- coding: utf-8 -*- + +import requests + +from services import storage + + +class _FakeResponse: + def __init__(self, status_code=200): + self.status_code = status_code + + def raise_for_status(self): + return None + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + return False + + +def test_upload_file_with_metrics_file_not_found(tmp_path): + missing_path = tmp_path / "missing.mp4" + success, metrics = storage.upload_file_with_metrics( + "https://example.com/upload", + str(missing_path), + max_retries=1, + timeout=1, + ) + + assert success is False + assert metrics["error_type"] == "file_not_found" + assert metrics["upload_method"] == "none" + + +def test_upload_file_with_metrics_http_success(tmp_path, monkeypatch): + source_path = tmp_path / "video.mp4" + source_path.write_bytes(b"content") + + monkeypatch.setattr("services.storage.requests.put", lambda *args, **kwargs: _FakeResponse(200)) + + success, metrics = storage.upload_file_with_metrics( + "https://example.com/upload", + str(source_path), + max_retries=3, + timeout=1, + ) + + assert success is True + assert metrics["upload_method"] == "http" + assert metrics["http_attempts"] == 1 + assert metrics["http_retry_count"] == 0 + assert metrics["http_status_code"] == 200 + assert metrics["content_type"] == "video/mp4" + + +def test_upload_file_with_metrics_retry_then_success(tmp_path, monkeypatch): + source_path = tmp_path / "audio.aac" + source_path.write_bytes(b"audio-bytes") + call_counter = {"count": 0} + + def _fake_put(*args, **kwargs): + call_counter["count"] += 1 + if call_counter["count"] == 1: + raise requests.exceptions.Timeout() + return _FakeResponse(200) + + monkeypatch.setattr("services.storage.requests.put", _fake_put) + + success, metrics = storage.upload_file_with_metrics( + "https://example.com/upload", + str(source_path), + max_retries=3, + timeout=1, + ) + + assert success is True + assert metrics["http_attempts"] == 2 + assert metrics["http_retry_count"] == 1 + assert metrics["http_status_code"] == 200 + assert metrics["error_type"] == "" diff --git a/tests/unit/test_tracing.py b/tests/unit/test_tracing.py new file mode 100644 index 0000000..b981447 --- /dev/null +++ b/tests/unit/test_tracing.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- + +import importlib +from types import SimpleNamespace + +import util.tracing as tracing_module + + +def _create_task_stub(): + task_type = SimpleNamespace(value="RENDER_SEGMENT_VIDEO") + return SimpleNamespace( + task_id="task-1001", + task_type=task_type, + get_job_id=lambda: "job-2002", + get_segment_id=lambda: "seg-3003", + ) + + +def test_task_trace_scope_sets_and_resets_context(monkeypatch): + monkeypatch.setenv("OTEL_ENABLED", "false") + tracing = importlib.reload(tracing_module) + + assert tracing.initialize_tracing("worker-1", "2.0.0") is False + assert tracing.get_current_task_context() is None + + with tracing.task_trace_scope(_create_task_stub()) as span: + assert span is None + context = tracing.get_current_task_context() + assert context is not None + assert context.task_id == "task-1001" + assert context.task_type == "RENDER_SEGMENT_VIDEO" + assert context.job_id == "job-2002" + assert context.segment_id == "seg-3003" + + with tracing.start_span("render.task.sample.step") as child_span: + assert child_span is None + + assert tracing.get_current_task_context() is None + + +def test_bind_trace_context_restores_previous_context(monkeypatch): + monkeypatch.setenv("OTEL_ENABLED", "false") + tracing = importlib.reload(tracing_module) + + tracing.initialize_tracing("worker-1", "2.0.0") + context = tracing.TaskTraceContext(task_id="task-1", task_type="FINALIZE_MP4") + + assert tracing.get_current_task_context() is None + with tracing.bind_trace_context(None, context): + assert tracing.get_current_task_context() == context + assert tracing.get_current_task_context() is None