Merge pull request #25 from q792602257/main

尝试大改,可读性有点差的版本,基本做到能实时处理弹幕
This commit is contained in:
gll19920817
2022-06-08 21:58:49 +08:00
committed by GitHub
78 changed files with 846 additions and 374 deletions

10
.gitignore vendored
View File

@ -1,2 +1,10 @@
evn/* evn/*
.DS_Store .DS_Store
.idea
.vscode
venv
debug
error
__pycache__
*.py[cod]
*.xml

13
.vscode/launch.json vendored
View File

@ -1,13 +0,0 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "Python: 当前文件",
"type": "python",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"args": ["https://live.douyin.com/162574996168"]
}
]
}

View File

@ -1,6 +0,0 @@
{
"cSpell.words": [
"douyin",
"mitmproxy"
]
}

View File

@ -1,4 +1,4 @@
抖音web直播间([live.douyin.com](https://live.douyin.com))弹幕抓取 抖音web直播间([live.douyin.com](https://live.douyin.com))弹幕抓取
-- --
**屏幕效果截图** **屏幕效果截图**

Binary file not shown.

41
browser/IDriver.py Normal file
View File

@ -0,0 +1,41 @@
import contextlib
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from selenium.webdriver.remote.webdriver import WebDriver
class IDriver():
browser: "WebDriver"
def __del__(self):
self.terminate()
def terminate(self):
self.browser.quit()
def new_tab(self) -> str:
...
def change_tab(self, tab_handler: str):
...
def open_url(self, url: str, tab_handler: str = ""):
...
@contextlib.contextmanager
def op_tab(self, tab_handler: str):
cur_handle = self.browser.current_window_handle
if tab_handler == "":
tab_handler = cur_handle
try:
self.change_tab(tab_handler)
yield self
finally:
self.change_tab(cur_handle)
def refresh(self, tab_handler: str = ""):
...
def screenshot(self, tab_handler: str = "") -> str:
...

1
browser/__init__.py Normal file
View File

@ -0,0 +1 @@
from browser.manager import BrowserManager

62
browser/chrome.py Normal file
View File

@ -0,0 +1,62 @@
from selenium import webdriver
from selenium.webdriver import Proxy, DesiredCapabilities
from selenium.webdriver.common.proxy import ProxyType
from config.helper import config
from browser.IDriver import IDriver
from selenium.webdriver.chrome.options import Options
class ChromeDriver(IDriver):
def __init__(self):
super(ChromeDriver, self).__init__()
options = Options()
if config()['webdriver']['headless']:
options.add_argument("--headless")
options.add_argument("--window-size=1920,1080")
options.add_argument('--proxy-server=%s:%s' % (config()['mitm']['host'], config()['mitm']['port']))
options.add_argument('--ignore-certificate-errors')
options.add_argument('--ignore-ssl-errors')
options.add_argument('--incognito')
options.add_experimental_option('excludeSwitches', ['ignore-certificate-errors'])
if config()['webdriver']['chrome']['no_sandbox']:
options.add_argument('--no-sandbox')
proxy = Proxy()
proxy.proxy_type = ProxyType.MANUAL
proxy.http_proxy = "%s:%s" % (config()['mitm']['host'], config()['mitm']['port'])
proxy.ssl_proxy = "%s:%s" % (config()['mitm']['host'], config()['mitm']['port'])
capabilities = DesiredCapabilities.CHROME
proxy.add_to_capabilities(capabilities)
self.browser = webdriver.Chrome(options=options,
desired_capabilities=capabilities,
executable_path=config()['webdriver']['chrome']['bin']
)
def new_tab(self) -> str:
current_window_handles = self.browser.window_handles
self.browser.execute_script("window.open('')")
new_window_handles = self.browser.window_handles
for _handle in new_window_handles:
if _handle not in current_window_handles:
return _handle
return ""
def change_tab(self, tab_handler: str):
if tab_handler not in self.browser.window_handles:
return
if tab_handler == self.browser.current_window_handle:
return
self.browser.switch_to.window(tab_handler)
def open_url(self, url: str, tab_handler: str = ""):
with self.op_tab(tab_handler):
self.browser.get(url)
def refresh(self, tab_handler: str = ""):
with self.op_tab(tab_handler):
self.browser.refresh()
def screenshot(self, tab_handler: str = "") -> str:
with self.op_tab(tab_handler):
return self.browser.get_screenshot_as_base64()

59
browser/edge.py Normal file
View File

@ -0,0 +1,59 @@
from selenium import webdriver
from selenium.webdriver import Proxy, DesiredCapabilities
from selenium.webdriver.common.proxy import ProxyType
from config.helper import config
from browser.IDriver import IDriver
from selenium.webdriver.edge.options import Options
class EdgeDriver(IDriver):
def __init__(self):
super(EdgeDriver, self).__init__()
options = Options()
if config()['webdriver']['headless']:
options.add_argument("--headless")
options.add_argument("--window-size=1920,1080")
options.add_argument('--proxy-server=%s:%s' % (config()['mitm']['host'], config()['mitm']['port']))
options.add_argument('--ignore-certificate-errors')
options.add_argument('--ignore-ssl-errors')
options.add_argument('--incognito')
proxy = Proxy()
proxy.proxy_type = ProxyType.MANUAL
proxy.http_proxy = "%s:%s" % (config()['mitm']['host'], config()['mitm']['port'])
proxy.ssl_proxy = "%s:%s" % (config()['mitm']['host'], config()['mitm']['port'])
capabilities = DesiredCapabilities.EDGE
proxy.add_to_capabilities(capabilities)
self.browser = webdriver.Chrome(options=options,
desired_capabilities=capabilities,
executable_path=config()['webdriver']['edge']['bin']
)
def new_tab(self) -> str:
current_window_handles = self.browser.window_handles
self.browser.execute_script("window.open('')")
new_window_handles = self.browser.window_handles
for _handle in new_window_handles:
if _handle not in current_window_handles:
return _handle
return ""
def change_tab(self, tab_handler: str):
if tab_handler not in self.browser.window_handles:
return
if tab_handler == self.browser.current_window_handle:
return
self.browser.switch_to.window(tab_handler)
def open_url(self, url: str, tab_handler: str = ""):
with self.op_tab(tab_handler):
self.browser.get(url)
def refresh(self, tab_handler: str = ""):
with self.op_tab(tab_handler):
self.browser.refresh()
def screenshot(self, tab_handler: str = "") -> str:
with self.op_tab(tab_handler):
return self.browser.get_screenshot_as_base64()

101
browser/manager.py Normal file
View File

@ -0,0 +1,101 @@
import threading
from urllib.parse import urlparse
from config.helper import config
from browser.edge import EdgeDriver
from browser.chrome import ChromeDriver
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Type, Optional, List
from browser.IDriver import IDriver
_manager: "Optional[BrowserManager]" = None
class BrowserManager():
_mapping: "dict[str, Type[IDriver]]" = {
"chrome": ChromeDriver,
"edge": EdgeDriver
}
def __init__(self):
_config = config()["webdriver"]["use"]
if _config not in self._mapping:
raise Exception("不支持的浏览器")
self._driver: IDriver = self._mapping[_config]()
self._tabs: "List[TabInfo]" = []
def init_browser(self):
_users = config()['live']['users']
if type(_users) is not list:
_users = [_users]
_rooms = config()['live']['rooms']
if type(_rooms) is not list:
_rooms = [_rooms]
for _user in _users:
self.open_user_page(str(_user))
for _room in _rooms:
self.open_live_page(str(_room))
@property
def driver(self):
return self._driver
def open_user_page(self, sec_user_id: str):
tab = TabInfo()
tab.tab_type = TabInfo.TAB_TYPE_USER
tab.user_id = sec_user_id
if urlparse(sec_user_id).scheme:
tab.url = sec_user_id
else:
# 单独的用户id
tab.url = "https://www.douyin.com/user/" + sec_user_id
self.open_tab(tab)
def open_live_page(self, live_url: str):
tab = TabInfo()
tab.tab_type = TabInfo.TAB_TYPE_LIVE
if not urlparse(live_url).scheme:
# 单独的房间号
live_url = "https://live.douyin.com/" + live_url
tab.url = live_url
self.open_tab(tab)
def open_tab(self, tab_info: "TabInfo"):
tab_handler = self._driver.new_tab()
tab_info.tab_handler = tab_handler
if not tab_info.tab_type:
tab_info.tab_type = TabInfo.TAB_TYPE_OTHER
self.driver.open_url(tab_info.url, tab_handler)
if tab_info not in self._tabs:
self._tabs.append(tab_info)
def terminate(self):
if self._driver:
self._driver.terminate()
class TabInfo(object):
TAB_TYPE_OTHER = "other"
TAB_TYPE_USER = "user"
TAB_TYPE_LIVE = "live"
def __init__(self):
self.tab_handler: str = ""
self.user_id: str = ""
self.url: str = ""
self.tab_type: str = self.TAB_TYPE_OTHER
def init_manager():
global _manager
_manager = BrowserManager()
threading.Thread(target=_manager.init_browser).start()
return _manager
def get_manager():
if _manager is None:
return init_manager()
return _manager

View File

@ -1,9 +1,9 @@
from pathlib import Path from pathlib import Path
import yaml from ruamel import yaml
def config(): def config():
settings_file = str(Path(__file__).parent.absolute()) + '/settings.yml' settings_file = str(Path(__file__).parent.absolute()) + '/settings.yml'
with open(settings_file, 'r') as f: with open(settings_file, 'r') as f:
return yaml.load(f, Loader=yaml.FullLoader) return yaml.load(f, Loader=yaml.UnsafeLoader)

View File

@ -1,14 +1,40 @@
watchdog: mitm:
dir: '/Users/geng/douyin_live/' host: 127.0.0.1
port: 8080
webdriver: webdriver:
bin: '/usr/local/bin/chromedriver' headless: False
proxy: '127.0.0.1:8080' use: edge
edge:
mongo: bin: msedgedriver.exe
uri : 'mongodb://localhost:27017/' chrome:
dbname: 'tiktok' bin: chromedriver
enabled: 'on' no_sandbox: True
output:
use:
- print
- xml
- debug
xml:
save_path: "./"
file_pattern: "{room_id}_{ts}.xml"
debug:
save_path:
error: "./error"
unknown: "./debug"
known: False
live:
rooms:
- "585723119943"
- "583853809376"
users:
- MS4wLjABAAAAzBItqEvCjPryxn_Y6w6LtRBFDOVNfjvYSJg8VVZFwlw
http:
host: 127.0.0.1
port: 5000
api: api:
userinfo: 'https://live.douyin.com/webcast/user/?aid=6383&target_uid=' userinfo: 'https://live.douyin.com/webcast/user/?aid=6383&target_uid='

34
main.py
View File

@ -1,18 +1,26 @@
import sys import atexit
import threading import signal
from urllib.parse import urlparse
from scripts import watcher, webdriver from browser.manager import init_manager as init_browser_manager
from config.helper import config from output.manager import OutputManager
from proxy.manager import init_manager as init_proxy_manager
if __name__ == '__main__': if __name__ == '__main__':
if len(sys.argv) == 1 or not urlparse(sys.argv[1]).scheme: proxy_manager = init_proxy_manager()
print('Invalid url provided, please check...') proxy_manager.start_loop()
sys.exit(1) browser_manager = init_browser_manager()
output_manager = OutputManager()
t = threading.Thread(target=webdriver.go, args=(sys.argv[1],))
t.start()
w = watcher.Watcher(directory=config()['watchdog']['dir']) def terminate(*_):
w.run() print("terminate")
browser_manager.terminate()
output_manager.terminate()
proxy_manager.terminate()
atexit.register(terminate)
signal.signal(signal.SIGTERM, terminate)
signal.signal(signal.SIGINT, terminate)
output_manager.start_loop()
proxy_manager.join()

View File

@ -1,6 +1,5 @@
import traceback import traceback
from datetime import datetime from datetime import datetime
from store.mongo import MongoStore
from config.helper import config from config.helper import config
@ -13,55 +12,19 @@ class Base:
def extra_info(self): def extra_info(self):
return dict() return dict()
@property
def room_id(self):
if hasattr(self.instance, 'common'):
return self.instance.common.roomId
return None
def user(self): def user(self):
if(hasattr(self.instance, 'user')): if(hasattr(self.instance, 'user')):
return self.instance.user return self.instance.user
return None return None
def persists(self):
if config()['mongo']['enabled'] != 'on':
return
try:
store = MongoStore()
user = self.user()
if user is not None:
store.set_collection('user')
if not store.exists({'id': user.id}):
store.insert_one({
'id': user.id,
'shortId': user.shortId,
'nickname': user.nickname,
'gender': user.gender
})
store.set_collection(self.instance.common.method)
msg = {
"msgId": self.instance.common.msgId,
"roomId": self.instance.common.roomId,
'content': self.format_content(),
'created_at': datetime.today().replace(microsecond=0)
}
if user is not None:
msg.update({
'userId': user.id
})
if len(self.extra_info()):
msg.update(self.extra_info())
store.insert_one(msg)
except Exception as e:
print(self.instance.common.method + ' persists error')
print(traceback.format_exc())
finally:
store.close()
def __str__(self): def __str__(self):
pass pass

View File

@ -7,6 +7,10 @@ class ChatMessage(Base):
def __init__(self): def __init__(self):
self.instance = message_pb2.ChatMessage() self.instance = message_pb2.ChatMessage()
@property
def content(self):
return self.instance.content
def format_content(self): def format_content(self):
return self.user().nickname + ': ' + self.instance.content return self.user().nickname + ': ' + self.instance.content

View File

@ -11,9 +11,14 @@ class GiftMessage(Base):
return { return {
'giftId': self.instance.gift.id, 'giftId': self.instance.gift.id,
'giftName': self.instance.gift.name, 'giftName': self.instance.gift.name,
'giftCount': self.instance.gift.diamondCount, 'giftCount': self.instance.repeatCount,
'diamondCount': self.instance.gift.diamondCount,
} }
@property
def gift(self):
return self.instance.gift
def format_content(self): def format_content(self):
return self.instance.common.describe return self.instance.common.describe

View File

@ -1,103 +0,0 @@
import os
from messages.control import ControlMessage
from messages.fansclub import FansclubMessage
from protobuf import message_pb2
from protobuf import wss_pb2
import gzip
from messages.member import MemberMessage
from messages.like import LikeMessage
from messages.roomuserseq import RoomUserSeqMessage
from messages.gift import GiftMessage
from messages.social import SocialMessage
from messages.chat import ChatMessage
from colorama import init, Fore
# define colors
RED = Fore.RED
GREEN = Fore.GREEN
BLUE = Fore.BLUE
CYAN = Fore.CYAN
MAGENTA = Fore.MAGENTA
YELLOW = Fore.YELLOW
WHITE = Fore.WHITE
RESET = Fore.RESET
init()
def unpackMsgBin(filepath):
response = message_pb2.Response()
wss = wss_pb2.WssResponse()
try:
with open(filepath, 'rb') as f:
path_content = f.read()
wss.ParseFromString( path_content )
decompressed = gzip.decompress(wss.data)
response.ParseFromString(decompressed)
decodeMsg(response.messages)
except Exception as e:
os.remove(filepath)
pass
finally:
os.remove(filepath)
def decodeMsg(messages):
for message in messages:
try:
if message.method == 'WebcastMemberMessage':
member_message = MemberMessage()
member_message.set_payload(message.payload)
member_message.persists()
print(f"\n{RED}[+] {member_message} {RESET}")
elif message.method == 'WebcastSocialMessage':
social_message = SocialMessage()
social_message.set_payload(message.payload)
social_message.persists()
print(f"\n{GREEN}[+] {social_message} {RESET}")
elif message.method == 'WebcastChatMessage':
chat_message = ChatMessage()
chat_message.set_payload(message.payload)
chat_message.persists()
print(f"\n{BLUE}[+] {chat_message} {RESET}")
elif message.method == 'WebcastLikeMessage':
like_message = LikeMessage()
like_message.set_payload(message.payload)
like_message.persists()
print(f"\n{CYAN}[+] {like_message} {RESET}")
elif message.method == 'WebcastGiftMessage':
gift_message = GiftMessage()
gift_message.set_payload(message.payload)
gift_message.persists()
print(f"\n{MAGENTA}[+] {gift_message} {RESET}")
elif message.method == 'WebcastRoomUserSeqMessage':
room_user_seq_message = RoomUserSeqMessage()
room_user_seq_message.set_payload(message.payload)
room_user_seq_message.persists()
print(f"\n{YELLOW}[+] {room_user_seq_message} {RESET}")
elif message.method == 'WebcastFansclubMessage':
fansclub_message = FansclubMessage()
fansclub_message.set_payload(message.payload)
fansclub_message.persists()
print(f"\n{RED}[+] {fansclub_message} {RESET}")
elif message.method == 'WebcastControlMessage':
control_message = ControlMessage()
control_message.set_payload(message.payload)
control_message.persists()
print(f"\n{CYAN}[+] {control_message} {RESET}")
except Exception as e:
print(e)

53
output/IOutput.py Normal file
View File

@ -0,0 +1,53 @@
from messages.base import Base
from messages.chat import ChatMessage
from messages.control import ControlMessage
from messages.fansclub import FansclubMessage
from messages.gift import GiftMessage
from messages.like import LikeMessage
from messages.member import MemberMessage
from messages.roomuserseq import RoomUserSeqMessage
from messages.social import SocialMessage
class IOutput():
def __del__(self):
self.terminate()
def output(self, message_type: str, message_obj: Base):
...
def chat_output(self, message: ChatMessage):
...
def like_output(self, message: LikeMessage):
...
def member_output(self, message: MemberMessage):
...
def social_output(self, message: SocialMessage):
...
def gift_output(self, message: GiftMessage):
...
def userseq_output(self, message: RoomUserSeqMessage):
...
def control_output(self, message: ControlMessage):
...
def fansclub_output(self, message: FansclubMessage):
...
def other_output(self, message_type: str, message_raw: bytes):
...
def debug_output(self, message_type: str, message_raw: str):
...
def error_output(self, message_type: str, message_raw: bytes, exception: Exception):
...
def terminate(self):
...

3
output/README.md Normal file
View File

@ -0,0 +1,3 @@
# 输出模块
主要放置输出模块,建议继承IOutput,解析模块中所有解析的内容均基于IOutput进行适配

33
output/debug.py Normal file
View File

@ -0,0 +1,33 @@
import os
import time
import traceback
from config.helper import config
from output.IOutput import IOutput
class DebugWriter(IOutput):
def __init__(self):
# 获取对应配置文件
self.unknown_output_dir = config()['output']['debug']['save_path']['unknown']
if not os.path.isdir(self.unknown_output_dir):
os.makedirs(self.unknown_output_dir)
self.error_output_dir = config()['output']['debug']['save_path']['error']
if not os.path.isdir(self.error_output_dir):
os.makedirs(self.error_output_dir)
def other_output(self, message_type: str, message_raw: bytes):
if not os.path.isdir(os.path.join(self.unknown_output_dir, message_type)):
os.makedirs(os.path.join(self.unknown_output_dir, message_type))
with open(os.path.join(self.unknown_output_dir, message_type, str(time.time())), "wb") as f:
f.write(message_raw)
def error_output(self, message_type: str, message_raw: bytes, exception: Exception):
if not os.path.isdir(os.path.join(self.error_output_dir, message_type)):
os.makedirs(os.path.join(self.error_output_dir, message_type))
ts = time.time()
with open(os.path.join(self.error_output_dir, message_type, str(ts)), "wb") as f:
f.write(message_raw)
traceback.print_exc(file=open(os.path.join(self.error_output_dir, message_type, str(ts)) + ".exc", "w", encoding="UTF-8"))

130
output/manager.py Normal file
View File

@ -0,0 +1,130 @@
import gzip
import threading
from typing import TYPE_CHECKING
from config.helper import config
from messages.fansclub import FansclubMessage
from proxy.queues import MESSAGE_QUEUE
from messages.chat import ChatMessage
from messages.control import ControlMessage
from messages.gift import GiftMessage
from messages.like import LikeMessage
from messages.member import MemberMessage
from messages.roomuserseq import RoomUserSeqMessage
from messages.social import SocialMessage
from output.print import Print
from output.xml import XMLWriter
from output.debug import DebugWriter
from protobuf import message_pb2, wss_pb2
if TYPE_CHECKING:
from typing import Type, Optional, List
from output.IOutput import IOutput
from proxy.common import MessagePayload
class OutputManager():
_mapping: "dict[str, Type[IOutput]]" = {
"print": Print,
"xml": XMLWriter,
"debug": DebugWriter,
}
_writer: "List[IOutput]" = []
_thread: "Optional[threading.Thread]"= None
_should_exit = threading.Event()
def __init__(self):
_config = config()['output']['use']
if type(_config) != list:
_config = [_config]
for _c in _config:
if _c not in self._mapping:
raise Exception("不支持的输出方式")
self._writer.append(self._mapping[_c]())
def __del__(self):
self.terminate()
def decode_payload(self, message: "MessagePayload"):
try:
response = message_pb2.Response()
wss = wss_pb2.WssResponse()
wss.ParseFromString(message.body)
decompressed = gzip.decompress(wss.data)
response.ParseFromString(decompressed)
self.decode_message(response.messages)
except Exception as e:
for writer in self._writer:
writer.error_output("ParseError", message.body, e)
def decode_message(self, message_list: "List[message_pb2.Message]"):
for message in message_list:
try:
if message.method == 'WebcastMemberMessage':
member_message = MemberMessage()
member_message.set_payload(message.payload)
for writer in self._writer:
writer.member_output(member_message)
elif message.method == 'WebcastSocialMessage':
social_message = SocialMessage()
social_message.set_payload(message.payload)
for writer in self._writer:
writer.social_output(social_message)
elif message.method == 'WebcastChatMessage':
chat_message = ChatMessage()
chat_message.set_payload(message.payload)
for writer in self._writer:
writer.chat_output(chat_message)
elif message.method == 'WebcastLikeMessage':
like_message = LikeMessage()
like_message.set_payload(message.payload)
for writer in self._writer:
writer.like_output(like_message)
elif message.method == 'WebcastGiftMessage':
gift_message = GiftMessage()
gift_message.set_payload(message.payload)
for writer in self._writer:
writer.gift_output(gift_message)
elif message.method == 'WebcastRoomUserSeqMessage':
room_user_seq_message = RoomUserSeqMessage()
room_user_seq_message.set_payload(message.payload)
for writer in self._writer:
writer.userseq_output(room_user_seq_message)
elif message.method == 'WebcastControlMessage':
control_message = ControlMessage()
control_message.set_payload(message.payload)
for writer in self._writer:
writer.control_output(control_message)
elif message.method == 'WebcastFansclubMessage':
fansclub_message = FansclubMessage()
fansclub_message.set_payload(message.payload)
for writer in self._writer:
writer.fansclub_output(fansclub_message)
else:
for writer in self._writer:
writer.other_output(message.method, message.payload)
except Exception as e:
for writer in self._writer:
writer.error_output(message.method, message.payload, e)
def start_loop(self):
self._should_exit.clear()
self._thread = threading.Thread(target=self._handle)
self._thread.start()
def _handle(self):
while True:
message = MESSAGE_QUEUE.get()
if self._should_exit.is_set():
break
if message is None:
continue
self.decode_payload(message)
def terminate(self):
if self._should_exit:
self._should_exit.set()
MESSAGE_QUEUE.put(None)
for writer in self._writer:
writer.terminate()

39
output/print.py Normal file
View File

@ -0,0 +1,39 @@
from colorama import init, Fore
from output.IOutput import IOutput
RED = Fore.RED
GREEN = Fore.GREEN
BLUE = Fore.BLUE
CYAN = Fore.CYAN
MAGENTA = Fore.MAGENTA
YELLOW = Fore.YELLOW
WHITE = Fore.WHITE
RESET = Fore.RESET
init()
class Print(IOutput):
def chat_output(self, msg):
print(f"\n{BLUE}[+] {msg} {RESET}")
def like_output(self, msg):
print(f"\n{CYAN}[+] {msg} {RESET}")
def member_output(self, msg):
print(f"\n{RED}[+] {msg} {RESET}")
def social_output(self, msg):
print(f"\n{GREEN}[+] {msg} {RESET}")
def gift_output(self, msg):
print(f"\n{MAGENTA}[+] {msg} {RESET}")
def userseq_output(self, msg):
print(f"\n{YELLOW}[+] {msg} {RESET}")
def control_output(self, msg):
print(f"\n{CYAN}[+] {msg} {RESET}")
def fansclub_output(self, msg):
print(f"\n{GREEN}[+] {msg} {RESET}")

79
output/xml.py Normal file
View File

@ -0,0 +1,79 @@
from config.helper import config
from output.IOutput import IOutput
from typing import IO
import time
class XMLWriter(IOutput):
"""
可输出与B站弹幕姬兼容的xml弹幕格式,可用于转成ass字幕
"""
def __init__(self):
self._file_mappings: "dict[str, IO[str]]" = {}
self.time_mappings: "dict[str, float]" = {}
self._file_name_pattern: "str" = config()['output']['xml']['file_pattern']
def _get_fd_by_room_id(self, room_id: str) -> IO[str]:
if room_id in self._file_mappings:
return self._file_mappings[room_id]
cur_ts = time.time()
fd = open(self._file_name_pattern.format_map({
"room_id": room_id,
"ts": cur_ts
}), "w", encoding="UTF-8")
self._file_mappings[room_id] = fd
self.time_mappings[room_id] = cur_ts
return fd
def _close_fd_by_room_id(self, room_id: str):
if room_id in self._file_mappings:
fd = self._file_mappings[room_id]
if not fd.closed:
fd.close()
del self._file_mappings[room_id]
if room_id in self.time_mappings:
del self.time_mappings[room_id]
def control_output(self, message):
# 下播了
self._close_fd_by_room_id(message.room_id)
def _get_bias_ts_by_room_id(self, room_id: str, cur_ts: float = 0):
if cur_ts == 0:
cur_ts = time.time()
if room_id not in self.time_mappings:
return 0
return cur_ts - self.time_mappings[room_id]
def chat_output(self, message):
fd = self._get_fd_by_room_id(message.room_id)
if fd is None:
return
cur_time = time.time()
_c = """<d p="{:.2f},1,24,{},{:.0f},0,{},0" user="{}">{}</d>\r\n""".format(
self._get_bias_ts_by_room_id(message.room_id, cur_time), message.room_id,
cur_time * 1000, message.user().id, message.user().nickname, message.content
)
fd.write(_c)
fd.flush()
def gift_output(self, message):
fd = self._get_fd_by_room_id(message.room_id)
if fd is None:
return
cur_time = time.time()
_c = """<gift ts="{:.2f}" user="{}" giftname="{}" giftcount="{}"></gift>\r\n""".format(
self._get_bias_ts_by_room_id(message.room_id, cur_time),
message.user().nickname, message.gift.name, message.instance.repeatCount
)
fd.write(_c)
fd.flush()
def terminate(self):
print("保存所有弹幕文件中...")
# copy
_rooms = [i for i in self._file_mappings.keys()]
for _room_id in _rooms:
self._close_fd_by_room_id(_room_id)
print("保存完毕")

0
proxy/addon/__init__.py Normal file
View File

24
proxy/addon/danmaku_ws.py Normal file
View File

@ -0,0 +1,24 @@
import re
from proxy.common import MessagePayload
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from mitmproxy import http
from queue import SimpleQueue
class DanmakuWebsocketAddon:
def __init__(self, queue: "SimpleQueue[MessagePayload]"):
self._queue = queue
def websocket_message(self, flow: "http.HTTPFlow"):
re_c = re.search('webcast\d-ws-web-.*\.douyin\.com', flow.request.host)
if re_c:
message = flow.websocket.messages[-1]
if message.from_client:
return
payload = MessagePayload(message.content)
payload.request_url = flow.request.url
payload.request_query = flow.request.query
self._queue.put(payload)

View File

@ -0,0 +1,11 @@
from mitmproxy import http
class UserInfoAddon:
def __init__(self):
...
def response(self, flow: http.HTTPFlow):
# /aweme/v1/web/user/profile/other/ 他人主页获取他人信息
if '/aweme/v1/web/user/profile/other' in flow.request.path:
content = flow.response.content

9
proxy/common.py Normal file
View File

@ -0,0 +1,9 @@
import time
class MessagePayload(object):
def __init__(self, body: bytes):
self.body = body
self.timestamp: float = time.time()
self.request_url: str = ""
self.request_query: dict[str, str] = {}

71
proxy/manager.py Normal file
View File

@ -0,0 +1,71 @@
import asyncio
import threading
from typing import TYPE_CHECKING
from mitmproxy.options import Options
from mitmproxy.tools.dump import DumpMaster
from config.helper import config
from proxy.addon.danmaku_ws import DanmakuWebsocketAddon
from proxy.queues import MESSAGE_QUEUE
if TYPE_CHECKING:
from typing import Optional
_manager: "Optional[ProxyManager]" = None
class ProxyManager:
def __init__(self):
self._mitm_instance = None
self._loop: "Optional[asyncio.AbstractEventLoop]" = None
opts = Options(
listen_host=config()['mitm']['host'],
listen_port=config()['mitm']['port'],
)
self._mitm_instance = DumpMaster(options=opts)
self._load_addon()
opts.update_defer(
flow_detail=0,
termlog_verbosity="error",
)
self._thread = None
def __del__(self):
self.terminate()
def terminate(self):
if self._loop:
if self._loop.is_running():
self._loop.stop()
if self._mitm_instance:
self._mitm_instance.shutdown()
def _load_addon(self):
self._mitm_instance.addons.add(DanmakuWebsocketAddon(MESSAGE_QUEUE))
def _start(self):
loop = asyncio.new_event_loop()
self._loop = loop
asyncio.set_event_loop(loop)
self._mitm_instance.run()
def start_loop(self):
self._thread = threading.Thread(target=self._start)
self._thread.start()
def join(self):
if self._thread:
self._thread.join()
def init_manager():
global _manager
_manager = ProxyManager()
return _manager
def get_manager():
if _manager is None:
return init_manager()
return _manager

8
proxy/queues.py Normal file
View File

@ -0,0 +1,8 @@
from queue import SimpleQueue
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Optional
from proxy.common import MessagePayload
MESSAGE_QUEUE: "SimpleQueue[Optional[MessagePayload]]" = SimpleQueue()

34
proxy_script.py Normal file
View File

@ -0,0 +1,34 @@
# ! IMPORT ! make sure you ran mitmproxy with this script,
# eg: `/path/to/mitmproxy -s mitmproxy.py`
import time
from mitmproxy import http
import re
import requests
session = requests.session()
class Writer:
def response(self, flow: http.HTTPFlow):
# /aweme/v1/web/user/profile/other/ 他人主页获取他人信息
if '/aweme/v1/web/user/profile/other' in flow.request.path:
response_json_content = flow.response.content
session.post("http://127.0.0.1:5000/user_info", headers={
"X-MITM-TS": str(time.time()),
"X_REFERER": flow.request.url
}, data=response_json_content, timeout=(1, 1))
def websocket_message(self, flow: http.HTTPFlow):
re_c = re.search('webcast\d-ws-web-.*\.douyin\.com', flow.request.host)
if re_c:
message = flow.websocket.messages[-1]
if message.from_client:
return
content = message.content
session.post("http://127.0.0.1:5000/message", headers={
"X-MITM-TS": str(time.time()),
"X_REFERER": flow.request.url
}, data=content, timeout=(1, 1))
addons = [Writer()]

View File

@ -1,26 +1,6 @@
async-generator==1.10
attrs==21.2.0
certifi==2021.10.8
cffi==1.15.0
colorama==0.4.4 colorama==0.4.4
cryptography==36.0.0
h11==0.12.0
idna==3.3
outcome==1.1.0
protobuf==3.19.1
pycparser==2.21
pymongo==3.12.1
pyOpenSSL==21.0.0
PyYAML==6.0
selenium==4.1.0 selenium==4.1.0
six==1.16.0
sniffio==1.2.0
sortedcontainers==2.4.0
trio==0.19.0
trio-websocket==0.9.2
urllib3==1.26.7
watchdog==2.1.6
wsproto==1.0.0
requests==2.27.1 requests==2.27.1
scripts==2.0
mitmproxy~=7.0.4
protobuf<3.19

View File

@ -1,16 +0,0 @@
# ! IMPORT ! make sure you ran mitmproxy with this script,
# eg: `/path/to/mitmproxy -s mitmproxy.py`
import uuid
from mitmproxy import http
import re
class Writer:
def websocket_message(self, flow: http.HTTPFlow) :
re_c = re.search('webcast3-ws-web-.*\.douyin\.com', flow.request.host)
if re_c :
with open('/Users/geng/douyin_live/' + uuid.uuid4().hex, 'wb') as f:
mess = flow.websocket.messages[-1].content
f.write(bytes(mess))
addons = [Writer()]

View File

@ -1,10 +0,0 @@
import requests
from config.helper import config
def getUserinfo(uid):
try:
r = requests.get(config()['api']['userinfo'] + str(uid))
return r.json()
except:
pass

View File

@ -1,43 +0,0 @@
import concurrent.futures
import queue
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from messages.utils import unpackMsgBin
q = queue.Queue()
class Watcher:
DIRECTORY_TO_WATCH = ""
def __init__(self, directory):
self.observer = Observer()
self.DIRECTORY_TO_WATCH = directory
def run(self):
event_handler = Handler()
self.observer.schedule(event_handler, self.DIRECTORY_TO_WATCH, recursive=True)
self.observer.start()
try:
while True:
with concurrent.futures.ThreadPoolExecutor() as executor:
time.sleep(0.2)
executor.submit(unpackMsgBin, q.get())
except:
self.observer.stop()
self.observer.join()
class Handler(FileSystemEventHandler):
@staticmethod
def on_any_event(event):
if event.is_directory:
return None
elif event.event_type == 'created':
q.put(event.src_path)

View File

@ -1,63 +0,0 @@
import requests
import json
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from selenium.webdriver.common.proxy import Proxy, ProxyType
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support.expected_conditions import presence_of_element_located
from config.helper import config
from store.mongo import MongoStore
def go(url):
chrome_options = Options()
chrome_options.add_argument('--proxy-server=%s' % config()['webdriver']['proxy'])
chrome_options.add_argument('--headless')
# 2022-04-09 添加一个忽略证书
chrome_options.add_argument('-ignore-certificate-errors')
chrome_options.add_argument('-ignore -ssl-errors')
chrome_options.add_argument('--incognito')
proxy = Proxy()
proxy.proxy_type = ProxyType.MANUAL
proxy.http_proxy = config()['webdriver']['proxy']
proxy.ssl_proxy = config()['webdriver']['proxy']
capabilities = DesiredCapabilities.CHROME
proxy.add_to_capabilities(capabilities)
with webdriver.Chrome(options=chrome_options,
desired_capabilities=capabilities,
executable_path=config()['webdriver']['bin']
) as driver:
wait = WebDriverWait(driver, 10)
driver.implicitly_wait(24 * 60 * 60)
driver.get(url)
first_result = wait.until(presence_of_element_located((By.ID, "RENDER_DATA")))
json_str = requests.utils.unquote(first_result.get_attribute("textContent"))
json_obj = json.loads(json_str)
roomInfo = json_obj['initialState']['roomStore']['roomInfo']
store = MongoStore()
store.set_collection('room')
store.insert_one({
'roomId': roomInfo['roomId'],
'web_rid': roomInfo['web_rid'],
'title': roomInfo['room']['title'],
'user_count_str': roomInfo['room']['user_count_str'],
'cover': roomInfo['room']['cover']['url_list'][0],
'admin_user_ids': roomInfo['room']['admin_user_ids'],
'owner': roomInfo['room']['owner']
})
store.close()
wait.until(presence_of_element_located((By.CLASS_NAME, "oSu9Aw19")))

View File

@ -1,26 +0,0 @@
import pymongo
from config.helper import config
class MongoStore:
def __init__(self):
self.client = pymongo.MongoClient(config()['mongo']['uri'])
self.db = self.client[config()['mongo']['dbname']]
def close(self):
self.client.close()
def set_collection(self, collection):
self.collection = self.db[collection]
def replace_one(self, condition, data, upsert=True):
return self.collection.replace_one(condition, data, upsert=upsert)
def insert_one(self, data):
return self.collection.insert_one(data)
def insert_many(self, data):
return self.collection.insert_many(data)
def exists(self, condition):
return self.collection.count_documents(condition, limit = 1) != 0