Files
FrameTour-RenderWorker/index.py
Jerry Yan d955def63c feat(tracing): 增强文件下载上传的日志记录和追踪功能
- 添加任务上下文信息到日志前缀,便于追踪具体任务
- 在跨度中增加文件源URL和上传URL的属性记录
- 将存储服务中的info级别日志调整为debug级别以减少冗余输出
- 添加文件访问地址的调试日志输出
- 优化根日志级别设置允许DEBUG日志流入处理器
- 修复重试失败后的错误日志格式问题
2026-02-07 00:26:01 +08:00

239 lines
7.3 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
RenderWorker v2 入口
支持 v2 API 协议的渲染 Worker,处理以下任务类型:
- RENDER_SEGMENT_VIDEO: 渲染视频片段
- PREPARE_JOB_AUDIO: 生成全局音频
- PACKAGE_SEGMENT_TS: 封装 TS 分片
- FINALIZE_MP4: 产出最终 MP4
使用方法:
python index.py
环境变量:
API_ENDPOINT_V2: v2 API 端点(或使用 API_ENDPOINT)
ACCESS_KEY: Worker 认证密钥
WORKER_ID: Worker ID(默认 100001)
MAX_CONCURRENCY: 最大并发数(默认 4)
HEARTBEAT_INTERVAL: 心跳间隔秒数(默认 5)
TEMP_DIR: 临时文件目录
"""
import sys
import time
import signal
import logging
import os
from logging.handlers import RotatingFileHandler
from dotenv import load_dotenv
from domain.config import WorkerConfig
from services.api_client import APIClientV2
from services.task_executor import TaskExecutor
from constant import SOFTWARE_VERSION
from util.tracing import initialize_tracing, shutdown_tracing
# 日志配置
def setup_logging():
"""配置日志系统,输出到控制台和文件"""
# 日志格式
log_format = '[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s'
date_format = '%Y-%m-%d %H:%M:%S'
formatter = logging.Formatter(log_format, date_format)
# 获取根logger
root_logger = logging.getLogger()
# 允许 DEBUG 日志流入各 handler(具体是否落盘由 handler 级别决定)
root_logger.setLevel(logging.DEBUG)
# 清除已有的handlers(避免重复)
root_logger.handlers.clear()
# 1. 控制台handler(只输出WARNING及以上级别)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.WARNING)
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
# 确定日志目录:PyInstaller 打包后 __file__ 指向临时解压目录,日志会随之丢失
# 使用 sys.frozen 判断是否为打包环境,打包后取 exe 所在目录
if getattr(sys, 'frozen', False):
log_dir = os.path.dirname(sys.executable)
else:
log_dir = os.path.dirname(os.path.abspath(__file__))
# 2. 所有日志文件handler(all_log.log)
all_log_path = os.path.join(log_dir, 'all_log.log')
all_log_handler = RotatingFileHandler(
all_log_path,
maxBytes=10*1024*1024, # 10MB
backupCount=5,
encoding='utf-8'
)
all_log_handler.setLevel(logging.DEBUG) # 记录所有级别
all_log_handler.setFormatter(formatter)
root_logger.addHandler(all_log_handler)
# 3. 错误日志文件handler(error.log)
error_log_path = os.path.join(log_dir, 'error.log')
error_log_handler = RotatingFileHandler(
error_log_path,
maxBytes=10*1024*1024, # 10MB
backupCount=5,
encoding='utf-8'
)
error_log_handler.setLevel(logging.ERROR) # 只记录ERROR及以上
error_log_handler.setFormatter(formatter)
root_logger.addHandler(error_log_handler)
# 初始化日志系统
setup_logging()
logger = logging.getLogger('worker')
class WorkerV2:
"""
v2 渲染 Worker 主类
负责:
- 配置加载
- API 客户端初始化
- 任务执行器管理
- 主循环运行
- 优雅退出处理
"""
def __init__(self):
"""初始化 Worker"""
# 加载配置
try:
self.config = WorkerConfig.from_env()
except ValueError as e:
logger.error(f"Configuration error: {e}")
sys.exit(1)
tracing_enabled = initialize_tracing(self.config.worker_id, SOFTWARE_VERSION)
logger.info("OTel tracing %s", "enabled" if tracing_enabled else "disabled")
# 初始化 API 客户端
self.api_client = APIClientV2(self.config)
# 初始化任务执行器
self.task_executor = TaskExecutor(self.config, self.api_client)
# 运行状态
self.running = True
# 确保临时目录存在
self.config.ensure_temp_dir()
# 注册信号处理器
self._setup_signal_handlers()
def _setup_signal_handlers(self):
"""设置信号处理器"""
# Windows 不支持 SIGTERM
signal.signal(signal.SIGINT, self._signal_handler)
if hasattr(signal, 'SIGTERM'):
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
"""
信号处理,优雅退出
Args:
signum: 信号编号
frame: 当前栈帧
"""
signal_name = signal.Signals(signum).name
logger.info(f"Received signal {signal_name}, initiating shutdown...")
self.running = False
def run(self):
"""主循环"""
logger.info("=" * 60)
logger.info("RenderWorker v2 Starting")
logger.info("=" * 60)
logger.info(f"Worker ID: {self.config.worker_id}")
logger.info(f"API Endpoint: {self.config.api_endpoint}")
logger.info(f"Max Concurrency: {self.config.max_concurrency}")
logger.info(f"Heartbeat Interval: {self.config.heartbeat_interval}s")
logger.info(f"Capabilities: {', '.join(self.config.capabilities)}")
logger.info(f"Temp Directory: {self.config.temp_dir}")
logger.info("=" * 60)
consecutive_errors = 0
max_consecutive_errors = 10
while self.running:
try:
# 心跳同步并拉取任务
current_task_ids = self.task_executor.get_current_task_ids()
tasks = self.api_client.sync(current_task_ids)
# 提交新任务
for task in tasks:
if self.task_executor.submit_task(task):
logger.info(f"Submitted task: {task.task_id} ({task.task_type.value})")
# 重置错误计数
consecutive_errors = 0
# 等待下次心跳
time.sleep(self.config.heartbeat_interval)
except KeyboardInterrupt:
logger.info("Keyboard interrupt received")
self.running = False
except Exception as e:
consecutive_errors += 1
logger.error(f"Worker loop error ({consecutive_errors}/{max_consecutive_errors}): {e}", exc_info=True)
# 连续错误过多,增加等待时间
if consecutive_errors >= max_consecutive_errors:
logger.error("Too many consecutive errors, waiting 30 seconds...")
time.sleep(30)
consecutive_errors = 0
else:
time.sleep(5)
# 优雅关闭
self._shutdown()
def _shutdown(self):
"""优雅关闭"""
logger.info("Shutting down...")
# 等待当前任务完成
current_count = self.task_executor.get_current_task_count()
if current_count > 0:
logger.info(f"Waiting for {current_count} running task(s) to complete...")
# 关闭执行器
self.task_executor.shutdown(wait=True)
# 关闭 API 客户端
self.api_client.close()
shutdown_tracing()
logger.info("Worker stopped")
def main():
"""主函数"""
# 加载 .env 文件(如果存在)
load_dotenv()
logger.info(f"RenderWorker v{SOFTWARE_VERSION}")
# 创建并运行 Worker
worker = WorkerV2()
worker.run()
if __name__ == '__main__':
main()