import os.path from concurrent.futures import ProcessPoolExecutor, Future from datetime import datetime from glob import glob from typing import Optional from flask import Blueprint, jsonify, request, current_app from config import BILILIVE_RECORDER_DIRECTORY, VIDEO_TITLE, XIGUALIVE_RECORDER_DIRECTORY, VIDEO_DESC, \ VIDEO_TAGS, VIDEO_TID, VIDEO_ENABLED from exception.danmaku import DanmakuException from model import db from model.DanmakuClip import DanmakuClip from model.VideoClip import VideoClip from model.Workflow import Workflow from workflow.bilibili import IS_LIVING, IS_UPLOADING, INSTANCE as bilibili_instance from workflow.bilibili import VideoPart from workflow.danmaku import get_file_start from workflow.video import get_video_real_duration, duration_str_to_float from workflow.worker import do_workflow blueprint = Blueprint("api_bilirecorder", __name__, url_prefix="/api/bilirecorder") bili_record_workflow_item: Optional[Workflow] = None pool = ProcessPoolExecutor(max_workers=1) def auto_submit_task(): global bili_record_workflow_item if bili_record_workflow_item is None: print("[!]Auto Submit Fail: Item is None") return if bili_record_workflow_item.editing: print("[!]Auto Submit Fail: Still Editing") return if len(bili_record_workflow_item.video_clips) == 0: print("[!]Auto Submit Fail: No Video Clips") return bilibili_instance.login() video_title = bili_record_workflow_item.name _future = None for video_clip in bili_record_workflow_item.video_clips: if len(video_clip.danmaku_clips) > 0: print("[+]Workflow:", bili_record_workflow_item.id, "; Video:", video_clip.full_path) _started = True _future = pool.submit( do_workflow, video_clip.full_path, video_clip.danmaku_clips[0].full_path, *[clip.full_path for clip in video_clip.danmaku_clips[1:]] ) clear_item() if VIDEO_ENABLED: def _encode_finish_callback(_f: "Future"): _result = _f.result() if _result: # start uploading bilibili_instance.pre_upload( parts=[VideoPart(os.path.join(_item['base_path'], _item['file']), _item['file']) for _item in _result], max_retry=10 ) _future.add_done_callback(_encode_finish_callback) else: print("[-]Workflow:", bili_record_workflow_item.id, "; Video:", video_clip.full_path, "; No Danmaku") if _future is not None: def _on_upload_finish(_f: "Future"): if IS_UPLOADING.is_set() or IS_LIVING.is_set(): return bilibili_instance.finish_upload( title=video_title, desc=VIDEO_DESC, tid=VIDEO_TID, tag=VIDEO_TAGS, no_reprint=0) bilibili_instance.clear() _future.add_done_callback(_on_upload_finish) def clear_item(): global bili_record_workflow_item bili_record_workflow_item = Workflow() bili_record_workflow_item.name = VIDEO_TITLE.format(datetime.now().strftime("%Y%m%d")) bili_record_workflow_item.automatic = True bili_record_workflow_item.editing = True def commit_item(): global bili_record_workflow_item if bili_record_workflow_item is None: return bili_record_workflow_item.calculate_start_time() db.session.commit() def safe_create_item(): global bili_record_workflow_item if bili_record_workflow_item is None: bili_record_workflow_item = Workflow() elif bili_record_workflow_item.id is not None: bili_record_workflow_item.editing = False commit_item() auto_submit_task() bili_record_workflow_item = Workflow() else: bili_record_workflow_item.name = VIDEO_TITLE.format(datetime.utcnow().strftime("%Y%m%d")) bili_record_workflow_item.automatic = True bili_record_workflow_item.editing = True db.session.commit() return bili_record_workflow_item def safe_get_item() -> Workflow: global bili_record_workflow_item if bili_record_workflow_item is None: return safe_create_item() return bili_record_workflow_item def collect_danmaku_files(workflow: Optional[Workflow]): if workflow is None: return clip: VideoClip for clip in workflow.video_clips: full_path = clip.full_path pre_file_name = os.path.splitext(full_path)[0] # 理论上也只有一个结果 start_time_ts = None for danmaku_file in glob("{}*xml".format(pre_file_name)): relpath = os.path.relpath(danmaku_file, BILILIVE_RECORDER_DIRECTORY) danmaku = DanmakuClip.query.filter( DanmakuClip.file == relpath, DanmakuClip.base_path == BILILIVE_RECORDER_DIRECTORY ).first() if danmaku is None: try: start_time_ts = get_file_start(danmaku_file) except DanmakuException: continue danmaku = DanmakuClip() danmaku.file = relpath danmaku.base_path = BILILIVE_RECORDER_DIRECTORY danmaku.offset = 0 danmaku.video_clip = clip db.session.add(danmaku) clip.danmaku_clips.append(danmaku) if start_time_ts is None: if clip.duration is None or clip.duration == 0: clip.duration = duration_str_to_float(get_video_real_duration(clip.full_path)) start_time_ts = datetime.now().timestamp() - float(clip.duration) for danmaku_file in glob(os.path.join(XIGUALIVE_RECORDER_DIRECTORY, "*.xml")): relpath = os.path.relpath(danmaku_file, XIGUALIVE_RECORDER_DIRECTORY) danmaku = DanmakuClip.query.filter( DanmakuClip.file == relpath, DanmakuClip.base_path == XIGUALIVE_RECORDER_DIRECTORY ).first() if danmaku is None: try: start_time_xg = get_file_start(danmaku_file) except DanmakuException: continue bias = start_time_xg - start_time_ts if bias < -600: print("弹幕文件", danmaku_file, "反向偏移超过10分钟") continue danmaku = DanmakuClip() danmaku.file = relpath danmaku.base_path = XIGUALIVE_RECORDER_DIRECTORY danmaku.offset = bias danmaku.video_clip = clip db.session.add(danmaku) clip.danmaku_clips.append(danmaku) commit_item() @blueprint.post("/") def bilirecorder_event(): payload = request.json current_app.logger.debug(payload) if 'EventType' not in payload: response = jsonify({ 'error': "异常", 'payload': payload, }) response.status_code = 403 return response if payload['EventType'] == "SessionStarted": IS_LIVING.set() # 录制开始 safe_create_item() elif payload['EventType'] == "SessionEnded": IS_LIVING.clear() # 录制结束 item = safe_get_item() item.editing = False commit_item() auto_submit_task() return jsonify(item.to_dict()) elif payload['EventType'] == "FileClosed": # 文件关闭 item = safe_get_item() event_data = payload.get("EventData", {}) video_file = event_data.get("RelativePath", None) # 判断是否重复 already_add = False for clip in item.video_clips: if video_file == clip.file and clip.base_path == BILILIVE_RECORDER_DIRECTORY: already_add = True break if not already_add: video_clip = VideoClip() video_clip.file = video_file video_clip.base_path = BILILIVE_RECORDER_DIRECTORY video_clip.duration = event_data.get("Duration", 0) item.video_clips.append(video_clip) commit_item() collect_danmaku_files(item) auto_submit_task() return jsonify(item.to_dict()) commit_item() item = safe_get_item() return jsonify(item.to_dict()) @blueprint.get("/") def query_current_status(): item = safe_get_item() return jsonify(item.to_dict())