diff --git a/handlers/base.py b/handlers/base.py index fd71626..ca14fb9 100644 --- a/handlers/base.py +++ b/handlers/base.py @@ -734,16 +734,22 @@ class BaseHandler(TaskHandler, ABC): Returns: 访问 URL,失败返回 None """ + local_file_exists = os.path.exists(file_path) + local_file_size = os.path.getsize(file_path) if local_file_exists else 0 with start_span( "render.task.file.upload", kind=SpanKind.CLIENT, attributes={ "render.file.type": file_type, "render.file.path": file_path, + "render.file.timeout_seconds": self.config.upload_timeout, + "render.file.local_exists": local_file_exists, + "render.file.local_size_bytes": local_file_size, }, ) as span: upload_info = self.api_client.get_upload_url(task_id, file_type, file_name) if not upload_info: + mark_span_error(span, "get upload url failed", ErrorCode.E_UPLOAD_FAILED.value) logger.error(f"[task:{task_id}] Failed to get upload URL") return None @@ -751,6 +757,7 @@ class BaseHandler(TaskHandler, ABC): access_url = upload_info.get('accessUrl') if not upload_url: + mark_span_error(span, "invalid upload url response", ErrorCode.E_UPLOAD_FAILED.value) logger.error(f"[task:{task_id}] Invalid upload URL response") return None @@ -763,9 +770,40 @@ class BaseHandler(TaskHandler, ABC): span.set_attribute("render.file.access_url", access_url) try: - result = storage.upload_file(upload_url, file_path, timeout=self.config.upload_timeout) + result, upload_metrics = storage.upload_file_with_metrics( + upload_url, + file_path, + timeout=self.config.upload_timeout, + ) + upload_method = str(upload_metrics.get("upload_method", "unknown")) + http_attempts = int(upload_metrics.get("http_attempts", 0)) + http_retry_count = int(upload_metrics.get("http_retry_count", 0)) + http_status_code = int(upload_metrics.get("http_status_code", 0)) + http_replace_applied = bool(upload_metrics.get("http_replace_applied", False)) + content_type = str(upload_metrics.get("content_type", "")) + error_type = str(upload_metrics.get("error_type", "")) + rclone_attempted = bool(upload_metrics.get("rclone_attempted", False)) + rclone_succeeded = bool(upload_metrics.get("rclone_succeeded", False)) + rclone_fallback_http = bool(upload_metrics.get("rclone_fallback_http", False)) + + if span is not None: + span.set_attribute("render.file.upload_success", bool(result)) + span.set_attribute("render.file.upload_method", upload_method) + span.set_attribute("render.file.http_attempts", http_attempts) + span.set_attribute("render.file.http_retry_count", http_retry_count) + span.set_attribute("render.file.http_replace_applied", http_replace_applied) + span.set_attribute("render.file.rclone_attempted", rclone_attempted) + span.set_attribute("render.file.rclone_succeeded", rclone_succeeded) + span.set_attribute("render.file.rclone_fallback_http", rclone_fallback_http) + if content_type: + span.set_attribute("render.file.content_type", content_type) + if http_status_code > 0: + span.set_attribute("render.file.http_status_code", http_status_code) + if error_type: + span.set_attribute("render.file.error_type", error_type) + if result: - file_size = os.path.getsize(file_path) + file_size = local_file_size if local_file_size > 0 else os.path.getsize(file_path) logger.info( f"[task:{task_id}] Uploaded: {file_path} ({file_size} bytes)" ) @@ -773,12 +811,26 @@ class BaseHandler(TaskHandler, ABC): if span is not None: span.set_attribute("render.file.size_bytes", file_size) + cache_write_back = "skipped" if access_url: - self.material_cache.add_to_cache(access_url, file_path) + cache_added = self.material_cache.add_to_cache(access_url, file_path) + cache_write_back = "success" if cache_added else "failed" + if not cache_added: + logger.warning(f"[task:{task_id}] Upload cache write back failed: {file_path}") + if span is not None: + span.set_attribute("render.file.cache_write_back", cache_write_back) return access_url - logger.error(f"[task:{task_id}] Upload failed: {file_path}") + mark_span_error( + span, + f"upload failed(method={upload_method}, status={http_status_code}, retries={http_retry_count}, error={error_type})", + ErrorCode.E_UPLOAD_FAILED.value + ) + logger.error( + f"[task:{task_id}] Upload failed: {file_path}, method={upload_method}, " + f"http_status={http_status_code}, retries={http_retry_count}, error_type={error_type}" + ) return None except Exception as e: mark_span_error(span, str(e), ErrorCode.E_UPLOAD_FAILED.value) diff --git a/services/storage.py b/services/storage.py index 4387bcf..879249a 100644 --- a/services/storage.py +++ b/services/storage.py @@ -8,7 +8,7 @@ import os import logging import subprocess -from typing import Optional +from typing import Any, Dict, Optional, Tuple from urllib.parse import unquote import requests @@ -65,6 +65,22 @@ def _apply_http_replace_map(url: str) -> str: def upload_file(url: str, file_path: str, max_retries: int = 5, timeout: int = 60) -> bool: + """兼容旧接口:仅返回上传是否成功。""" + result, _ = upload_file_with_metrics( + url=url, + file_path=file_path, + max_retries=max_retries, + timeout=timeout, + ) + return result + + +def upload_file_with_metrics( + url: str, + file_path: str, + max_retries: int = 5, + timeout: int = 60 +) -> Tuple[bool, Dict[str, Any]]: """ 使用签名 URL 上传文件到 OSS @@ -75,30 +91,54 @@ def upload_file(url: str, file_path: str, max_retries: int = 5, timeout: int = 6 timeout: 超时时间(秒) Returns: - 是否成功 + (是否成功, 上传指标) """ + metrics: Dict[str, Any] = { + "upload_method": "none", + "file_size_bytes": 0, + "content_type": "", + "http_attempts": 0, + "http_retry_count": 0, + "http_status_code": 0, + "http_replace_applied": False, + "rclone_attempted": False, + "rclone_succeeded": False, + "rclone_fallback_http": False, + "error_type": "", + } + if not os.path.exists(file_path): logger.error(f"File not found: {file_path}") - return False + metrics["error_type"] = "file_not_found" + return False, metrics file_size = os.path.getsize(file_path) + metrics["file_size_bytes"] = file_size logger.info(f"Uploading: {file_path} ({file_size} bytes)") # 检查是否使用 rclone 上传 if os.getenv("UPLOAD_METHOD") == "rclone": + metrics["rclone_attempted"] = True logger.debug(f"Uploading to: {url}") result = _upload_with_rclone(url, file_path) + metrics["rclone_succeeded"] = result if result: - return True + metrics["upload_method"] = "rclone" + return True, metrics # rclone 失败时回退到 HTTP + metrics["rclone_fallback_http"] = True # 应用 HTTP_REPLACE_MAP 替换 URL http_url = _apply_http_replace_map(url) + metrics["http_replace_applied"] = http_url != url content_type = _get_content_type(file_path) + metrics["content_type"] = content_type + metrics["upload_method"] = "rclone_fallback_http" if metrics["rclone_fallback_http"] else "http" logger.debug(f"Uploading to: {http_url} (Content-Type: {content_type})") retries = 0 while retries < max_retries: + metrics["http_attempts"] = retries + 1 try: with open(file_path, 'rb') as f: with requests.put( @@ -108,19 +148,30 @@ def upload_file(url: str, file_path: str, max_retries: int = 5, timeout: int = 6 timeout=timeout, headers={"Content-Type": content_type} ) as response: + status_code = int(getattr(response, 'status_code', 0) or 0) + metrics["http_status_code"] = status_code response.raise_for_status() logger.info(f"Upload succeeded: {file_path}") - return True + metrics["error_type"] = "" + return True, metrics except requests.exceptions.Timeout: retries += 1 + metrics["http_retry_count"] = retries + metrics["error_type"] = "timeout" logger.warning(f"Upload timed out. Retrying {retries}/{max_retries}...") except requests.exceptions.RequestException as e: retries += 1 + metrics["http_retry_count"] = retries + metrics["error_type"] = "request_exception" + response_obj = getattr(e, 'response', None) + status_code = getattr(response_obj, 'status_code', 0) if response_obj is not None else 0 + if isinstance(status_code, int) and status_code > 0: + metrics["http_status_code"] = status_code logger.warning(f"Upload failed ({e}). Retrying {retries}/{max_retries}...") logger.error(f"Upload failed after {max_retries} retries: {file_path}") - return False + return False, metrics def _upload_with_rclone(url: str, file_path: str) -> bool: