# coding=utf-8 import os import re import json as JSON import Common import rsa import math import base64 import hashlib import requests from urllib import parse from requests.adapters import HTTPAdapter from urllib3 import Retry class VideoPart: def __init__(self, path, title='', desc=''): self.path = path self.title = title self.desc = desc class Bilibili: def __init__(self, cookie=None): self.files = [] self.videos = [] self.session = requests.session() self.session.keep_alive = False if cookie: self.session.headers["cookie"] = cookie self.csrf = re.search('bili_jct=(.*?);', cookie).group(1) self.mid = re.search('DedeUserID=(.*?);', cookie).group(1) self.session.headers['Accept'] = 'application/json, text/javascript, */*; q=0.01' self.session.headers['Referer'] = 'https://space.bilibili.com/{mid}/#!/'.format(mid=self.mid) # session.headers['User-Agent'] = 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36' # session.headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8' def login(self, user, pwd): """ :param user: username :type user: str :param pwd: password :type pwd: str :return: if success return True else raise Exception """ APPKEY = '4409e2ce8ffd12b8' ACTIONKEY = 'appkey' BUILD = 101800 DEVICE = 'android_tv_yst' MOBI_APP = 'android_tv_yst' PLATFORM = 'android' APPSECRET = '59b43e04ad6965f34319062b478f83dd' def md5(s): h = hashlib.md5() h.update(s.encode('utf-8')) return h.hexdigest() def sign(s): """ :return: return sign """ return md5(s + APPSECRET) def signed_body(body): """ :return: body which be added sign """ if isinstance(body, str): return body + '&sign=' + sign(body) elif isinstance(body, dict): ls = [] for k, v in body.items(): ls.append(k + '=' + v) body['sign'] = sign('&'.join(ls)) return body def getkey(): """ :return: hash, key """ r = self.session.post( 'https://passport.bilibili.com/api/oauth2/getKey', signed_body({'appkey': APPKEY}), ) # {"ts":1544152439,"code":0,"data":{"hash":"99c7573759582e0b","key":"-----BEGIN PUBLIC----- -----END PUBLIC KEY-----\n"}} json = r.json() data = json['data'] return data['hash'], data['key'] def access_token_2_cookie(access_token): r = self.session.get( 'https://passport.bilibili.com/api/login/sso?' + \ signed_body( 'access_key={access_token}&appkey={appkey}&gourl=https%3A%2F%2Faccount.bilibili.com%2Faccount%2Fhome' .format(access_token=access_token, appkey=APPKEY), ), allow_redirects=False, ) return r.cookies.get_dict(domain=".bilibili.com") self.session.headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8' h, k = getkey() pwd = base64.b64encode( rsa.encrypt( (h + pwd).encode('utf-8'), rsa.PublicKey.load_pkcs1_openssl_pem(k.encode()) ) ) user = parse.quote_plus(user) pwd = parse.quote_plus(pwd) r = self.session.post( 'https://passport.snm0516.aisee.tv/api/tv/login', signed_body( 'appkey={appkey}&build={build}&captcha=&channel=master&' 'guid=XYEBAA3E54D502E37BD606F0589A356902FCF&mobi_app={mobi_app}&' 'password={password}&platform={platform}&token=5598158bcd8511e2&ts=0&username={username}' .format(appkey=APPKEY, build=BUILD, platform=PLATFORM, mobi_app=MOBI_APP, username=user, password=pwd)), ) json = r.json() if json['code'] == -105: # need captcha raise Exception('TODO: login with captcha') if json['code'] != 0: raise Exception(r.text) access_token = json['data']['token_info']['access_token'] cookie_dict = access_token_2_cookie(access_token) cookie = '; '.join( '%s=%s' % (k, v) for k, v in cookie_dict.items() ) self.session.headers["cookie"] = cookie self.csrf = re.search('bili_jct=(.*?)(;|$)', cookie).group(1) self.mid = re.search('DedeUserID=(.*?)(;|$)', cookie).group(1) self.session.headers['Accept'] = 'application/json, text/javascript, */*; q=0.01' self.session.headers['Referer'] = 'https://space.bilibili.com/{mid}/#!/'.format(mid=self.mid) return True def upload(self, parts, title, tid, tag, desc, source='', cover='', no_reprint=1, ): """ :param parts: e.g. VideoPart('part path', 'part title', 'part desc'), or [VideoPart(...), VideoPart(...)] :type parts: VideoPart or list :param title: video's title :type title: str :param tid: video type, see: https://member.bilibili.com/x/web/archive/pre or https://github.com/uupers/BiliSpider/wiki/%E8%A7%86%E9%A2%91%E5%88%86%E5%8C%BA%E5%AF%B9%E5%BA%94%E8%A1%A8 :type tid: int :param tag: video's tag :type tag: list :param desc: video's description :type desc: str :param source: (optional) 转载地址 :type source: str :param cover: (optional) cover's URL, use method *cover_up* to get :type cover: str :param no_reprint: (optional) 0=可以转载, 1=禁止转载(default) :type no_reprint: int """ self.preUpload(parts) self.finishUpload(title, tid, tag, desc, source, cover, no_reprint) self.clear() def preUpload(self, parts, max_retry=5): """ :param max_retry: :param parts: e.g. VideoPart('part path', 'part title', 'part desc'), or [VideoPart(...), VideoPart(...)] :type parts: VideoPart or list """ self.session.headers['Content-Type'] = 'application/json; charset=utf-8' if not isinstance(parts, list): parts = [parts] # retry by status retries = Retry( total=max_retry, backoff_factor=1, status_forcelist=(504, ), ) self.session.mount('https://', HTTPAdapter(max_retries=retries)) self.session.mount('http://', HTTPAdapter(max_retries=retries)) # for part in parts: filepath = part.path filename = os.path.basename(filepath) filesize = os.path.getsize(filepath) Common.appendUploadStatus("Upload >{}< Started".format(filepath)) self.files.append(part) r = self.session.get('https://member.bilibili.com/preupload?' 'os=upos&upcdn=ws&name={name}&size={size}&r=upos&profile=ugcupos%2Fyb&ssl=0' .format(name=parse.quote_plus(filename), size=filesize)) """return example { "upos_uri": "upos://ugc/i181012ws18x52mti3gg0h33chn3tyhp.mp4", "biz_id": 58993125, "endpoint": "//upos-hz-upcdnws.acgvideo.com", "endpoints": [ "//upos-hz-upcdnws.acgvideo.com", "//upos-hz-upcdntx.acgvideo.com" ], "chunk_retry_delay": 3, "chunk_retry": 200, "chunk_size": 4194304, "threads": 2, "timeout": 900, "auth": "os=upos&cdn=upcdnws&uid=&net_state=4&device=&build=&os_version=&ak=×tamp=&sign=", "OK": 1 } """ json = r.json() upos_uri = json['upos_uri'] endpoint = json['endpoint'] auth = json['auth'] biz_id = json['biz_id'] chunk_size = json['chunk_size'] self.session.headers['X-Upos-Auth'] = auth # add auth header r = self.session.post('https:{}/{}?uploads&output=json'.format(endpoint, upos_uri.replace('upos://', ''))) # {"upload_id":"72eb747b9650b8c7995fdb0efbdc2bb6","key":"\/i181012ws2wg1tb7tjzswk2voxrwlk1u.mp4","OK":1,"bucket":"ugc"} json = r.json() upload_id = json['upload_id'] with open(filepath, 'rb') as f: chunks_num = math.ceil(filesize / chunk_size) chunks_index = 0 chunks_data = f.read(chunk_size) Common.modifyLastUploadStatus( "Uploading >{}< @ {:.2f}%".format(filepath, 100.0 * chunks_index / chunks_num)) while True: if not chunks_data: break def upload_chunk(): r = self.session.put('https:{endpoint}/{upos_uri}?' 'partNumber={part_number}&uploadId={upload_id}&chunk={chunk}&chunks={chunks}&size={size}&start={start}&end={end}&total={total}' .format(endpoint=endpoint, upos_uri=upos_uri.replace('upos://', ''), part_number=chunks_index + 1, # starts with 1 upload_id=upload_id, chunk=chunks_index, chunks=chunks_num, size=len(chunks_data), start=chunks_index * chunk_size, end=chunks_index * chunk_size + len(chunks_data), total=filesize, ), chunks_data, ) return r def retry_upload_chunk(): """return :class:`Response` if upload success, else return None.""" for i in range(max_retry): r = upload_chunk() if r.status_code == 200: return r Common.modifyLastUploadStatus( "Uploading >{}< @ {:.2f}% RETRY[{}]".format(filepath, 100.0 * chunks_index / chunks_num, max_retry)) return None r = retry_upload_chunk() if r: Common.modifyLastUploadStatus( "Uploading >{}< @ {:.2f}%".format(filepath, 100.0 * chunks_index / chunks_num)) else: Common.modifyLastUploadStatus( "Uploading >{}< FAILED @ {:.2f}%".format(filepath, 100.0 * chunks_index / chunks_num)) continue chunks_data = f.read(chunk_size) chunks_index += 1 # start with 0 # NOT DELETE! Refer to https://github.com/comwrg/bilibiliupload/issues/15#issuecomment-424379769 self.session.post('https:{endpoint}/{upos_uri}?' 'output=json&name={name}&profile=ugcupos%2Fyb&uploadId={upload_id}&biz_id={biz_id}' .format(endpoint=endpoint, upos_uri=upos_uri.replace('upos://', ''), name=filename, upload_id=upload_id, biz_id=biz_id, ), {"parts": [{"partNumber": i, "eTag": "etag"} for i in range(1, chunks_num + 1)]}, ) self.videos.append({'filename': upos_uri.replace('upos://ugc/', '').split('.')[0], 'title': part.title, 'desc': part.desc}) Common.modifyLastUploadStatus("Upload >{}< Finished".format(filepath)) __f = open("uploaded.json", "w") JSON.dump(self.videos, __f) __f.close() def finishUpload(self, title, tid, tag, desc, source='', cover='', no_reprint=1, ): """ :param title: video's title :type title: str :param tid: video type, see: https://member.bilibili.com/x/web/archive/pre or https://github.com/uupers/BiliSpider/wiki/%E8%A7%86%E9%A2%91%E5%88%86%E5%8C%BA%E5%AF%B9%E5%BA%94%E8%A1%A8 :type tid: int :param tag: video's tag :type tag: list :param desc: video's description :type desc: str :param source: (optional) 转载地址 :type source: str :param cover: (optional) cover's URL, use method *cover_up* to get :type cover: str :param no_reprint: (optional) 0=可以转载, 1=禁止转载(default) :type no_reprint: int """ if len(self.videos) == 0: return Common.appendUploadStatus("[{}]投稿中,请稍后".format(title)) self.session.headers['Content-Type'] = 'application/json; charset=utf-8' copyright = 2 if source else 1 r = self.session.post('https://member.bilibili.com/x/vu/web/add?csrf=' + self.csrf, json={ "copyright": copyright, "source": source, "title": title, "tid": tid, "tag": ','.join(tag), "no_reprint": no_reprint, "desc": desc, "cover": cover, "mission_id": 0, "order_id": 0, "videos": self.videos} ) Common.modifyLastUploadStatus("[{}] Published | Result : {}".format(title, r.text)) def reloadFromPrevious(self): if os.path.exists("uploaded.json"): __f = open("uploaded.json", "r") try: self.videos = JSON.load(__f) Common.appendUploadStatus("RELOAD SUCCESS") except: Common.appendUploadStatus("RELOAD Failed") self.videos = [] __f.close() os.remove("uploaded.json") else: Common.appendUploadStatus("RELOAD Failed") self.videos = [] def clear(self): self.files.clear() self.videos.clear() if (os.path.exists("uploaded.json")): os.remove("uploaded.json") def appendUpload(self, aid, parts, title="", tid="", tag="", desc="", source='', cover='', no_reprint=1, ): """ :param aid: just aid :type aid: int :param parts: e.g. VideoPart('part path', 'part title', 'part desc'), or [VideoPart(...), VideoPart(...)] :type parts: VideoPart or list :param title: video's title :type title: str :param tid: video type, see: https://member.bilibili.com/x/web/archive/pre or https://github.com/uupers/BiliSpider/wiki/%E8%A7%86%E9%A2%91%E5%88%86%E5%8C%BA%E5%AF%B9%E5%BA%94%E8%A1%A8 :type tid: int :param tag: video's tag :type tag: list :param desc: video's description :type desc: str :param source: (optional) 转载地址 :type source: str :param cover: (optional) cover's URL, use method *cover_up* to get :type cover: str :param no_reprint: (optional) 0=可以转载, 1=禁止转载(default) :type no_reprint: int """ self.session.headers['Content-Type'] = 'application/json; charset=utf-8' p = self.session.get("https://member.bilibili.com/x/web/archive/view?aid={}&history=".format(aid)) j = p.json() if len(self.videos) == 0: for i in j['data']['videos']: self.videos.append({'filename': i['filename'], 'title': i["title"], 'desc': i["desc"]}) if (title == ""): title = j["data"]["archive"]['title'] if (tag == ""): tag = j["data"]["archive"]['tag'] if (no_reprint == ""): no_reprint = j["data"]["archive"]['no_reprint'] if (desc == ""): desc = j["data"]["archive"]['desc'] if (source == ""): source = j["data"]["archive"]['source'] if (tid == ""): tid = j["data"]["archive"]['tid'] self.preUpload(parts) self.editUpload(aid, title, tid, tag, desc, source, cover, no_reprint) def editUpload(self, aid, title, tid, tag, desc, source='', cover='', no_reprint=1, ): """ :param aid: just aid :type aid: int :param parts: e.g. VideoPart('part path', 'part title', 'part desc'), or [VideoPart(...), VideoPart(...)] :type parts: VideoPart or list :param title: video's title :type title: str :param tid: video type, see: https://member.bilibili.com/x/web/archive/pre or https://github.com/uupers/BiliSpider/wiki/%E8%A7%86%E9%A2%91%E5%88%86%E5%8C%BA%E5%AF%B9%E5%BA%94%E8%A1%A8 :type tid: int :param tag: video's tag :type tag: list :param desc: video's description :type desc: str :param source: (optional) 转载地址 :type source: str :param cover: (optional) cover's URL, use method *cover_up* to get :type cover: str :param no_reprint: (optional) 0=可以转载, 1=禁止转载(default) :type no_reprint: int """ copyright = 2 if source else 1 r = self.session.post('https://member.bilibili.com/x/vu/web/edit?csrf=' + self.csrf, json={ "aid": aid, "copyright": copyright, "source": source, "title": title, "tid": tid, "tag": ','.join(tag), "no_reprint": no_reprint, "desc": desc, "cover": cover, "mission_id": 0, "order_id": 0, "videos": self.videos} ) print(r.text) def addChannel(self, name, intro=''): """ :param name: channel's name :type name: str :param intro: channel's introduction :type intro: str """ r = self.session.post( url='https://space.bilibili.com/ajax/channel/addChannel', data={ 'name': name, 'intro': intro, 'aids': '', 'csrf': self.csrf, }, # name=123&intro=123&aids=&csrf=565d7ed17cef2cc8ad054210c4e64324&_=1497077610768 ) # return # {"status":true,"data":{"cid":"15812"}} print(r.json()) def channel_addVideo(self, cid, aids): """ :param cid: channel's id :type cid: int :param aids: videos' id :type aids: list """ r = self.session.post( url='https://space.bilibili.com/ajax/channel/addVideo', data={ 'aids': '%2C'.join(aids), 'cid': cid, 'csrf': self.csrf } # aids=9953555%2C9872953&cid=15814&csrf=565d7ed17cef2cc8ad054210c4e64324&_=1497079332679 ) print(r.json()) def cover_up(self, img): """ :param img: img path or stream :type img: str or BufferedReader :return: img URL """ if isinstance(img, str): f = open(img, 'rb') else: f = img r = self.session.post( url='https://member.bilibili.com/x/vu/web/cover/up', data={ 'cover': b'data:image/jpeg;base64,' + (base64.b64encode(f.read())), 'csrf': self.csrf, } ) # print(r.text) # {"code":0,"data":{"url":"http://i0.hdslb.com/bfs/archive/67db4a6eae398c309244e74f6e85ae8d813bd7c9.jpg"},"message":"","ttl":1} return r.json()['data']['url']