Files

53 lines
1.7 KiB
Python

# -- encoding: utf-8 --
import os
import time
from random import uniform
from loguru import logger
from sqlalchemy import create_engine, inspect
from sqlalchemy.engine import URL
class SQLManager:
@staticmethod
def create_connect(max_retries=5, base_delay=1):
"""
连接到 MySQL 数据库,使用 SQLAlchemy 和 PyMySQL。
:param max_retries: 最大重试次数
:param base_delay: 基础时延
:return: 返回 SQLAlchemy 连接对象
"""
connection_url = URL.create(
drivername="mysql+pymysql",
username=os.getenv("MYSQL_USER", "root"),
password=os.getenv("MYSQL_PASSWORD", "password"),
host=os.getenv("MYSQL_HOST", "mysql"),
port=os.getenv("MYSQL_PORT", 3306),
database=os.getenv("MYSQL_DATABASE", "datamate"),
query={"charset": "utf8mb4"},
)
attempt = 0
while True:
try:
engine = create_engine(connection_url, pool_pre_ping=True, isolation_level="AUTOCOMMIT")
return engine.connect()
except Exception as e:
logger.error(f"Attempt {attempt + 1} failed with error: {str(e)}")
if attempt >= max_retries - 1:
raise
wait_time = min(30, base_delay * (2 ** attempt)) # 不超过30秒的最大延时
jitter = uniform(-wait_time / 4, wait_time / 4) # 增加随机抖动因子
time.sleep(wait_time + jitter)
attempt += 1
if __name__ == "__main__":
with SQLManager.create_connect() as connection:
inspector = inspect(connection)
print(inspector.get_table_names())