60 lines
1.8 KiB
Python

import logging
import os
import requests
logger = logging.getLogger(__name__)
def upload_to_oss(url, file_path):
"""
使用签名URL上传文件到OSS
:param str url: 签名URL
:param str file_path: 文件路径
:return bool: 是否成功
"""
max_retries = 5
retries = 0
while retries < max_retries:
try:
with open(file_path, 'rb') as f:
response = requests.put(url, data=f, timeout=60) # 设置超时时间为1分钟
if response.status_code == 200:
return True
except requests.exceptions.Timeout:
retries += 1
logger.warning(f"Upload timed out. Retrying {retries}/{max_retries}...")
except Exception as e:
logger.warning(f"Upload failed. Retrying {retries}/{max_retries}...")
retries += 1
return False
def download_from_oss(url, file_path):
"""
使用签名URL下载文件到OSS
:param str url: 签名URL
:param Union[LiteralString, str, bytes] file_path: 文件路径
:return bool: 是否成功
"""
logging.info("download_from_oss: %s", url)
file_dir, file_name = os.path.split(file_path)
if file_dir:
if not os.path.exists(file_dir):
os.makedirs(file_dir)
max_retries = 5
retries = 0
while retries < max_retries:
try:
response = requests.get(url, timeout=15) # 设置超时时间
with open(file_path, 'wb') as f:
f.write(response.content)
return True
except requests.exceptions.Timeout:
retries += 1
logger.warning(f"Download timed out. Retrying {retries}/{max_retries}...")
except Exception as e:
logger.warning(f"Download failed. Retrying {retries}/{max_retries}...")
retries += 1
return False