117 lines
5.5 KiB
Python
117 lines
5.5 KiB
Python
import logging
|
|
import os
|
|
|
|
import requests
|
|
from opentelemetry.trace import Status, StatusCode
|
|
|
|
from telemetry import get_tracer
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def upload_to_oss(url, file_path):
|
|
"""
|
|
使用签名URL上传文件到OSS
|
|
:param str url: 签名URL
|
|
:param str file_path: 文件路径
|
|
:return bool: 是否成功
|
|
"""
|
|
tracer = get_tracer(__name__)
|
|
with tracer.start_as_current_span("upload_to_oss") as span:
|
|
span.set_attribute("file.url", url)
|
|
span.set_attribute("file.path", file_path)
|
|
span.set_attribute("file.size", os.path.getsize(file_path))
|
|
max_retries = 5
|
|
retries = 0
|
|
if os.getenv("UPLOAD_METHOD") == "rclone":
|
|
with tracer.start_as_current_span("rclone_to_oss") as r_span:
|
|
replace_map = os.getenv("RCLONE_REPLACE_MAP")
|
|
r_span.set_attribute("rclone.replace_map", replace_map)
|
|
if replace_map != "":
|
|
replace_list = [i.split("|", 1) for i in replace_map.split(",")]
|
|
new_url = url
|
|
for (_src, _dst) in replace_list:
|
|
new_url = new_url.replace(_src, _dst)
|
|
new_url = new_url.split("?", 1)[0]
|
|
r_span.set_attribute("rclone.target_dir", new_url)
|
|
if new_url != url:
|
|
result = os.system(f"rclone copyto --no-check-dest --ignore-existing --multi-thread-chunk-size 32M --multi-thread-streams 8 {file_path} {new_url}")
|
|
r_span.set_attribute("rclone.result", result)
|
|
if result == 0:
|
|
span.set_status(Status(StatusCode.OK))
|
|
return True
|
|
else:
|
|
span.set_status(Status(StatusCode.ERROR))
|
|
while retries < max_retries:
|
|
with tracer.start_as_current_span("upload_to_oss.request") as req_span:
|
|
req_span.set_attribute("http.retry_count", retries)
|
|
try:
|
|
req_span.set_attribute("http.method", "PUT")
|
|
req_span.set_attribute("http.url", url)
|
|
with open(file_path, 'rb') as f:
|
|
response = requests.put(url, data=f, stream=True, timeout=60, headers={"Content-Type": "video/mp4"})
|
|
req_span.set_attribute("http.status_code", response.status_code)
|
|
req_span.set_attribute("http.response", response.text)
|
|
response.raise_for_status()
|
|
req_span.set_status(Status(StatusCode.OK))
|
|
span.set_status(Status(StatusCode.OK))
|
|
return True
|
|
except requests.exceptions.Timeout:
|
|
req_span.set_attribute("http.error", "Timeout")
|
|
req_span.set_status(Status(StatusCode.ERROR))
|
|
retries += 1
|
|
logger.warning(f"Upload timed out. Retrying {retries}/{max_retries}...")
|
|
except Exception as e:
|
|
req_span.set_attribute("http.error", str(e))
|
|
req_span.set_status(Status(StatusCode.ERROR))
|
|
retries += 1
|
|
logger.warning(f"Upload failed. Retrying {retries}/{max_retries}...")
|
|
span.set_status(Status(StatusCode.ERROR))
|
|
return False
|
|
|
|
|
|
def download_from_oss(url, file_path):
|
|
"""
|
|
使用签名URL下载文件到OSS
|
|
:param str url: 签名URL
|
|
:param Union[LiteralString, str, bytes] file_path: 文件路径
|
|
:return bool: 是否成功
|
|
"""
|
|
tracer = get_tracer(__name__)
|
|
with tracer.start_as_current_span("download_from_oss") as span:
|
|
span.set_attribute("file.url", url)
|
|
span.set_attribute("file.path", file_path)
|
|
logging.info("download_from_oss: %s", url)
|
|
file_dir, file_name = os.path.split(file_path)
|
|
if file_dir:
|
|
if not os.path.exists(file_dir):
|
|
os.makedirs(file_dir)
|
|
max_retries = 5
|
|
retries = 0
|
|
while retries < max_retries:
|
|
with tracer.start_as_current_span("download_from_oss.request") as req_span:
|
|
req_span.set_attribute("http.retry_count", retries)
|
|
try:
|
|
req_span.set_attribute("http.method", "GET")
|
|
req_span.set_attribute("http.url", url)
|
|
response = requests.get(url, timeout=15) # 设置超时时间
|
|
req_span.set_attribute("http.status_code", response.status_code)
|
|
with open(file_path, 'wb') as f:
|
|
f.write(response.content)
|
|
req_span.set_attribute("file.size", os.path.getsize(file_path))
|
|
req_span.set_status(Status(StatusCode.OK))
|
|
span.set_status(Status(StatusCode.OK))
|
|
return True
|
|
except requests.exceptions.Timeout:
|
|
req_span.set_attribute("http.error", "Timeout")
|
|
req_span.set_status(Status(StatusCode.ERROR))
|
|
retries += 1
|
|
logger.warning(f"Download timed out. Retrying {retries}/{max_retries}...")
|
|
except Exception as e:
|
|
req_span.set_attribute("http.error", str(e))
|
|
req_span.set_status(Status(StatusCode.ERROR))
|
|
retries += 1
|
|
logger.warning(f"Download failed. Retrying {retries}/{max_retries}...")
|
|
span.set_status(Status(StatusCode.ERROR))
|
|
return False
|