由outputManager管理整个输出类及解析消息、分流消息功能,debug部分配置对接、xml导出部分配置对接

This commit is contained in:
Jerry Yan 2022-06-05 14:48:59 +08:00
parent 9fe1384b5d
commit 49a0715191
7 changed files with 153 additions and 107 deletions

View File

@ -9,9 +9,19 @@ webdriver:
edge:
bin: msedgedriver.exe
rooms:
- 19829678666
- https://live.douyin.com/579197336883
output:
use:
- print
- xml
- debug
xml:
save_path: "./"
file_pattern: "{room_id}_{ts}.xml"
debug:
save_path:
error: "./error"
unknown: "./debug"
known: False
http:
host: 127.0.0.1

View File

@ -1,73 +0,0 @@
import os
from handler.common import MESSAGE_QUEUE
from messages.control import ControlMessage
from protobuf import message_pb2
from protobuf import wss_pb2
import gzip
from messages.member import MemberMessage
from messages.like import LikeMessage
from messages.roomuserseq import RoomUserSeqMessage
from messages.gift import GiftMessage
from messages.social import SocialMessage
from messages.chat import ChatMessage
from output import OUTPUTER
def loop_queue():
while True:
message = MESSAGE_QUEUE.get()
try:
response = message_pb2.Response()
wss = wss_pb2.WssResponse()
wss.ParseFromString(message.body)
decompressed = gzip.decompress(wss.data)
response.ParseFromString(decompressed)
decodeMsg(response.messages)
except Exception as e:
for output in OUTPUTER:
output.error_output("ParseError", message.body, e)
def decodeMsg(messages):
for message in messages:
try:
if message.method == 'WebcastMemberMessage':
member_message = MemberMessage()
member_message.set_payload(message.payload)
for output in OUTPUTER:
output.member_output(member_message)
elif message.method == 'WebcastSocialMessage':
social_message = SocialMessage()
social_message.set_payload(message.payload)
for output in OUTPUTER:
output.social_output(social_message)
elif message.method == 'WebcastChatMessage':
chat_message = ChatMessage()
chat_message.set_payload(message.payload)
for output in OUTPUTER:
output.chat_output(chat_message)
elif message.method == 'WebcastLikeMessage':
like_message = LikeMessage()
like_message.set_payload(message.payload)
for output in OUTPUTER:
output.like_output(like_message)
elif message.method == 'WebcastGiftMessage':
gift_message = GiftMessage()
gift_message.set_payload(message.payload)
for output in OUTPUTER:
output.gift_output(gift_message)
elif message.method == 'WebcastRoomUserSeqMessage':
room_user_seq_message = RoomUserSeqMessage()
room_user_seq_message.set_payload(message.payload)
for output in OUTPUTER:
output.userseq_output(room_user_seq_message)
elif message.method == 'WebcastControlMessage':
control_message = ControlMessage()
control_message.set_payload(message.payload)
for output in OUTPUTER:
output.control_output(control_message)
else:
for output in OUTPUTER:
output.other_output(message.method, message.payload)
except Exception as e:
for output in OUTPUTER:
output.error_output(message.method, message.payload, e)

10
main.py
View File

@ -3,8 +3,8 @@ import subprocess
from config.helper import config
from handler.http_server import app
from handler.utils import loop_queue
from browser.manager import BrowserManager
from output.manager import OutputManager
if __name__ == '__main__':
mitmproxy_process = subprocess.Popen([
@ -13,9 +13,9 @@ if __name__ == '__main__':
])
api_thread = threading.Thread(target=app.run, args=(config()["http"]["host"], config()["http"]["port"],))
api_thread.start()
manager = BrowserManager()
queue_thread = threading.Thread(target=loop_queue)
queue_thread.start()
queue_thread.join()
browser_manager = BrowserManager()
output_manager = OutputManager()
output_manager.start_loop()
api_thread.join()

View File

@ -1,13 +0,0 @@
from output.print import Print
from output.debug import DebugWriter
from output.xml import XMLWriter
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from output.IOutput import IOutput
OUTPUTER: "list[IOutput]" = [
Print(),
XMLWriter(),
DebugWriter()
]

View File

@ -1,26 +1,33 @@
import os
import time
import traceback
from config.helper import config
from output.IOutput import IOutput
class DebugWriter(IOutput):
def __init__(self):
# 获取对应配置文件
self.unknown_output_dir = config()['output']['debug']['save_path']['unknown']
if not os.path.isdir(self.unknown_output_dir):
os.makedirs(self.unknown_output_dir)
self.error_output_dir = config()['output']['debug']['save_path']['error']
if not os.path.isdir(self.error_output_dir):
os.makedirs(self.error_output_dir)
def other_output(self, message_type: str, message_raw: bytes):
if not os.path.isdir(os.path.join("", "debug")):
os.makedirs(os.path.join("", "debug"))
if not os.path.isdir(os.path.join("", "debug", message_type)):
os.makedirs(os.path.join("", "debug", message_type))
with open(os.path.join("", "debug", message_type, str(time.time())), "wb") as f:
if not os.path.isdir(os.path.join(self.unknown_output_dir, message_type)):
os.makedirs(os.path.join(self.unknown_output_dir, message_type))
with open(os.path.join(self.unknown_output_dir, message_type, str(time.time())), "wb") as f:
f.write(message_raw)
def error_output(self, message_type: str, message_raw: bytes, exception: Exception):
if not os.path.isdir(os.path.join("", "error")):
os.makedirs(os.path.join("", "error"))
if not os.path.isdir(os.path.join("", "error", message_type)):
os.makedirs(os.path.join("", "error", message_type))
if not os.path.isdir(os.path.join(self.error_output_dir, message_type)):
os.makedirs(os.path.join(self.error_output_dir, message_type))
ts = time.time()
with open(os.path.join("", "error", message_type, str(ts)), "wb") as f:
with open(os.path.join(self.error_output_dir, message_type, str(ts)), "wb") as f:
f.write(message_raw)
traceback.print_exc(file=open(os.path.join("", "error", message_type, str(ts)) + ".exc", "w", encoding="UTF-8"))
traceback.print_exc(file=open(os.path.join(self.error_output_dir, message_type, str(ts)) + ".exc", "w", encoding="UTF-8"))

109
output/manager.py Normal file
View File

@ -0,0 +1,109 @@
import gzip
import threading
from typing import TYPE_CHECKING
from config.helper import config
from handler.common import MessagePayload, MESSAGE_QUEUE
from messages.chat import ChatMessage
from messages.control import ControlMessage
from messages.gift import GiftMessage
from messages.like import LikeMessage
from messages.member import MemberMessage
from messages.roomuserseq import RoomUserSeqMessage
from messages.social import SocialMessage
from output.print import Print
from output.xml import XMLWriter
from output.debug import DebugWriter
from protobuf import message_pb2, wss_pb2
if TYPE_CHECKING:
from typing import Type, Optional
from output.IOutput import IOutput
class OutputManager():
_mapping: "dict[str, Type[IOutput]]" = {
"print": Print,
"xml": XMLWriter,
"debug": DebugWriter,
}
_writer: "list[IOutput]" = []
_thread: "Optional[threading.Thread]"= None
def __init__(self):
_config = config()['output']['use']
if type(_config) != list:
_config = [_config]
for _c in _config:
if _c not in self._mapping:
raise Exception("不支持的输出方式")
self._writer.append(self._mapping[_c]())
def __del__(self):
...
def decode_payload(self, message: MessagePayload):
try:
response = message_pb2.Response()
wss = wss_pb2.WssResponse()
wss.ParseFromString(message.body)
decompressed = gzip.decompress(wss.data)
response.ParseFromString(decompressed)
self.decode_message(response.messages)
except Exception as e:
for writer in self._writer:
writer.error_output("ParseError", message.body, e)
def decode_message(self, message_list: list[message_pb2.Message]):
for message in message_list:
try:
if message.method == 'WebcastMemberMessage':
member_message = MemberMessage()
member_message.set_payload(message.payload)
for writer in self._writer:
writer.member_output(member_message)
elif message.method == 'WebcastSocialMessage':
social_message = SocialMessage()
social_message.set_payload(message.payload)
for writer in self._writer:
writer.social_output(social_message)
elif message.method == 'WebcastChatMessage':
chat_message = ChatMessage()
chat_message.set_payload(message.payload)
for writer in self._writer:
writer.chat_output(chat_message)
elif message.method == 'WebcastLikeMessage':
like_message = LikeMessage()
like_message.set_payload(message.payload)
for writer in self._writer:
writer.like_output(like_message)
elif message.method == 'WebcastGiftMessage':
gift_message = GiftMessage()
gift_message.set_payload(message.payload)
for writer in self._writer:
writer.gift_output(gift_message)
elif message.method == 'WebcastRoomUserSeqMessage':
room_user_seq_message = RoomUserSeqMessage()
room_user_seq_message.set_payload(message.payload)
for writer in self._writer:
writer.userseq_output(room_user_seq_message)
elif message.method == 'WebcastControlMessage':
control_message = ControlMessage()
control_message.set_payload(message.payload)
for writer in self._writer:
writer.control_output(control_message)
else:
for writer in self._writer:
writer.other_output(message.method, message.payload)
except Exception as e:
for writer in self._writer:
writer.error_output(message.method, message.payload, e)
def start_loop(self):
self._thread = threading.Thread(target=self._handle)
self._thread.start()
def _handle(self):
while True:
message = MESSAGE_QUEUE.get()
self.decode_payload(message)

View File

@ -1,3 +1,4 @@
from config.helper import config
from output.IOutput import IOutput
from typing import IO
import time
@ -10,13 +11,18 @@ class XMLWriter(IOutput):
def __init__(self):
self.file_mappings: "dict[str, IO[str]]" = {}
self.time_mappings: "dict[str, float]" = {}
self._file_name_pattern: "str" = config()['output']['xml']['file_pattern']
def _get_fd_by_room_id(self, room_id: str) -> IO[str]:
if room_id in self.file_mappings:
return self.file_mappings[room_id]
fd = open(f"{room_id}_{time.time()}.xml", "w", encoding="UTF-8")
cur_ts = time.time()
fd = open(self._file_name_pattern.format_map({
"room_id": room_id,
"ts": cur_ts
}), "w", encoding="UTF-8")
self.file_mappings[room_id] = fd
self.time_mappings[room_id] = time.time()
self.time_mappings[room_id] = cur_ts
return fd
def _close_fd_by_room_id(self, room_id: str):