You've already forked FrameTour-RenderWorker
Compare commits
6 Commits
d7704005b6
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
| 357c0afb3b | |||
| 8de0564fef | |||
| c61f6d7521 | |||
| 4ef57a208e | |||
| a415d8571d | |||
| 4af52d5a54 |
@@ -1,7 +1,7 @@
|
||||
import json
|
||||
import os.path
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
|
||||
from opentelemetry.trace import Status, StatusCode
|
||||
|
||||
@@ -129,13 +129,33 @@ def check_placeholder_exist_with_count(placeholder_id, task_params, required_cou
|
||||
return False
|
||||
|
||||
|
||||
def start_ffmpeg_task(ffmpeg_task):
|
||||
def start_ffmpeg_task(ffmpeg_task, max_workers=None):
|
||||
if max_workers is None:
|
||||
max_workers = int(os.environ.get("FFMPEG_MAX_WORKERS", 4))
|
||||
tracer = get_tracer(__name__)
|
||||
with tracer.start_as_current_span("start_ffmpeg_task") as span:
|
||||
for task in ffmpeg_task.analyze_input_render_tasks():
|
||||
result = start_ffmpeg_task(task)
|
||||
if not result:
|
||||
return False
|
||||
sub_tasks = list(ffmpeg_task.analyze_input_render_tasks())
|
||||
|
||||
if sub_tasks:
|
||||
span.set_attribute("sub_tasks.count", len(sub_tasks))
|
||||
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||
futures = {executor.submit(start_ffmpeg_task, task, max_workers): task
|
||||
for task in sub_tasks}
|
||||
for future in as_completed(futures):
|
||||
try:
|
||||
if not future.result():
|
||||
# 快速失败:取消剩余任务
|
||||
for f in futures:
|
||||
f.cancel()
|
||||
span.set_status(Status(StatusCode.ERROR))
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error("子任务执行失败: %s", e)
|
||||
for f in futures:
|
||||
f.cancel()
|
||||
span.set_status(Status(StatusCode.ERROR))
|
||||
return False
|
||||
|
||||
ffmpeg_task.correct_task_type()
|
||||
span.set_attribute("task.type", ffmpeg_task.task_type)
|
||||
span.set_attribute("task.center_cut", str(ffmpeg_task.center_cut))
|
||||
|
||||
@@ -6,4 +6,4 @@ SUPPORT_FEATURE = (
|
||||
'rclone_upload',
|
||||
'custom_re_encode',
|
||||
)
|
||||
SOFTWARE_VERSION = '0.0.6'
|
||||
SOFTWARE_VERSION = '0.0.8'
|
||||
|
||||
@@ -329,7 +329,7 @@ class FfmpegTask(object):
|
||||
skip_seconds = float(param)
|
||||
if skip_seconds > 0:
|
||||
effect_index += 1
|
||||
filter_args.append(f"{video_output_str}trim=start={skip_seconds}[v_eff{effect_index}]")
|
||||
filter_args.append(f"{video_output_str}trim=start={skip_seconds},setpts=PTS-STARTPTS[v_eff{effect_index}]")
|
||||
video_output_str = f"[v_eff{effect_index}]"
|
||||
elif effect.startswith("tail:"):
|
||||
param = effect.split(":", 2)[1]
|
||||
@@ -342,7 +342,7 @@ class FfmpegTask(object):
|
||||
# 使用reverse+trim+reverse的方法来精确获取最后N秒
|
||||
filter_args.append(f"{video_output_str}reverse[v_rev{effect_index}]")
|
||||
filter_args.append(f"[v_rev{effect_index}]trim=duration={tail_seconds}[v_trim{effect_index}]")
|
||||
filter_args.append(f"[v_trim{effect_index}]reverse[v_eff{effect_index}]")
|
||||
filter_args.append(f"[v_trim{effect_index}]reverse,setpts=PTS-STARTPTS[v_eff{effect_index}]")
|
||||
video_output_str = f"[v_eff{effect_index}]"
|
||||
elif effect.startswith("show:"):
|
||||
param = effect.split(":", 2)[1]
|
||||
@@ -351,7 +351,7 @@ class FfmpegTask(object):
|
||||
show_seconds = float(param)
|
||||
if show_seconds > 0:
|
||||
effect_index += 1
|
||||
filter_args.append(f"{video_output_str}trim=end={show_seconds}[v_eff{effect_index}]")
|
||||
filter_args.append(f"{video_output_str}trim=end={show_seconds},setpts=PTS-STARTPTS[v_eff{effect_index}]")
|
||||
video_output_str = f"[v_eff{effect_index}]"
|
||||
elif effect.startswith("grid4:"):
|
||||
param = effect.split(":", 2)[1]
|
||||
|
||||
@@ -64,7 +64,9 @@ def start_render(ffmpeg_task: FfmpegTask):
|
||||
ffmpeg_task.set_output_file(ffmpeg_task.input_file[0])
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
return True
|
||||
ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *ffmpeg_args], stderr=subprocess.PIPE, **subprocess_args(True))
|
||||
# 通过环境变量传入通用FFmpeg参数
|
||||
common_args = os.getenv("FFMPEG_COMMON_ARGS", "").split() if os.getenv("FFMPEG_COMMON_ARGS") else []
|
||||
ffmpeg_process = subprocess.run(["ffmpeg", "-progress", "-", "-loglevel", "error", *common_args, *ffmpeg_args], stderr=subprocess.PIPE, **subprocess_args(True))
|
||||
span.set_attribute("ffmpeg.args", json.dumps(ffmpeg_process.args))
|
||||
logger.info(" ".join(ffmpeg_process.args))
|
||||
ffmpeg_final_out = handle_ffmpeg_output(ffmpeg_process.stdout)
|
||||
|
||||
38
util/oss.py
38
util/oss.py
@@ -10,6 +10,24 @@ from telemetry import get_tracer
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _apply_http_replace_map(url):
|
||||
"""
|
||||
应用 HTTP_REPLACE_MAP 环境变量替换 URL
|
||||
:param str url: 原始 URL
|
||||
:return str: 替换后的 URL
|
||||
"""
|
||||
replace_map = os.getenv("HTTP_REPLACE_MAP", "")
|
||||
if not replace_map:
|
||||
return url
|
||||
replace_list = [i.split("|", 1) for i in replace_map.split(",")]
|
||||
new_url = url
|
||||
for (_src, _dst) in replace_list:
|
||||
new_url = new_url.replace(_src, _dst)
|
||||
if new_url != url:
|
||||
logger.debug(f"HTTP_REPLACE_MAP: {url} -> {new_url}")
|
||||
return new_url
|
||||
|
||||
|
||||
def upload_to_oss(url, file_path):
|
||||
"""
|
||||
使用签名URL上传文件到OSS
|
||||
@@ -27,6 +45,10 @@ def upload_to_oss(url, file_path):
|
||||
if os.getenv("UPLOAD_METHOD") == "rclone":
|
||||
with tracer.start_as_current_span("rclone_to_oss") as r_span:
|
||||
replace_map = os.getenv("RCLONE_REPLACE_MAP")
|
||||
config_file = os.getenv("RCLONE_CONFIG_FILE")
|
||||
rclone_config = ""
|
||||
if config_file != "":
|
||||
rclone_config = f"--config {config_file}"
|
||||
r_span.set_attribute("rclone.replace_map", replace_map)
|
||||
if replace_map != "":
|
||||
replace_list = [i.split("|", 1) for i in replace_map.split(",")]
|
||||
@@ -36,21 +58,24 @@ def upload_to_oss(url, file_path):
|
||||
new_url = new_url.split("?", 1)[0]
|
||||
r_span.set_attribute("rclone.target_dir", new_url)
|
||||
if new_url != url:
|
||||
result = os.system(f"rclone copyto --no-check-dest --ignore-existing --multi-thread-chunk-size 8M --multi-thread-streams 8 {file_path} {new_url}")
|
||||
result = os.system(f"rclone copyto --no-check-dest --ignore-existing --multi-thread-chunk-size 8M --multi-thread-streams 8 {rclone_config} {file_path} {new_url}")
|
||||
r_span.set_attribute("rclone.result", result)
|
||||
if result == 0:
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
return True
|
||||
else:
|
||||
span.set_status(Status(StatusCode.ERROR))
|
||||
# 应用 HTTP_REPLACE_MAP 替换 URL
|
||||
http_url = _apply_http_replace_map(url)
|
||||
span.set_attribute("file.http_url", http_url)
|
||||
while retries < max_retries:
|
||||
with tracer.start_as_current_span("upload_to_oss.request") as req_span:
|
||||
req_span.set_attribute("http.retry_count", retries)
|
||||
try:
|
||||
req_span.set_attribute("http.method", "PUT")
|
||||
req_span.set_attribute("http.url", url)
|
||||
req_span.set_attribute("http.url", http_url)
|
||||
with open(file_path, 'rb') as f:
|
||||
response = requests.put(url, data=f, stream=True, timeout=60, headers={"Content-Type": "video/mp4"})
|
||||
response = requests.put(http_url, data=f, stream=True, timeout=60, headers={"Content-Type": "video/mp4"})
|
||||
req_span.set_attribute("http.status_code", response.status_code)
|
||||
req_span.set_attribute("http.response", response.text)
|
||||
response.raise_for_status()
|
||||
@@ -97,6 +122,9 @@ def download_from_oss(url, file_path, skip_if_exist=None):
|
||||
if file_dir:
|
||||
if not os.path.exists(file_dir):
|
||||
os.makedirs(file_dir)
|
||||
# 应用 HTTP_REPLACE_MAP 替换 URL
|
||||
http_url = _apply_http_replace_map(url)
|
||||
span.set_attribute("file.http_url", http_url)
|
||||
max_retries = 5
|
||||
retries = 0
|
||||
while retries < max_retries:
|
||||
@@ -104,8 +132,8 @@ def download_from_oss(url, file_path, skip_if_exist=None):
|
||||
req_span.set_attribute("http.retry_count", retries)
|
||||
try:
|
||||
req_span.set_attribute("http.method", "GET")
|
||||
req_span.set_attribute("http.url", url)
|
||||
response = requests.get(url, timeout=15) # 设置超时时间
|
||||
req_span.set_attribute("http.url", http_url)
|
||||
response = requests.get(http_url, timeout=15) # 设置超时时间
|
||||
req_span.set_attribute("http.status_code", response.status_code)
|
||||
with open(file_path, 'wb') as f:
|
||||
f.write(response.content)
|
||||
|
||||
Reference in New Issue
Block a user