Files
FrameTour-RenderWorker/services/storage.py
Jerry Yan ef4cf549c4 feat(storage): 增强文件上传功能并添加详细的指标追踪
- 在存储服务中新增 upload_file_with_metrics 方法,返回上传结果和详细指标
- 为上传操作添加完整的指标收集,包括 HTTP 尝试次数、重试次数、状态码等
- 集成 OpenTelemetry 追踪,记录文件上传的关键属性和错误标记
- 改进缓存写回逻辑,添加缓存写入失败的日志记录
- 支持 Rclone 上传方式的指标追踪和回退到 HTTP 的情况记录
- 优化本地文件大小检查,避免重复的文件系统调用
- 添加更详细的错误日志,包含上传方法、状态码和错误类型信息
2026-02-07 18:29:20 +08:00

297 lines
8.8 KiB
Python

# -*- coding: utf-8 -*-
"""
存储服务
提供文件上传/下载功能,支持 OSS 签名 URL 和 HTTP_REPLACE_MAP 环境变量。
"""
import os
import logging
import subprocess
from typing import Any, Dict, Optional, Tuple
from urllib.parse import unquote
import requests
logger = logging.getLogger(__name__)
# 文件扩展名到 Content-Type 的映射
_CONTENT_TYPE_MAP = {
'.mp4': 'video/mp4',
'.aac': 'audio/aac',
'.ts': 'video/mp2t',
'.m4a': 'audio/mp4',
}
def _get_content_type(file_path: str) -> str:
"""
根据文件扩展名获取 Content-Type
Args:
file_path: 文件路径
Returns:
Content-Type 字符串
"""
ext = os.path.splitext(file_path)[1].lower()
return _CONTENT_TYPE_MAP.get(ext, 'application/octet-stream')
def _apply_http_replace_map(url: str) -> str:
"""
应用 HTTP_REPLACE_MAP 环境变量替换 URL
Args:
url: 原始 URL
Returns:
替换后的 URL
"""
replace_map = os.getenv("HTTP_REPLACE_MAP", "")
if not replace_map:
return url
new_url = url
replace_list = [i.split("|", 1) for i in replace_map.split(",") if "|" in i]
for src, dst in replace_list:
new_url = new_url.replace(src, dst)
if new_url != url:
logger.debug(f"HTTP_REPLACE_MAP: {url} -> {new_url}")
return new_url
def upload_file(url: str, file_path: str, max_retries: int = 5, timeout: int = 60) -> bool:
"""兼容旧接口:仅返回上传是否成功。"""
result, _ = upload_file_with_metrics(
url=url,
file_path=file_path,
max_retries=max_retries,
timeout=timeout,
)
return result
def upload_file_with_metrics(
url: str,
file_path: str,
max_retries: int = 5,
timeout: int = 60
) -> Tuple[bool, Dict[str, Any]]:
"""
使用签名 URL 上传文件到 OSS
Args:
url: 签名 URL
file_path: 本地文件路径
max_retries: 最大重试次数
timeout: 超时时间(秒)
Returns:
(是否成功, 上传指标)
"""
metrics: Dict[str, Any] = {
"upload_method": "none",
"file_size_bytes": 0,
"content_type": "",
"http_attempts": 0,
"http_retry_count": 0,
"http_status_code": 0,
"http_replace_applied": False,
"rclone_attempted": False,
"rclone_succeeded": False,
"rclone_fallback_http": False,
"error_type": "",
}
if not os.path.exists(file_path):
logger.error(f"File not found: {file_path}")
metrics["error_type"] = "file_not_found"
return False, metrics
file_size = os.path.getsize(file_path)
metrics["file_size_bytes"] = file_size
logger.info(f"Uploading: {file_path} ({file_size} bytes)")
# 检查是否使用 rclone 上传
if os.getenv("UPLOAD_METHOD") == "rclone":
metrics["rclone_attempted"] = True
logger.debug(f"Uploading to: {url}")
result = _upload_with_rclone(url, file_path)
metrics["rclone_succeeded"] = result
if result:
metrics["upload_method"] = "rclone"
return True, metrics
# rclone 失败时回退到 HTTP
metrics["rclone_fallback_http"] = True
# 应用 HTTP_REPLACE_MAP 替换 URL
http_url = _apply_http_replace_map(url)
metrics["http_replace_applied"] = http_url != url
content_type = _get_content_type(file_path)
metrics["content_type"] = content_type
metrics["upload_method"] = "rclone_fallback_http" if metrics["rclone_fallback_http"] else "http"
logger.debug(f"Uploading to: {http_url} (Content-Type: {content_type})")
retries = 0
while retries < max_retries:
metrics["http_attempts"] = retries + 1
try:
with open(file_path, 'rb') as f:
with requests.put(
http_url,
data=f,
stream=True,
timeout=timeout,
headers={"Content-Type": content_type}
) as response:
status_code = int(getattr(response, 'status_code', 0) or 0)
metrics["http_status_code"] = status_code
response.raise_for_status()
logger.info(f"Upload succeeded: {file_path}")
metrics["error_type"] = ""
return True, metrics
except requests.exceptions.Timeout:
retries += 1
metrics["http_retry_count"] = retries
metrics["error_type"] = "timeout"
logger.warning(f"Upload timed out. Retrying {retries}/{max_retries}...")
except requests.exceptions.RequestException as e:
retries += 1
metrics["http_retry_count"] = retries
metrics["error_type"] = "request_exception"
response_obj = getattr(e, 'response', None)
status_code = getattr(response_obj, 'status_code', 0) if response_obj is not None else 0
if isinstance(status_code, int) and status_code > 0:
metrics["http_status_code"] = status_code
logger.warning(f"Upload failed ({e}). Retrying {retries}/{max_retries}...")
logger.error(f"Upload failed after {max_retries} retries: {file_path}")
return False, metrics
def _upload_with_rclone(url: str, file_path: str) -> bool:
"""
使用 rclone 上传文件
Args:
url: 目标 URL
file_path: 本地文件路径
Returns:
是否成功
"""
replace_map = os.getenv("RCLONE_REPLACE_MAP", "")
if not replace_map:
return False
config_file = os.getenv("RCLONE_CONFIG_FILE", "")
# 替换 URL
new_url = url
replace_list = [i.split("|", 1) for i in replace_map.split(",") if "|" in i]
for src, dst in replace_list:
new_url = new_url.replace(src, dst)
new_url = new_url.split("?", 1)[0] # 移除查询参数
new_url = unquote(new_url) # 解码 URL 编码的字符(如 %2F -> /)
if new_url == url:
return False
if new_url.startswith(("http://", "https://")):
logger.warning("rclone upload skipped: URL still starts with http after replace")
logger.debug(f"rclone upload skipped address: {new_url}")
return False
cmd = [
"rclone",
"copyto",
"--no-check-dest",
"--ignore-existing",
"--multi-thread-chunk-size",
"8M",
"--multi-thread-streams",
"8",
]
if config_file:
cmd.extend(["--config", config_file])
cmd.extend([file_path, new_url])
logger.debug(f"rclone command: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
logger.info(f"rclone upload succeeded: {file_path}")
return True
stderr = (result.stderr or '').strip()
stderr = stderr[:500] if stderr else ""
logger.warning(f"rclone upload failed (code={result.returncode}): {file_path} {stderr}")
return False
def download_file(
url: str,
file_path: str,
max_retries: int = 5,
timeout: int = 30,
skip_if_exist: bool = False
) -> bool:
"""
使用签名 URL 下载文件
Args:
url: 签名 URL
file_path: 本地文件路径
max_retries: 最大重试次数
timeout: 超时时间(秒)
skip_if_exist: 如果文件存在则跳过
Returns:
是否成功
"""
# 如果文件已存在且跳过
if skip_if_exist and os.path.exists(file_path):
logger.debug(f"File exists, skipping download: {file_path}")
return True
logger.debug(f"Downloading: {url}")
# 确保目标目录存在
file_dir = os.path.dirname(file_path)
if file_dir:
os.makedirs(file_dir, exist_ok=True)
# 应用 HTTP_REPLACE_MAP 替换 URL
http_url = _apply_http_replace_map(url)
retries = 0
while retries < max_retries:
try:
with requests.get(http_url, timeout=timeout, stream=True) as response:
response.raise_for_status()
with open(file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
file_size = os.path.getsize(file_path)
logger.info(f"Download succeeded: {file_path} ({file_size} bytes)")
return True
except requests.exceptions.Timeout:
retries += 1
logger.warning(f"Download timed out. Retrying {retries}/{max_retries}...")
except requests.exceptions.RequestException as e:
retries += 1
logger.warning(f"Download failed ({e}). Retrying {retries}/{max_retries}...")
logger.error(f"Download failed after {max_retries} retries")
logger.debug(f"Download failed source address: {url}")
return False