You've already forked FrameTour-RenderWorker
Compare commits
4 Commits
d7704005b6
...
c61f6d7521
| Author | SHA1 | Date | |
|---|---|---|---|
| c61f6d7521 | |||
| 4ef57a208e | |||
| a415d8571d | |||
| 4af52d5a54 |
@@ -1,7 +1,7 @@
|
|||||||
import json
|
import json
|
||||||
import os.path
|
import os.path
|
||||||
import time
|
import time
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||||
|
|
||||||
from opentelemetry.trace import Status, StatusCode
|
from opentelemetry.trace import Status, StatusCode
|
||||||
|
|
||||||
@@ -129,13 +129,31 @@ def check_placeholder_exist_with_count(placeholder_id, task_params, required_cou
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
def start_ffmpeg_task(ffmpeg_task):
|
def start_ffmpeg_task(ffmpeg_task, max_workers=4):
|
||||||
tracer = get_tracer(__name__)
|
tracer = get_tracer(__name__)
|
||||||
with tracer.start_as_current_span("start_ffmpeg_task") as span:
|
with tracer.start_as_current_span("start_ffmpeg_task") as span:
|
||||||
for task in ffmpeg_task.analyze_input_render_tasks():
|
sub_tasks = list(ffmpeg_task.analyze_input_render_tasks())
|
||||||
result = start_ffmpeg_task(task)
|
|
||||||
if not result:
|
if sub_tasks:
|
||||||
|
span.set_attribute("sub_tasks.count", len(sub_tasks))
|
||||||
|
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||||
|
futures = {executor.submit(start_ffmpeg_task, task, max_workers): task
|
||||||
|
for task in sub_tasks}
|
||||||
|
for future in as_completed(futures):
|
||||||
|
try:
|
||||||
|
if not future.result():
|
||||||
|
# 快速失败:取消剩余任务
|
||||||
|
for f in futures:
|
||||||
|
f.cancel()
|
||||||
|
span.set_status(Status(StatusCode.ERROR))
|
||||||
return False
|
return False
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("子任务执行失败: %s", e)
|
||||||
|
for f in futures:
|
||||||
|
f.cancel()
|
||||||
|
span.set_status(Status(StatusCode.ERROR))
|
||||||
|
return False
|
||||||
|
|
||||||
ffmpeg_task.correct_task_type()
|
ffmpeg_task.correct_task_type()
|
||||||
span.set_attribute("task.type", ffmpeg_task.task_type)
|
span.set_attribute("task.type", ffmpeg_task.task_type)
|
||||||
span.set_attribute("task.center_cut", str(ffmpeg_task.center_cut))
|
span.set_attribute("task.center_cut", str(ffmpeg_task.center_cut))
|
||||||
|
|||||||
@@ -6,4 +6,4 @@ SUPPORT_FEATURE = (
|
|||||||
'rclone_upload',
|
'rclone_upload',
|
||||||
'custom_re_encode',
|
'custom_re_encode',
|
||||||
)
|
)
|
||||||
SOFTWARE_VERSION = '0.0.6'
|
SOFTWARE_VERSION = '0.0.8'
|
||||||
|
|||||||
@@ -329,7 +329,7 @@ class FfmpegTask(object):
|
|||||||
skip_seconds = float(param)
|
skip_seconds = float(param)
|
||||||
if skip_seconds > 0:
|
if skip_seconds > 0:
|
||||||
effect_index += 1
|
effect_index += 1
|
||||||
filter_args.append(f"{video_output_str}trim=start={skip_seconds}[v_eff{effect_index}]")
|
filter_args.append(f"{video_output_str}trim=start={skip_seconds},setpts=PTS-STARTPTS[v_eff{effect_index}]")
|
||||||
video_output_str = f"[v_eff{effect_index}]"
|
video_output_str = f"[v_eff{effect_index}]"
|
||||||
elif effect.startswith("tail:"):
|
elif effect.startswith("tail:"):
|
||||||
param = effect.split(":", 2)[1]
|
param = effect.split(":", 2)[1]
|
||||||
@@ -342,7 +342,7 @@ class FfmpegTask(object):
|
|||||||
# 使用reverse+trim+reverse的方法来精确获取最后N秒
|
# 使用reverse+trim+reverse的方法来精确获取最后N秒
|
||||||
filter_args.append(f"{video_output_str}reverse[v_rev{effect_index}]")
|
filter_args.append(f"{video_output_str}reverse[v_rev{effect_index}]")
|
||||||
filter_args.append(f"[v_rev{effect_index}]trim=duration={tail_seconds}[v_trim{effect_index}]")
|
filter_args.append(f"[v_rev{effect_index}]trim=duration={tail_seconds}[v_trim{effect_index}]")
|
||||||
filter_args.append(f"[v_trim{effect_index}]reverse[v_eff{effect_index}]")
|
filter_args.append(f"[v_trim{effect_index}]reverse,setpts=PTS-STARTPTS[v_eff{effect_index}]")
|
||||||
video_output_str = f"[v_eff{effect_index}]"
|
video_output_str = f"[v_eff{effect_index}]"
|
||||||
elif effect.startswith("show:"):
|
elif effect.startswith("show:"):
|
||||||
param = effect.split(":", 2)[1]
|
param = effect.split(":", 2)[1]
|
||||||
@@ -351,7 +351,7 @@ class FfmpegTask(object):
|
|||||||
show_seconds = float(param)
|
show_seconds = float(param)
|
||||||
if show_seconds > 0:
|
if show_seconds > 0:
|
||||||
effect_index += 1
|
effect_index += 1
|
||||||
filter_args.append(f"{video_output_str}trim=end={show_seconds}[v_eff{effect_index}]")
|
filter_args.append(f"{video_output_str}trim=end={show_seconds},setpts=PTS-STARTPTS[v_eff{effect_index}]")
|
||||||
video_output_str = f"[v_eff{effect_index}]"
|
video_output_str = f"[v_eff{effect_index}]"
|
||||||
elif effect.startswith("grid4:"):
|
elif effect.startswith("grid4:"):
|
||||||
param = effect.split(":", 2)[1]
|
param = effect.split(":", 2)[1]
|
||||||
|
|||||||
38
util/oss.py
38
util/oss.py
@@ -10,6 +10,24 @@ from telemetry import get_tracer
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def _apply_http_replace_map(url):
|
||||||
|
"""
|
||||||
|
应用 HTTP_REPLACE_MAP 环境变量替换 URL
|
||||||
|
:param str url: 原始 URL
|
||||||
|
:return str: 替换后的 URL
|
||||||
|
"""
|
||||||
|
replace_map = os.getenv("HTTP_REPLACE_MAP", "")
|
||||||
|
if not replace_map:
|
||||||
|
return url
|
||||||
|
replace_list = [i.split("|", 1) for i in replace_map.split(",")]
|
||||||
|
new_url = url
|
||||||
|
for (_src, _dst) in replace_list:
|
||||||
|
new_url = new_url.replace(_src, _dst)
|
||||||
|
if new_url != url:
|
||||||
|
logger.debug(f"HTTP_REPLACE_MAP: {url} -> {new_url}")
|
||||||
|
return new_url
|
||||||
|
|
||||||
|
|
||||||
def upload_to_oss(url, file_path):
|
def upload_to_oss(url, file_path):
|
||||||
"""
|
"""
|
||||||
使用签名URL上传文件到OSS
|
使用签名URL上传文件到OSS
|
||||||
@@ -27,6 +45,10 @@ def upload_to_oss(url, file_path):
|
|||||||
if os.getenv("UPLOAD_METHOD") == "rclone":
|
if os.getenv("UPLOAD_METHOD") == "rclone":
|
||||||
with tracer.start_as_current_span("rclone_to_oss") as r_span:
|
with tracer.start_as_current_span("rclone_to_oss") as r_span:
|
||||||
replace_map = os.getenv("RCLONE_REPLACE_MAP")
|
replace_map = os.getenv("RCLONE_REPLACE_MAP")
|
||||||
|
config_file = os.getenv("RCLONE_CONFIG_FILE")
|
||||||
|
rclone_config = ""
|
||||||
|
if config_file != "":
|
||||||
|
rclone_config = f"--config {config_file}"
|
||||||
r_span.set_attribute("rclone.replace_map", replace_map)
|
r_span.set_attribute("rclone.replace_map", replace_map)
|
||||||
if replace_map != "":
|
if replace_map != "":
|
||||||
replace_list = [i.split("|", 1) for i in replace_map.split(",")]
|
replace_list = [i.split("|", 1) for i in replace_map.split(",")]
|
||||||
@@ -36,21 +58,24 @@ def upload_to_oss(url, file_path):
|
|||||||
new_url = new_url.split("?", 1)[0]
|
new_url = new_url.split("?", 1)[0]
|
||||||
r_span.set_attribute("rclone.target_dir", new_url)
|
r_span.set_attribute("rclone.target_dir", new_url)
|
||||||
if new_url != url:
|
if new_url != url:
|
||||||
result = os.system(f"rclone copyto --no-check-dest --ignore-existing --multi-thread-chunk-size 8M --multi-thread-streams 8 {file_path} {new_url}")
|
result = os.system(f"rclone copyto --no-check-dest --ignore-existing --multi-thread-chunk-size 8M --multi-thread-streams 8 {rclone_config} {file_path} {new_url}")
|
||||||
r_span.set_attribute("rclone.result", result)
|
r_span.set_attribute("rclone.result", result)
|
||||||
if result == 0:
|
if result == 0:
|
||||||
span.set_status(Status(StatusCode.OK))
|
span.set_status(Status(StatusCode.OK))
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
span.set_status(Status(StatusCode.ERROR))
|
span.set_status(Status(StatusCode.ERROR))
|
||||||
|
# 应用 HTTP_REPLACE_MAP 替换 URL
|
||||||
|
http_url = _apply_http_replace_map(url)
|
||||||
|
span.set_attribute("file.http_url", http_url)
|
||||||
while retries < max_retries:
|
while retries < max_retries:
|
||||||
with tracer.start_as_current_span("upload_to_oss.request") as req_span:
|
with tracer.start_as_current_span("upload_to_oss.request") as req_span:
|
||||||
req_span.set_attribute("http.retry_count", retries)
|
req_span.set_attribute("http.retry_count", retries)
|
||||||
try:
|
try:
|
||||||
req_span.set_attribute("http.method", "PUT")
|
req_span.set_attribute("http.method", "PUT")
|
||||||
req_span.set_attribute("http.url", url)
|
req_span.set_attribute("http.url", http_url)
|
||||||
with open(file_path, 'rb') as f:
|
with open(file_path, 'rb') as f:
|
||||||
response = requests.put(url, data=f, stream=True, timeout=60, headers={"Content-Type": "video/mp4"})
|
response = requests.put(http_url, data=f, stream=True, timeout=60, headers={"Content-Type": "video/mp4"})
|
||||||
req_span.set_attribute("http.status_code", response.status_code)
|
req_span.set_attribute("http.status_code", response.status_code)
|
||||||
req_span.set_attribute("http.response", response.text)
|
req_span.set_attribute("http.response", response.text)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
@@ -97,6 +122,9 @@ def download_from_oss(url, file_path, skip_if_exist=None):
|
|||||||
if file_dir:
|
if file_dir:
|
||||||
if not os.path.exists(file_dir):
|
if not os.path.exists(file_dir):
|
||||||
os.makedirs(file_dir)
|
os.makedirs(file_dir)
|
||||||
|
# 应用 HTTP_REPLACE_MAP 替换 URL
|
||||||
|
http_url = _apply_http_replace_map(url)
|
||||||
|
span.set_attribute("file.http_url", http_url)
|
||||||
max_retries = 5
|
max_retries = 5
|
||||||
retries = 0
|
retries = 0
|
||||||
while retries < max_retries:
|
while retries < max_retries:
|
||||||
@@ -104,8 +132,8 @@ def download_from_oss(url, file_path, skip_if_exist=None):
|
|||||||
req_span.set_attribute("http.retry_count", retries)
|
req_span.set_attribute("http.retry_count", retries)
|
||||||
try:
|
try:
|
||||||
req_span.set_attribute("http.method", "GET")
|
req_span.set_attribute("http.method", "GET")
|
||||||
req_span.set_attribute("http.url", url)
|
req_span.set_attribute("http.url", http_url)
|
||||||
response = requests.get(url, timeout=15) # 设置超时时间
|
response = requests.get(http_url, timeout=15) # 设置超时时间
|
||||||
req_span.set_attribute("http.status_code", response.status_code)
|
req_span.set_attribute("http.status_code", response.status_code)
|
||||||
with open(file_path, 'wb') as f:
|
with open(file_path, 'wb') as f:
|
||||||
f.write(response.content)
|
f.write(response.content)
|
||||||
|
|||||||
Reference in New Issue
Block a user