2022-07-25 16:24:11 +08:00

647 lines
17 KiB
Python

import requests
from datetime import datetime
from .util import cipher as cipher
import os
import math
import hashlib
from .util.retry import Retry
from concurrent.futures import ThreadPoolExecutor, as_completed
# From PC ugc_assisstant
# APPKEY = 'aae92bc66f3edfab'
# APPSECRET = 'af125a0d5279fd576c1b4418a3e8276d'
APPKEY = '1d8b6e7d45233436'
APPSECRET = '560c52ccd288fed045859ed18bffd973'
LOGIN_APPKEY = '783bbb7264451d82'
# upload chunk size = 2MB
CHUNK_SIZE = 2 * 1024 * 1024
class VideoPart:
"""
Video Part of a post.
每个对象代表一个分P
Attributes:
path: file path in local file system.
title: title of the video part.
desc: description of the video part.
server_file_name: file name in bilibili server. generated by pre-upload API.
"""
def __init__(self, path, title='', desc='', server_file_name=None):
self.path = path
self.title = title
self.desc = desc
self.server_file_name = server_file_name
def __repr__(self):
return '<{clazz}, path: {path}, title: {title}, desc: {desc}, server_file_name:{server_file_name}>' \
.format(clazz=self.__class__.__name__,
path=self.path,
title=self.title,
desc=self.desc,
server_file_name=self.server_file_name)
def get_key_old(sid=None, jsessionid=None):
"""
get public key, hash and session id for login.
Args:
sid: session id. only for captcha login.
jsessionid: j-session id. only for captcha login.
Returns:
hash: salt for password encryption.
pubkey: rsa public key for password encryption.
sid: session id.
"""
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': "application/json, text/javascript, */*; q=0.01"
}
post_data = {
'appkey': APPKEY,
'platform': "pc",
'ts': str(int(datetime.now().timestamp()))
}
post_data['sign'] = cipher.sign_dict(post_data, APPSECRET)
cookie = {}
if sid:
cookie['sid'] = sid
if jsessionid:
cookie['JSESSIONID'] = jsessionid
r = requests.post(
# "https://passport.bilibili.com/api/oauth2/getKey",
"https://passport.bilibili.com/x/passport-login/web/key",
headers=headers,
data=post_data,
cookies=cookie
)
print(r.content.decode())
r_data = r.json()['data']
if sid:
return r_data['hash'], r_data['key'], sid
return r_data['hash'], r_data['key'], r.cookies['sid']
def get_key():
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': "application/json, text/javascript, */*; q=0.01"
}
params_data = {
'appkey': LOGIN_APPKEY,
# 'ts': str(int(datetime.now().timestamp()))
}
params_data['sign'] = cipher.login_sign_dict_bin(params_data)
r = requests.get(
"https://passport.bilibili.com/x/passport-login/web/key",
headers=headers,
params=params_data
)
r_data = r.json()['data']
return r_data['hash'], r_data['key'], ''
def get_capcha(sid):
headers = {
'User-Agent': '',
'Accept-Encoding': 'gzip,deflate',
}
params = {
'appkey': APPKEY,
'platform': 'pc',
'ts': str(int(datetime.now().timestamp()))
}
params['sign'] = cipher.sign_dict(params, APPSECRET)
r = requests.get(
"https://passport.bilibili.com/captcha",
headers=headers,
params=params,
cookies={
'sid': sid
}
)
print(r.status_code)
capcha_data = r.content
return r.cookies['JSESSIONID'], capcha_data
def login_by_access_token(access_token):
"""
bilibili access token login.
Args:
access_token: Bilibili access token got by previous username/password login.
Returns:
sid: session id.
mid: member id.
expires_in: access token expire time
"""
headers = {
'Connection': 'keep-alive',
'Accept-Encoding': 'gzip,deflate',
'Host': 'passport.bilibili.com',
'User-Agent': '',
}
login_params = {
'appkey': APPKEY,
'access_token': access_token,
'platform': "pc",
'ts': str(int(datetime.now().timestamp())),
}
login_params['sign'] = cipher.sign_dict(login_params, APPSECRET)
r = requests.get(
url="https://passport.bilibili.com/api/oauth2/info",
headers=headers,
params=login_params
)
login_data = r.json()['data']
return r.cookies['sid'], login_data['mid'], login_data["expires_in"]
def upload_cover(access_token, sid, cover_file_path):
with open(cover_file_path, "rb") as f:
cover_pic = f.read()
headers = {
'Connection': 'keep-alive',
'Host': 'member.bilibili.com',
'Accept-Encoding': 'gzip,deflate',
'User-Agent': '',
}
params = {
"access_key": access_token,
}
params["sign"] = cipher.sign_dict(params, APPSECRET)
files = {
'file': ("cover.png", cover_pic, "Content-Type: image/png"),
}
r = requests.post(
"http://member.bilibili.com/x/vu/client/cover/up",
headers=headers,
params=params,
files=files,
cookies={
'sid': sid
},
verify=False,
)
return r.json()["data"]["url"]
def upload_chunk(upload_url, server_file_name, local_file_name, chunk_data, chunk_size, chunk_id, chunk_total_num):
"""
upload video chunk.
Args:
upload_url: upload url by pre_upload api.
server_file_name: file name on server by pre_upload api.
local_file_name: video file name in local fs.
chunk_data: binary data of video chunk.
chunk_size: default of ugc_assisstant is 2M.
chunk_id: chunk number.
chunk_total_num: total chunk number.
Returns:
True: upload chunk success.
False: upload chunk fail.
"""
print("chunk{}/{}".format(chunk_id, chunk_total_num))
print("filename: {}".format(local_file_name))
files = {
'version': (None, '2.0.0.1054'),
'filesize': (None, chunk_size),
'chunk': (None, chunk_id),
'chunks': (None, chunk_total_num),
'md5': (None, cipher.md5_bytes(chunk_data)),
'file': (local_file_name, chunk_data, 'application/octet-stream')
}
r = requests.post(
url=upload_url,
files=files,
cookies={
'PHPSESSID': server_file_name
},
)
print(r.status_code)
print(r.content)
if r.status_code == 200 and r.json()['OK'] == 1:
return True
else:
return False
def upload_video_part(access_token, sid, mid, video_part: VideoPart, max_retry=5, cb=None):
"""
upload a video file.
Args:
access_token: access token generated by login api.
sid: session id.
mid: member id.
video_part: local video file data.
max_retry: max retry number for each chunk.
cb: 回调
Returns:
status: success or fail.
server_file_name: server file name by pre_upload api.
"""
if cb is None:
cb = lambda f, c, t: None
if not isinstance(video_part, VideoPart):
return False
if video_part.server_file_name is not None:
return True
headers = {
'Connection': 'keep-alive',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'User-Agent': '',
'Accept-Encoding': 'gzip,deflate',
}
r = requests.get(
"http://member.bilibili.com/preupload?access_key={}&mid={}&profile=ugcfr%2Fpc3".format(access_token, mid),
headers=headers,
cookies={
'sid': sid
},
verify=False,
)
pre_upload_data = r.json()
upload_url = pre_upload_data['url']
complete_upload_url = pre_upload_data['complete']
server_file_name = pre_upload_data['filename']
local_file_name = video_part.path
file_size = os.path.getsize(local_file_name)
chunk_total_num = int(math.ceil(file_size / CHUNK_SIZE))
file_hash = hashlib.md5()
with open(local_file_name, 'rb') as f:
for chunk_id in range(0, chunk_total_num):
chunk_data = f.read(CHUNK_SIZE)
cb(video_part, chunk_id, chunk_total_num)
status = Retry(max_retry=max_retry, success_return_value=True).run(
upload_chunk,
upload_url,
server_file_name,
os.path.basename(local_file_name),
chunk_data,
CHUNK_SIZE,
chunk_id,
chunk_total_num
)
if not status:
return False
file_hash.update(chunk_data)
print(file_hash.hexdigest())
# complete upload
post_data = {
'chunks': chunk_total_num,
'filesize': file_size,
'md5': file_hash.hexdigest(),
'name': os.path.basename(local_file_name),
'version': '2.0.0.1054',
}
r = requests.post(
url=complete_upload_url,
data=post_data,
headers=headers,
)
print(r.status_code)
print(r.content)
video_part.server_file_name = server_file_name
return True
def upload(access_token,
sid,
mid,
parts,
copyright: int,
title: str,
tid: int,
tag: str,
desc: str,
source: str = '',
cover: str = '',
no_reprint: int = 0,
open_elec: int = 1,
max_retry: int = 5,
thread_pool_workers: int = 1):
"""
upload video.
Args:
access_token: oauth2 access token.
sid: session id.
mid: member id.
parts: VideoPart list.
copyright: 原创/转载.
title: 投稿标题.
tid: 分区id.
tag: 标签.
desc: 投稿简介.
source: 转载地址.
cover: 封面图片文件路径.
no_reprint: 可否转载.
open_elec: 充电.
max_retry: max retry time for each chunk.
thread_pool_workers: max upload threads.
Returns:
(aid, bvid)
aid: av号
bvid: bv号
"""
if not isinstance(parts, list):
parts = [parts]
status = True
with ThreadPoolExecutor(max_workers=thread_pool_workers) as tpe:
t_list = []
for video_part in parts:
print("upload {} added in pool".format(video_part.title))
t_obj = tpe.submit(upload_video_part, access_token, sid, mid, video_part, max_retry)
t_obj.video_part = video_part
t_list.append(t_obj)
for t_obj in as_completed(t_list):
status = status and t_obj.result()
print("video part {} finished, status: {}".format(t_obj.video_part.title, t_obj.result()))
if not status:
print("upload failed")
return None, None
# cover
if os.path.isfile(cover):
try:
cover = upload_cover(access_token, sid, cover)
except:
cover = ''
else:
cover = ''
# submit
headers = {
'Connection': 'keep-alive',
'Content-Type': 'application/json',
'User-Agent': '',
}
post_data = {
'build': 1054,
'copyright': copyright,
'cover': cover,
'desc': desc,
'no_reprint': no_reprint,
'open_elec': open_elec,
'source': source,
'tag': tag,
'tid': tid,
'title': title,
'videos': []
}
for video_part in parts:
post_data['videos'].append({
"desc": video_part.desc,
"filename": video_part.server_file_name,
"title": video_part.title
})
params = {
'access_key': access_token,
}
params['sign'] = cipher.sign_dict(params, APPSECRET)
r = requests.post(
url="http://member.bilibili.com/x/vu/client/add",
params=params,
headers=headers,
verify=False,
cookies={
'sid': sid
},
json=post_data,
)
print("submit")
print(r.status_code)
print(r.content.decode())
data = r.json()["data"]
return data["aid"], data["bvid"]
def get_post_data(access_token, sid, avid):
headers = {
'Connection': 'keep-alive',
'Host': 'member.bilibili.com',
'Accept-Encoding': 'gzip,deflate',
'User-Agent': '',
}
params = {
"access_key": access_token,
"aid": avid,
"build": "1054"
}
params["sign"] = cipher.sign_dict(params, APPSECRET)
r = requests.get(
url="http://member.bilibili.com/x/client/archive/view",
headers=headers,
params=params,
cookies={
'sid': sid
}
)
return r.json()["data"]
def edit_videos(
access_token,
sid,
mid,
avid=None,
bvid=None,
parts=None,
insert_index=None,
copyright=None,
title=None,
tid=None,
tag=None,
desc=None,
source=None,
cover=None,
no_reprint=None,
open_elec=None,
max_retry: int = 5,
thread_pool_workers: int = 1):
"""
insert videos into existed post.
Args:
access_token: oauth2 access token.
sid: session id.
mid: member id.
avid: av number,
bvid: bv string,
parts: VideoPart list.
insert_index: new video index.
copyright: 原创/转载.
title: 投稿标题.
tid: 分区id.
tag: 标签.
desc: 投稿简介.
source: 转载地址.
cover: cover url.
no_reprint: 可否转载.
open_elec: 充电.
max_retry: max retry time for each chunk.
thread_pool_workers: max upload threads.
Returns:
(aid, bvid)
aid: av号
bvid: bv号
"""
if not avid and not bvid:
print("please provide avid or bvid")
return None, None
if not avid:
avid = cipher.bv2av(bvid)
if not isinstance(parts, list):
parts = [parts]
if type(avid) is str:
avid = int(avid)
post_video_data = get_post_data(access_token, sid, avid)
status = True
with ThreadPoolExecutor(max_workers=thread_pool_workers) as tpe:
t_list = []
for video_part in parts:
print("upload {} added in pool".format(video_part.title))
t_obj = tpe.submit(upload_video_part, access_token, sid, mid, video_part, max_retry)
t_obj.video_part = video_part
t_list.append(t_obj)
for t_obj in as_completed(t_list):
status = status and t_obj.result()
print("video part {} finished, status: {}".format(t_obj.video_part.title, t_obj.result()))
if not status:
print("upload failed")
return None, None
headers = {
'Connection': 'keep-alive',
'Content-Type': 'application/json',
'User-Agent': '',
}
submit_data = {
'aid': avid,
'build': 1054,
'copyright': post_video_data["archive"]["copyright"],
'cover': post_video_data["archive"]["cover"],
'desc': post_video_data["archive"]["desc"],
'no_reprint': post_video_data["archive"]["no_reprint"],
'open_elec': post_video_data["archive_elec"]["state"], # open_elec not tested
'source': post_video_data["archive"]["source"],
'tag': post_video_data["archive"]["tag"],
'tid': post_video_data["archive"]["tid"],
'title': post_video_data["archive"]["title"],
'videos': post_video_data["videos"]
}
# cover
if os.path.isfile(cover):
try:
cover = upload_cover(access_token, sid, cover)
except:
cover = ''
else:
cover = ''
# edit archive data
if copyright:
submit_data["copyright"] = copyright
if title:
submit_data["title"] = title
if tid:
submit_data["tid"] = tid
if tag:
submit_data["tag"] = tag
if desc:
submit_data["desc"] = desc
if source:
submit_data["source"] = source
if cover:
submit_data["cover"] = cover
if no_reprint:
submit_data["no_reprint"] = no_reprint
if open_elec:
submit_data["open_elec"] = open_elec
if type(insert_index) is int:
for i, video_part in enumerate(parts):
submit_data['videos'].insert(insert_index + i, {
"desc": video_part.desc,
"filename": video_part.server_file_name,
"title": video_part.title
})
elif insert_index is None:
for video_part in parts:
submit_data['videos'].append({
"desc": video_part.desc,
"filename": video_part.server_file_name,
"title": video_part.title
})
else:
print("wrong insert index")
return None, None
params = {
'access_key': access_token,
}
params['sign'] = cipher.sign_dict(params, APPSECRET)
r = requests.post(
url="http://member.bilibili.com/x/vu/client/edit",
params=params,
headers=headers,
verify=False,
cookies={
'sid': sid
},
json=submit_data,
)
print("edit submit")
print(r.status_code)
print(r.content.decode())
data = r.json()["data"]
return data["aid"], data["bvid"]