my-video-workflow/controller/api/bilirecorder_blueprint.py

216 lines
8.0 KiB
Python

import os.path
from concurrent.futures import ProcessPoolExecutor, Future
from datetime import datetime
from glob import glob
from typing import Optional
from flask import Blueprint, jsonify, request, current_app
from config import BILILIVE_RECORDER_DIRECTORY, VIDEO_TITLE, XIGUALIVE_RECORDER_DIRECTORY, VIDEO_OUTPUT_DIR, VIDEO_DESC, \
VIDEO_TAGS, VIDEO_TID
from exception.danmaku import DanmakuException
from model import db
from model.DanmakuClip import DanmakuClip
from model.VideoClip import VideoClip
from model.Workflow import Workflow
from workflow.danmaku import get_file_start
from workflow.video import get_video_real_duration, duration_str_to_float
from workflow.worker import do_workflow
from workflow.bilibili import IS_LIVING, INSTANCE as bilibili_instance
from workflow.bilibili import VideoPart
blueprint = Blueprint("api_bilirecorder", __name__, url_prefix="/api/bilirecorder")
bili_record_workflow_item: Optional[Workflow] = None
pool = ProcessPoolExecutor(max_workers=1)
def auto_submit_task():
global bili_record_workflow_item
if bili_record_workflow_item is None:
print("[!]Auto Submit Fail: Item is None")
return
if bili_record_workflow_item.editing:
print("[!]Auto Submit Fail: Still Editing")
return
if len(bili_record_workflow_item.video_clips) == 0:
print("[!]Auto Submit Fail: No Video Clips")
return
for video_clip in bili_record_workflow_item.video_clips:
if len(video_clip.danmaku_clips) > 0:
print("[+]Workflow:", bili_record_workflow_item.id, "; Video:", video_clip.full_path)
_started = True
_future = pool.submit(
do_workflow,
video_clip.full_path,
video_clip.danmaku_clips[0].full_path,
*[clip.full_path for clip in video_clip.danmaku_clips[1:]]
)
clear_item()
def _encode_finish_callback(_f: "Future"):
_result = _f.result()
if _result:
# start uploading
bilibili_instance.login()
bilibili_instance.upload(
parts=[VideoPart(os.path.join(_item['base_path'], _item['file']), _item['file']) for _item in
_result],
title=bili_record_workflow_item.name,
desc=VIDEO_DESC,
tid=VIDEO_TID,
tag=VIDEO_TAGS,
no_reprint=0)
_future.add_done_callback(_encode_finish_callback)
else:
print("[-]Workflow:", bili_record_workflow_item.id, "; Video:", video_clip.full_path, "; No Danmaku")
def clear_item():
global bili_record_workflow_item
bili_record_workflow_item = Workflow()
bili_record_workflow_item.name = VIDEO_TITLE.format(datetime.now().strftime("%Y%m%d"))
bili_record_workflow_item.automatic = True
bili_record_workflow_item.editing = True
def commit_item():
global bili_record_workflow_item
if bili_record_workflow_item is None:
return
bili_record_workflow_item.calculate_start_time()
db.session.commit()
def safe_create_item():
global bili_record_workflow_item
if bili_record_workflow_item is None:
bili_record_workflow_item = Workflow()
else:
if bili_record_workflow_item is not None and bili_record_workflow_item.id is not None:
bili_record_workflow_item.editing = False
commit_item()
bili_record_workflow_item = Workflow()
bili_record_workflow_item.name = VIDEO_TITLE.format(datetime.now().strftime("%Y%m%d"))
bili_record_workflow_item.automatic = True
bili_record_workflow_item.editing = True
db.session.commit()
return bili_record_workflow_item
def safe_get_item() -> Workflow:
global bili_record_workflow_item
if bili_record_workflow_item is None:
return safe_create_item()
return bili_record_workflow_item
def collect_danmaku_files(workflow: Optional[Workflow]):
if workflow is None:
return
clip: VideoClip
for clip in workflow.video_clips:
full_path = clip.full_path
pre_file_name = os.path.splitext(full_path)[0]
# 理论上也只有一个结果
start_time_ts = None
for danmaku_file in glob("{}*xml".format(pre_file_name)):
relpath = os.path.relpath(danmaku_file, BILILIVE_RECORDER_DIRECTORY)
danmaku = DanmakuClip.query.filter(
DanmakuClip.file == relpath,
DanmakuClip.base_path == BILILIVE_RECORDER_DIRECTORY
).first()
if danmaku is None:
try:
start_time_ts = get_file_start(danmaku_file)
except DanmakuException:
continue
danmaku = DanmakuClip()
danmaku.file = relpath
danmaku.base_path = BILILIVE_RECORDER_DIRECTORY
danmaku.offset = 0
danmaku.video_clip = clip
db.session.add(danmaku)
clip.danmaku_clips.append(danmaku)
if start_time_ts is None:
if clip.duration is None or clip.duration == 0:
clip.duration = duration_str_to_float(get_video_real_duration(clip.full_path))
start_time_ts = datetime.now().timestamp() - float(clip.duration)
for danmaku_file in glob(os.path.join(XIGUALIVE_RECORDER_DIRECTORY, "*.xml")):
relpath = os.path.relpath(danmaku_file, XIGUALIVE_RECORDER_DIRECTORY)
danmaku = DanmakuClip.query.filter(
DanmakuClip.file == relpath,
DanmakuClip.base_path == XIGUALIVE_RECORDER_DIRECTORY
).first()
if danmaku is None:
try:
start_time_xg = get_file_start(danmaku_file)
except DanmakuException:
continue
bias = start_time_xg - start_time_ts
if bias < -600:
print("弹幕文件", danmaku_file, "反向偏移超过10分钟")
continue
danmaku = DanmakuClip()
danmaku.file = relpath
danmaku.base_path = XIGUALIVE_RECORDER_DIRECTORY
danmaku.offset = bias
danmaku.video_clip = clip
db.session.add(danmaku)
clip.danmaku_clips.append(danmaku)
commit_item()
@blueprint.post("/")
def bilirecorder_event():
payload = request.json
current_app.logger.debug(payload)
if 'EventType' not in payload:
response = jsonify({
'error': "异常",
'payload': payload,
})
response.status_code = 403
return response
if payload['EventType'] == "SessionStarted":
IS_LIVING.set()
# 录制开始
safe_create_item()
elif payload['EventType'] == "SessionEnded":
IS_LIVING.clear()
# 录制结束
item = safe_get_item()
item.editing = False
commit_item()
auto_submit_task()
return jsonify(item.to_dict())
elif payload['EventType'] == "FileClosed":
# 文件关闭
item = safe_get_item()
event_data = payload.get("EventData", {})
video_file = event_data.get("RelativePath", None)
# 判断是否重复
already_add = False
for clip in item.video_clips:
if video_file == clip.file and clip.base_path == BILILIVE_RECORDER_DIRECTORY:
already_add = True
break
if not already_add:
video_clip = VideoClip()
video_clip.file = video_file
video_clip.base_path = BILILIVE_RECORDER_DIRECTORY
video_clip.duration = event_data.get("Duration", 0)
item.video_clips.append(video_clip)
commit_item()
collect_danmaku_files(item)
auto_submit_task()
return jsonify(item.to_dict())
commit_item()
item = safe_get_item()
return jsonify(item.to_dict())
@blueprint.get("/")
def query_current_status():
item = safe_get_item()
return jsonify(item.to_dict())