You've already forked Douyin_Web_Live
架构修改,删除Edge浏览器支持,配置可以自己保存
This commit is contained in:
@ -1,15 +1,21 @@
|
||||
import contextlib
|
||||
import logging
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from selenium.webdriver.remote.webdriver import WebDriver
|
||||
from config import ConfigManager
|
||||
|
||||
_log = logging.getLogger("IDriver")
|
||||
_log.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
class IDriver():
|
||||
browser: "WebDriver"
|
||||
_config_manager: "ConfigManager"
|
||||
|
||||
def __del__(self):
|
||||
self.terminate()
|
||||
def __init__(self, config_manager):
|
||||
self._config_manager = config_manager
|
||||
|
||||
def terminate(self):
|
||||
self.browser.quit()
|
||||
@ -20,19 +26,25 @@ class IDriver():
|
||||
def change_tab(self, tab_handler: str):
|
||||
...
|
||||
|
||||
def close_tab(self, tab_handler: str):
|
||||
...
|
||||
|
||||
def open_url(self, url: str, tab_handler: str = ""):
|
||||
...
|
||||
|
||||
@contextlib.contextmanager
|
||||
def op_tab(self, tab_handler: str):
|
||||
cur_handle = self.browser.current_window_handle
|
||||
_log.debug("切换Tab:旧Tab:%s,新Tab:%s", cur_handle, tab_handler)
|
||||
if tab_handler == "":
|
||||
tab_handler = cur_handle
|
||||
try:
|
||||
self.change_tab(tab_handler)
|
||||
_log.debug("切换至新Tab:%s", tab_handler)
|
||||
yield self
|
||||
finally:
|
||||
self.change_tab(cur_handle)
|
||||
_log.debug("切换至旧Tab:%s", cur_handle)
|
||||
|
||||
def refresh(self, tab_handler: str = ""):
|
||||
...
|
||||
|
@ -1,37 +1,41 @@
|
||||
import logging
|
||||
from selenium import webdriver
|
||||
from selenium.webdriver import Proxy, DesiredCapabilities
|
||||
from selenium.webdriver.common.proxy import ProxyType
|
||||
|
||||
from config.helper import config
|
||||
from browser.IDriver import IDriver
|
||||
from selenium.webdriver.chrome.options import Options
|
||||
|
||||
_log = logging.getLogger("ChromeDriver")
|
||||
_log.setLevel(logging.DEBUG)
|
||||
|
||||
class ChromeDriver(IDriver):
|
||||
def __init__(self):
|
||||
super(ChromeDriver, self).__init__()
|
||||
def __init__(self, config_manager):
|
||||
super(ChromeDriver, self).__init__(config_manager)
|
||||
options = Options()
|
||||
if config()['webdriver']['headless']:
|
||||
if self._config_manager.config['webdriver']['headless']:
|
||||
options.add_argument("--headless")
|
||||
options.add_argument("--window-size=1920,1080")
|
||||
options.add_argument('--proxy-server=%s:%s' % (config()['mitm']['host'], config()['mitm']['port']))
|
||||
options.add_argument('--proxy-server=%s:%s' % (self._config_manager.config['mitm']['host'], self._config_manager.config['mitm']['port']))
|
||||
options.add_argument('--ignore-certificate-errors')
|
||||
options.add_argument('--ignore-ssl-errors')
|
||||
options.add_argument('--incognito')
|
||||
options.add_experimental_option('excludeSwitches', ['ignore-certificate-errors'])
|
||||
if config()['webdriver']['chrome']['no_sandbox']:
|
||||
if self._config_manager.config['webdriver']['chrome']['no_sandbox']:
|
||||
_log.debug("添加启动参数NoSandbox")
|
||||
options.add_argument('--no-sandbox')
|
||||
proxy = Proxy()
|
||||
proxy.proxy_type = ProxyType.MANUAL
|
||||
proxy.http_proxy = "%s:%s" % (config()['mitm']['host'], config()['mitm']['port'])
|
||||
proxy.ssl_proxy = "%s:%s" % (config()['mitm']['host'], config()['mitm']['port'])
|
||||
proxy.http_proxy = "%s:%s" % (self._config_manager.config['mitm']['host'], self._config_manager.config['mitm']['port'])
|
||||
proxy.ssl_proxy = "%s:%s" % (self._config_manager.config['mitm']['host'], self._config_manager.config['mitm']['port'])
|
||||
capabilities = DesiredCapabilities.CHROME
|
||||
proxy.add_to_capabilities(capabilities)
|
||||
|
||||
self.browser = webdriver.Chrome(options=options,
|
||||
desired_capabilities=capabilities,
|
||||
executable_path=config()['webdriver']['chrome']['bin']
|
||||
executable_path=self._config_manager.config['webdriver']['chrome']['bin']
|
||||
)
|
||||
_log.info("浏览器启动完毕")
|
||||
|
||||
def new_tab(self) -> str:
|
||||
current_window_handles = self.browser.window_handles
|
||||
@ -39,16 +43,20 @@ class ChromeDriver(IDriver):
|
||||
new_window_handles = self.browser.window_handles
|
||||
for _handle in new_window_handles:
|
||||
if _handle not in current_window_handles:
|
||||
_log.debug("新窗口句柄:%s", _handle)
|
||||
return _handle
|
||||
_log.warning("打开新窗口,未发现新句柄")
|
||||
return ""
|
||||
|
||||
def change_tab(self, tab_handler: str):
|
||||
if tab_handler not in self.browser.window_handles:
|
||||
return
|
||||
if tab_handler == self.browser.current_window_handle:
|
||||
return
|
||||
self.browser.switch_to.window(tab_handler)
|
||||
|
||||
def close_tab(self, tab_handler: str):
|
||||
with self.op_tab(tab_handler):
|
||||
self.browser.close()
|
||||
|
||||
def open_url(self, url: str, tab_handler: str = ""):
|
||||
with self.op_tab(tab_handler):
|
||||
self.browser.get(url)
|
||||
|
@ -1,59 +0,0 @@
|
||||
from selenium import webdriver
|
||||
from selenium.webdriver import Proxy, DesiredCapabilities
|
||||
from selenium.webdriver.common.proxy import ProxyType
|
||||
|
||||
from config.helper import config
|
||||
from browser.IDriver import IDriver
|
||||
from selenium.webdriver.edge.options import Options
|
||||
|
||||
|
||||
class EdgeDriver(IDriver):
|
||||
def __init__(self):
|
||||
super(EdgeDriver, self).__init__()
|
||||
options = Options()
|
||||
if config()['webdriver']['headless']:
|
||||
options.add_argument("--headless")
|
||||
options.add_argument("--window-size=1920,1080")
|
||||
options.add_argument('--proxy-server=%s:%s' % (config()['mitm']['host'], config()['mitm']['port']))
|
||||
options.add_argument('--ignore-certificate-errors')
|
||||
options.add_argument('--ignore-ssl-errors')
|
||||
options.add_argument('--incognito')
|
||||
proxy = Proxy()
|
||||
proxy.proxy_type = ProxyType.MANUAL
|
||||
proxy.http_proxy = "%s:%s" % (config()['mitm']['host'], config()['mitm']['port'])
|
||||
proxy.ssl_proxy = "%s:%s" % (config()['mitm']['host'], config()['mitm']['port'])
|
||||
capabilities = DesiredCapabilities.EDGE
|
||||
proxy.add_to_capabilities(capabilities)
|
||||
|
||||
self.browser = webdriver.Chrome(options=options,
|
||||
desired_capabilities=capabilities,
|
||||
executable_path=config()['webdriver']['edge']['bin']
|
||||
)
|
||||
|
||||
def new_tab(self) -> str:
|
||||
current_window_handles = self.browser.window_handles
|
||||
self.browser.execute_script("window.open('')")
|
||||
new_window_handles = self.browser.window_handles
|
||||
for _handle in new_window_handles:
|
||||
if _handle not in current_window_handles:
|
||||
return _handle
|
||||
return ""
|
||||
|
||||
def change_tab(self, tab_handler: str):
|
||||
if tab_handler not in self.browser.window_handles:
|
||||
return
|
||||
if tab_handler == self.browser.current_window_handle:
|
||||
return
|
||||
self.browser.switch_to.window(tab_handler)
|
||||
|
||||
def open_url(self, url: str, tab_handler: str = ""):
|
||||
with self.op_tab(tab_handler):
|
||||
self.browser.get(url)
|
||||
|
||||
def refresh(self, tab_handler: str = ""):
|
||||
with self.op_tab(tab_handler):
|
||||
self.browser.refresh()
|
||||
|
||||
def screenshot(self, tab_handler: str = "") -> str:
|
||||
with self.op_tab(tab_handler):
|
||||
return self.browser.get_screenshot_as_base64()
|
@ -1,102 +1,59 @@
|
||||
import threading
|
||||
import logging
|
||||
from typing import TYPE_CHECKING
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from browser.chrome import ChromeDriver
|
||||
from browser.edge import EdgeDriver
|
||||
from config.helper import config
|
||||
from common import Singleton
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Type, Optional, List
|
||||
from typing import Type, List
|
||||
from browser.IDriver import IDriver
|
||||
from config import ConfigManager
|
||||
from common.items import TabInfo
|
||||
|
||||
_manager: "Optional[BrowserManager]" = None
|
||||
_log = logging.getLogger("BrowserManager")
|
||||
|
||||
|
||||
class BrowserManager():
|
||||
class BrowserManager(metaclass=Singleton):
|
||||
_config_manager: "ConfigManager"
|
||||
_mapping: "dict[str, Type[IDriver]]" = {
|
||||
"chrome": ChromeDriver,
|
||||
"edge": EdgeDriver
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
_config = config()["webdriver"]["use"]
|
||||
def __init__(self, config_manager: "ConfigManager"):
|
||||
self._config_manager = config_manager
|
||||
_config = self._config_manager.config["webdriver"]["use"]
|
||||
if _config not in self._mapping:
|
||||
_log.error("不支持的浏览器:%s", _config)
|
||||
raise Exception("不支持的浏览器")
|
||||
self._driver: IDriver = self._mapping[_config]()
|
||||
self._driver: IDriver = self._mapping[_config](self._config_manager)
|
||||
self._tabs: "List[TabInfo]" = []
|
||||
|
||||
def init_browser(self):
|
||||
_live_config = config().get("live", {})
|
||||
_users = _live_config.get("users", [])
|
||||
if type(_users) is not list:
|
||||
_users = [_users]
|
||||
_rooms = _live_config.get("rooms", [])
|
||||
if type(_rooms) is not list:
|
||||
_rooms = [_rooms]
|
||||
for _user in _users:
|
||||
self.open_user_page(str(_user))
|
||||
for _room in _rooms:
|
||||
self.open_live_page(str(_room))
|
||||
_log.debug("初始化完毕")
|
||||
|
||||
@property
|
||||
def driver(self):
|
||||
return self._driver
|
||||
|
||||
def open_user_page(self, sec_user_id: str):
|
||||
tab = TabInfo()
|
||||
tab.tab_type = TabInfo.TAB_TYPE_USER
|
||||
tab.user_id = sec_user_id
|
||||
if urlparse(sec_user_id).scheme:
|
||||
tab.url = sec_user_id
|
||||
else:
|
||||
# 单独的用户id
|
||||
tab.url = "https://www.douyin.com/user/" + sec_user_id
|
||||
self.open_tab(tab)
|
||||
|
||||
def open_live_page(self, live_url: str):
|
||||
tab = TabInfo()
|
||||
tab.tab_type = TabInfo.TAB_TYPE_LIVE
|
||||
if not urlparse(live_url).scheme:
|
||||
# 单独的房间号
|
||||
live_url = "https://live.douyin.com/" + live_url
|
||||
tab.url = live_url
|
||||
self.open_tab(tab)
|
||||
|
||||
def open_tab(self, tab_info: "TabInfo"):
|
||||
tab_handler = self._driver.new_tab()
|
||||
tab_info.tab_handler = tab_handler
|
||||
if not tab_info.tab_handler:
|
||||
tab_handler = self._driver.new_tab()
|
||||
tab_info.tab_handler = tab_handler
|
||||
if not tab_info.tab_type:
|
||||
tab_info.tab_type = TabInfo.TAB_TYPE_OTHER
|
||||
self.driver.open_url(tab_info.url, tab_handler)
|
||||
_log.debug("打开URL:【%s】@%s", tab_info.url, tab_info.tab_handler)
|
||||
self.driver.open_url(tab_info.url, tab_info.tab_handler)
|
||||
_log.info("打开URL完毕:【%s】@%s", tab_info.url, tab_info.tab_handler)
|
||||
if tab_info not in self._tabs:
|
||||
self._tabs.append(tab_info)
|
||||
|
||||
def close_tab(self, tab_info: "TabInfo"):
|
||||
if tab_info not in self._tabs:
|
||||
_log.warning("提供的标签不在标签组中,不予执行")
|
||||
return
|
||||
_log.debug("关闭标签:%s", tab_info.tab_handler)
|
||||
self._driver.close_tab(tab_info.tab_handler)
|
||||
_log.info("关闭标签完毕:%s", tab_info.tab_handler)
|
||||
self._tabs.remove(tab_info)
|
||||
|
||||
def terminate(self):
|
||||
if self._driver:
|
||||
self._driver.terminate()
|
||||
|
||||
|
||||
class TabInfo(object):
|
||||
TAB_TYPE_OTHER = "other"
|
||||
TAB_TYPE_USER = "user"
|
||||
TAB_TYPE_LIVE = "live"
|
||||
|
||||
def __init__(self):
|
||||
self.tab_handler: str = ""
|
||||
self.user_id: str = ""
|
||||
self.url: str = ""
|
||||
self.tab_type: str = self.TAB_TYPE_OTHER
|
||||
|
||||
|
||||
def init_manager():
|
||||
global _manager
|
||||
_manager = BrowserManager()
|
||||
threading.Thread(target=_manager.init_browser).start()
|
||||
return _manager
|
||||
|
||||
|
||||
def get_manager():
|
||||
if _manager is None:
|
||||
return init_manager()
|
||||
return _manager
|
||||
|
Reference in New Issue
Block a user