Skip to content

Commit

Permalink
feat: 微博支持评论 & 指定帖子
Browse files Browse the repository at this point in the history
  • Loading branch information
NanmiCoder committed Dec 24, 2023
1 parent b1441ab commit eee8162
Show file tree
Hide file tree
Showing 12 changed files with 228 additions and 29 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
| 小红书 ||||||||||
| 抖音 ||||||||||
| 快手 ||||||||||
| B 站 ||||| |||||
| 微博 ||||| |||||
| B 站 ||||||||||
| 微博 ||||| |||||


## 使用方法
Expand Down
8 changes: 8 additions & 0 deletions base/base_crawler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from abc import ABC, abstractmethod
from typing import Dict, Optional

from playwright.async_api import BrowserContext, BrowserType


class AbstractCrawler(ABC):
Expand All @@ -14,6 +17,11 @@ async def start(self):
async def search(self):
pass

@abstractmethod
async def launch_browser(self, chromium: BrowserType, playwright_proxy: Optional[Dict], user_agent: Optional[str],
headless: bool = True) -> BrowserContext:
pass


class AbstractLogin(ABC):
@abstractmethod
Expand Down
10 changes: 8 additions & 2 deletions config/base_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
CRAWLER_MAX_NOTES_COUNT = 20

# 并发爬虫数量控制
MAX_CONCURRENCY_NUM = 10
MAX_CONCURRENCY_NUM = 4

# 每个视频/帖子抓取评论最大条数 (为0则不限制)
MAX_COMMENTS_PER_POST = 10
MAX_COMMENTS_PER_POST = 0

# 评论关键词筛选(只会留下包含关键词的评论,为空不限制)
COMMENT_KEYWORDS = [
Expand Down Expand Up @@ -63,3 +63,9 @@
"BV14Q4y1n7jz",
# ........................
]

# 指定微博平台需要爬取的帖子列表
WEIBO_SPECIFIED_ID_LIST = [
"4982041758140155",
# ........................
]
2 changes: 1 addition & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from media_platform.bilibili import BilibiliCrawler
from media_platform.douyin import DouYinCrawler
from media_platform.kuaishou import KuaishouCrawler
from media_platform.xhs import XiaoHongShuCrawler
from media_platform.weibo import WeiboCrawler
from media_platform.xhs import XiaoHongShuCrawler


class CrawlerFactory:
Expand Down
2 changes: 1 addition & 1 deletion media_platform/weibo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
# @Author : [email protected]
# @Time : 2023/12/23 15:40
# @Desc :
from .client import WeiboClient
from .core import WeiboCrawler
from .login import WeiboLogin
from .client import WeiboClient
84 changes: 82 additions & 2 deletions media_platform/weibo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
# @Desc : 微博爬虫 API 请求 client

import asyncio
import copy
import json
import re
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from urllib.parse import urlencode

Expand Down Expand Up @@ -47,12 +49,15 @@ async def request(self, method, url, **kwargs) -> Any:
else:
return data.get("data", {})

async def get(self, uri: str, params=None) -> Dict:
async def get(self, uri: str, params=None, headers=None) -> Dict:
final_uri = uri
if isinstance(params, dict):
final_uri = (f"{uri}?"
f"{urlencode(params)}")
return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=self.headers)

if headers is None:
headers = self.headers
return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=headers)

async def post(self, uri: str, data: dict) -> Dict:
json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False)
Expand Down Expand Up @@ -96,3 +101,78 @@ async def get_note_by_keyword(
"page": page,
}
return await self.get(uri, params)

async def get_note_comments(self, mid_id: str, max_id: int) -> Dict:
"""get notes comments
:param mid_id: 微博ID
:param max_id: 分页参数ID
:return:
"""
uri = "/comments/hotflow"
params = {
"id": mid_id,
"mid": mid_id,
"max_id_type": 0,
}
if max_id > 0:
params.update({"max_id": max_id})

referer_url = f"https://m.weibo.cn/detail/{mid_id}"
headers = copy.copy(self.headers)
headers["Referer"] = referer_url

return await self.get(uri, params, headers=headers)

async def get_note_all_comments(self, note_id: str, crawl_interval: float = 1.0, is_fetch_sub_comments=False,
callback: Optional[Callable] = None, ):
"""
get note all comments include sub comments
:param note_id:
:param crawl_interval:
:param is_fetch_sub_comments:
:param callback:
:return:
"""

result = []
is_end = False
max_id = -1
while not is_end:
comments_res = await self.get_note_comments(note_id, max_id)
max_id: int = comments_res.get("max_id")
comment_list: List[Dict] = comments_res.get("data", [])
is_end = max_id == 0
if callback: # 如果有回调函数,就执行回调函数
await callback(note_id, comment_list)
await asyncio.sleep(crawl_interval)
if not is_fetch_sub_comments:
result.extend(comment_list)
continue
# todo handle get sub comments
return result

async def get_note_info_by_id(self, note_id: str) -> Dict:
"""
根据帖子ID获取详情
:param note_id:
:return:
"""
url = f"{self._host}/detail/{note_id}"
async with httpx.AsyncClient(proxies=self.proxies) as client:
response = await client.request(
"GET", url, timeout=self.timeout, headers=self.headers
)
if response.status_code != 200:
raise DataFetchError(f"get weibo detail err: {response.text}")
match = re.search(r'var \$render_data = (\[.*?\])\[0\]', response.text, re.DOTALL)
if match:
render_data_json = match.group(1)
render_data_dict = json.loads(render_data_json)
note_detail = render_data_dict[0].get("status")
note_item = {
"mblog": note_detail
}
return note_item
else:
utils.logger.info(f"[WeiboClient.get_note_info_by_id] 未找到$render_data的值")
return dict()
100 changes: 96 additions & 4 deletions media_platform/weibo/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@

from .client import WeiboClient
from .exception import DataFetchError
from .login import WeiboLogin
from .field import SearchType
from .help import filter_search_result_card
from .login import WeiboLogin


class WeiboCrawler(AbstractCrawler):
Expand All @@ -38,7 +38,7 @@ class WeiboCrawler(AbstractCrawler):

def __init__(self):
self.index_url = "https://m.weibo.cn"
self.user_agent = utils.get_user_agent()
self.user_agent = utils.get_mobile_user_agent()

def init_config(self, platform: str, login_type: str, crawler_type: str):
self.platform = platform
Expand Down Expand Up @@ -85,7 +85,7 @@ async def start(self):
await self.search()
elif self.crawler_type == "detail":
# Get the information and comments of the specified post
pass
await self.get_specified_notes()
else:
pass
utils.logger.info("[WeiboCrawler.start] Bilibili Crawler finished ...")
Expand All @@ -109,12 +109,104 @@ async def search(self):
note_id_list: List[str] = []
note_list = filter_search_result_card(search_res.get("cards"))
for note_item in note_list:
if note_item :
if note_item:
mblog: Dict = note_item.get("mblog")
note_id_list.append(mblog.get("id"))
await weibo.update_weibo_note(note_item)

page += 1
await self.batch_get_notes_comments(note_id_list)

async def get_specified_notes(self):
"""
get specified notes info
:return:
"""
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list = [
self.get_note_info_task(note_id=note_id, semaphore=semaphore) for note_id in
config.WEIBO_SPECIFIED_ID_LIST
]
video_details = await asyncio.gather(*task_list)
for note_item in video_details:
if note_item:
await weibo.update_weibo_note(note_item)
await self.batch_get_notes_comments(config.WEIBO_SPECIFIED_ID_LIST)

async def get_note_info_task(self, note_id: str, semaphore: asyncio.Semaphore) -> Optional[Dict]:
"""
Get note detail task
:param note_id:
:param semaphore:
:return:
"""
async with semaphore:
try:
result = await self.wb_client.get_note_info_by_id(note_id)
return result
except DataFetchError as ex:
utils.logger.error(f"[WeiboCrawler.get_note_info_task] Get note detail error: {ex}")
return None
except KeyError as ex:
utils.logger.error(
f"[WeiboCrawler.get_note_info_task] have not fund note detail note_id:{note_id}, err: {ex}")
return None

async def batch_get_notes_comments(self, note_id_list: List[str]):
"""
batch get notes comments
:param note_id_list:
:return:
"""
utils.logger.info(f"[WeiboCrawler.batch_get_notes_comments] note ids:{note_id_list}")
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list: List[Task] = []
for note_id in note_id_list:
task = asyncio.create_task(self.get_note_comments(note_id, semaphore), name=note_id)
task_list.append(task)
await asyncio.gather(*task_list)

async def get_note_comments(self, note_id: str, semaphore: asyncio.Semaphore):
"""
get comment for note id
:param note_id:
:param semaphore:
:return:
"""
async with semaphore:
try:
utils.logger.info(f"[WeiboCrawler.get_note_comments] begin get note_id: {note_id} comments ...")

# Read keyword and quantity from config
keywords = config.COMMENT_KEYWORDS
max_comments = config.MAX_COMMENTS_PER_POST

# Download comments
all_comments = await self.wb_client.get_note_all_comments(
note_id=note_id,
crawl_interval=random.random(),
)

# Filter comments by keyword
if keywords:
filtered_comments = [
comment for comment in all_comments if
any(keyword in comment["content"]["message"] for keyword in keywords)
]
else:
filtered_comments = all_comments

# Limit the number of comments
if max_comments > 0:
filtered_comments = filtered_comments[:max_comments]

# Update weibo note comments
await weibo.batch_update_weibo_note_comments(note_id, filtered_comments)

except DataFetchError as ex:
utils.logger.error(f"[WeiboCrawler.get_note_comments] get note_id: {note_id} comment error: {ex}")
except Exception as e:
utils.logger.error(f"[WeiboCrawler.get_note_comments] may be been blocked, err:{e}")

async def create_weibo_client(self, httpx_proxy: Optional[str]) -> WeiboClient:
"""Create xhs client"""
Expand Down
2 changes: 1 addition & 1 deletion media_platform/weibo/help.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# @Time : 2023/12/24 17:37
# @Desc :

from typing import List, Dict
from typing import Dict, List


def filter_search_result_card(card_list: List[Dict]) -> List[Dict]:
Expand Down
2 changes: 1 addition & 1 deletion models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .bilibili import *
from .douyin import *
from .kuaishou import *
from .weibo import *
from .xiaohongshu import *
from .weibo import *
27 changes: 13 additions & 14 deletions models/weibo.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,19 @@ async def update_weibo_note(note_item: Dict):
"avatar": user_info.get("profile_image_url", ""),
}
utils.logger.info(
f"[models.weibo.update_weibo_video] weibo note id:{note_id}, title:{local_db_item.get('content')[:24]} ...")
f"[models.weibo.update_weibo_note] weibo note id:{note_id}, title:{local_db_item.get('content')[:24]} ...")
if config.IS_SAVED_DATABASED:
if not await WeiboNote.filter(note_id=note_id).exists():
local_db_item["add_ts"] = utils.get_current_timestamp()
weibo_video_pydantic = pydantic_model_creator(WeiboNote, name='WeiboNoteCreate', exclude=('id',))
weibo_data = weibo_video_pydantic(**local_db_item)
weibo_video_pydantic.model_validate(weibo_data)
weibo_note_pydantic = pydantic_model_creator(WeiboNote, name='WeiboNoteCreate', exclude=('id',))
weibo_data = weibo_note_pydantic(**local_db_item)
weibo_note_pydantic.model_validate(weibo_data)
await WeiboNote.create(**weibo_data.model_dump())
else:
weibo_video_pydantic = pydantic_model_creator(WeiboNote, name='WeiboNoteUpdate',
weibo_note_pydantic = pydantic_model_creator(WeiboNote, name='WeiboNoteUpdate',
exclude=('id', 'add_ts'))
weibo_data = weibo_video_pydantic(**local_db_item)
weibo_video_pydantic.model_validate(weibo_data)
weibo_data = weibo_note_pydantic(**local_db_item)
weibo_note_pydantic.model_validate(weibo_data)
await WeiboNote.filter(note_id=note_id).update(**weibo_data.model_dump())
else:
# Below is a simple way to save it in CSV format.
Expand All @@ -116,23 +116,22 @@ async def update_weibo_note(note_item: Dict):
writer.writerow(local_db_item.values())


async def batch_update_weibo_video_comments(video_id: str, comments: List[Dict]):
async def batch_update_weibo_note_comments(note_id: str, comments: List[Dict]):
if not comments:
return
for comment_item in comments:
await update_weibo_video_comment(video_id, comment_item)
await update_weibo_note_comment(note_id, comment_item)


async def update_weibo_video_comment(note_id: str, comment_item: Dict):
async def update_weibo_note_comment(note_id: str, comment_item: Dict):
comment_id = str(comment_item.get("id"))
content: Dict = comment_item.get("text")
user_info: Dict = comment_item.get("member")
user_info: Dict = comment_item.get("user")
local_db_item = {
"comment_id": comment_id,
"create_time": utils.rfc2822_to_timestamp(comment_item.get("created_at")),
"create_date_time": str(utils.rfc2822_to_china_datetime(comment_item.get("created_at"))),
"note_id": note_id,
"content": content.get("message"),
"content": comment_item.get("text"),
"sub_comment_count": str(comment_item.get("total_number", 0)),
"comment_like_count": str(comment_item.get("like_count", 0)),
"last_modify_ts": utils.get_current_timestamp(),
Expand All @@ -146,7 +145,7 @@ async def update_weibo_video_comment(note_id: str, comment_item: Dict):
"avatar": user_info.get("profile_image_url", ""),
}
utils.logger.info(
f"[models.weibo.update_weibo_video_comment] Weibo note comment: {comment_id}, content: {local_db_item.get('content','')[:24]} ...")
f"[models.weibo.update_weibo_note_comment] Weibo note comment: {comment_id}, content: {local_db_item.get('content','')[:24]} ...")
if config.IS_SAVED_DATABASED:
if not await WeiboComment.filter(comment_id=comment_id).exists():
local_db_item["add_ts"] = utils.get_current_timestamp()
Expand Down
Loading

0 comments on commit eee8162

Please sign in to comment.