diff --git a/config/settings.yml b/config/settings.yml index abc9090..c9b90ac 100644 --- a/config/settings.yml +++ b/config/settings.yml @@ -1,14 +1,18 @@ -watchdog: - dir: '/Users/geng/douyin_live/' +mitm: + bin: 'C:\Program Files (x86)\mitmproxy\bin\mitmdump.exe' + host: 127.0.0.1 + port: 8080 webdriver: - bin: '/usr/local/bin/chromedriver' - proxy: '127.0.0.1:8080' - -mongo: - uri : 'mongodb://localhost:27017/' - dbname: 'tiktok' - enabled: 'on' + bin: 'msedgedriver.exe' + +rooms: + - 19829678666 + - https://live.douyin.com/579197336883 + +http: + host: 127.0.0.1 + port: 5000 api: userinfo: 'https://live.douyin.com/webcast/user/?aid=6383&target_uid=' \ No newline at end of file diff --git a/store/__init__.py b/handler/__init__.py similarity index 100% rename from store/__init__.py rename to handler/__init__.py diff --git a/handler/common.py b/handler/common.py new file mode 100644 index 0000000..e946253 --- /dev/null +++ b/handler/common.py @@ -0,0 +1,3 @@ +from queue import SimpleQueue + +MESSAGE_QUEUE = SimpleQueue() diff --git a/handler/http_server.py b/handler/http_server.py new file mode 100644 index 0000000..9dc42e1 --- /dev/null +++ b/handler/http_server.py @@ -0,0 +1,15 @@ +from flask import Flask, request, Response +from handler.common import MESSAGE_QUEUE +import logging +log = logging.getLogger('werkzeug') +log.setLevel(logging.ERROR) + +app = Flask(__name__) +app.config['JSON_AS_ASCII'] = False + + +@app.post("/message") +def message_from_mitmproxy(): + payload = request.data + MESSAGE_QUEUE.put(payload) + return Response(status=204) \ No newline at end of file diff --git a/messages/utils.py b/handler/utils.py similarity index 57% rename from messages/utils.py rename to handler/utils.py index 386fe52..2f2ad36 100644 --- a/messages/utils.py +++ b/handler/utils.py @@ -1,4 +1,6 @@ import os + +from handler.common import MESSAGE_QUEUE from protobuf import message_pb2 from protobuf import wss_pb2 import gzip @@ -8,34 +10,23 @@ from messages.roomuserseq import RoomUserSeqMessage from messages.gift import GiftMessage from messages.social import SocialMessage from messages.chat import ChatMessage +from output import OUTPUTER -from colorama import init, Fore -# define colors -RED = Fore.RED -GREEN = Fore.GREEN -BLUE = Fore.BLUE -CYAN = Fore.CYAN -MAGENTA = Fore.MAGENTA -YELLOW = Fore.YELLOW -WHITE = Fore.WHITE -RESET = Fore.RESET -init() - -def unpackMsgBin(filepath): - response = message_pb2.Response() - wss = wss_pb2.WssResponse() - try: - with open(filepath, 'rb') as f: - path_content = f.read() - wss.ParseFromString( path_content ) +def loop_queue(): + while True: + message = MESSAGE_QUEUE.get() + if type(message) == str: + message = message.encode("UTF-8") + try: + response = message_pb2.Response() + wss = wss_pb2.WssResponse() + wss.ParseFromString(message) decompressed = gzip.decompress(wss.data) response.ParseFromString(decompressed) decodeMsg(response.messages) - except Exception as e: - os.remove(filepath) - pass - finally: - os.remove(filepath) + except Exception as e: + # 发出去的信息无法解析 + pass def decodeMsg(messages): for message in messages: @@ -43,44 +34,34 @@ def decodeMsg(messages): if message.method == 'WebcastMemberMessage': member_message = MemberMessage() member_message.set_payload(message.payload) - member_message.persists() - - print(f"\n{RED}[+] {member_message} {RESET}") - + for output in OUTPUTER: + output.member_output(member_message) elif message.method == 'WebcastSocialMessage': social_message = SocialMessage() social_message.set_payload(message.payload) - social_message.persists() - - print(f"\n{GREEN}[+] {social_message} {RESET}") - + for output in OUTPUTER: + output.social_output(social_message) elif message.method == 'WebcastChatMessage': chat_message = ChatMessage() chat_message.set_payload(message.payload) - chat_message.persists() - - print(f"\n{BLUE}[+] {chat_message} {RESET}") - + for output in OUTPUTER: + output.chat_output(chat_message) elif message.method == 'WebcastLikeMessage': like_message = LikeMessage() like_message.set_payload(message.payload) - like_message.persists() - - print(f"\n{CYAN}[+] {like_message} {RESET}") - + for output in OUTPUTER: + output.like_output(like_message) elif message.method == 'WebcastGiftMessage': gift_message = GiftMessage() gift_message.set_payload(message.payload) - gift_message.persists() - - print(f"\n{MAGENTA}[+] {gift_message} {RESET}") - + for output in OUTPUTER: + output.gift_output(gift_message) elif message.method == 'WebcastRoomUserSeqMessage': room_user_seq_message = RoomUserSeqMessage() room_user_seq_message.set_payload(message.payload) - room_user_seq_message.persists() - - print(f"\n{YELLOW}[+] {room_user_seq_message} {RESET}") - + for output in OUTPUTER: + output.userseq_output(room_user_seq_message) + else: + ... except Exception as e: print(e) diff --git a/main.py b/main.py index a88ee28..74db8d9 100644 --- a/main.py +++ b/main.py @@ -1,18 +1,27 @@ import sys import threading +import subprocess from urllib.parse import urlparse -from scripts import watcher, webdriver from config.helper import config +from handler.http_server import app +from handler.utils import loop_queue +from scripts import webdriver if __name__ == '__main__': if len(sys.argv) == 1 or not urlparse(sys.argv[1]).scheme: print('Invalid url provided, please check...') sys.exit(1) - - t = threading.Thread(target=webdriver.go, args=(sys.argv[1],)) + api_thread = threading.Thread(target=app.run, args=(config()["http"]["host"], config()["http"]["port"],)) + api_thread.start() + mitmproxy_process = subprocess.Popen([ + config()["mitm"]["bin"], "-s", "./scripts/mitmproxy.py", "-q", + "--listen-host", config()["mitm"]["host"], "--listen-port", str(config()["mitm"]["port"]) + ]) + t = threading.Thread(target=webdriver.go) t.start() + queue_thread = threading.Thread(target=loop_queue) + queue_thread.start() + queue_thread.join() - w = watcher.Watcher(directory=config()['watchdog']['dir']) - w.run() diff --git a/messages/base.py b/messages/base.py index f881283..f990078 100644 --- a/messages/base.py +++ b/messages/base.py @@ -1,6 +1,5 @@ import traceback from datetime import datetime -from store.mongo import MongoStore from config.helper import config @@ -13,55 +12,13 @@ class Base: def extra_info(self): return dict() - + def user(self): if(hasattr(self.instance, 'user')): return self.instance.user - + return None - def persists(self): - if config()['mongo']['enabled'] != 'on': - return - - try: - store = MongoStore() - - user = self.user() - if user is not None: - store.set_collection('user') - - if not store.exists({'id': user.id}): - store.insert_one({ - 'id': user.id, - 'shortId': user.shortId, - 'nickname': user.nickname, - 'gender': user.gender - }) - - store.set_collection(self.instance.common.method) - msg = { - "msgId": self.instance.common.msgId, - "roomId": self.instance.common.roomId, - 'content': self.format_content(), - 'created_at': datetime.today().replace(microsecond=0) - } - - if user is not None: - msg.update({ - 'userId': user.id - }) - - if len(self.extra_info()): - msg.update(self.extra_info()) - - store.insert_one(msg) - except Exception as e: - print(self.instance.common.method + ' persists error') - print(traceback.format_exc()) - finally: - store.close() - def __str__(self): pass diff --git a/output/DebugWriter.py b/output/DebugWriter.py new file mode 100644 index 0000000..11ff3c4 --- /dev/null +++ b/output/DebugWriter.py @@ -0,0 +1,6 @@ +from output import IOutput + + +class DebugWriter(IOutput): + def other_output(self, message_type: str, message_raw: bytes): + print(message_type) diff --git a/output/IOutput.py b/output/IOutput.py new file mode 100644 index 0000000..1491288 --- /dev/null +++ b/output/IOutput.py @@ -0,0 +1,36 @@ +from messages.base import Base +from messages.chat import ChatMessage +from messages.gift import GiftMessage +from messages.like import LikeMessage +from messages.member import MemberMessage +from messages.roomuserseq import RoomUserSeqMessage +from messages.social import SocialMessage + + +class IOutput(): + def output(self, message_type: str, message_obj: Base): + ... + + def chat_output(self, message: ChatMessage): + ... + + def like_output(self, message: LikeMessage): + ... + + def member_output(self, message: MemberMessage): + ... + + def social_output(self, message: SocialMessage): + ... + + def gift_output(self, message: GiftMessage): + ... + + def userseq_output(self, message: RoomUserSeqMessage): + ... + + def other_output(self, message_type: str, message_raw: bytes): + ... + + def error_output(self, exception: Exception): + ... \ No newline at end of file diff --git a/output/README.md b/output/README.md new file mode 100644 index 0000000..1230bd5 --- /dev/null +++ b/output/README.md @@ -0,0 +1,3 @@ +# 输出模块 + +主要放置输出模块,建议继承IOutput,解析模块中所有解析的内容均基于IOutput进行适配 \ No newline at end of file diff --git a/output/__init__.py b/output/__init__.py new file mode 100644 index 0000000..468ecb6 --- /dev/null +++ b/output/__init__.py @@ -0,0 +1,6 @@ +from output.IOutput import IOutput +from output.print import Print + +OUTPUTER = [ + Print() +] \ No newline at end of file diff --git a/output/print.py b/output/print.py new file mode 100644 index 0000000..23fcf55 --- /dev/null +++ b/output/print.py @@ -0,0 +1,33 @@ +from colorama import init, Fore + +from output import IOutput + +RED = Fore.RED +GREEN = Fore.GREEN +BLUE = Fore.BLUE +CYAN = Fore.CYAN +MAGENTA = Fore.MAGENTA +YELLOW = Fore.YELLOW +WHITE = Fore.WHITE +RESET = Fore.RESET +init() + + +class Print(IOutput): + def chat_output(self, msg): + print(f"\n{BLUE}[+] {msg} {RESET}") + + def like_output(self, msg): + print(f"\n{CYAN}[+] {msg} {RESET}") + + def member_output(self, msg): + print(f"\n{RED}[+] {msg} {RESET}") + + def social_output(self, msg): + print(f"\n{GREEN}[+] {msg} {RESET}") + + def gift_output(self, msg): + print(f"\n{MAGENTA}[+] {msg} {RESET}") + + def userseq_output(self, msg): + print(f"\n{YELLOW}[+] {msg} {RESET}") diff --git a/scripts/mitmproxy.py b/scripts/mitmproxy.py index ae87cf7..e11b25c 100644 --- a/scripts/mitmproxy.py +++ b/scripts/mitmproxy.py @@ -1,16 +1,23 @@ # ! IMPORT ! make sure you ran mitmproxy with this script, # eg: `/path/to/mitmproxy -s mitmproxy.py` - -import uuid +import time from mitmproxy import http import re +import requests +import base64 + +session = requests.session() + class Writer: - def websocket_message(self, flow: http.HTTPFlow) : - re_c = re.search('webcast3-ws-web-.*\.douyin\.com', flow.request.host) - if re_c : - with open('/Users/geng/douyin_live/' + uuid.uuid4().hex, 'wb') as f: - mess = flow.websocket.messages[-1].content - f.write(bytes(mess)) + def websocket_message(self, flow: http.HTTPFlow): + re_c = re.search('webcast\d-ws-web-.*\.douyin\.com', flow.request.host) + if re_c: + message = flow.websocket.messages[-1].content + session.post("http://127.0.0.1:5000/message", data=message, headers={ + "X-MITM_TS": str(time.time()), + "X_REFERER": flow.request.host + }, timeout=(1, 1)) + addons = [Writer()] diff --git a/scripts/watcher.py b/scripts/watcher.py deleted file mode 100644 index e4a1c8e..0000000 --- a/scripts/watcher.py +++ /dev/null @@ -1,43 +0,0 @@ -import concurrent.futures -import queue -import time -from watchdog.observers import Observer -from watchdog.events import FileSystemEventHandler - -from messages.utils import unpackMsgBin - -q = queue.Queue() - -class Watcher: - DIRECTORY_TO_WATCH = "" - - def __init__(self, directory): - self.observer = Observer() - self.DIRECTORY_TO_WATCH = directory - - def run(self): - event_handler = Handler() - self.observer.schedule(event_handler, self.DIRECTORY_TO_WATCH, recursive=True) - self.observer.start() - - try: - while True: - with concurrent.futures.ThreadPoolExecutor() as executor: - time.sleep(0.2) - executor.submit(unpackMsgBin, q.get()) - except: - self.observer.stop() - - self.observer.join() - - -class Handler(FileSystemEventHandler): - - @staticmethod - def on_any_event(event): - if event.is_directory: - return None - - elif event.event_type == 'created': - q.put(event.src_path) - diff --git a/scripts/webdriver.py b/scripts/webdriver.py index 01e0f73..3db669a 100644 --- a/scripts/webdriver.py +++ b/scripts/webdriver.py @@ -2,7 +2,7 @@ import requests import json from selenium import webdriver -from selenium.webdriver.chrome.options import Options +from selenium.webdriver.edge.options import Options from selenium.webdriver.common.desired_capabilities import DesiredCapabilities from selenium.webdriver.common.proxy import Proxy, ProxyType from selenium.webdriver.common.by import By @@ -10,12 +10,10 @@ from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support.expected_conditions import presence_of_element_located from config.helper import config -from store.mongo import MongoStore def go(url): chrome_options = Options() - chrome_options.add_argument('--proxy-server=%s' % config()['webdriver']['proxy']) - chrome_options.add_argument('--headless') + chrome_options.add_argument('--proxy-server=%s:%s' % (config()['mitm']['host'], config()['mitm']['port'])) # 2022-04-09 添加一个忽略证书 chrome_options.add_argument('-ignore-certificate-errors') @@ -24,10 +22,10 @@ def go(url): proxy = Proxy() proxy.proxy_type = ProxyType.MANUAL - proxy.http_proxy = config()['webdriver']['proxy'] - proxy.ssl_proxy = config()['webdriver']['proxy'] + proxy.http_proxy = "%s:%s" % (config()['mitm']['host'], config()['mitm']['port']) + proxy.ssl_proxy = "%s:%s" % (config()['mitm']['host'], config()['mitm']['port']) - capabilities = DesiredCapabilities.CHROME + capabilities = DesiredCapabilities.EDGE proxy.add_to_capabilities(capabilities) with webdriver.Chrome(options=chrome_options, @@ -45,19 +43,7 @@ def go(url): json_obj = json.loads(json_str) roomInfo = json_obj['initialState']['roomStore']['roomInfo'] - - store = MongoStore() - store.set_collection('room') - store.insert_one({ - 'roomId': roomInfo['roomId'], - 'web_rid': roomInfo['web_rid'], - 'title': roomInfo['room']['title'], - 'user_count_str': roomInfo['room']['user_count_str'], - 'cover': roomInfo['room']['cover']['url_list'][0], - 'admin_user_ids': roomInfo['room']['admin_user_ids'], - 'owner': roomInfo['room']['owner'] - }) - store.close() + print(roomInfo) wait.until(presence_of_element_located((By.CLASS_NAME, "oSu9Aw19"))) \ No newline at end of file diff --git a/store/mongo.py b/store/mongo.py deleted file mode 100644 index 52116ed..0000000 --- a/store/mongo.py +++ /dev/null @@ -1,26 +0,0 @@ -import pymongo - -from config.helper import config - -class MongoStore: - def __init__(self): - self.client = pymongo.MongoClient(config()['mongo']['uri']) - self.db = self.client[config()['mongo']['dbname']] - - def close(self): - self.client.close() - - def set_collection(self, collection): - self.collection = self.db[collection] - - def replace_one(self, condition, data, upsert=True): - return self.collection.replace_one(condition, data, upsert=upsert) - - def insert_one(self, data): - return self.collection.insert_one(data) - - def insert_many(self, data): - return self.collection.insert_many(data) - - def exists(self, condition): - return self.collection.count_documents(condition, limit = 1) != 0