提取请求接口,添加ping房间

This commit is contained in:
Jerry Yan 2020-01-01 11:55:12 +08:00
parent a3fec20b52
commit d3cce09a7a

112
api.py
View File

@ -26,6 +26,7 @@ SEARCH_USER_API = (
"https://security.snssdk.com/video/app/search/live/?format=json&search_sug=0&forum=0&m_tab=live&is_native_req=0"
"&offset=0&from=live&en_qc=1&pd=xigua_live&ssmix=a{COMMON}&keyword={keyword}")
USER_INFO_API = "https://is.snssdk.com/video/app/user/home/v7/?to_user_id={userId}{COMMON}"
PING_ROOM_API = "https://webcast3.ixigua.com/webcast/room/ping/audience/?room_id={roomId}{COMMON}"
COMMON_HEADERS = {
"sdk-version": '1',
"User-Agent": "Dalvik/2.1.0 (Linux; U; Android 9) VideoArticle/8.1.6 cronet/TTNetVersion:b97574c0 2019-09-24",
@ -43,23 +44,26 @@ class XiGuaLiveApi:
"""
if name is None:
name = "永恒de草薙"
self.roomLiver = None
if type(name) == User:
self.roomLiver = name
self.name = name.name
elif str(name).isdigit():
self.roomLiver = User()
self.roomLiver.ID = int(name)
else:
self.name = str(name)
self.isLive = False
self.isValidRoom = False
self._rawRoomInfo = {}
self.roomID = 0
self.roomLiver = None
self.roomPopularity = 0
self._cursor = "0"
self.lottery = None
self.s = requests.session()
self.s.headers.update(COMMON_HEADERS)
self._updRoomAt = datetime.now()
self.updRoomInfo(True)
self.updRoomInfo()
def _updateRoomPopularity(self, _data):
"""
@ -74,6 +78,48 @@ class XiGuaLiveApi:
if "popularity" in _data["data"]:
self.roomPopularity = _data["data"]["popularity"]
def getJson(self, url, **kwargs):
try:
p = self.s.get(url, **kwargs)
except Exception as e:
print("网络请求失败")
if DEBUG:
print("GET")
print("URL", url)
print("ERR ", e.__str__())
return None
try:
return p.json()
except Exception as e:
print("解析请求失败")
if DEBUG:
print("GET JSON")
print("URL", url)
print("CNT", p.text)
print("ERR ", e.__str__())
return None
def postJson(self, url, data, **kwargs):
try:
p = self.s.post(url, data, **kwargs)
except Exception as e:
print("网络请求失败")
if DEBUG:
print("POST")
print("URL", url)
print("ERR ", e.__str__())
return None
try:
return p.json()
except Exception as e:
print("解析请求失败")
if DEBUG:
print("GET JSON")
print("URL", url)
print("CNT", p.text)
print("ERR ", e.__str__())
return None
@staticmethod
def apiChangedError(msg: str, *args):
"""
@ -205,15 +251,9 @@ class XiGuaLiveApi:
return False
_formatData = {"COMMON": COMMON_GET_PARAM, "TIMESTAMP": time.time() * 1000, "userId": self.roomLiver.ID}
_url = USER_INFO_API.format_map(_formatData).format_map(_formatData)
try:
p = self.s.get(_url)
except Exception as e:
self.apiChangedError("更新用户信息接口请求失败", e.__str__())
return False
try:
d = p.json()
except Exception as e:
self.apiChangedError("更新房间接口错误", e.__str__())
d = self.getJson(_url)
if d is None:
print("获取用户信息失败")
return False
self.isValidRoom = d["status"] == 0
if "user_info" not in d and d["user_info"] is None:
@ -234,39 +274,29 @@ class XiGuaLiveApi:
self.lottery = l
return True
def updRoomInfo(self, force=False):
def _checkRoom(self):
if self.roomID == 0:
return False
_formatData = {"COMMON": COMMON_GET_PARAM, "TIMESTAMP": time.time() * 1000, "roomId": self.roomID}
_url = PING_ROOM_API.format_map(_formatData).format_map(_formatData)
d = self.getJson(_url)
if d is None:
print("检查房间失败")
return False
self.isLive = d["data"]["room_status"] == 0
return self.isLive
def updRoomInfo(self):
"""
更新房间信息
:return:
"""
if not force and self._updRoomAt > (datetime.now() - timedelta(minutes=3)):
return self.isLive
self._updRoomAt = datetime.now()
if self.isValidRoom:
if not self.isValidRoom:
return self._forceSearchUser()
elif not self.isLive:
return self._updateUserOnly()
else:
return self._forceSearchUser()
@staticmethod
def getUserInfoByUserId(userId):
"""
通过UserId查找用户的房间号
:param userId: 用户ID
:return: XiGuaLiveApi
"""
_formatData = {"COMMON": COMMON_GET_PARAM, "TIMESTAMP": time.time() * 1000, "userId": userId}
_url = USER_INFO_API.format_map(_formatData).format_map(_formatData)
try:
p = requests.get(_url, headers=COMMON_HEADERS)
except Exception as e:
XiGuaLiveApi.apiChangedError("更新用户信息接口请求失败", e.__str__())
return None
try:
d = p.json()
except Exception as e:
XiGuaLiveApi.apiChangedError("更新房间接口错误", e.__str__())
return None
return XiGuaLiveApi(User(d))
return self._checkRoom()
@staticmethod
def searchUser(keyword):
@ -298,9 +328,7 @@ class XiGuaLiveApi:
"""
获取弹幕
"""
if not self.isValidRoom:
self.updRoomInfo()
return
self.updRoomInfo()
p = self.s.get("https://i.snssdk.com/videolive/im/get_msg?cursor={cursor}&room_id={roomID}"
"&version_code=800&device_platform=android".format(
roomID=self.roomID,
@ -351,7 +379,7 @@ class XiGuaLiveApi:
self.onLottery(self.lottery)
self.lottery = None
# 2分钟自动更新下房间信息
self.updRoomInfo(len(d['data']) == 0)
self.updRoomInfo()
if __name__ == "__main__":