feat(task): 支持图片素材类型的视频渲染

- 添加 IMAGE_EXTENSIONS 常量定义支持的图片格式
- 实现 get_material_type 方法优先使用服务端类型或根据URL后缀推断
- 添加 is_image_material 方法判断素材是否为图片类型
- 修改 RenderSegmentVideoHandler 支持图片转视频流程
- 实现 _convert_image_to_video 方法将静态图片转换为视频
- 更新下载步骤为先检测素材类型再确定输入文件扩展名
- 添加图片素材转换为视频的处理逻辑
- 重构步骤编号以匹配新的处理流程
- 优化错误提示信息支持HTTP/HTTPS协议检查
This commit is contained in:
2026-01-18 13:52:46 +08:00
parent 10c57a387f
commit f27490e9e1
2 changed files with 180 additions and 15 deletions

View File

@@ -9,6 +9,12 @@ from enum import Enum
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Dict, Any, Optional, List from typing import Dict, Any, Optional, List
from datetime import datetime from datetime import datetime
from urllib.parse import urlparse, unquote
import os
# 支持的图片扩展名
IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.webp', '.bmp', '.gif'}
class TaskType(Enum): class TaskType(Enum):
@@ -369,6 +375,37 @@ class Task:
"""获取绑定的素材 URL(实际可下载的 HTTP URL)""" """获取绑定的素材 URL(实际可下载的 HTTP URL)"""
return self.payload.get('boundMaterialUrl') return self.payload.get('boundMaterialUrl')
def get_material_type(self) -> str:
"""
获取素材类型
优先使用服务端下发的 materialType 字段,
如果不存在则根据 URL 后缀自动推断。
Returns:
素材类型:"video""image"
"""
# 优先使用服务端下发的类型
material_type = self.payload.get('materialType')
if material_type in ('video', 'image'):
return material_type
# 降级:根据 URL 后缀推断
material_url = self.get_material_url()
if material_url:
parsed = urlparse(material_url)
path = unquote(parsed.path)
_, ext = os.path.splitext(path)
if ext.lower() in IMAGE_EXTENSIONS:
return 'image'
# 默认视频类型
return 'video'
def is_image_material(self) -> bool:
"""判断素材是否为图片类型"""
return self.get_material_type() == 'image'
def get_render_spec(self) -> RenderSpec: def get_render_spec(self) -> RenderSpec:
"""获取渲染规格""" """获取渲染规格"""
return RenderSpec.from_dict(self.payload.get('renderSpec')) return RenderSpec.from_dict(self.payload.get('renderSpec'))

View File

@@ -9,14 +9,23 @@
import os import os
import logging import logging
from typing import List, Optional, Tuple from typing import List, Optional, Tuple
from urllib.parse import urlparse, unquote
from handlers.base import BaseHandler from handlers.base import BaseHandler
from domain.task import Task, TaskType, RenderSpec, OutputSpec, Effect from domain.task import Task, TaskType, RenderSpec, OutputSpec, Effect, IMAGE_EXTENSIONS
from domain.result import TaskResult, ErrorCode from domain.result import TaskResult, ErrorCode
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def _get_extension_from_url(url: str) -> str:
"""从 URL 提取文件扩展名"""
parsed = urlparse(url)
path = unquote(parsed.path)
_, ext = os.path.splitext(path)
return ext.lower() if ext else ''
class RenderSegmentVideoHandler(BaseHandler): class RenderSegmentVideoHandler(BaseHandler):
""" """
视频片段渲染处理器 视频片段渲染处理器
@@ -46,18 +55,18 @@ class RenderSegmentVideoHandler(BaseHandler):
"Missing material URL (boundMaterialUrl or sourceRef)" "Missing material URL (boundMaterialUrl or sourceRef)"
) )
# 检查 URL 格式:必须是 HTTP/HTTPS 协议 # 检查 URL 格式:必须是 HTTPHTTPS 协议
if not material_url.startswith(('http://', 'https://')): if not material_url.startswith(('http://', 'https://')):
source_ref = task.get_source_ref() source_ref = task.get_source_ref()
bound_url = task.get_bound_material_url() bound_url = task.get_bound_material_url()
logger.error( logger.error(
f"[task:{task.task_id}] Invalid material URL format: '{material_url}'. " f"[task:{task.task_id}] Invalid material URL format: '{material_url}'. "
f"boundMaterialUrl={bound_url}, sourceRef={source_ref}. " f"boundMaterialUrl={bound_url}, sourceRef={source_ref}. "
f"Server should provide boundMaterialUrl with HTTP URL." f"Server should provide boundMaterialUrl with HTTP/HTTPS URL."
) )
return TaskResult.fail( return TaskResult.fail(
ErrorCode.E_SPEC_INVALID, ErrorCode.E_SPEC_INVALID,
f"Invalid material URL: '{material_url}' is not a valid HTTP URL. " f"Invalid material URL: '{material_url}' is not a valid HTTP/HTTPS URL. "
f"Server must provide boundMaterialUrl." f"Server must provide boundMaterialUrl."
) )
@@ -65,15 +74,44 @@ class RenderSegmentVideoHandler(BaseHandler):
output_spec = task.get_output_spec() output_spec = task.get_output_spec()
duration_ms = task.get_duration_ms() duration_ms = task.get_duration_ms()
# 1. 下载素材 # 1. 检测素材类型并确定输入文件扩展名
is_image = task.is_image_material()
if is_image:
# 图片素材:根据 URL 确定扩展名
ext = _get_extension_from_url(material_url)
if not ext or ext not in IMAGE_EXTENSIONS:
ext = '.jpg' # 默认扩展名
input_file = os.path.join(work_dir, f'input{ext}')
else:
input_file = os.path.join(work_dir, 'input.mp4') input_file = os.path.join(work_dir, 'input.mp4')
# 2. 下载素材
if not self.download_file(material_url, input_file): if not self.download_file(material_url, input_file):
return TaskResult.fail( return TaskResult.fail(
ErrorCode.E_INPUT_UNAVAILABLE, ErrorCode.E_INPUT_UNAVAILABLE,
f"Failed to download material: {material_url}" f"Failed to download material: {material_url}"
) )
# 2. 下载 LUT(如有) # 3. 图片素材转换为视频
if is_image:
video_input_file = os.path.join(work_dir, 'input_video.mp4')
if not self._convert_image_to_video(
image_file=input_file,
output_file=video_input_file,
duration_ms=duration_ms,
output_spec=output_spec,
render_spec=render_spec,
task_id=task.task_id
):
return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED,
"Failed to convert image to video"
)
# 使用转换后的视频作为输入
input_file = video_input_file
logger.info(f"[task:{task.task_id}] Image converted to video successfully")
# 4. 下载 LUT(如有)
lut_file = None lut_file = None
if render_spec.lut_url: if render_spec.lut_url:
lut_file = os.path.join(work_dir, 'lut.cube') lut_file = os.path.join(work_dir, 'lut.cube')
@@ -81,7 +119,7 @@ class RenderSegmentVideoHandler(BaseHandler):
logger.warning(f"[task:{task.task_id}] Failed to download LUT, continuing without it") logger.warning(f"[task:{task.task_id}] Failed to download LUT, continuing without it")
lut_file = None lut_file = None
# 3. 下载叠加层(如有) # 5. 下载叠加层(如有)
overlay_file = None overlay_file = None
if render_spec.overlay_url: if render_spec.overlay_url:
# 根据 URL 后缀确定文件扩展名 # 根据 URL 后缀确定文件扩展名
@@ -93,13 +131,13 @@ class RenderSegmentVideoHandler(BaseHandler):
logger.warning(f"[task:{task.task_id}] Failed to download overlay, continuing without it") logger.warning(f"[task:{task.task_id}] Failed to download overlay, continuing without it")
overlay_file = None overlay_file = None
# 4. 计算 overlap 时长(用于转场帧冻结) # 6. 计算 overlap 时长(用于转场帧冻结)
# 头部 overlap: 来自前一片段的出场转场 # 头部 overlap: 来自前一片段的出场转场
overlap_head_ms = task.get_overlap_head_ms() overlap_head_ms = task.get_overlap_head_ms()
# 尾部 overlap: 当前片段的出场转场 # 尾部 overlap: 当前片段的出场转场
overlap_tail_ms = task.get_overlap_tail_ms_v2() overlap_tail_ms = task.get_overlap_tail_ms_v2()
# 5. 构建 FFmpeg 命令 # 7. 构建 FFmpeg 命令
output_file = os.path.join(work_dir, 'output.mp4') output_file = os.path.join(work_dir, 'output.mp4')
cmd = self._build_command( cmd = self._build_command(
input_file=input_file, input_file=input_file,
@@ -113,25 +151,25 @@ class RenderSegmentVideoHandler(BaseHandler):
overlap_tail_ms=overlap_tail_ms overlap_tail_ms=overlap_tail_ms
) )
# 6. 执行 FFmpeg # 8. 执行 FFmpeg
if not self.run_ffmpeg(cmd, task.task_id): if not self.run_ffmpeg(cmd, task.task_id):
return TaskResult.fail( return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED, ErrorCode.E_FFMPEG_FAILED,
"FFmpeg rendering failed" "FFmpeg rendering failed"
) )
# 7. 验证输出文件 # 9. 验证输出文件
if not self.ensure_file_exists(output_file, min_size=4096): if not self.ensure_file_exists(output_file, min_size=4096):
return TaskResult.fail( return TaskResult.fail(
ErrorCode.E_FFMPEG_FAILED, ErrorCode.E_FFMPEG_FAILED,
"Output file is missing or too small" "Output file is missing or too small"
) )
# 8. 获取实际时长 # 10. 获取实际时长
actual_duration = self.probe_duration(output_file) actual_duration = self.probe_duration(output_file)
actual_duration_ms = int(actual_duration * 1000) if actual_duration else duration_ms actual_duration_ms = int(actual_duration * 1000) if actual_duration else duration_ms
# 9. 上传产物 # 11. 上传产物
video_url = self.upload_file(task.task_id, 'video', output_file) video_url = self.upload_file(task.task_id, 'video', output_file)
if not video_url: if not video_url:
return TaskResult.fail( return TaskResult.fail(
@@ -139,7 +177,7 @@ class RenderSegmentVideoHandler(BaseHandler):
"Failed to upload video" "Failed to upload video"
) )
# 10. 构建结果(包含 overlap 信息) # 12. 构建结果(包含 overlap 信息)
result_data = { result_data = {
'videoUrl': video_url, 'videoUrl': video_url,
'actualDurationMs': actual_duration_ms, 'actualDurationMs': actual_duration_ms,
@@ -156,6 +194,96 @@ class RenderSegmentVideoHandler(BaseHandler):
finally: finally:
self.cleanup_work_dir(work_dir) self.cleanup_work_dir(work_dir)
def _convert_image_to_video(
self,
image_file: str,
output_file: str,
duration_ms: int,
output_spec: OutputSpec,
render_spec: RenderSpec,
task_id: str
) -> bool:
"""
将图片转换为视频
使用 FFmpeg 将静态图片转换为指定时长的视频,
同时应用缩放填充和变速处理。
Args:
image_file: 输入图片文件路径
output_file: 输出视频文件路径
duration_ms: 目标时长(毫秒)
output_spec: 输出规格
render_spec: 渲染规格
task_id: 任务 ID(用于日志)
Returns:
是否成功
"""
width = output_spec.width
height = output_spec.height
fps = output_spec.fps
# 计算实际时长(考虑变速)
speed = float(render_spec.speed) if render_spec.speed else 1.0
if speed <= 0:
speed = 1.0
# 变速后的实际播放时长
actual_duration_sec = (duration_ms / 1000.0) / speed
# 构建 FFmpeg 命令
cmd = [
'ffmpeg', '-y', '-hide_banner',
'-loop', '1', # 循环输入图片
'-i', image_file,
'-t', str(actual_duration_sec), # 输出时长
]
# 构建滤镜:缩放填充到目标尺寸
filters = []
# 裁切处理(与视频相同逻辑)
if render_spec.crop_enable and render_spec.face_pos:
try:
fx, fy = map(float, render_spec.face_pos.split(','))
target_ratio = width / height
filters.append(
f"crop='min(iw,ih*{target_ratio})':'min(ih,iw/{target_ratio})':"
f"'(iw-min(iw,ih*{target_ratio}))*{fx}':"
f"'(ih-min(ih,iw/{target_ratio}))*{fy}'"
)
except (ValueError, ZeroDivisionError):
logger.warning(f"[task:{task_id}] Invalid face position: {render_spec.face_pos}")
elif render_spec.zoom_cut:
target_ratio = width / height
filters.append(
f"crop='min(iw,ih*{target_ratio})':'min(ih,iw/{target_ratio})'"
)
# 缩放填充
filters.append(
f"scale={width}:{height}:force_original_aspect_ratio=decrease,"
f"pad={width}:{height}:(ow-iw)/2:(oh-ih)/2:black"
)
# 格式转换(确保兼容性)
filters.append("format=yuv420p")
cmd.extend(['-vf', ','.join(filters)])
# 编码参数
cmd.extend([
'-c:v', 'libx264',
'-preset', 'fast',
'-crf', '18',
'-r', str(fps),
'-an', # 无音频
output_file
])
logger.info(f"[task:{task_id}] Converting image to video: {actual_duration_sec:.2f}s at {fps}fps")
return self.run_ffmpeg(cmd, task_id)
def _build_command( def _build_command(
self, self,
input_file: str, input_file: str,