Files
FrameTour-RenderWorker/tests/unit/test_storage_upload_metrics.py
Jerry Yan 3cb2f8d02a test(handlers): 添加基础处理器并行传输相关单元测试
- 实现 download_files_parallel 方法的并发下载功能测试
- 验证上传文件并行处理收集URL功能的正确性
- 测试下载文件时设置锁等待时间跨度属性
- 验证无缓存下载时锁等待时间为零的场景
- 测试上传文件时设置详细的跨度属性功能
- 添加渲染视频效果相关的参数解析测试
- 实现存储服务上传指标
2026-02-07 18:29:54 +08:00

82 lines
2.2 KiB
Python

# -*- coding: utf-8 -*-
import requests
from services import storage
class _FakeResponse:
def __init__(self, status_code=200):
self.status_code = status_code
def raise_for_status(self):
return None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
return False
def test_upload_file_with_metrics_file_not_found(tmp_path):
missing_path = tmp_path / "missing.mp4"
success, metrics = storage.upload_file_with_metrics(
"https://example.com/upload",
str(missing_path),
max_retries=1,
timeout=1,
)
assert success is False
assert metrics["error_type"] == "file_not_found"
assert metrics["upload_method"] == "none"
def test_upload_file_with_metrics_http_success(tmp_path, monkeypatch):
source_path = tmp_path / "video.mp4"
source_path.write_bytes(b"content")
monkeypatch.setattr("services.storage.requests.put", lambda *args, **kwargs: _FakeResponse(200))
success, metrics = storage.upload_file_with_metrics(
"https://example.com/upload",
str(source_path),
max_retries=3,
timeout=1,
)
assert success is True
assert metrics["upload_method"] == "http"
assert metrics["http_attempts"] == 1
assert metrics["http_retry_count"] == 0
assert metrics["http_status_code"] == 200
assert metrics["content_type"] == "video/mp4"
def test_upload_file_with_metrics_retry_then_success(tmp_path, monkeypatch):
source_path = tmp_path / "audio.aac"
source_path.write_bytes(b"audio-bytes")
call_counter = {"count": 0}
def _fake_put(*args, **kwargs):
call_counter["count"] += 1
if call_counter["count"] == 1:
raise requests.exceptions.Timeout()
return _FakeResponse(200)
monkeypatch.setattr("services.storage.requests.put", _fake_put)
success, metrics = storage.upload_file_with_metrics(
"https://example.com/upload",
str(source_path),
max_retries=3,
timeout=1,
)
assert success is True
assert metrics["http_attempts"] == 2
assert metrics["http_retry_count"] == 1
assert metrics["http_status_code"] == 200
assert metrics["error_type"] == ""