Skip to content

Commit

Permalink
feat: B站爬虫签名实现
Browse files Browse the repository at this point in the history
NanmiCoder committed Dec 2, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 5c920da commit 5aeee93
Showing 8 changed files with 345 additions and 12 deletions.
27 changes: 16 additions & 11 deletions main.py
Original file line number Diff line number Diff line change
@@ -4,30 +4,35 @@

import config
import db
from base.base_crawler import AbstractCrawler
from media_platform.bilibili import BilibiliCrawler
from media_platform.douyin import DouYinCrawler
from media_platform.kuaishou import KuaishouCrawler
from media_platform.xhs import XiaoHongShuCrawler
from proxy import proxy_account_pool


class CrawlerFactory:
CRAWLERS = {
"xhs": XiaoHongShuCrawler,
"dy": DouYinCrawler,
"ks": KuaishouCrawler,
"bili": BilibiliCrawler
}

@staticmethod
def create_crawler(platform: str):
if platform == "xhs":
return XiaoHongShuCrawler()
elif platform == "dy":
return DouYinCrawler()
elif platform == "ks":
return KuaishouCrawler()
else:
raise ValueError("Invalid Media Platform Currently only supported xhs or dy ...")
def create_crawler(platform: str) -> AbstractCrawler:
crawler_class = CrawlerFactory.CRAWLERS.get(platform)
if not crawler_class:
raise ValueError("Invalid Media Platform Currently only supported xhs or dy or ks or bili ...")
return crawler_class()


async def main():
# define command line params ...
parser = argparse.ArgumentParser(description='Media crawler program.')
parser.add_argument('--platform', type=str, help='Media platform select (xhs | dy | ks)',
choices=["xhs", "dy", "ks"], default=config.PLATFORM)
parser.add_argument('--platform', type=str, help='Media platform select (xhs | dy | ks | bili)',
choices=["xhs", "dy", "ks", "bili"], default=config.PLATFORM)
parser.add_argument('--lt', type=str, help='Login type (qrcode | phone | cookie)',
choices=["qrcode", "phone", "cookie"], default=config.LOGIN_TYPE)
parser.add_argument('--type', type=str, help='crawler type (search | detail)',
6 changes: 6 additions & 0 deletions media_platform/bilibili/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
# @Author : [email protected]
# @Time : 2023/12/2 18:36
# @Desc :

from .core import *
163 changes: 163 additions & 0 deletions media_platform/bilibili/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
# -*- coding: utf-8 -*-
# @Author : [email protected]
# @Time : 2023/12/2 18:44
# @Desc : bilibili 请求客户端
import asyncio
import json
from typing import Any, Callable, Dict, Optional
from urllib.parse import urlencode

import httpx
from playwright.async_api import BrowserContext, Page

from tools import utils

from .help import BilibiliSign
from .exception import DataFetchError


class BilibiliClient:
def __init__(
self,
timeout=10,
proxies=None,
*,
headers: Dict[str, str],
playwright_page: Page,
cookie_dict: Dict[str, str],
):
self.proxies = proxies
self.timeout = timeout
self.headers = headers
self._host = "https://api.bilibili.com"
self.playwright_page = playwright_page
self.cookie_dict = cookie_dict

async def request(self, method, url, **kwargs) -> Any:
async with httpx.AsyncClient(proxies=self.proxies) as client:
response = await client.request(
method, url, timeout=self.timeout,
**kwargs
)
data: Dict = response.json()
if data.get("code") != 0:
raise DataFetchError(data.get("message", "unkonw error"))
else:
return data.get("data", {})

async def pre_request_data(self, req_data: Dict) -> Dict:
"""
发送请求进行请求参数签名
需要从 localStorage 拿 wbi_img_urls 这参数,值如下:
https://i0.hdslb.com/bfs/wbi/7cd084941338484aae1ad9425b84077c.png-https://i0.hdslb.com/bfs/wbi/4932caff0ff746eab6f01bf08b70ac45.png
:param req_data:
:return:
"""
img_key, sub_key = self.get_wbi_keys()
return BilibiliSign(img_key, sub_key).sign(req_data)

async def get_wbi_keys(self) -> tuple[str, str]:
"""
获取最新的 img_key 和 sub_key
:return:
"""
local_storage = await self.playwright_page.evaluate("() => window.localStorage")
wbi_img_urls = local_storage.get("wbi_img_urls", "")
img_url, sub_url = wbi_img_urls.split("-")
if not img_url or not sub_url:
resp = await self.request(method="GET", url=self._host + "/x/web-interface/nav")
img_url: str = resp['wbi_img']['img_url']
sub_url: str = resp['wbi_img']['sub_url']
img_key = img_url.rsplit('/', 1)[1].split('.')[0]
sub_key = sub_url.rsplit('/', 1)[1].split('.')[0]
return img_key, sub_key

async def get(self, uri: str, params=None) -> Dict:
final_uri = uri
params = self.pre_request_data(params)
if isinstance(params, dict):
final_uri = (f"{uri}?"
f"{urlencode(params)}")
return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=self.headers)

async def post(self, uri: str, data: dict) -> Dict:
data = self.pre_request_data(data)
json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False)
return await self.request(method="POST", url=f"{self._host}{uri}",
data=json_str, headers=self.headers)

async def pong(self) -> bool:
"""get a note to check if login state is ok"""
utils.logger.info("Begin pong kuaishou...")
ping_flag = False
try:
pass
except Exception as e:
utils.logger.error(f"Pong kuaishou failed: {e}, and try to login again...")
ping_flag = False
return ping_flag

async def update_cookies(self, browser_context: BrowserContext):
cookie_str, cookie_dict = utils.convert_cookies(await browser_context.cookies())
self.headers["Cookie"] = cookie_str
self.cookie_dict = cookie_dict

async def search_info_by_keyword(self, keyword: str, pcursor: str):
"""
KuaiShou web search api
:param keyword: search keyword
:param pcursor: limite page curson
:return:
"""
post_data = {
}
return await self.post("", post_data)

async def get_video_info(self, photo_id: str) -> Dict:
"""
Kuaishou web video detail api
:param photo_id:
:return:
"""
post_data = {
}
return await self.post("", post_data)

async def get_video_comments(self, photo_id: str, pcursor: str = "") -> Dict:
"""get video comments
:param photo_id: photo id you want to fetch
:param pcursor: last you get pcursor, defaults to ""
:return:
"""
post_data = {
}
return await self.post("", post_data)

async def get_video_all_comments(self, photo_id: str, crawl_interval: float = 1.0, is_fetch_sub_comments=False,
callback: Optional[Callable] = None, ):
"""
get video all comments include sub comments
:param photo_id:
:param crawl_interval:
:param is_fetch_sub_comments:
:param callback:
:return:
"""

result = []
pcursor = ""
while pcursor != "no_more":
comments_res = await self.get_video_comments(photo_id, pcursor)
vision_commen_list = comments_res.get("visionCommentList", {})
pcursor = vision_commen_list.get("pcursor", "")
comments = vision_commen_list.get("rootComments", [])

if callback: # 如果有回调函数,就执行回调函数
await callback(photo_id, comments)

await asyncio.sleep(crawl_interval)
if not is_fetch_sub_comments:
result.extend(comments)
continue
# todo handle get sub comments
return result
51 changes: 51 additions & 0 deletions media_platform/bilibili/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# -*- coding: utf-8 -*-
# @Author : [email protected]
# @Time : 2023/12/2 18:44
# @Desc : B站爬虫

import asyncio
import os
import random
import time
from asyncio import Task
from typing import Dict, List, Optional, Tuple

from playwright.async_api import (BrowserContext, BrowserType, Page,
async_playwright)

import config
from base.base_crawler import AbstractCrawler
from models import kuaishou
from proxy.proxy_account_pool import AccountPool
from tools import utils
from var import comment_tasks_var, crawler_type_var

from .client import BilibiliClient
from .exception import DataFetchError
from .login import BilibiliLogin


class BilibiliCrawler(AbstractCrawler):
platform: str
login_type: str
crawler_type: str
context_page: Page
bili_client: BilibiliClient
account_pool: AccountPool
browser_context: BrowserContext

def __init__(self):
self.index_url = "https://www.bilibili.com"
self.user_agent = utils.get_user_agent()

def init_config(self, platform: str, login_type: str, account_pool: AccountPool, crawler_type: str):
self.platform = platform
self.login_type = login_type
self.account_pool = account_pool
self.crawler_type = crawler_type

async def start(self):
pass

async def search(self):
pass
14 changes: 14 additions & 0 deletions media_platform/bilibili/exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# -*- coding: utf-8 -*-
# @Author : [email protected]
# @Time : 2023/12/2 18:44
# @Desc :

from httpx import RequestError


class DataFetchError(RequestError):
"""something error when fetch"""


class IPBlockError(RequestError):
"""fetch so fast that the server block us ip"""
71 changes: 71 additions & 0 deletions media_platform/bilibili/help.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# -*- coding: utf-8 -*-
# @Author : [email protected]
# @Time : 2023/12/2 23:26
# @Desc : bilibili 请求参数签名
# 逆向实现参考:https://socialsisteryi.github.io/bilibili-API-collect/docs/misc/sign/wbi.html#wbi%E7%AD%BE%E5%90%8D%E7%AE%97%E6%B3%95
import urllib.parse
from hashlib import md5
from typing import Dict

from tools import utils


class BilibiliSign:
def __init__(self, img_key: str, sub_key: str):
self.img_key = img_key
self.sub_key = sub_key
self.map_table = [
46, 47, 18, 2, 53, 8, 23, 32, 15, 50, 10, 31, 58, 3, 45, 35, 27, 43, 5, 49,
33, 9, 42, 19, 29, 28, 14, 39, 12, 38, 41, 13, 37, 48, 7, 16, 24, 55, 40,
61, 26, 17, 0, 1, 60, 51, 30, 4, 22, 25, 54, 21, 56, 59, 6, 63, 57, 62, 11,
36, 20, 34, 44, 52
]

def get_salt(self) -> str:
"""
获取加盐的 key
:return:
"""
salt = ""
mixin_key = self.img_key + self.sub_key
for mt in self.map_table:
salt += mixin_key[mt]
return salt[:32]

def sign(self, req_data: Dict) -> Dict:
"""
请求参数中加上当前时间戳对请求参数中的key进行字典序排序
再将请求参数进行 url 编码集合 salt 进行 md5 就可以生成w_rid参数了
:param req_data:
:return:
"""
current_ts = utils.get_unix_timestamp()
req_data.update({"wts": current_ts})
req_data = dict(sorted(req_data.items()))
req_data = {
# 过滤 value 中的 "!'()*" 字符
k: ''.join(filter(lambda ch: ch not in "!'()*", str(v)))
for k, v
in req_data.items()
}
query = urllib.parse.urlencode(req_data)
salt = self.get_salt()
wbi_sign = md5((query + salt).encode()).hexdigest() # 计算 w_rid
req_data['w_rid'] = wbi_sign
# print(urllib.parse.urlencode(req_data))
return req_data


if __name__ == '__main__':
_img_key = "7cd084941338484aae1ad9425b84077c"
_sub_key = "4932caff0ff746eab6f01bf08b70ac45"
_search_url = "category_id=&search_type=video&ad_resource=5654&__refresh__=true&_extra=&context=&page=1&page_size=42&order=click&from_source=&from_spmid=333.337&platform=pc&highlight=1&single_column=0&keyword=python&qv_id=OQ8f2qtgYdBV1UoEnqXUNUl8LEDAdzsD&source_tag=3&gaia_vtoken=&dynamic_offset=0&web_location=1430654"
_req_data = dict()
for params in _search_url.split("&"):
kvalues = params.split("=")
key = kvalues[0]
value = kvalues[1]
_req_data[key] = value
print("pre req_data", _req_data)
_req_data = BilibiliSign(img_key=_img_key, sub_key=_sub_key).sign(req_data=_req_data)
print(_req_data)
19 changes: 19 additions & 0 deletions media_platform/bilibili/login.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
# @Author : [email protected]
# @Time : 2023/12/2 18:44
# @Desc :
from base.base_crawler import AbstractLogin


class BilibiliLogin(AbstractLogin):
async def begin(self):
pass

async def login_by_qrcode(self):
pass

async def login_by_mobile(self):
pass

async def login_by_cookies(self):
pass
6 changes: 5 additions & 1 deletion tools/time_util.py
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@

def get_current_timestamp() -> int:
"""
获取当前的时间戳:1701493264496
获取当前的时间戳(13 位):1701493264496
:return:
"""
return int(time.time() * 1000)
@@ -65,3 +65,7 @@ def get_unix_time_from_time_str(time_str):
except Exception as e:
return 0
pass


def get_unix_timestamp():
return int(time.time())

0 comments on commit 5aeee93

Please sign in to comment.