my-video-workflow/controller/api/collector_blueprint.py

71 lines
2.0 KiB
Python

import os.path
import platform
import psutil
from flask import Blueprint, jsonify
from config import DANMAKU_EXEC, FFMPEG_EXEC, BILILIVE_RECORDER_DIRECTORY, XIGUALIVE_RECORDER_DIRECTORY, VIDEO_OUTPUT_DIR
from util.system import check_exec
from workflow.bilibili import IS_LIVING, IS_UPLOADING
blueprint = Blueprint("api_collector", __name__, url_prefix="/api/collector")
def _get_disk_info(path):
if os.path.isdir(path):
disk_info = psutil.disk_usage(path)
return {
'exist': True,
'percent': disk_info.percent,
'free': _better_size_unit(disk_info.free),
'total': _better_size_unit(disk_info.total)
}
else:
return {
'exist': False,
'percent': 0,
'free': 0,
'total': 0
}
def _better_size_unit(bytes: int) -> str:
if bytes > 8*1024:
kbytes = bytes / 1024
if kbytes > 8*1024:
mbytes = kbytes / 1024
if mbytes > 4*1024:
gbytes = mbytes / 1024
if gbytes > 4*1024:
tbytes = gbytes / 1024
return "{:.2f}TB".format(tbytes)
else:
return "{:.2f}GB".format(gbytes)
else:
return "{:.2f}MB".format(mbytes)
else:
return "{:.1f}KB".format(kbytes)
else:
return "{}B".format(bytes)
@blueprint.get("/")
def collect_basic_status():
return jsonify({
'disk': {
'output': _get_disk_info(VIDEO_OUTPUT_DIR),
'work': _get_disk_info(os.path.abspath(".")),
'bili': _get_disk_info(BILILIVE_RECORDER_DIRECTORY),
'xigua': _get_disk_info(XIGUALIVE_RECORDER_DIRECTORY)
},
'exec': {
'ffmpeg': check_exec(FFMPEG_EXEC),
'danmaku': check_exec(DANMAKU_EXEC),
},
'system': {
'os': platform.system(),
},
'living': IS_LIVING.is_set(),
'uploading': IS_UPLOADING.is_set(),
})