This repository has been archived on 2024-09-10. You can view files and clone it, but cannot push or open issues or pull requests.

80 lines
2.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from output.IOutput import IOutput
from typing import IO
import time
class XMLWriter(IOutput):
"""
可输出与B站弹幕姬兼容的xml弹幕格式可用于转成ass字幕
"""
def __init__(self, config_manager):
super(XMLWriter, self).__init__(config_manager)
self._file_mappings: "dict[str, IO[str]]" = {}
self.time_mappings: "dict[str, float]" = {}
self._file_name_pattern: "str" = self._config_manager.config['output']['xml']['file_pattern']
def _get_fd_by_room_id(self, room_id: str) -> IO[str]:
if room_id in self._file_mappings:
return self._file_mappings[room_id]
cur_ts = time.time()
fd = open(self._file_name_pattern.format_map({
"room_id": room_id,
"ts": cur_ts
}), "w", encoding="UTF-8")
self._file_mappings[room_id] = fd
self.time_mappings[room_id] = cur_ts
return fd
def _close_fd_by_room_id(self, room_id: str):
if room_id in self._file_mappings:
fd = self._file_mappings[room_id]
if not fd.closed:
fd.close()
del self._file_mappings[room_id]
if room_id in self.time_mappings:
del self.time_mappings[room_id]
def control_output(self, message):
# 下播了
self._close_fd_by_room_id(message.room_id)
def _get_bias_ts_by_room_id(self, room_id: str, cur_ts: float = 0):
if cur_ts == 0:
cur_ts = time.time()
if room_id not in self.time_mappings:
return 0
return cur_ts - self.time_mappings[room_id]
def chat_output(self, message):
fd = self._get_fd_by_room_id(message.room_id)
if fd is None:
return
cur_time = time.time()
_c = """<d p="{:.2f},1,24,{},{:.0f},0,{},0" user="{}">{}</d>\r\n""".format(
self._get_bias_ts_by_room_id(message.room_id, cur_time), message.room_id,
cur_time * 1000, message.user().id, message.user().nickname, message.content
)
fd.write(_c)
fd.flush()
def gift_output(self, message):
fd = self._get_fd_by_room_id(message.room_id)
if fd is None:
return
cur_time = time.time()
_c = """<gift ts="{:.2f}" user="{}" giftname="{}" giftcount="{}"></gift>\r\n""".format(
self._get_bias_ts_by_room_id(message.room_id, cur_time),
message.user().nickname, message.gift.name, message.instance.repeatCount
)
fd.write(_c)
fd.flush()
def terminate(self):
print("保存所有弹幕文件中...")
# copy
_rooms = [i for i in self._file_mappings.keys()]
for _room_id in _rooms:
self._close_fd_by_room_id(_room_id)
print("保存完毕")