from output.IOutput import IOutput
from typing import IO
import time
class XMLWriter(IOutput):
"""
可输出与B站弹幕姬兼容的xml弹幕格式,可用于转成ass字幕
"""
def __init__(self, config_manager):
super(XMLWriter, self).__init__(config_manager)
self._file_mappings: "dict[str, IO[str]]" = {}
self.time_mappings: "dict[str, float]" = {}
self._file_name_pattern: "str" = self._config_manager.config['output']['xml']['file_pattern']
def _get_fd_by_room_id(self, room_id: str) -> IO[str]:
if room_id in self._file_mappings:
return self._file_mappings[room_id]
cur_ts = time.time()
fd = open(self._file_name_pattern.format_map({
"room_id": room_id,
"ts": cur_ts
}), "w", encoding="UTF-8")
self._file_mappings[room_id] = fd
self.time_mappings[room_id] = cur_ts
return fd
def _close_fd_by_room_id(self, room_id: str):
if room_id in self._file_mappings:
fd = self._file_mappings[room_id]
if not fd.closed:
fd.close()
del self._file_mappings[room_id]
if room_id in self.time_mappings:
del self.time_mappings[room_id]
def control_output(self, message):
# 下播了
self._close_fd_by_room_id(message.room_id)
def _get_bias_ts_by_room_id(self, room_id: str, cur_ts: float = 0):
if cur_ts == 0:
cur_ts = time.time()
if room_id not in self.time_mappings:
return 0
return cur_ts - self.time_mappings[room_id]
def chat_output(self, message):
fd = self._get_fd_by_room_id(message.room_id)
if fd is None:
return
cur_time = time.time()
_c = """{}\r\n""".format(
self._get_bias_ts_by_room_id(message.room_id, cur_time), message.room_id,
cur_time * 1000, message.user().id, message.user().nickname, message.content
)
fd.write(_c)
fd.flush()
def gift_output(self, message):
fd = self._get_fd_by_room_id(message.room_id)
if fd is None:
return
cur_time = time.time()
_c = """\r\n""".format(
self._get_bias_ts_by_room_id(message.room_id, cur_time),
message.user().nickname, message.gift.name, message.instance.repeatCount
)
fd.write(_c)
fd.flush()
def terminate(self):
print("保存所有弹幕文件中...")
# copy
_rooms = [i for i in self._file_mappings.keys()]
for _room_id in _rooms:
self._close_fd_by_room_id(_room_id)
print("保存完毕")