尝试大改,可读性有点差的版本,基本做到能实时处理弹幕

This commit is contained in:
Jerry Yan 2022-06-03 23:08:43 +08:00
parent 3f95b07009
commit 74a15849ab
16 changed files with 181 additions and 204 deletions

View File

@ -1,14 +1,18 @@
watchdog: mitm:
dir: '/Users/geng/douyin_live/' bin: 'C:\Program Files (x86)\mitmproxy\bin\mitmdump.exe'
host: 127.0.0.1
port: 8080
webdriver: webdriver:
bin: '/usr/local/bin/chromedriver' bin: 'msedgedriver.exe'
proxy: '127.0.0.1:8080'
rooms:
mongo: - 19829678666
uri : 'mongodb://localhost:27017/' - https://live.douyin.com/579197336883
dbname: 'tiktok'
enabled: 'on' http:
host: 127.0.0.1
port: 5000
api: api:
userinfo: 'https://live.douyin.com/webcast/user/?aid=6383&target_uid=' userinfo: 'https://live.douyin.com/webcast/user/?aid=6383&target_uid='

3
handler/common.py Normal file
View File

@ -0,0 +1,3 @@
from queue import SimpleQueue
MESSAGE_QUEUE = SimpleQueue()

15
handler/http_server.py Normal file
View File

@ -0,0 +1,15 @@
from flask import Flask, request, Response
from handler.common import MESSAGE_QUEUE
import logging
log = logging.getLogger('werkzeug')
log.setLevel(logging.ERROR)
app = Flask(__name__)
app.config['JSON_AS_ASCII'] = False
@app.post("/message")
def message_from_mitmproxy():
payload = request.data
MESSAGE_QUEUE.put(payload)
return Response(status=204)

View File

@ -1,4 +1,6 @@
import os import os
from handler.common import MESSAGE_QUEUE
from protobuf import message_pb2 from protobuf import message_pb2
from protobuf import wss_pb2 from protobuf import wss_pb2
import gzip import gzip
@ -8,34 +10,23 @@ from messages.roomuserseq import RoomUserSeqMessage
from messages.gift import GiftMessage from messages.gift import GiftMessage
from messages.social import SocialMessage from messages.social import SocialMessage
from messages.chat import ChatMessage from messages.chat import ChatMessage
from output import OUTPUTER
from colorama import init, Fore def loop_queue():
# define colors while True:
RED = Fore.RED message = MESSAGE_QUEUE.get()
GREEN = Fore.GREEN if type(message) == str:
BLUE = Fore.BLUE message = message.encode("UTF-8")
CYAN = Fore.CYAN try:
MAGENTA = Fore.MAGENTA response = message_pb2.Response()
YELLOW = Fore.YELLOW wss = wss_pb2.WssResponse()
WHITE = Fore.WHITE wss.ParseFromString(message)
RESET = Fore.RESET
init()
def unpackMsgBin(filepath):
response = message_pb2.Response()
wss = wss_pb2.WssResponse()
try:
with open(filepath, 'rb') as f:
path_content = f.read()
wss.ParseFromString( path_content )
decompressed = gzip.decompress(wss.data) decompressed = gzip.decompress(wss.data)
response.ParseFromString(decompressed) response.ParseFromString(decompressed)
decodeMsg(response.messages) decodeMsg(response.messages)
except Exception as e: except Exception as e:
os.remove(filepath) # 发出去的信息无法解析
pass pass
finally:
os.remove(filepath)
def decodeMsg(messages): def decodeMsg(messages):
for message in messages: for message in messages:
@ -43,44 +34,34 @@ def decodeMsg(messages):
if message.method == 'WebcastMemberMessage': if message.method == 'WebcastMemberMessage':
member_message = MemberMessage() member_message = MemberMessage()
member_message.set_payload(message.payload) member_message.set_payload(message.payload)
member_message.persists() for output in OUTPUTER:
output.member_output(member_message)
print(f"\n{RED}[+] {member_message} {RESET}")
elif message.method == 'WebcastSocialMessage': elif message.method == 'WebcastSocialMessage':
social_message = SocialMessage() social_message = SocialMessage()
social_message.set_payload(message.payload) social_message.set_payload(message.payload)
social_message.persists() for output in OUTPUTER:
output.social_output(social_message)
print(f"\n{GREEN}[+] {social_message} {RESET}")
elif message.method == 'WebcastChatMessage': elif message.method == 'WebcastChatMessage':
chat_message = ChatMessage() chat_message = ChatMessage()
chat_message.set_payload(message.payload) chat_message.set_payload(message.payload)
chat_message.persists() for output in OUTPUTER:
output.chat_output(chat_message)
print(f"\n{BLUE}[+] {chat_message} {RESET}")
elif message.method == 'WebcastLikeMessage': elif message.method == 'WebcastLikeMessage':
like_message = LikeMessage() like_message = LikeMessage()
like_message.set_payload(message.payload) like_message.set_payload(message.payload)
like_message.persists() for output in OUTPUTER:
output.like_output(like_message)
print(f"\n{CYAN}[+] {like_message} {RESET}")
elif message.method == 'WebcastGiftMessage': elif message.method == 'WebcastGiftMessage':
gift_message = GiftMessage() gift_message = GiftMessage()
gift_message.set_payload(message.payload) gift_message.set_payload(message.payload)
gift_message.persists() for output in OUTPUTER:
output.gift_output(gift_message)
print(f"\n{MAGENTA}[+] {gift_message} {RESET}")
elif message.method == 'WebcastRoomUserSeqMessage': elif message.method == 'WebcastRoomUserSeqMessage':
room_user_seq_message = RoomUserSeqMessage() room_user_seq_message = RoomUserSeqMessage()
room_user_seq_message.set_payload(message.payload) room_user_seq_message.set_payload(message.payload)
room_user_seq_message.persists() for output in OUTPUTER:
output.userseq_output(room_user_seq_message)
print(f"\n{YELLOW}[+] {room_user_seq_message} {RESET}") else:
...
except Exception as e: except Exception as e:
print(e) print(e)

19
main.py
View File

@ -1,18 +1,27 @@
import sys import sys
import threading import threading
import subprocess
from urllib.parse import urlparse from urllib.parse import urlparse
from scripts import watcher, webdriver
from config.helper import config from config.helper import config
from handler.http_server import app
from handler.utils import loop_queue
from scripts import webdriver
if __name__ == '__main__': if __name__ == '__main__':
if len(sys.argv) == 1 or not urlparse(sys.argv[1]).scheme: if len(sys.argv) == 1 or not urlparse(sys.argv[1]).scheme:
print('Invalid url provided, please check...') print('Invalid url provided, please check...')
sys.exit(1) sys.exit(1)
api_thread = threading.Thread(target=app.run, args=(config()["http"]["host"], config()["http"]["port"],))
t = threading.Thread(target=webdriver.go, args=(sys.argv[1],)) api_thread.start()
mitmproxy_process = subprocess.Popen([
config()["mitm"]["bin"], "-s", "./scripts/mitmproxy.py", "-q",
"--listen-host", config()["mitm"]["host"], "--listen-port", str(config()["mitm"]["port"])
])
t = threading.Thread(target=webdriver.go)
t.start() t.start()
queue_thread = threading.Thread(target=loop_queue)
queue_thread.start()
queue_thread.join()
w = watcher.Watcher(directory=config()['watchdog']['dir'])
w.run()

View File

@ -1,6 +1,5 @@
import traceback import traceback
from datetime import datetime from datetime import datetime
from store.mongo import MongoStore
from config.helper import config from config.helper import config
@ -13,55 +12,13 @@ class Base:
def extra_info(self): def extra_info(self):
return dict() return dict()
def user(self): def user(self):
if(hasattr(self.instance, 'user')): if(hasattr(self.instance, 'user')):
return self.instance.user return self.instance.user
return None return None
def persists(self):
if config()['mongo']['enabled'] != 'on':
return
try:
store = MongoStore()
user = self.user()
if user is not None:
store.set_collection('user')
if not store.exists({'id': user.id}):
store.insert_one({
'id': user.id,
'shortId': user.shortId,
'nickname': user.nickname,
'gender': user.gender
})
store.set_collection(self.instance.common.method)
msg = {
"msgId": self.instance.common.msgId,
"roomId": self.instance.common.roomId,
'content': self.format_content(),
'created_at': datetime.today().replace(microsecond=0)
}
if user is not None:
msg.update({
'userId': user.id
})
if len(self.extra_info()):
msg.update(self.extra_info())
store.insert_one(msg)
except Exception as e:
print(self.instance.common.method + ' persists error')
print(traceback.format_exc())
finally:
store.close()
def __str__(self): def __str__(self):
pass pass

6
output/DebugWriter.py Normal file
View File

@ -0,0 +1,6 @@
from output import IOutput
class DebugWriter(IOutput):
def other_output(self, message_type: str, message_raw: bytes):
print(message_type)

36
output/IOutput.py Normal file
View File

@ -0,0 +1,36 @@
from messages.base import Base
from messages.chat import ChatMessage
from messages.gift import GiftMessage
from messages.like import LikeMessage
from messages.member import MemberMessage
from messages.roomuserseq import RoomUserSeqMessage
from messages.social import SocialMessage
class IOutput():
def output(self, message_type: str, message_obj: Base):
...
def chat_output(self, message: ChatMessage):
...
def like_output(self, message: LikeMessage):
...
def member_output(self, message: MemberMessage):
...
def social_output(self, message: SocialMessage):
...
def gift_output(self, message: GiftMessage):
...
def userseq_output(self, message: RoomUserSeqMessage):
...
def other_output(self, message_type: str, message_raw: bytes):
...
def error_output(self, exception: Exception):
...

3
output/README.md Normal file
View File

@ -0,0 +1,3 @@
# 输出模块
主要放置输出模块建议继承IOutput解析模块中所有解析的内容均基于IOutput进行适配

6
output/__init__.py Normal file
View File

@ -0,0 +1,6 @@
from output.IOutput import IOutput
from output.print import Print
OUTPUTER = [
Print()
]

33
output/print.py Normal file
View File

@ -0,0 +1,33 @@
from colorama import init, Fore
from output import IOutput
RED = Fore.RED
GREEN = Fore.GREEN
BLUE = Fore.BLUE
CYAN = Fore.CYAN
MAGENTA = Fore.MAGENTA
YELLOW = Fore.YELLOW
WHITE = Fore.WHITE
RESET = Fore.RESET
init()
class Print(IOutput):
def chat_output(self, msg):
print(f"\n{BLUE}[+] {msg} {RESET}")
def like_output(self, msg):
print(f"\n{CYAN}[+] {msg} {RESET}")
def member_output(self, msg):
print(f"\n{RED}[+] {msg} {RESET}")
def social_output(self, msg):
print(f"\n{GREEN}[+] {msg} {RESET}")
def gift_output(self, msg):
print(f"\n{MAGENTA}[+] {msg} {RESET}")
def userseq_output(self, msg):
print(f"\n{YELLOW}[+] {msg} {RESET}")

View File

@ -1,16 +1,23 @@
# ! IMPORT ! make sure you ran mitmproxy with this script, # ! IMPORT ! make sure you ran mitmproxy with this script,
# eg: `/path/to/mitmproxy -s mitmproxy.py` # eg: `/path/to/mitmproxy -s mitmproxy.py`
import time
import uuid
from mitmproxy import http from mitmproxy import http
import re import re
import requests
import base64
session = requests.session()
class Writer: class Writer:
def websocket_message(self, flow: http.HTTPFlow) : def websocket_message(self, flow: http.HTTPFlow):
re_c = re.search('webcast3-ws-web-.*\.douyin\.com', flow.request.host) re_c = re.search('webcast\d-ws-web-.*\.douyin\.com', flow.request.host)
if re_c : if re_c:
with open('/Users/geng/douyin_live/' + uuid.uuid4().hex, 'wb') as f: message = flow.websocket.messages[-1].content
mess = flow.websocket.messages[-1].content session.post("http://127.0.0.1:5000/message", data=message, headers={
f.write(bytes(mess)) "X-MITM_TS": str(time.time()),
"X_REFERER": flow.request.host
}, timeout=(1, 1))
addons = [Writer()] addons = [Writer()]

View File

@ -1,43 +0,0 @@
import concurrent.futures
import queue
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from messages.utils import unpackMsgBin
q = queue.Queue()
class Watcher:
DIRECTORY_TO_WATCH = ""
def __init__(self, directory):
self.observer = Observer()
self.DIRECTORY_TO_WATCH = directory
def run(self):
event_handler = Handler()
self.observer.schedule(event_handler, self.DIRECTORY_TO_WATCH, recursive=True)
self.observer.start()
try:
while True:
with concurrent.futures.ThreadPoolExecutor() as executor:
time.sleep(0.2)
executor.submit(unpackMsgBin, q.get())
except:
self.observer.stop()
self.observer.join()
class Handler(FileSystemEventHandler):
@staticmethod
def on_any_event(event):
if event.is_directory:
return None
elif event.event_type == 'created':
q.put(event.src_path)

View File

@ -2,7 +2,7 @@ import requests
import json import json
from selenium import webdriver from selenium import webdriver
from selenium.webdriver.chrome.options import Options from selenium.webdriver.edge.options import Options
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from selenium.webdriver.common.proxy import Proxy, ProxyType from selenium.webdriver.common.proxy import Proxy, ProxyType
from selenium.webdriver.common.by import By from selenium.webdriver.common.by import By
@ -10,12 +10,10 @@ from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support.expected_conditions import presence_of_element_located from selenium.webdriver.support.expected_conditions import presence_of_element_located
from config.helper import config from config.helper import config
from store.mongo import MongoStore
def go(url): def go(url):
chrome_options = Options() chrome_options = Options()
chrome_options.add_argument('--proxy-server=%s' % config()['webdriver']['proxy']) chrome_options.add_argument('--proxy-server=%s:%s' % (config()['mitm']['host'], config()['mitm']['port']))
chrome_options.add_argument('--headless')
# 2022-04-09 添加一个忽略证书 # 2022-04-09 添加一个忽略证书
chrome_options.add_argument('-ignore-certificate-errors') chrome_options.add_argument('-ignore-certificate-errors')
@ -24,10 +22,10 @@ def go(url):
proxy = Proxy() proxy = Proxy()
proxy.proxy_type = ProxyType.MANUAL proxy.proxy_type = ProxyType.MANUAL
proxy.http_proxy = config()['webdriver']['proxy'] proxy.http_proxy = "%s:%s" % (config()['mitm']['host'], config()['mitm']['port'])
proxy.ssl_proxy = config()['webdriver']['proxy'] proxy.ssl_proxy = "%s:%s" % (config()['mitm']['host'], config()['mitm']['port'])
capabilities = DesiredCapabilities.CHROME capabilities = DesiredCapabilities.EDGE
proxy.add_to_capabilities(capabilities) proxy.add_to_capabilities(capabilities)
with webdriver.Chrome(options=chrome_options, with webdriver.Chrome(options=chrome_options,
@ -45,19 +43,7 @@ def go(url):
json_obj = json.loads(json_str) json_obj = json.loads(json_str)
roomInfo = json_obj['initialState']['roomStore']['roomInfo'] roomInfo = json_obj['initialState']['roomStore']['roomInfo']
print(roomInfo)
store = MongoStore()
store.set_collection('room')
store.insert_one({
'roomId': roomInfo['roomId'],
'web_rid': roomInfo['web_rid'],
'title': roomInfo['room']['title'],
'user_count_str': roomInfo['room']['user_count_str'],
'cover': roomInfo['room']['cover']['url_list'][0],
'admin_user_ids': roomInfo['room']['admin_user_ids'],
'owner': roomInfo['room']['owner']
})
store.close()
wait.until(presence_of_element_located((By.CLASS_NAME, "oSu9Aw19"))) wait.until(presence_of_element_located((By.CLASS_NAME, "oSu9Aw19")))

View File

@ -1,26 +0,0 @@
import pymongo
from config.helper import config
class MongoStore:
def __init__(self):
self.client = pymongo.MongoClient(config()['mongo']['uri'])
self.db = self.client[config()['mongo']['dbname']]
def close(self):
self.client.close()
def set_collection(self, collection):
self.collection = self.db[collection]
def replace_one(self, condition, data, upsert=True):
return self.collection.replace_one(condition, data, upsert=upsert)
def insert_one(self, data):
return self.collection.insert_one(data)
def insert_many(self, data):
return self.collection.insert_many(data)
def exists(self, condition):
return self.collection.count_documents(condition, limit = 1) != 0