Compare commits

5 Commits

Author SHA1 Message Date
d7704005b6 feat(entity/ffmpeg.py): 添加grid4效果支持在ffmpeg.py中增加了对grid4效果的支持。该功能允许用户通过指定参数来创建一个四宫格视频布局,每个格子显示不同的视频片段,并且可以设置延迟时间以实现更丰富的视觉效果。具体改动包括:
- 解析`grid4`效果的参数,如果未提供则默认为1。
- 根据提供的或默认的分辨率分割视频流为四个部分。
-为每个分割后的视频流应用缩放和时间延迟处理。
- 创建黑色背景并使用overlay滤镜将处理后的视频流放置于正确的位置上,形成最终的四宫格布局。
2025-09-18 17:01:03 +08:00
f85ccea933 feat(constant): 更新软件版本号至 0.0.6- 在 constant/__init__.py 文件中将 SOFTWARE_VERSION 从 '0.0.5' 修改为 '0.0.6'
- 在 entity/ffmpeg.py 文件中添加了新的视频效果处理逻辑,支持显示特定时长的视频片段
2025-09-18 09:42:57 +08:00
0c7181911e refactor(entity): 优化视频变速和缩放效果处理
-将视频变速实现从 minterpolate 改为使用 setpts,避免 PTS 冲突问题
-简化缩放效果处理逻辑
2025-09-12 18:01:41 +08:00
cf43f6379e feat(ffmpeg): 使用 minterpolate 替代 fps 调整视频速度
- 将视频变速功能从直接调整帧率改为使用 minterpolate 滤镜- 通过设置 fps 和 mi_mode 参数实现平滑的视频慢放效果
- 解决了直接调整帧率可能导致的 PTS 冲突问题
2025-09-12 16:50:53 +08:00
ce8854404b fix(entity): 修复视频慢放时 PTS 冲突问题
- 修改视频变速功能,通过改变帧率实现慢放效果
-避免使用 setpts滤镜导致的 PTS 冲突
- 优化代码结构,提高可读性和可维护性
2025-09-12 14:54:01 +08:00
25 changed files with 226 additions and 2011 deletions

3
.gitignore vendored
View File

@@ -31,4 +31,5 @@ target/
.venv .venv
venv/ venv/
cython_debug/ cython_debug/
.env .env
.serena

12
app.py
View File

@@ -4,14 +4,12 @@ import flask
import config import config
import biz.task import biz.task
from services import DefaultTemplateService import template
from telemetry import init_opentelemetry from telemetry import init_opentelemetry
from template import load_local_template
from util import api from util import api
# 使用新的服务架构 load_local_template()
template_service = DefaultTemplateService()
template_service.load_local_templates()
import logging import logging
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
@@ -29,11 +27,11 @@ def do_nothing():
@app.post('/<task_id>') @app.post('/<task_id>')
def do_task(task_id): def do_task(task_id):
task_info = api.get_task_info(task_id) task_info = api.get_task_info(task_id)
local_template_info = template_service.get_template(task_info.get("templateId")) local_template_info = template.get_template_def(task_info.get("templateId"))
template_info = api.get_template_info(task_info.get("templateId")) template_info = api.get_template_info(task_info.get("templateId"))
if local_template_info: if local_template_info:
if local_template_info.get("updateTime") != template_info.get("updateTime"): if local_template_info.get("updateTime") != template_info.get("updateTime"):
template_service.download_template(task_info.get("templateId")) template.download_template(task_info.get("templateId"))
biz.task.start_task(task_info) biz.task.start_task(task_info)
return "OK" return "OK"

View File

@@ -5,10 +5,7 @@ from concurrent.futures import ThreadPoolExecutor
from opentelemetry.trace import Status, StatusCode from opentelemetry.trace import Status, StatusCode
# 使用新架构组件,保持对旧FfmpegTask的兼容
from entity.ffmpeg import FfmpegTask from entity.ffmpeg import FfmpegTask
from entity.render_task import RenderTask, TaskType
from services import DefaultRenderService
import logging import logging
from util import ffmpeg, oss from util import ffmpeg, oss
@@ -16,14 +13,6 @@ from util.ffmpeg import fade_out_audio
from telemetry import get_tracer from telemetry import get_tracer
logger = logging.getLogger('biz/ffmpeg') logger = logging.getLogger('biz/ffmpeg')
_render_service = None
def _get_render_service():
"""获取渲染服务实例"""
global _render_service
if _render_service is None:
_render_service = DefaultRenderService()
return _render_service
def parse_ffmpeg_task(task_info, template_info): def parse_ffmpeg_task(task_info, template_info):
@@ -141,25 +130,24 @@ def check_placeholder_exist_with_count(placeholder_id, task_params, required_cou
def start_ffmpeg_task(ffmpeg_task): def start_ffmpeg_task(ffmpeg_task):
"""启动FFmpeg任务 - 使用新的渲染服务"""
tracer = get_tracer(__name__) tracer = get_tracer(__name__)
with tracer.start_as_current_span("start_ffmpeg_task") as span: with tracer.start_as_current_span("start_ffmpeg_task") as span:
try: for task in ffmpeg_task.analyze_input_render_tasks():
# 使用新的渲染服务 result = start_ffmpeg_task(task)
render_service = _get_render_service() if not result:
result = render_service.render(ffmpeg_task) return False
ffmpeg_task.correct_task_type()
if result: span.set_attribute("task.type", ffmpeg_task.task_type)
span.set_status(Status(StatusCode.OK)) span.set_attribute("task.center_cut", str(ffmpeg_task.center_cut))
else: span.set_attribute("task.frame_rate", ffmpeg_task.frame_rate)
span.set_status(Status(StatusCode.ERROR)) span.set_attribute("task.resolution", str(ffmpeg_task.resolution))
span.set_attribute("task.ext_data", json.dumps(ffmpeg_task.ext_data))
return result result = ffmpeg.start_render(ffmpeg_task)
if not result:
except Exception as e:
span.set_status(Status(StatusCode.ERROR)) span.set_status(Status(StatusCode.ERROR))
logger.error(f"FFmpeg task failed: {e}", exc_info=True)
return False return False
span.set_status(Status(StatusCode.OK))
return True
def clear_task_tmp_file(ffmpeg_task): def clear_task_tmp_file(ffmpeg_task):
@@ -178,8 +166,7 @@ def clear_task_tmp_file(ffmpeg_task):
def probe_video_info(ffmpeg_task): def probe_video_info(ffmpeg_task):
"""获取视频长度宽度和时长 - 使用新的渲染服务""" # 获取视频长度宽度和时长
render_service = _get_render_service() return ffmpeg.probe_video_info(ffmpeg_task.get_output_file())
return render_service.get_video_info(ffmpeg_task.get_output_file())

View File

@@ -1,55 +1,44 @@
import json import json
import logging
from opentelemetry.trace import Status, StatusCode from opentelemetry.trace import Status, StatusCode
# 使用新的服务架构 from biz.ffmpeg import parse_ffmpeg_task, start_ffmpeg_task, clear_task_tmp_file, probe_video_info, fade_out_audio
from services import DefaultTaskService, DefaultRenderService, DefaultTemplateService
from telemetry import get_tracer from telemetry import get_tracer
from template import get_template_def
from util import api
logger = logging.getLogger(__name__)
# 创建服务实例(单例模式)
_render_service = None
_template_service = None
_task_service = None
def _get_services():
"""获取服务实例(懒加载)"""
global _render_service, _template_service, _task_service
if _render_service is None:
_render_service = DefaultRenderService()
if _template_service is None:
_template_service = DefaultTemplateService()
_template_service.load_local_templates() # 加载本地模板
if _task_service is None:
_task_service = DefaultTaskService(_render_service, _template_service)
return _task_service, _render_service, _template_service
def start_task(task_info): def start_task(task_info):
"""启动任务处理(保持向后兼容的接口)"""
tracer = get_tracer(__name__) tracer = get_tracer(__name__)
with tracer.start_as_current_span("start_task_legacy") as span: with tracer.start_as_current_span("start_task") as span:
try: task_info = api.normalize_task(task_info)
task_service, _, _ = _get_services() span.set_attribute("task", json.dumps(task_info))
span.set_attribute("scenicId", task_info.get("scenicId", "?"))
# 使用新的任务服务处理 span.set_attribute("templateId", task_info.get("templateId"))
result = task_service.process_task(task_info) template_info = get_template_def(task_info.get("templateId"))
api.report_task_start(task_info)
if result: ffmpeg_task = parse_ffmpeg_task(task_info, template_info)
span.set_status(Status(StatusCode.OK)) result = start_ffmpeg_task(ffmpeg_task)
logger.info("Task completed successfully: %s", task_info.get("id")) if not result:
else:
span.set_status(Status(StatusCode.ERROR))
logger.error("Task failed: %s", task_info.get("id"))
return None # 保持原有返回值格式
except Exception as e:
span.set_status(Status(StatusCode.ERROR)) span.set_status(Status(StatusCode.ERROR))
logger.error("Task processing failed: %s", e, exc_info=True) return api.report_task_failed(task_info)
return None width, height, duration = probe_video_info(ffmpeg_task)
span.set_attribute("probe.width", width)
span.set_attribute("probe.height", height)
span.set_attribute("probe.duration", duration)
# 音频淡出
new_fn = fade_out_audio(ffmpeg_task.get_output_file(), duration)
ffmpeg_task.set_output_file(new_fn)
oss_result = api.upload_task_file(task_info, ffmpeg_task)
if not oss_result:
span.set_status(Status(StatusCode.ERROR))
return api.report_task_failed(task_info)
# 获取视频长度宽度和时长
clear_task_tmp_file(ffmpeg_task)
api.report_task_success(task_info, videoInfo={
"width": width,
"height": height,
"duration": duration
})
span.set_status(Status(StatusCode.OK))
return None

View File

@@ -3,9 +3,6 @@ import logging
from logging.handlers import TimedRotatingFileHandler from logging.handlers import TimedRotatingFileHandler
from dotenv import load_dotenv from dotenv import load_dotenv
# 导入新的配置系统,保持向后兼容
from .settings import get_config, get_ffmpeg_config, get_api_config, get_storage_config, get_server_config
load_dotenv() load_dotenv()
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
root_logger = logging.getLogger() root_logger = logging.getLogger()

View File

@@ -1,139 +0,0 @@
import os
from dataclasses import dataclass
from typing import Dict, List, Optional, Union
import logging
from dotenv import load_dotenv
load_dotenv()
@dataclass
class FFmpegConfig:
"""FFmpeg相关配置"""
encoder_args: List[str]
video_args: List[str]
audio_args: List[str]
default_args: List[str]
old_ffmpeg: bool = False
re_encode_video_args: Optional[List[str]] = None
re_encode_encoder_args: Optional[List[str]] = None
@classmethod
def from_env(cls) -> 'FFmpegConfig':
encoder_args = os.getenv("ENCODER_ARGS", "-c:v h264").split(" ")
video_args = os.getenv("VIDEO_ARGS", "-profile:v high -level:v 4").split(" ")
audio_args = ["-c:a", "aac", "-b:a", "128k", "-ar", "48000", "-ac", "2"]
default_args = ["-shortest"]
re_encode_video_args = None
if os.getenv("RE_ENCODE_VIDEO_ARGS"):
re_encode_video_args = os.getenv("RE_ENCODE_VIDEO_ARGS").split(" ")
re_encode_encoder_args = None
if os.getenv("RE_ENCODE_ENCODER_ARGS"):
re_encode_encoder_args = os.getenv("RE_ENCODE_ENCODER_ARGS").split(" ")
return cls(
encoder_args=encoder_args,
video_args=video_args,
audio_args=audio_args,
default_args=default_args,
old_ffmpeg=bool(os.getenv("OLD_FFMPEG", False)),
re_encode_video_args=re_encode_video_args,
re_encode_encoder_args=re_encode_encoder_args
)
@dataclass
class APIConfig:
"""API相关配置"""
endpoint: str
access_key: str
timeout: int = 10
redirect_to_url: Optional[str] = None
@classmethod
def from_env(cls) -> 'APIConfig':
endpoint = os.getenv('API_ENDPOINT', '')
if not endpoint:
raise ValueError("API_ENDPOINT environment variable is required")
access_key = os.getenv('ACCESS_KEY', '')
if not access_key:
raise ValueError("ACCESS_KEY environment variable is required")
return cls(
endpoint=endpoint,
access_key=access_key,
timeout=int(os.getenv('API_TIMEOUT', '10')),
redirect_to_url=os.getenv("REDIRECT_TO_URL") or None
)
@dataclass
class StorageConfig:
"""存储相关配置"""
template_dir: str
@classmethod
def from_env(cls) -> 'StorageConfig':
template_dir = os.getenv('TEMPLATE_DIR', './template')
return cls(template_dir=template_dir)
@dataclass
class ServerConfig:
"""服务器相关配置"""
host: str = "0.0.0.0"
port: int = 9998
debug: bool = False
@classmethod
def from_env(cls) -> 'ServerConfig':
return cls(
host=os.getenv('HOST', '0.0.0.0'),
port=int(os.getenv('PORT', '9998')),
debug=bool(os.getenv('DEBUG', False))
)
@dataclass
class AppConfig:
"""应用总配置"""
ffmpeg: FFmpegConfig
api: APIConfig
storage: StorageConfig
server: ServerConfig
@classmethod
def from_env(cls) -> 'AppConfig':
return cls(
ffmpeg=FFmpegConfig.from_env(),
api=APIConfig.from_env(),
storage=StorageConfig.from_env(),
server=ServerConfig.from_env()
)
# 全局配置实例
_config: Optional[AppConfig] = None
def get_config() -> AppConfig:
"""获取全局配置实例"""
global _config
if _config is None:
_config = AppConfig.from_env()
return _config
def reload_config() -> AppConfig:
"""重新加载配置"""
global _config
_config = AppConfig.from_env()
return _config
# 向后兼容的配置获取函数
def get_ffmpeg_config() -> FFmpegConfig:
return get_config().ffmpeg
def get_api_config() -> APIConfig:
return get_config().api
def get_storage_config() -> StorageConfig:
return get_config().storage
def get_server_config() -> ServerConfig:
return get_config().server

View File

@@ -6,4 +6,4 @@ SUPPORT_FEATURE = (
'rclone_upload', 'rclone_upload',
'custom_re_encode', 'custom_re_encode',
) )
SOFTWARE_VERSION = '0.0.5' SOFTWARE_VERSION = '0.0.6'

View File

@@ -1,25 +0,0 @@
from .base import EffectProcessor, EffectRegistry
from .camera_shot import CameraShotEffect
from .speed import SpeedEffect
from .zoom import ZoomEffect
from .skip import SkipEffect
from .tail import TailEffect
# 注册所有效果处理器
registry = EffectRegistry()
registry.register('cameraShot', CameraShotEffect)
registry.register('ospeed', SpeedEffect)
registry.register('zoom', ZoomEffect)
registry.register('skip', SkipEffect)
registry.register('tail', TailEffect)
__all__ = [
'EffectProcessor',
'EffectRegistry',
'registry',
'CameraShotEffect',
'SpeedEffect',
'ZoomEffect',
'SkipEffect',
'TailEffect'
]

View File

@@ -1,94 +0,0 @@
from abc import ABC, abstractmethod
from typing import Dict, List, Type, Any, Optional
import json
import logging
logger = logging.getLogger(__name__)
class EffectProcessor(ABC):
"""效果处理器抽象基类"""
def __init__(self, params: str = "", ext_data: Optional[Dict[str, Any]] = None):
self.params = params
self.ext_data = ext_data or {}
self.frame_rate = 25 # 默认帧率
@abstractmethod
def validate_params(self) -> bool:
"""验证参数是否有效"""
pass
@abstractmethod
def generate_filter_args(self, video_input: str, effect_index: int) -> tuple[List[str], str]:
"""
生成FFmpeg滤镜参数
Args:
video_input: 输入视频流标识符 (例如: "[0:v]", "[v_eff1]")
effect_index: 效果索引,用于生成唯一的输出标识符
Returns:
tuple: (filter_args_list, output_stream_identifier)
"""
pass
@abstractmethod
def get_effect_name(self) -> str:
"""获取效果名称"""
pass
def parse_params(self) -> List[str]:
"""解析参数字符串为列表"""
if not self.params:
return []
return self.params.split(',')
def get_pos_json(self) -> Dict[str, Any]:
"""获取位置JSON数据"""
pos_json_str = self.ext_data.get('posJson', '{}')
try:
return json.loads(pos_json_str) if pos_json_str != '{}' else {}
except Exception as e:
logger.warning(f"Failed to parse posJson: {e}")
return {}
class EffectRegistry:
"""效果处理器注册表"""
def __init__(self):
self._processors: Dict[str, Type[EffectProcessor]] = {}
def register(self, name: str, processor_class: Type[EffectProcessor]):
"""注册效果处理器"""
if not issubclass(processor_class, EffectProcessor):
raise ValueError(f"{processor_class} must be a subclass of EffectProcessor")
self._processors[name] = processor_class
logger.debug(f"Registered effect processor: {name}")
def get_processor(self, effect_name: str, params: str = "", ext_data: Optional[Dict[str, Any]] = None) -> Optional[EffectProcessor]:
"""获取效果处理器实例"""
if effect_name not in self._processors:
logger.warning(f"Unknown effect: {effect_name}")
return None
processor_class = self._processors[effect_name]
return processor_class(params, ext_data)
def list_effects(self) -> List[str]:
"""列出所有注册的效果"""
return list(self._processors.keys())
def parse_effect_string(self, effect_string: str) -> tuple[str, str]:
"""
解析效果字符串
Args:
effect_string: 效果字符串,格式为 "effect_name:params"
Returns:
tuple: (effect_name, params)
"""
if ':' in effect_string:
parts = effect_string.split(':', 2)
return parts[0], parts[1] if len(parts) > 1 else ""
return effect_string, ""

View File

@@ -1,79 +0,0 @@
from typing import List, Dict, Any
from .base import EffectProcessor
class CameraShotEffect(EffectProcessor):
"""相机镜头效果处理器"""
def validate_params(self) -> bool:
"""验证参数:start_time,duration,rotate_deg"""
params = self.parse_params()
if not params:
return True # 使用默认参数
# 参数格式: "start_time,duration,rotate_deg"
if len(params) > 3:
return False
try:
for i, param in enumerate(params):
if param == '':
continue
if i == 2: # rotate_deg
int(param)
else: # start_time, duration
float(param)
return True
except ValueError:
return False
def generate_filter_args(self, video_input: str, effect_index: int) -> tuple[List[str], str]:
"""生成相机镜头效果的滤镜参数"""
if not self.validate_params():
return [], video_input
params = self.parse_params()
# 设置默认值
start = 3.0
duration = 1.0
rotate_deg = 0
if len(params) >= 1 and params[0] != '':
start = float(params[0])
if len(params) >= 2 and params[1] != '':
duration = float(params[1])
if len(params) >= 3 and params[2] != '':
rotate_deg = int(params[2])
filter_args = []
# 生成输出流标识符
start_out_str = "[eff_s]"
mid_out_str = "[eff_m]"
end_out_str = "[eff_e]"
final_output = f"[v_eff{effect_index}]"
# 分割视频流为三部分
filter_args.append(f"{video_input}split=3{start_out_str}{mid_out_str}{end_out_str}")
# 选择开始部分帧
filter_args.append(f"{start_out_str}select=lt(n\\,{int(start * self.frame_rate)}){start_out_str}")
# 选择结束部分帧
filter_args.append(f"{end_out_str}select=gt(n\\,{int(start * self.frame_rate)}){end_out_str}")
# 选择中间特定帧并扩展
filter_args.append(f"{mid_out_str}select=eq(n\\,{int(start * self.frame_rate)}){mid_out_str}")
filter_args.append(f"{mid_out_str}tpad=start_mode=clone:start_duration={duration:.4f}{mid_out_str}")
# 如果需要旋转
if rotate_deg != 0:
filter_args.append(f"{mid_out_str}rotate=PI*{rotate_deg}/180{mid_out_str}")
# 连接三部分
filter_args.append(f"{start_out_str}{mid_out_str}{end_out_str}concat=n=3:v=1:a=0,setpts=N/{self.frame_rate}/TB{final_output}")
return filter_args, final_output
def get_effect_name(self) -> str:
return "cameraShot"

View File

@@ -1,38 +0,0 @@
from typing import List
from .base import EffectProcessor
class SkipEffect(EffectProcessor):
"""跳过开头效果处理器"""
def validate_params(self) -> bool:
"""验证参数:跳过的秒数"""
if not self.params:
return True # 默认不跳过
try:
skip_seconds = float(self.params)
return skip_seconds >= 0
except ValueError:
return False
def generate_filter_args(self, video_input: str, effect_index: int) -> tuple[List[str], str]:
"""生成跳过开头效果的滤镜参数"""
if not self.validate_params():
return [], video_input
if not self.params:
return [], video_input
skip_seconds = float(self.params)
if skip_seconds <= 0:
return [], video_input
output_stream = f"[v_eff{effect_index}]"
# 使用trim滤镜跳过开头
filter_args = [f"{video_input}trim=start={skip_seconds}{output_stream}"]
return filter_args, output_stream
def get_effect_name(self) -> str:
return "skip"

View File

@@ -1,35 +0,0 @@
from typing import List
from .base import EffectProcessor
class SpeedEffect(EffectProcessor):
"""视频变速效果处理器"""
def validate_params(self) -> bool:
"""验证参数:速度倍数"""
if not self.params:
return True # 默认不变速
try:
speed = float(self.params)
return speed > 0
except ValueError:
return False
def generate_filter_args(self, video_input: str, effect_index: int) -> tuple[List[str], str]:
"""生成变速效果的滤镜参数"""
if not self.validate_params():
return [], video_input
if not self.params or self.params == "1":
return [], video_input # 不需要变速
speed = float(self.params)
output_stream = f"[v_eff{effect_index}]"
# 使用setpts进行变速
filter_args = [f"{video_input}setpts={speed}*PTS{output_stream}"]
return filter_args, output_stream
def get_effect_name(self) -> str:
return "ospeed"

View File

@@ -1,42 +0,0 @@
from typing import List
from .base import EffectProcessor
class TailEffect(EffectProcessor):
"""保留末尾效果处理器"""
def validate_params(self) -> bool:
"""验证参数:保留的秒数"""
if not self.params:
return True # 默认不截取
try:
tail_seconds = float(self.params)
return tail_seconds >= 0
except ValueError:
return False
def generate_filter_args(self, video_input: str, effect_index: int) -> tuple[List[str], str]:
"""生成保留末尾效果的滤镜参数"""
if not self.validate_params():
return [], video_input
if not self.params:
return [], video_input
tail_seconds = float(self.params)
if tail_seconds <= 0:
return [], video_input
output_stream = f"[v_eff{effect_index}]"
# 使用reverse+trim+reverse的方法来精确获取最后N秒
filter_args = [
f"{video_input}reverse[v_rev{effect_index}]",
f"[v_rev{effect_index}]trim=duration={tail_seconds}[v_trim{effect_index}]",
f"[v_trim{effect_index}]reverse{output_stream}"
]
return filter_args, output_stream
def get_effect_name(self) -> str:
return "tail"

View File

@@ -1,89 +0,0 @@
from typing import List
import json
from .base import EffectProcessor
class ZoomEffect(EffectProcessor):
"""缩放效果处理器"""
def validate_params(self) -> bool:
"""验证参数:start_time,zoom_factor,duration"""
params = self.parse_params()
if len(params) < 3:
return False
try:
start_time = float(params[0])
zoom_factor = float(params[1])
duration = float(params[2])
return (start_time >= 0 and
zoom_factor > 0 and
duration >= 0)
except (ValueError, IndexError):
return False
def generate_filter_args(self, video_input: str, effect_index: int) -> tuple[List[str], str]:
"""生成缩放效果的滤镜参数"""
if not self.validate_params():
return [], video_input
params = self.parse_params()
start_time = float(params[0])
zoom_factor = float(params[1])
duration = float(params[2])
if zoom_factor == 1:
return [], video_input # 不需要缩放
output_stream = f"[v_eff{effect_index}]"
# 获取缩放中心点
center_x, center_y = self._get_zoom_center()
filter_args = []
if duration == 0:
# 静态缩放(整个视频时长)
x_expr = f"({center_x})-(ow*zoom)/2"
y_expr = f"({center_y})-(oh*zoom)/2"
filter_args.append(
f"{video_input}trim=start={start_time},zoompan=z={zoom_factor}:x={x_expr}:y={y_expr}:d=1{output_stream}"
)
else:
# 动态缩放(指定时间段内)
zoom_expr = f"if(between(t\\,{start_time}\\,{start_time + duration})\\,{zoom_factor}\\,1)"
x_expr = f"({center_x})-(ow*zoom)/2"
y_expr = f"({center_y})-(oh*zoom)/2"
filter_args.append(
f"{video_input}zoompan=z={zoom_expr}:x={x_expr}:y={y_expr}:d=1{output_stream}"
)
return filter_args, output_stream
def _get_zoom_center(self) -> tuple[str, str]:
"""获取缩放中心点坐标表达式"""
# 默认中心点
center_x = "iw/2"
center_y = "ih/2"
pos_json = self.get_pos_json()
if pos_json:
_f_x = pos_json.get('ltX', 0)
_f_x2 = pos_json.get('rbX', 0)
_f_y = pos_json.get('ltY', 0)
_f_y2 = pos_json.get('rbY', 0)
_v_w = pos_json.get('imgWidth', 1)
_v_h = pos_json.get('imgHeight', 1)
if _v_w > 0 and _v_h > 0:
# 计算坐标系统中的中心点
center_x_ratio = (_f_x + _f_x2) / (2 * _v_w)
center_y_ratio = (_f_y + _f_y2) / (2 * _v_h)
# 转换为视频坐标系统
center_x = f"iw*{center_x_ratio:.6f}"
center_y = f"ih*{center_y_ratio:.6f}"
return center_x, center_y
def get_effect_name(self) -> str:
return "zoom"

View File

@@ -271,7 +271,7 @@ class FfmpegTask(object):
if param == '': if param == '':
param = "1" param = "1"
if param != "1": if param != "1":
# 视频变速 # 视频变速:使用fps实现,避免PTS冲突
effect_index += 1 effect_index += 1
filter_args.append(f"{video_output_str}setpts={param}*PTS[v_eff{effect_index}]") filter_args.append(f"{video_output_str}setpts={param}*PTS[v_eff{effect_index}]")
video_output_str = f"[v_eff{effect_index}]" video_output_str = f"[v_eff{effect_index}]"
@@ -280,29 +280,24 @@ class FfmpegTask(object):
if param == '': if param == '':
continue continue
_split = param.split(",") _split = param.split(",")
if len(_split) < 3: if len(_split) < 2:
continue continue
try: try:
start_time = float(_split[0]) start_time = float(_split[0])
zoom_factor = float(_split[1]) zoom_factor = float(_split[1])
duration = float(_split[2])
if start_time < 0: if start_time < 0:
start_time = 0 start_time = 0
if duration < 0:
duration = 0
if zoom_factor <= 0: if zoom_factor <= 0:
zoom_factor = 1 zoom_factor = 1
except (ValueError, IndexError): except (ValueError, IndexError):
start_time = 0 start_time = 0
duration = 0
zoom_factor = 1 zoom_factor = 1
if zoom_factor == 1: if zoom_factor == 1:
continue continue
effect_index += 1 effect_index += 1
# 获取缩放中心点(从pos_json或使用默认中心) left_x = f"iw/(2*{zoom_factor})"
center_x = "iw/2" top_y = f"ih/(2*{zoom_factor})"
center_y = "ih/2"
pos_json_str = self.ext_data.get('posJson', '{}') pos_json_str = self.ext_data.get('posJson', '{}')
try: try:
pos_json = json.loads(pos_json_str) if pos_json_str != '{}' else {} pos_json = json.loads(pos_json_str) if pos_json_str != '{}' else {}
@@ -318,23 +313,14 @@ class FfmpegTask(object):
center_x_ratio = (_f_x + _f_x2) / (2 * _v_w) center_x_ratio = (_f_x + _f_x2) / (2 * _v_w)
center_y_ratio = (_f_y + _f_y2) / (2 * _v_h) center_y_ratio = (_f_y + _f_y2) / (2 * _v_h)
# 转换为视频坐标系统 # 转换为视频坐标系统
center_x = f"iw*{center_x_ratio:.6f}" left_x = f"iw*({center_x_ratio:.6f}-1/(2*{zoom_factor}))"
center_y = f"ih*{center_y_ratio:.6f}" top_y = f"ih*({center_y_ratio:.6f}-1/(2*{zoom_factor}))"
except Exception as e: except Exception as e:
# 解析失败使用默认中心 # 解析失败使用默认中心
pass pass
if duration == 0: # 静态缩放(整个视频时长)
# 静态缩放(整个视频时长) filter_args.append(f"{video_output_str}scale={zoom_factor}*iw:{zoom_factor}*ih,crop=iw/{zoom_factor}:ih/{zoom_factor}:{left_x}:{top_y}[v_eff{effect_index}]")
x_expr = f"({center_x})-(ow*zoom)/2"
y_expr = f"({center_y})-(oh*zoom)/2"
filter_args.append(f"{video_output_str}trim=start={start_time},zoompan=z={zoom_factor}:x={x_expr}:y={y_expr}:d=1[v_eff{effect_index}]")
else:
# 动态缩放(指定时间段内)
zoom_expr = f"if(between(t\\,{start_time}\\,{start_time + duration})\\,{zoom_factor}\\,1)"
x_expr = f"({center_x})-(ow*zoom)/2"
y_expr = f"({center_y})-(oh*zoom)/2"
filter_args.append(f"{video_output_str}zoompan=z={zoom_expr}:x={x_expr}:y={y_expr}:d=1[v_eff{effect_index}]")
video_output_str = f"[v_eff{effect_index}]" video_output_str = f"[v_eff{effect_index}]"
elif effect.startswith("skip:"): elif effect.startswith("skip:"):
param = effect.split(":", 2)[1] param = effect.split(":", 2)[1]
@@ -358,6 +344,51 @@ class FfmpegTask(object):
filter_args.append(f"[v_rev{effect_index}]trim=duration={tail_seconds}[v_trim{effect_index}]") filter_args.append(f"[v_rev{effect_index}]trim=duration={tail_seconds}[v_trim{effect_index}]")
filter_args.append(f"[v_trim{effect_index}]reverse[v_eff{effect_index}]") filter_args.append(f"[v_trim{effect_index}]reverse[v_eff{effect_index}]")
video_output_str = f"[v_eff{effect_index}]" video_output_str = f"[v_eff{effect_index}]"
elif effect.startswith("show:"):
param = effect.split(":", 2)[1]
if param == '':
param = "0"
show_seconds = float(param)
if show_seconds > 0:
effect_index += 1
filter_args.append(f"{video_output_str}trim=end={show_seconds}[v_eff{effect_index}]")
video_output_str = f"[v_eff{effect_index}]"
elif effect.startswith("grid4:"):
param = effect.split(":", 2)[1]
if param == '':
param = "1"
delay_seconds = float(param)
effect_index += 1
# 获取分辨率,如果没有设置则使用默认值
if self.resolution:
width, height = self.resolution.split('x')
else:
width, height = "1920", "1080" # 默认分辨率
# 计算四宫格中每个象限的大小
grid_width = int(width) // 2
grid_height = int(height) // 2
# 分割视频流为4份
filter_args.append(f"{video_output_str}split=4[grid_v1_{effect_index}][grid_v2_{effect_index}][grid_v3_{effect_index}][grid_v4_{effect_index}]")
# 创建黑色背景
filter_args.append(f"color=black:size={width}x{height}:duration=1[bg_{effect_index}]")
# 缩放每个视频流到绝对尺寸
filter_args.append(f"[grid_v1_{effect_index}]scale={grid_width}:{grid_height}[v1_scaled_{effect_index}]")
filter_args.append(f"[grid_v2_{effect_index}]scale={grid_width}:{grid_height},tpad=start_duration={delay_seconds}[v2_scaled_{effect_index}]")
filter_args.append(f"[grid_v3_{effect_index}]scale={grid_width}:{grid_height},tpad=start_duration={delay_seconds*2}[v3_scaled_{effect_index}]")
filter_args.append(f"[grid_v4_{effect_index}]scale={grid_width}:{grid_height},tpad=start_duration={delay_seconds*3}[v4_scaled_{effect_index}]")
# 使用overlay将四个视频流叠加到四个位置
filter_args.append(f"[bg_{effect_index}][v1_scaled_{effect_index}]overlay=0:0:shortest=1[grid_step1_{effect_index}]")
filter_args.append(f"[grid_step1_{effect_index}][v2_scaled_{effect_index}]overlay=w/2:0:shortest=1[grid_step2_{effect_index}]")
filter_args.append(f"[grid_step2_{effect_index}][v3_scaled_{effect_index}]overlay=0:h/2:shortest=1[grid_step3_{effect_index}]")
filter_args.append(f"[grid_step3_{effect_index}][v4_scaled_{effect_index}]overlay=w/2:h/2:shortest=1[v_eff{effect_index}]")
video_output_str = f"[v_eff{effect_index}]"
... ...
if self.resolution: if self.resolution:
filter_args.append(f"{video_output_str}scale={self.resolution.replace('x', ':')}[v]") filter_args.append(f"{video_output_str}scale={self.resolution.replace('x', ':')}[v]")

View File

@@ -1,281 +0,0 @@
import json
import os
import time
from typing import List, Optional
from config.settings import get_ffmpeg_config
from entity.render_task import RenderTask, TaskType
from entity.effects import registry as effect_registry
from util.exceptions import FFmpegError
from util.ffmpeg import probe_video_info, probe_video_audio
import logging
logger = logging.getLogger(__name__)
class FFmpegCommandBuilder:
"""FFmpeg命令构建器"""
def __init__(self, task: RenderTask):
self.task = task
self.config = get_ffmpeg_config()
def build_command(self) -> List[str]:
"""构建FFmpeg命令"""
self.task.update_task_type()
if self.task.task_type == TaskType.COPY:
return self._build_copy_command()
elif self.task.task_type == TaskType.CONCAT:
return self._build_concat_command()
elif self.task.task_type == TaskType.ENCODE:
return self._build_encode_command()
else:
raise FFmpegError(f"Unsupported task type: {self.task.task_type}")
def _build_copy_command(self) -> List[str]:
"""构建复制命令"""
if len(self.task.input_files) == 1:
input_file = self.task.input_files[0]
if input_file == self.task.output_file:
return [] # 不需要处理
return [
"ffmpeg", "-y", "-hide_banner",
"-i", self.task.input_files[0],
"-c", "copy",
self.task.output_file
]
def _build_concat_command(self) -> List[str]:
"""构建拼接命令"""
args = ["ffmpeg", "-y", "-hide_banner"]
input_args = []
output_args = [*self.config.default_args]
filter_args = []
if len(self.task.input_files) == 1:
# 单个文件
file = self.task.input_files[0]
input_args.extend(["-i", file])
self.task.mute = not probe_video_audio(file)
else:
# 多个文件使用concat协议
tmp_file = f"tmp_concat_{time.time()}.txt"
with open(tmp_file, "w", encoding="utf-8") as f:
for input_file in self.task.input_files:
f.write(f"file '{input_file}'\n")
input_args.extend(["-f", "concat", "-safe", "0", "-i", tmp_file])
self.task.mute = not probe_video_audio(tmp_file, "concat")
# 视频流映射
output_args.extend(["-map", "0:v", "-c:v", "copy"])
# 音频处理
audio_output_str = self._handle_audio_concat(input_args, filter_args)
if audio_output_str:
output_args.extend(["-map", audio_output_str])
output_args.extend(self.config.audio_args)
# annexb处理
if self.task.annexb:
output_args.extend(["-bsf:v", self._get_mp4toannexb_filter()])
output_args.extend(["-bsf:a", "setts=pts=DTS"])
output_args.extend(["-f", "mpegts"])
else:
output_args.extend(["-f", "mp4"])
filter_complex = ["-filter_complex", ";".join(filter_args)] if filter_args else []
return args + input_args + filter_complex + output_args + [self.task.output_file]
def _build_encode_command(self) -> List[str]:
"""构建编码命令"""
args = ["ffmpeg", "-y", "-hide_banner"]
input_args = []
filter_args = []
output_args = [
*self.config.video_args,
*self.config.audio_args,
*self.config.encoder_args,
*self.config.default_args
]
# annexb处理
if self.task.annexb:
output_args.extend(["-bsf:v", self._get_mp4toannexb_filter()])
output_args.extend(["-reset_timestamps", "1"])
# 处理输入文件
for input_file in self.task.input_files:
input_args.extend(["-i", input_file])
# 处理视频流
video_output_str = "[0:v]"
effect_index = 0
# 处理中心裁剪
if self.task.center_cut == 1:
video_output_str, effect_index = self._add_center_cut(filter_args, video_output_str, effect_index)
# 处理缩放裁剪
if self.task.zoom_cut == 1 and self.task.resolution:
video_output_str, effect_index = self._add_zoom_cut(filter_args, video_output_str, effect_index)
# 处理效果
video_output_str, effect_index = self._add_effects(filter_args, video_output_str, effect_index)
# 处理分辨率
if self.task.resolution:
filter_args.append(f"{video_output_str}scale={self.task.resolution.replace('x', ':')}[v]")
video_output_str = "[v]"
# 处理LUT
for lut in self.task.luts:
filter_args.append(f"{video_output_str}lut3d=file={lut}{video_output_str}")
# 处理覆盖层
video_output_str = self._add_overlays(input_args, filter_args, video_output_str)
# 处理字幕
for subtitle in self.task.subtitles:
filter_args.append(f"{video_output_str}ass={subtitle}[v]")
video_output_str = "[v]"
# 映射视频流
output_args.extend(["-map", video_output_str])
output_args.extend(["-r", str(self.task.frame_rate)])
output_args.extend(["-fps_mode", "cfr"])
# 处理音频
audio_output_str = self._handle_audio_encode(input_args, filter_args)
if audio_output_str:
output_args.extend(["-map", audio_output_str])
filter_complex = ["-filter_complex", ";".join(filter_args)] if filter_args else []
return args + input_args + filter_complex + output_args + [self.task.output_file]
def _add_center_cut(self, filter_args: List[str], video_input: str, effect_index: int) -> tuple[str, int]:
"""添加中心裁剪"""
pos_json = self.task.ext_data.get('posJson', '{}')
try:
pos_data = json.loads(pos_json) if pos_json != '{}' else {}
except:
pos_data = {}
_v_w = pos_data.get('imgWidth', 1)
_f_x = pos_data.get('ltX', 0)
_f_x2 = pos_data.get('rbX', 0)
_x = f'{float((_f_x2 + _f_x)/(2 * _v_w)):.4f}*iw-ih*ih/(2*iw)'
filter_args.append(f"{video_input}crop=x={_x}:y=0:w=ih*ih/iw:h=ih[v_cut{effect_index}]")
return f"[v_cut{effect_index}]", effect_index + 1
def _add_zoom_cut(self, filter_args: List[str], video_input: str, effect_index: int) -> tuple[str, int]:
"""添加缩放裁剪"""
# 获取输入视频尺寸
input_file = self.task.input_files[0]
_iw, _ih, _ = probe_video_info(input_file)
_w, _h = self.task.resolution.split('x', 1)
pos_json = self.task.ext_data.get('posJson', '{}')
try:
pos_data = json.loads(pos_json) if pos_json != '{}' else {}
except:
pos_data = {}
_v_w = pos_data.get('imgWidth', 1)
_v_h = pos_data.get('imgHeight', 1)
_f_x = pos_data.get('ltX', 0)
_f_x2 = pos_data.get('rbX', 0)
_f_y = pos_data.get('ltY', 0)
_f_y2 = pos_data.get('rbY', 0)
_x = min(max(0, int((_f_x + _f_x2) / 2 - int(_w) / 2)), _iw - int(_w))
_y = min(max(0, int((_f_y + _f_y2) / 2 - int(_h) / 2)), _ih - int(_h))
filter_args.append(f"{video_input}crop=x={_x}:y={_y}:w={_w}:h={_h}[vz_cut{effect_index}]")
return f"[vz_cut{effect_index}]", effect_index + 1
def _add_effects(self, filter_args: List[str], video_input: str, effect_index: int) -> tuple[str, int]:
"""添加效果处理"""
current_input = video_input
for effect_str in self.task.effects:
effect_name, params = effect_registry.parse_effect_string(effect_str)
processor = effect_registry.get_processor(effect_name, params, self.task.ext_data)
if processor:
processor.frame_rate = self.task.frame_rate
effect_filters, output_stream = processor.generate_filter_args(current_input, effect_index)
if effect_filters:
filter_args.extend(effect_filters)
current_input = output_stream
effect_index += 1
return current_input, effect_index
def _add_overlays(self, input_args: List[str], filter_args: List[str], video_input: str) -> str:
"""添加覆盖层"""
current_input = video_input
for overlay in self.task.overlays:
input_index = input_args.count("-i") // 2 # 每个输入占两个参数 -i filename
input_args.extend(["-i", overlay])
if self.config.old_ffmpeg:
filter_args.append(f"{current_input}[{input_index}:v]scale2ref=iw:ih[v]")
else:
filter_args.append(f"{current_input}[{input_index}:v]scale=rw:rh[v]")
filter_args.append(f"[v][{input_index}:v]overlay=1:eof_action=endall[v]")
current_input = "[v]"
return current_input
def _handle_audio_concat(self, input_args: List[str], filter_args: List[str]) -> Optional[str]:
"""处理concat模式的音频"""
audio_output_str = ""
if self.task.mute:
input_index = input_args.count("-i") // 2
input_args.extend(["-f", "lavfi", "-i", "anullsrc=cl=stereo:r=48000"])
audio_output_str = f"[{input_index}:a]"
else:
audio_output_str = "[0:a]"
for audio in self.task.audios:
input_index = input_args.count("-i") // 2
input_args.extend(["-i", audio.replace("\\", "/")])
filter_args.append(f"{audio_output_str}[{input_index}:a]amix=duration=shortest:dropout_transition=0:normalize=0[a]")
audio_output_str = "[a]"
return audio_output_str.strip("[]") if audio_output_str else None
def _handle_audio_encode(self, input_args: List[str], filter_args: List[str]) -> Optional[str]:
"""处理encode模式的音频"""
audio_output_str = ""
if self.task.mute:
input_index = input_args.count("-i") // 2
input_args.extend(["-f", "lavfi", "-i", "anullsrc=cl=stereo:r=48000"])
filter_args.append(f"[{input_index}:a]acopy[a]")
audio_output_str = "[a]"
else:
audio_output_str = "[0:a]"
for audio in self.task.audios:
input_index = input_args.count("-i") // 2
input_args.extend(["-i", audio.replace("\\", "/")])
filter_args.append(f"{audio_output_str}[{input_index}:a]amix=duration=shortest:dropout_transition=0:normalize=0[a]")
audio_output_str = "[a]"
return audio_output_str if audio_output_str else None
def _get_mp4toannexb_filter(self) -> str:
"""获取mp4toannexb滤镜"""
encoder_args_str = " ".join(self.config.encoder_args).lower()
if "hevc" in encoder_args_str:
return "hevc_mp4toannexb"
return "h264_mp4toannexb"

View File

@@ -1,146 +0,0 @@
import os
import uuid
from typing import List, Optional, Dict, Any
from dataclasses import dataclass, field
from enum import Enum
from config.settings import get_ffmpeg_config
from util.exceptions import TaskValidationError, EffectError
from entity.effects import registry as effect_registry
class TaskType(Enum):
COPY = "copy"
CONCAT = "concat"
ENCODE = "encode"
@dataclass
class RenderTask:
"""渲染任务数据类,只包含任务数据,不包含处理逻辑"""
input_files: List[str] = field(default_factory=list)
output_file: str = ""
task_type: TaskType = TaskType.COPY
# 视频参数
resolution: Optional[str] = None
frame_rate: int = 25
speed: float = 1.0
mute: bool = True
annexb: bool = False
# 裁剪参数
zoom_cut: Optional[int] = None
center_cut: Optional[int] = None
# 资源列表
subtitles: List[str] = field(default_factory=list)
luts: List[str] = field(default_factory=list)
audios: List[str] = field(default_factory=list)
overlays: List[str] = field(default_factory=list)
effects: List[str] = field(default_factory=list)
# 扩展数据
ext_data: Dict[str, Any] = field(default_factory=dict)
def __post_init__(self):
"""初始化后处理"""
# 检测annexb格式
for input_file in self.input_files:
if isinstance(input_file, str) and input_file.endswith(".ts"):
self.annexb = True
break
# 自动生成输出文件名
if not self.output_file:
self._generate_output_filename()
def _generate_output_filename(self):
"""生成输出文件名"""
if self.annexb:
self.output_file = f"rand_{uuid.uuid4()}.ts"
else:
self.output_file = f"rand_{uuid.uuid4()}.mp4"
def add_input_file(self, file_path: str):
"""添加输入文件"""
self.input_files.append(file_path)
if file_path.endswith(".ts"):
self.annexb = True
def add_overlay(self, *overlays: str):
"""添加覆盖层"""
for overlay in overlays:
if overlay.endswith('.ass'):
self.subtitles.append(overlay)
else:
self.overlays.append(overlay)
def add_audios(self, *audios: str):
"""添加音频"""
self.audios.extend(audios)
def add_lut(self, *luts: str):
"""添加LUT"""
self.luts.extend(luts)
def add_effect(self, *effects: str):
"""添加效果"""
self.effects.extend(effects)
def validate(self) -> bool:
"""验证任务参数"""
if not self.input_files:
raise TaskValidationError("No input files specified")
# 验证所有效果
for effect_str in self.effects:
effect_name, params = effect_registry.parse_effect_string(effect_str)
processor = effect_registry.get_processor(effect_name, params, self.ext_data)
if processor and not processor.validate_params():
raise EffectError(f"Invalid parameters for effect {effect_name}: {params}", effect_name, params)
return True
def can_copy(self) -> bool:
"""检查是否可以直接复制"""
return (len(self.luts) == 0 and
len(self.overlays) == 0 and
len(self.subtitles) == 0 and
len(self.effects) == 0 and
self.speed == 1 and
len(self.audios) == 0 and
len(self.input_files) == 1 and
self.zoom_cut is None and
self.center_cut is None)
def can_concat(self) -> bool:
"""检查是否可以使用concat模式"""
return (len(self.luts) == 0 and
len(self.overlays) == 0 and
len(self.subtitles) == 0 and
len(self.effects) == 0 and
self.speed == 1 and
self.zoom_cut is None and
self.center_cut is None)
def determine_task_type(self) -> TaskType:
"""自动确定任务类型"""
if self.can_copy():
return TaskType.COPY
elif self.can_concat():
return TaskType.CONCAT
else:
return TaskType.ENCODE
def update_task_type(self):
"""更新任务类型"""
self.task_type = self.determine_task_type()
def need_processing(self) -> bool:
"""检查是否需要处理"""
if self.annexb:
return True
return not self.can_copy()
def get_output_extension(self) -> str:
"""获取输出文件扩展名"""
return ".ts" if self.annexb else ".mp4"

View File

@@ -4,22 +4,20 @@ import sys
import config import config
import biz.task import biz.task
from telemetry import init_opentelemetry from telemetry import init_opentelemetry
from services import DefaultTemplateService from template import load_local_template, download_template, TEMPLATES
from util import api from util import api
import os import os
import glob import glob
# 使用新的服务架构 load_local_template()
template_service = DefaultTemplateService()
template_service.load_local_templates()
# Check for redownload parameter # Check for redownload parameter
if 'redownload' in sys.argv: if 'redownload' in sys.argv:
print("Redownloading all templates...") print("Redownloading all templates...")
for template_name in template_service.templates.keys(): for template_name in TEMPLATES.keys():
print(f"Redownloading template: {template_name}") print(f"Redownloading template: {template_name}")
template_service.download_template(template_name) download_template(template_name)
print("All templates redownloaded successfully!") print("All templates redownloaded successfully!")
sys.exit(0) sys.exit(0)
import logging import logging

View File

@@ -1,12 +0,0 @@
from .render_service import RenderService, DefaultRenderService
from .task_service import TaskService, DefaultTaskService
from .template_service import TemplateService, DefaultTemplateService
__all__ = [
'RenderService',
'DefaultRenderService',
'TaskService',
'DefaultTaskService',
'TemplateService',
'DefaultTemplateService'
]

View File

@@ -1,237 +0,0 @@
import subprocess
import os
import logging
from abc import ABC, abstractmethod
from typing import Optional, Union
from opentelemetry.trace import Status, StatusCode
from entity.render_task import RenderTask
from entity.ffmpeg_command_builder import FFmpegCommandBuilder
from util.exceptions import RenderError, FFmpegError
from util.ffmpeg import probe_video_info, fade_out_audio, handle_ffmpeg_output, subprocess_args
from telemetry import get_tracer
logger = logging.getLogger(__name__)
def _convert_ffmpeg_task_to_render_task(ffmpeg_task):
"""将旧的FfmpegTask转换为新的RenderTask"""
from entity.render_task import RenderTask, TaskType
# 获取输入文件
input_files = []
for inp in ffmpeg_task.input_file:
if hasattr(inp, 'get_output_file'):
input_files.append(inp.get_output_file())
else:
input_files.append(str(inp))
# 确定任务类型
task_type = TaskType.COPY
if ffmpeg_task.task_type == 'concat':
task_type = TaskType.CONCAT
elif ffmpeg_task.task_type == 'encode':
task_type = TaskType.ENCODE
# 创建新任务
render_task = RenderTask(
input_files=input_files,
output_file=ffmpeg_task.output_file,
task_type=task_type,
resolution=ffmpeg_task.resolution,
frame_rate=ffmpeg_task.frame_rate,
annexb=ffmpeg_task.annexb,
center_cut=ffmpeg_task.center_cut,
zoom_cut=ffmpeg_task.zoom_cut,
ext_data=getattr(ffmpeg_task, 'ext_data', {})
)
# 复制各种资源
render_task.effects = getattr(ffmpeg_task, 'effects', [])
render_task.luts = getattr(ffmpeg_task, 'luts', [])
render_task.audios = getattr(ffmpeg_task, 'audios', [])
render_task.overlays = getattr(ffmpeg_task, 'overlays', [])
render_task.subtitles = getattr(ffmpeg_task, 'subtitles', [])
return render_task
class RenderService(ABC):
"""渲染服务抽象接口"""
@abstractmethod
def render(self, task: Union[RenderTask, 'FfmpegTask']) -> bool:
"""
执行渲染任务
Args:
task: 渲染任务
Returns:
bool: 渲染是否成功
"""
pass
@abstractmethod
def get_video_info(self, file_path: str) -> tuple[int, int, float]:
"""
获取视频信息
Args:
file_path: 视频文件路径
Returns:
tuple: (width, height, duration)
"""
pass
@abstractmethod
def fade_out_audio(self, file_path: str, duration: float, fade_seconds: float = 2.0) -> str:
"""
音频淡出处理
Args:
file_path: 音频文件路径
duration: 音频总时长
fade_seconds: 淡出时长
Returns:
str: 处理后的文件路径
"""
pass
class DefaultRenderService(RenderService):
"""默认渲染服务实现"""
def render(self, task: Union[RenderTask, 'FfmpegTask']) -> bool:
"""执行渲染任务"""
# 兼容旧的FfmpegTask
if hasattr(task, 'get_ffmpeg_args'): # 这是FfmpegTask
# 使用旧的方式执行
return self._render_legacy_ffmpeg_task(task)
tracer = get_tracer(__name__)
with tracer.start_as_current_span("render_task") as span:
try:
# 验证任务
task.validate()
span.set_attribute("task.type", task.task_type.value)
span.set_attribute("task.input_files", len(task.input_files))
span.set_attribute("task.output_file", task.output_file)
# 检查是否需要处理
if not task.need_processing():
if len(task.input_files) == 1:
task.output_file = task.input_files[0]
span.set_status(Status(StatusCode.OK))
return True
# 构建FFmpeg命令
builder = FFmpegCommandBuilder(task)
ffmpeg_args = builder.build_command()
if not ffmpeg_args:
# 不需要处理,直接返回
if len(task.input_files) == 1:
task.output_file = task.input_files[0]
span.set_status(Status(StatusCode.OK))
return True
# 执行FFmpeg命令
return self._execute_ffmpeg(ffmpeg_args, span)
except Exception as e:
span.set_status(Status(StatusCode.ERROR))
logger.error(f"Render failed: {e}", exc_info=True)
raise RenderError(f"Render failed: {e}") from e
def _execute_ffmpeg(self, args: list[str], span) -> bool:
"""执行FFmpeg命令"""
span.set_attribute("ffmpeg.args", " ".join(args))
logger.info("Executing FFmpeg: %s", " ".join(args))
try:
# 执行FFmpeg进程
process = subprocess.run(
["ffmpeg", "-progress", "-", "-loglevel", "error"] + args[1:],
stderr=subprocess.PIPE,
**subprocess_args(True)
)
span.set_attribute("ffmpeg.return_code", process.returncode)
# 处理输出
if process.stdout:
output = handle_ffmpeg_output(process.stdout)
span.set_attribute("ffmpeg.output", output)
logger.info("FFmpeg output: %s", output)
# 检查返回码
if process.returncode != 0:
error_msg = process.stderr.decode() if process.stderr else "Unknown error"
span.set_attribute("ffmpeg.error", error_msg)
span.set_status(Status(StatusCode.ERROR))
logger.error("FFmpeg failed with return code %d: %s", process.returncode, error_msg)
raise FFmpegError(
f"FFmpeg execution failed",
command=args,
return_code=process.returncode,
stderr=error_msg
)
# 检查输出文件
output_file = args[-1] # 输出文件总是最后一个参数
if not os.path.exists(output_file):
span.set_status(Status(StatusCode.ERROR))
raise RenderError(f"Output file not created: {output_file}")
# 检查文件大小
file_size = os.path.getsize(output_file)
span.set_attribute("output.file_size", file_size)
if file_size < 4096: # 文件过小
span.set_status(Status(StatusCode.ERROR))
raise RenderError(f"Output file too small: {file_size} bytes")
span.set_status(Status(StatusCode.OK))
logger.info("FFmpeg execution completed successfully")
return True
except subprocess.SubprocessError as e:
span.set_status(Status(StatusCode.ERROR))
logger.error("Subprocess error: %s", e)
raise FFmpegError(f"Subprocess error: {e}") from e
def get_video_info(self, file_path: str) -> tuple[int, int, float]:
"""获取视频信息"""
return probe_video_info(file_path)
def fade_out_audio(self, file_path: str, duration: float, fade_seconds: float = 2.0) -> str:
"""音频淡出处理"""
return fade_out_audio(file_path, duration, fade_seconds)
def _render_legacy_ffmpeg_task(self, ffmpeg_task) -> bool:
"""兼容处理旧的FfmpegTask"""
tracer = get_tracer(__name__)
with tracer.start_as_current_span("render_legacy_ffmpeg_task") as span:
try:
# 处理依赖任务
for sub_task in ffmpeg_task.analyze_input_render_tasks():
if not self.render(sub_task):
span.set_status(Status(StatusCode.ERROR))
return False
# 获取FFmpeg参数
ffmpeg_args = ffmpeg_task.get_ffmpeg_args()
if not ffmpeg_args:
# 不需要处理,直接返回
span.set_status(Status(StatusCode.OK))
return True
# 执行FFmpeg命令
return self._execute_ffmpeg(ffmpeg_args, span)
except Exception as e:
span.set_status(Status(StatusCode.ERROR))
logger.error(f"Legacy FFmpeg task render failed: {e}", exc_info=True)
raise RenderError(f"Legacy render failed: {e}") from e

View File

@@ -1,289 +0,0 @@
import json
import logging
import os
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, Any, List, Optional
from opentelemetry.trace import Status, StatusCode
from entity.render_task import RenderTask
from services.render_service import RenderService
from services.template_service import TemplateService
from util.exceptions import TaskError, TaskValidationError
from util import api, oss
from telemetry import get_tracer
logger = logging.getLogger(__name__)
class TaskService(ABC):
"""任务服务抽象接口"""
@abstractmethod
def process_task(self, task_info: Dict[str, Any]) -> bool:
"""
处理任务
Args:
task_info: 任务信息
Returns:
bool: 处理是否成功
"""
pass
@abstractmethod
def create_render_task(self, task_info: Dict[str, Any], template_info: Dict[str, Any]) -> RenderTask:
"""
创建渲染任务
Args:
task_info: 任务信息
template_info: 模板信息
Returns:
RenderTask: 渲染任务对象
"""
pass
class DefaultTaskService(TaskService):
"""默认任务服务实现"""
def __init__(self, render_service: RenderService, template_service: TemplateService):
self.render_service = render_service
self.template_service = template_service
def process_task(self, task_info: Dict[str, Any]) -> bool:
"""处理任务"""
tracer = get_tracer(__name__)
with tracer.start_as_current_span("process_task") as span:
try:
# 标准化任务信息
task_info = api.normalize_task(task_info)
span.set_attribute("task.id", task_info.get("id", "unknown"))
span.set_attribute("task.template_id", task_info.get("templateId", "unknown"))
# 获取模板信息
template_id = task_info.get("templateId")
template_info = self.template_service.get_template(template_id)
if not template_info:
raise TaskError(f"Template not found: {template_id}")
# 报告任务开始
api.report_task_start(task_info)
# 创建渲染任务
render_task = self.create_render_task(task_info, template_info)
# 执行渲染
success = self.render_service.render(render_task)
if not success:
span.set_status(Status(StatusCode.ERROR))
api.report_task_failed(task_info, "Render failed")
return False
# 获取视频信息
width, height, duration = self.render_service.get_video_info(render_task.output_file)
span.set_attribute("video.width", width)
span.set_attribute("video.height", height)
span.set_attribute("video.duration", duration)
# 音频淡出
new_file = self.render_service.fade_out_audio(render_task.output_file, duration)
render_task.output_file = new_file
# 上传文件 - 创建一个兼容对象
class TaskCompat:
def __init__(self, output_file):
self.output_file = output_file
def get_output_file(self):
return self.output_file
task_compat = TaskCompat(render_task.output_file)
upload_success = api.upload_task_file(task_info, task_compat)
if not upload_success:
span.set_status(Status(StatusCode.ERROR))
api.report_task_failed(task_info, "Upload failed")
return False
# 清理临时文件
self._cleanup_temp_files(render_task)
# 报告任务成功
api.report_task_success(task_info, videoInfo={
"width": width,
"height": height,
"duration": duration
})
span.set_status(Status(StatusCode.OK))
return True
except Exception as e:
span.set_status(Status(StatusCode.ERROR))
logger.error(f"Task processing failed: {e}", exc_info=True)
api.report_task_failed(task_info, str(e))
return False
def create_render_task(self, task_info: Dict[str, Any], template_info: Dict[str, Any]) -> RenderTask:
"""创建渲染任务"""
tracer = get_tracer(__name__)
with tracer.start_as_current_span("create_render_task") as span:
# 解析任务参数
task_params_str = task_info.get("taskParams", "{}")
span.set_attribute("task_params", task_params_str)
try:
task_params = json.loads(task_params_str)
task_params_orig = json.loads(task_params_str)
except json.JSONDecodeError as e:
raise TaskValidationError(f"Invalid task params JSON: {e}")
# 并行下载资源
self._download_resources(task_params)
# 创建子任务列表
sub_tasks = []
only_if_usage_count = {}
for part in template_info.get("video_parts", []):
source, ext_data = self._parse_video_source(
part.get('source'), task_params, template_info
)
if not source:
logger.warning("No video found for part: %s", part)
continue
# 检查only_if条件
only_if = part.get('only_if', '')
if only_if:
only_if_usage_count[only_if] = only_if_usage_count.get(only_if, 0) + 1
required_count = only_if_usage_count[only_if]
if not self._check_placeholder_exist_with_count(only_if, task_params_orig, required_count):
logger.info("Skipping part due to only_if condition: %s (need %d)", only_if, required_count)
continue
# 创建子任务
sub_task = self._create_sub_task(part, source, ext_data, template_info)
sub_tasks.append(sub_task)
# 创建主任务
output_file = f"out_{task_info.get('id', 'unknown')}.mp4"
main_task = RenderTask(
input_files=[task.output_file for task in sub_tasks],
output_file=output_file,
resolution=template_info.get("video_size", ""),
frame_rate=template_info.get("frame_rate", 25),
center_cut=template_info.get("crop_mode"),
zoom_cut=template_info.get("zoom_cut")
)
# 应用整体模板设置
overall_template = template_info.get("overall_template", {})
self._apply_template_settings(main_task, overall_template, template_info)
# 设置扩展数据
main_task.ext_data = task_info
span.set_attribute("render_task.sub_tasks", len(sub_tasks))
span.set_attribute("render_task.effects", len(main_task.effects))
return main_task
def _download_resources(self, task_params: Dict[str, Any]):
"""并行下载资源"""
with ThreadPoolExecutor(max_workers=8) as executor:
for param_list in task_params.values():
if isinstance(param_list, list):
for param in param_list:
url = param.get("url", "")
if url.startswith("http"):
_, filename = os.path.split(url)
executor.submit(oss.download_from_oss, url, filename, True)
def _parse_video_source(self, source: str, task_params: Dict[str, Any],
template_info: Dict[str, Any]) -> tuple[Optional[str], Dict[str, Any]]:
"""解析视频源"""
if source.startswith('PLACEHOLDER_'):
placeholder_id = source.replace('PLACEHOLDER_', '')
new_sources = task_params.get(placeholder_id, [])
pick_source = {}
if isinstance(new_sources, list):
if len(new_sources) == 0:
logger.debug("No video found for placeholder: %s", placeholder_id)
return None, pick_source
else:
pick_source = new_sources.pop(0)
new_sources = pick_source.get("url", "")
if new_sources.startswith("http"):
_, source_name = os.path.split(new_sources)
oss.download_from_oss(new_sources, source_name, True)
return source_name, pick_source
return new_sources, pick_source
return os.path.join(template_info.get("local_path", ""), source), {}
def _check_placeholder_exist_with_count(self, placeholder_id: str, task_params: Dict[str, Any],
required_count: int = 1) -> bool:
"""检查占位符是否存在足够数量的片段"""
if placeholder_id in task_params:
new_sources = task_params.get(placeholder_id, [])
if isinstance(new_sources, list):
return len(new_sources) >= required_count
return required_count <= 1
return False
def _create_sub_task(self, part: Dict[str, Any], source: str, ext_data: Dict[str, Any],
template_info: Dict[str, Any]) -> RenderTask:
"""创建子任务"""
sub_task = RenderTask(
input_files=[source],
resolution=template_info.get("video_size", ""),
frame_rate=template_info.get("frame_rate", 25),
annexb=True,
center_cut=part.get("crop_mode"),
zoom_cut=part.get("zoom_cut"),
ext_data=ext_data
)
# 应用部分模板设置
self._apply_template_settings(sub_task, part, template_info)
return sub_task
def _apply_template_settings(self, task: RenderTask, template_part: Dict[str, Any],
template_info: Dict[str, Any]):
"""应用模板设置到任务"""
# 添加效果
for effect in template_part.get('effects', []):
task.add_effect(effect)
# 添加LUT
for lut in template_part.get('luts', []):
full_path = os.path.join(template_info.get("local_path", ""), lut)
task.add_lut(full_path.replace("\\", "/"))
# 添加音频
for audio in template_part.get('audios', []):
full_path = os.path.join(template_info.get("local_path", ""), audio)
task.add_audios(full_path)
# 添加覆盖层
for overlay in template_part.get('overlays', []):
full_path = os.path.join(template_info.get("local_path", ""), overlay)
task.add_overlay(full_path)
def _cleanup_temp_files(self, task: RenderTask):
"""清理临时文件"""
try:
template_dir = os.getenv("TEMPLATE_DIR", "")
if template_dir and template_dir not in task.output_file:
if os.path.exists(task.output_file):
os.remove(task.output_file)
logger.info("Cleaned up temp file: %s", task.output_file)
else:
logger.info("Skipped cleanup of template file: %s", task.output_file)
except OSError as e:
logger.warning("Failed to cleanup temp file %s: %s", task.output_file, e)

View File

@@ -1,266 +0,0 @@
import json
import os
import logging
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
from opentelemetry.trace import Status, StatusCode
from util.exceptions import TemplateError, TemplateNotFoundError, TemplateValidationError
from util import api, oss
from config.settings import get_storage_config
from telemetry import get_tracer
logger = logging.getLogger(__name__)
class TemplateService(ABC):
"""模板服务抽象接口"""
@abstractmethod
def get_template(self, template_id: str) -> Optional[Dict[str, Any]]:
"""
获取模板信息
Args:
template_id: 模板ID
Returns:
Dict[str, Any]: 模板信息,如果不存在则返回None
"""
pass
@abstractmethod
def load_local_templates(self):
"""加载本地模板"""
pass
@abstractmethod
def download_template(self, template_id: str) -> bool:
"""
下载模板
Args:
template_id: 模板ID
Returns:
bool: 下载是否成功
"""
pass
@abstractmethod
def validate_template(self, template_info: Dict[str, Any]) -> bool:
"""
验证模板
Args:
template_info: 模板信息
Returns:
bool: 验证是否通过
"""
pass
class DefaultTemplateService(TemplateService):
"""默认模板服务实现"""
def __init__(self):
self.templates: Dict[str, Dict[str, Any]] = {}
self.storage_config = get_storage_config()
def get_template(self, template_id: str) -> Optional[Dict[str, Any]]:
"""获取模板信息"""
if template_id not in self.templates:
# 尝试下载模板
if not self.download_template(template_id):
return None
return self.templates.get(template_id)
def load_local_templates(self):
"""加载本地模板"""
template_dir = self.storage_config.template_dir
if not os.path.exists(template_dir):
logger.warning("Template directory does not exist: %s", template_dir)
return
for template_name in os.listdir(template_dir):
if template_name.startswith("_") or template_name.startswith("."):
continue
target_path = os.path.join(template_dir, template_name)
if os.path.isdir(target_path):
try:
self._load_template(template_name, target_path)
except Exception as e:
logger.error("Failed to load template %s: %s", template_name, e)
def download_template(self, template_id: str) -> bool:
"""下载模板"""
tracer = get_tracer(__name__)
with tracer.start_as_current_span("download_template") as span:
try:
span.set_attribute("template.id", template_id)
# 获取远程模板信息
template_info = api.get_template_info(template_id)
if template_info is None:
logger.warning("Failed to get template info: %s", template_id)
return False
local_path = template_info.get('local_path')
if not local_path:
local_path = os.path.join(self.storage_config.template_dir, str(template_id))
template_info['local_path'] = local_path
# 创建本地目录
if not os.path.isdir(local_path):
os.makedirs(local_path)
# 下载模板资源
overall_template = template_info.get('overall_template', {})
video_parts = template_info.get('video_parts', [])
self._download_template_assets(overall_template, template_info)
for video_part in video_parts:
self._download_template_assets(video_part, template_info)
# 保存模板定义文件
template_file = os.path.join(local_path, 'template.json')
with open(template_file, 'w', encoding='utf-8') as f:
json.dump(template_info, f, ensure_ascii=False, indent=2)
# 加载到内存
self._load_template(template_id, local_path)
span.set_status(Status(StatusCode.OK))
logger.info("Template downloaded successfully: %s", template_id)
return True
except Exception as e:
span.set_status(Status(StatusCode.ERROR))
logger.error("Failed to download template %s: %s", template_id, e)
return False
def validate_template(self, template_info: Dict[str, Any]) -> bool:
"""验证模板"""
try:
local_path = template_info.get("local_path")
if not local_path:
raise TemplateValidationError("Template missing local_path")
# 验证视频部分
for video_part in template_info.get("video_parts", []):
self._validate_template_part(video_part, local_path)
# 验证整体模板
overall_template = template_info.get("overall_template", {})
if overall_template:
self._validate_template_part(overall_template, local_path)
return True
except TemplateValidationError:
raise
except Exception as e:
raise TemplateValidationError(f"Template validation failed: {e}")
def _load_template(self, template_name: str, local_path: str):
"""加载单个模板"""
logger.info("Loading template: %s (%s)", template_name, local_path)
template_def_file = os.path.join(local_path, "template.json")
if not os.path.exists(template_def_file):
raise TemplateNotFoundError(f"Template definition file not found: {template_def_file}")
try:
with open(template_def_file, 'r', encoding='utf-8') as f:
template_info = json.load(f)
except json.JSONDecodeError as e:
raise TemplateError(f"Invalid template JSON: {e}")
template_info["local_path"] = local_path
try:
self.validate_template(template_info)
self.templates[template_name] = template_info
logger.info("Template loaded successfully: %s", template_name)
except TemplateValidationError as e:
logger.error("Template validation failed for %s: %s. Attempting to re-download.", template_name, e)
# 模板验证失败,尝试重新下载
if self.download_template(template_name):
logger.info("Template re-downloaded successfully: %s", template_name)
else:
logger.error("Failed to re-download template: %s", template_name)
raise
def _download_template_assets(self, template_part: Dict[str, Any], template_info: Dict[str, Any]):
"""下载模板资源"""
local_path = template_info['local_path']
# 下载源文件
if 'source' in template_part:
source = template_part['source']
if isinstance(source, str) and source.startswith("http"):
_, filename = os.path.split(source)
new_file_path = os.path.join(local_path, filename)
oss.download_from_oss(source, new_file_path)
if filename.endswith(".mp4"):
from util.ffmpeg import re_encode_and_annexb
new_file_path = re_encode_and_annexb(new_file_path)
template_part['source'] = os.path.relpath(new_file_path, local_path)
# 下载覆盖层
if 'overlays' in template_part:
for i, overlay in enumerate(template_part['overlays']):
if isinstance(overlay, str) and overlay.startswith("http"):
_, filename = os.path.split(overlay)
oss.download_from_oss(overlay, os.path.join(local_path, filename))
template_part['overlays'][i] = filename
# 下载LUT
if 'luts' in template_part:
for i, lut in enumerate(template_part['luts']):
if isinstance(lut, str) and lut.startswith("http"):
_, filename = os.path.split(lut)
oss.download_from_oss(lut, os.path.join(local_path, filename))
template_part['luts'][i] = filename
# 下载音频
if 'audios' in template_part:
for i, audio in enumerate(template_part['audios']):
if isinstance(audio, str) and audio.startswith("http"):
_, filename = os.path.split(audio)
oss.download_from_oss(audio, os.path.join(local_path, filename))
template_part['audios'][i] = filename
def _validate_template_part(self, template_part: Dict[str, Any], base_dir: str):
"""验证模板部分"""
# 验证源文件
source_file = template_part.get("source", "")
if source_file and not source_file.startswith("http") and not source_file.startswith("PLACEHOLDER_"):
if not os.path.isabs(source_file):
source_file = os.path.join(base_dir, source_file)
if not os.path.exists(source_file):
raise TemplateValidationError(f"Source file not found: {source_file}")
# 验证音频文件
for audio in template_part.get("audios", []):
if not os.path.isabs(audio):
audio = os.path.join(base_dir, audio)
if not os.path.exists(audio):
raise TemplateValidationError(f"Audio file not found: {audio}")
# 验证LUT文件
for lut in template_part.get("luts", []):
if not os.path.isabs(lut):
lut = os.path.join(base_dir, lut)
if not os.path.exists(lut):
raise TemplateValidationError(f"LUT file not found: {lut}")
# 验证覆盖层文件
for overlay in template_part.get("overlays", []):
if not os.path.isabs(overlay):
overlay = os.path.join(base_dir, overlay)
if not os.path.exists(overlay):
raise TemplateValidationError(f"Overlay file not found: {overlay}")

View File

@@ -4,66 +4,125 @@ import logging
from telemetry import get_tracer from telemetry import get_tracer
from util import api, oss from util import api, oss
from services.template_service import DefaultTemplateService
TEMPLATES = {}
logger = logging.getLogger("template") logger = logging.getLogger("template")
# 全局模板服务实例
_template_service = None
def _get_template_service():
"""获取模板服务实例"""
global _template_service
if _template_service is None:
_template_service = DefaultTemplateService()
return _template_service
# 向后兼容的全局变量和函数
TEMPLATES = {}
def _update_templates_dict():
"""更新全局TEMPLATES字典以保持向后兼容"""
service = _get_template_service()
TEMPLATES.clear()
TEMPLATES.update(service.templates)
def check_local_template(local_name): def check_local_template(local_name):
"""向后兼容函数""" template_def = TEMPLATES[local_name]
service = _get_template_service() base_dir = template_def.get("local_path")
template_def = service.templates.get(local_name) for video_part in template_def.get("video_parts", []):
if template_def: source_file = video_part.get("source", "")
try: if str(source_file).startswith("http"):
service.validate_template(template_def) # download file
except Exception as e: ...
logger.error(f"Template validation failed: {e}") elif str(source_file).startswith("PLACEHOLDER_"):
raise continue
else:
if not os.path.isabs(source_file):
source_file = os.path.join(base_dir, source_file)
if not os.path.exists(source_file):
logger.error(f"{source_file} not found, please check the template definition")
raise Exception(f"{source_file} not found, please check the template definition")
for audio in video_part.get("audios", []):
if not os.path.isabs(audio):
audio = os.path.join(base_dir, audio)
if not os.path.exists(audio):
logger.error(f"{audio} not found, please check the template definition")
raise Exception(f"{audio} not found, please check the template definition")
for lut in video_part.get("luts", []):
if not os.path.isabs(lut):
lut = os.path.join(base_dir, lut)
if not os.path.exists(lut):
logger.error(f"{lut} not found, please check the template definition")
raise Exception(f"{lut} not found, please check the template definition")
for mask in video_part.get("overlays", []):
if not os.path.isabs(mask):
mask = os.path.join(base_dir, mask)
if not os.path.exists(mask):
logger.error(f"{mask} not found, please check the template definition")
raise Exception(f"{mask} not found, please check the template definition")
def load_template(template_name, local_path): def load_template(template_name, local_path):
"""向后兼容函数""" global TEMPLATES
service = _get_template_service() logger.info(f"加载视频模板定义:【{template_name}{local_path})】")
service._load_template(template_name, local_path) template_def_file = os.path.join(local_path, "template.json")
_update_templates_dict() if os.path.exists(template_def_file):
TEMPLATES[template_name] = json.load(open(template_def_file, 'rb'))
TEMPLATES[template_name]["local_path"] = local_path
try:
check_local_template(template_name)
logger.info(f"完成加载【{template_name}】模板")
except Exception as e:
logger.error(f"模板定义文件【{template_def_file}】有误,正在尝试重新下载模板", exc_info=e)
download_template(template_name)
def load_local_template(): def load_local_template():
"""加载本地模板(向后兼容函数)""" for template_name in os.listdir(os.getenv("TEMPLATE_DIR")):
service = _get_template_service() if template_name.startswith("_"):
service.load_local_templates() continue
_update_templates_dict() if template_name.startswith("."):
continue
target_path = os.path.join(os.getenv("TEMPLATE_DIR"), template_name)
if os.path.isdir(target_path):
load_template(template_name, target_path)
def get_template_def(template_id): def get_template_def(template_id):
"""获取模板定义(向后兼容函数)""" if template_id not in TEMPLATES:
service = _get_template_service() download_template(template_id)
template = service.get_template(template_id) return TEMPLATES.get(template_id)
_update_templates_dict()
return template
def download_template(template_id): def download_template(template_id):
"""下载模板(向后兼容函数)""" tracer = get_tracer(__name__)
service = _get_template_service() with tracer.start_as_current_span("download_template"):
success = service.download_template(template_id) template_info = api.get_template_info(template_id)
_update_templates_dict() if template_info is None:
return success return
if not os.path.isdir(template_info['local_path']):
os.makedirs(template_info['local_path'])
# download template assets
overall_template = template_info['overall_template']
video_parts = template_info['video_parts']
def _download_assets(_template):
if 'source' in _template:
if str(_template['source']).startswith("http"):
_, _fn = os.path.split(_template['source'])
new_fp = os.path.join(template_info['local_path'], _fn)
oss.download_from_oss(_template['source'], new_fp)
if _fn.endswith(".mp4"):
from util.ffmpeg import re_encode_and_annexb
new_fp = re_encode_and_annexb(new_fp)
_template['source'] = os.path.relpath(new_fp, template_info['local_path'])
if 'overlays' in _template:
for i in range(len(_template['overlays'])):
overlay = _template['overlays'][i]
if str(overlay).startswith("http"):
_, _fn = os.path.split(overlay)
oss.download_from_oss(overlay, os.path.join(template_info['local_path'], _fn))
_template['overlays'][i] = _fn
if 'luts' in _template:
for i in range(len(_template['luts'])):
lut = _template['luts'][i]
if str(lut).startswith("http"):
_, _fn = os.path.split(lut)
oss.download_from_oss(lut, os.path.join(template_info['local_path'], _fn))
_template['luts'][i] = _fn
if 'audios' in _template:
for i in range(len(_template['audios'])):
if str(_template['audios'][i]).startswith("http"):
_, _fn = os.path.split(_template['audios'][i])
oss.download_from_oss(_template['audios'][i], os.path.join(template_info['local_path'], _fn))
_template['audios'][i] = _fn
_download_assets(overall_template)
for video_part in video_parts:
_download_assets(video_part)
with open(os.path.join(template_info['local_path'], 'template.json'), 'w', encoding='utf-8') as f:
json.dump(template_info, f)
load_template(template_id, template_info['local_path'])
def analyze_template(template_id): def analyze_template(template_id):
"""分析模板(占位符函数)""" ...
pass

View File

@@ -24,14 +24,13 @@ def sync_center():
通过接口获取任务 通过接口获取任务
:return: 任务列表 :return: 任务列表
""" """
from services import DefaultTemplateService from template import TEMPLATES, download_template
template_service = DefaultTemplateService()
try: try:
response = session.post(os.getenv('API_ENDPOINT') + "/sync", json={ response = session.post(os.getenv('API_ENDPOINT') + "/sync", json={
'accessKey': os.getenv('ACCESS_KEY'), 'accessKey': os.getenv('ACCESS_KEY'),
'clientStatus': util.system.get_sys_info(), 'clientStatus': util.system.get_sys_info(),
'templateList': [{'id': t.get('id', ''), 'updateTime': t.get('updateTime', '')} for t in 'templateList': [{'id': t.get('id', ''), 'updateTime': t.get('updateTime', '')} for t in
template_service.templates.values()] TEMPLATES.values()]
}, timeout=10) }, timeout=10)
response.raise_for_status() response.raise_for_status()
except requests.RequestException as e: except requests.RequestException as e:
@@ -57,7 +56,7 @@ def sync_center():
template_id = template.get('id', '') template_id = template.get('id', '')
if template_id: if template_id:
logger.info("更新模板:【%s", template_id) logger.info("更新模板:【%s", template_id)
template_service.download_template(template_id) download_template(template_id)
return tasks return tasks

View File

@@ -1,72 +0,0 @@
class RenderWorkerError(Exception):
"""RenderWorker基础异常类"""
def __init__(self, message: str, error_code: str = None):
super().__init__(message)
self.message = message
self.error_code = error_code or self.__class__.__name__
class ConfigurationError(RenderWorkerError):
"""配置错误"""
pass
class TemplateError(RenderWorkerError):
"""模板相关错误"""
pass
class TemplateNotFoundError(TemplateError):
"""模板未找到错误"""
pass
class TemplateValidationError(TemplateError):
"""模板验证错误"""
pass
class TaskError(RenderWorkerError):
"""任务处理错误"""
pass
class TaskValidationError(TaskError):
"""任务参数验证错误"""
pass
class RenderError(RenderWorkerError):
"""渲染处理错误"""
pass
class FFmpegError(RenderError):
"""FFmpeg执行错误"""
def __init__(self, message: str, command: list = None, return_code: int = None, stderr: str = None):
super().__init__(message)
self.command = command
self.return_code = return_code
self.stderr = stderr
class EffectError(RenderError):
"""效果处理错误"""
def __init__(self, message: str, effect_name: str = None, effect_params: str = None):
super().__init__(message)
self.effect_name = effect_name
self.effect_params = effect_params
class StorageError(RenderWorkerError):
"""存储相关错误"""
pass
class APIError(RenderWorkerError):
"""API调用错误"""
def __init__(self, message: str, status_code: int = None, response_body: str = None):
super().__init__(message)
self.status_code = status_code
self.response_body = response_body
class ResourceError(RenderWorkerError):
"""资源相关错误"""
pass
class ResourceNotFoundError(ResourceError):
"""资源未找到错误"""
pass
class DownloadError(ResourceError):
"""下载错误"""
pass