房间信息获取接口

This commit is contained in:
2020-01-01 12:34:06 +08:00
parent d3cce09a7a
commit 560baf1b01
4 changed files with 51 additions and 237 deletions

105
api.py
View File

@ -26,7 +26,7 @@ SEARCH_USER_API = (
"https://security.snssdk.com/video/app/search/live/?format=json&search_sug=0&forum=0&m_tab=live&is_native_req=0"
"&offset=0&from=live&en_qc=1&pd=xigua_live&ssmix=a{COMMON}&keyword={keyword}")
USER_INFO_API = "https://is.snssdk.com/video/app/user/home/v7/?to_user_id={userId}{COMMON}"
PING_ROOM_API = "https://webcast3.ixigua.com/webcast/room/ping/audience/?room_id={roomId}{COMMON}"
ROOM_INFO_API = "https://webcast3.ixigua.com/webcast/room/enter/?room_id={roomId}&pack_level=4{COMMON}"
COMMON_HEADERS = {
"sdk-version": '1',
"User-Agent": "Dalvik/2.1.0 (Linux; U; Android 9) VideoArticle/8.1.6 cronet/TTNetVersion:b97574c0 2019-09-24",
@ -44,17 +44,17 @@ class XiGuaLiveApi:
"""
if name is None:
name = "永恒de草薙"
self.roomLiver = None
self.broadcaster = None
if type(name) == User:
self.roomLiver = name
self.broadcaster = name
self.name = name.name
elif str(name).isdigit():
self.roomLiver = User()
self.roomLiver.ID = int(name)
self.broadcaster = User()
self.broadcaster.ID = int(name)
else:
self.name = str(name)
self.isLive = False
self.isValidRoom = False
self.isValidUser = False
self._rawRoomInfo = {}
self.roomID = 0
self.roomPopularity = 0
@ -220,46 +220,62 @@ class XiGuaLiveApi:
"""
print("中奖消息 :", i)
def _checkUsernameIsMatched(self):
def _checkUsernameIsMatched(self, compare=None):
"""
验证主播名字是自己想要的那个
Check name matched
:return: bool: 是否匹配
"""
return True
if self.name is None or self.roomLiver is None:
if compare is None:
compare = self.broadcaster
if self.name is None or compare is None:
return False
return self.name == self.roomLiver.__str__() or self.roomLiver.__str__() in self.name or self.name in self.roomLiver.__str__()
return self.name == compare.__str__() or compare.__str__() in self.name or self.name in compare.__str__()
def _forceSearchUser(self):
"""
搜索主播名
:return:
"""
_results = self.searchUser(self.name)
if len(_results) > 0:
self.isValidRoom = True
self.roomLiver = _results[0]
return self._updateUserOnly()
_formatData = {"COMMON": COMMON_GET_PARAM, "TIMESTAMP": time.time() * 1000, "keyword": self.name}
_url = SEARCH_USER_API.format_map(_formatData).format_map(_formatData)
d = self.getJson(_url)
if d is None:
print("搜索接口请求失败")
return False
if "data" in d and d["data"] is not None:
for i in d["data"]:
if i["block_type"] != 0:
continue
if "cells" not in i or len(i["cells"]) == 0:
break
for _j in i["cells"]:
_user = User(_j)
if self._checkUsernameIsMatched(_user):
self.isValidUser = True
self.broadcaster = _user
break
return self._updateUserInfo()
def _updateUserOnly(self):
def _updateUserInfo(self):
"""
获取用户信息
:return:
"""
if self.roomLiver is None:
if self.broadcaster is None:
return False
_formatData = {"COMMON": COMMON_GET_PARAM, "TIMESTAMP": time.time() * 1000, "userId": self.roomLiver.ID}
_formatData = {"COMMON": COMMON_GET_PARAM, "TIMESTAMP": time.time() * 1000, "userId": self.broadcaster.ID}
_url = USER_INFO_API.format_map(_formatData).format_map(_formatData)
d = self.getJson(_url)
if d is None:
print("获取用户信息失败")
return False
self.isValidRoom = d["status"] == 0
self.isValidUser = d["status"] == 0
if "user_info" not in d and d["user_info"] is None:
self.apiChangedError("Api发生改变请及时联系我", d)
return False
self.roomLiver = User(d)
self.broadcaster = User(d)
if not self._checkUsernameIsMatched():
self.isLive = False
return False
@ -274,55 +290,30 @@ class XiGuaLiveApi:
self.lottery = l
return True
def _checkRoom(self):
if self.roomID == 0:
return False
def _getRoomInfo(self):
_formatData = {"COMMON": COMMON_GET_PARAM, "TIMESTAMP": time.time() * 1000, "roomId": self.roomID}
_url = PING_ROOM_API.format_map(_formatData).format_map(_formatData)
_url = ROOM_INFO_API.format_map(_formatData).format_map(_formatData)
d = self.getJson(_url)
if d is None:
print("检查房间失败")
print("获取房间信息接口请求失败")
return False
self.isLive = d["data"]["room_status"] == 0
return self.isLive
if d["status_code"] != 0:
print("接口提示:【{}".format(d["data"]["message"]))
return False
self._rawRoomInfo = d["data"]
return True
def updRoomInfo(self):
"""
更新房间信息
:return:
"""
if not self.isValidRoom:
if not self.isValidUser:
return self._forceSearchUser()
elif not self.isLive:
return self._updateUserOnly()
return self._updateUserInfo()
else:
return self._checkRoom()
@staticmethod
def searchUser(keyword):
"""
通过关键词搜索主播
:param keyword: 关键词
:return: array: 搜索结果
"""
ret = []
_formatData = {"COMMON": COMMON_GET_PARAM, "TIMESTAMP": time.time() * 1000, "keyword": keyword}
_url = SEARCH_USER_API.format_map(_formatData).format_map(_formatData)
try:
p = requests.get(_url)
d = p.json()
except json.decoder.JSONDecodeError as e:
XiGuaLiveApi.apiChangedError("搜索接口错误", e.__str__())
return ret
if "data" in d and d["data"] is not None:
for i in d["data"]:
if i["block_type"] != 0:
continue
if "cells" not in i or len(i["cells"]) == 0:
break
for _j in i["cells"]:
ret.append(User(_j))
return ret
return self._getRoomInfo()
def getDanmaku(self):
"""
@ -390,10 +381,10 @@ if __name__ == "__main__":
name = sys.argv[1]
print("西瓜直播弹幕助手 by JerryYan")
api = XiGuaLiveApi(name)
if not api.isValidRoom:
if not api.isValidUser:
input("房间不存在")
sys.exit()
print("进入", api.roomLiver, "的直播间")
print("进入", api.broadcaster, "的直播间")
print("=" * 30)
while True:
if api.isLive: