Skip to content

Commit

Permalink
feat: 代理IP缓存到redis中
Browse files Browse the repository at this point in the history
  • Loading branch information
NanmiCoder committed Dec 6, 2023
1 parent f71d086 commit c530bd4
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 50 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,4 @@ cython_debug/
/temp_image/
/browser_data/
/data/
/cache
6 changes: 3 additions & 3 deletions media_platform/bilibili/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@
# @Desc : bilibili 请求客户端
import asyncio
import json
from typing import Any, Callable, Dict, Optional, List, Tuple
from typing import Any, Callable, Dict, List, Optional, Tuple
from urllib.parse import urlencode

import httpx
from playwright.async_api import BrowserContext, Page

from tools import utils

from .help import BilibiliSign
from .exception import DataFetchError
from .field import SearchOrderType, CommentOrderType
from .field import CommentOrderType, SearchOrderType
from .help import BilibiliSign


class BilibiliClient:
Expand Down
1 change: 0 additions & 1 deletion media_platform/bilibili/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from tools import utils
from var import comment_tasks_var, crawler_type_var


from .client import BilibiliClient
from .field import SearchOrderType
from .login import BilibiliLogin
Expand Down
1 change: 0 additions & 1 deletion media_platform/bilibili/login.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import config
from base.base_crawler import AbstractLogin
from tools import utils
from base.base_crawler import AbstractLogin


class BilibiliLogin(AbstractLogin):
Expand Down
2 changes: 1 addition & 1 deletion media_platform/douyin/client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import copy
import urllib.parse
from typing import Any, Callable, Dict, Optional, List
from typing import Any, Callable, Dict, List, Optional

import execjs
import httpx
Expand Down
6 changes: 4 additions & 2 deletions proxy/proxy_ip_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
# @Author : [email protected]
# @Time : 2023/12/2 13:45
# @Desc : ip代理池实现
import json
import pathlib
import random
from typing import List
from typing import Dict, List

import httpx
from tenacity import retry, stop_after_attempt, wait_fixed
Expand All @@ -22,7 +24,7 @@ def __init__(self, ip_pool_count: int, enable_validate_ip: bool) -> None:

async def load_proxies(self) -> None:
"""
从 HTTP 代理商获取 IP 列表
解析
:return:
"""
self.proxy_list = await IpProxy.get_proxies(self.ip_pool_count)
Expand Down
118 changes: 79 additions & 39 deletions proxy/proxy_ip_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@
# @Url : 现在实现了极速HTTP的接口,官网地址:https://www.jisuhttp.com/?pl=mAKphQ&plan=ZY&kd=Yang

import asyncio
import json
import os
from abc import ABC, abstractmethod
from typing import Dict, List, Optional
from urllib.parse import urlencode

import httpx
import redis
from pydantic import BaseModel, Field

import config
from tools import utils


Expand Down Expand Up @@ -41,71 +44,108 @@ async def get_proxies(self, num: int) -> List[Dict]:
pass


class RedisDbIpCache:
def __init__(self):
self.redis_client = redis.Redis(host=config.REDIS_DB_HOST, password=config.REDIS_DB_PWD)

def set_ip(self, ip_key: str, ip_value_info: str, ex: int):
"""
设置IP并带有过期时间,到期之后由 redis 负责删除
:param ip_key:
:param ip_value_info:
:param ex:
:return:
"""
self.redis_client.set(name=ip_key, value=ip_value_info, ex=ex)

def load_all_ip(self, proxy_brand_name: str) -> List[IpInfoModel]:
"""
从 redis 中加载所有还未过期的 IP 信息
:param proxy_brand_name: 代理商名称
:return:
"""
all_ip_list: List[IpInfoModel] = []
all_ip_keys: List[str] = self.redis_client.keys(pattern=f"{proxy_brand_name}_*")
try:
for ip_key in all_ip_keys:
ip_value = self.redis_client.get(ip_key)
if not ip_value:
continue
all_ip_list.append(IpInfoModel(**json.loads(ip_value)))
except Exception as e:
utils.logger.error("[RedisDbIpCache.load_all_ip] get ip err from redis db", e)
return all_ip_list


class JiSuHttpProxy(ProxyProvider):
def __init__(self, exract_type: str, key: str, crypto: str, res_type: str, protocol: int, time: int):
def __init__(self, key: str, crypto: str, time_validity_period: int):
"""
极速HTTP 代理IP实现
官网地址:https://www.jisuhttp.com/?pl=mAKphQ&plan=ZY&kd=Yang
:param exract_type: 提取方式
:param key: 提取key值 (到上面链接的官网去注册后获取)
:param crypto: 加密签名 (到上面链接的官网去注册后获取)
:param res_type: 返回的数据格式:TXT、JSON
:param protocol: IP协议:1:HTTP、2:HTTPS、3:SOCKS5
:param time: IP使用时长,支持3、5、10、15、30分钟时效
:param key: 提取key值 (去官网注册后获取)
:param crypto: 加密签名 (去官网注册后获取)
"""
self.exract_type = exract_type
self.proxy_brand_name = "JISUHTTP"
self.api_path = "https://api.jisuhttp.com"
self.params = {
"key": key,
"crypto": crypto,
"type": res_type,
"port": protocol,
"time": time,
"time": time_validity_period, # IP使用时长,支持3、5、10、15、30分钟时效
"type": "json", # 数据结果为json
"port": "2", # IP协议:1:HTTP、2:HTTPS、3:SOCKS5
"pw": "1", # 是否使用账密验证, 1:是,0:否,否表示白名单验证;默认为0
"se": "1", # 返回JSON格式时是否显示IP过期时间, 1:显示,0:不显示;默认为0
}
self.ip_cache = RedisDbIpCache()

async def get_proxies(self, num: int) -> List[IpInfoModel]:
"""
:param num:
:return:
"""
if self.exract_type == "API":
uri = "/fetchips"
self.params.update({"num": num})
ip_infos = []
async with httpx.AsyncClient() as client:
url = self.api_path + uri + '?' + urlencode(self.params)
utils.logger.info(f"[JiSuHttpProxy] get ip proxy url:{url}")
response = await client.get(url, headers={"User-Agent": "MediaCrawler"})
res_dict: Dict = response.json()
if res_dict.get("code") == 0:
data: List[Dict] = res_dict.get("data")
for ip_item in data:
ip_info_model = IpInfoModel(
ip=ip_item.get("ip"),
port=ip_item.get("port"),
user=ip_item.get("user"),
password=ip_item.get("pass"),
expired_time_ts=utils.get_unix_time_from_time_str(ip_item.get("expire"))
)
ip_infos.append(ip_info_model)
else:
raise IpGetError(res_dict.get("msg", "unkown err"))
return ip_infos
else:
pass

# 优先从缓存中拿 IP
ip_cache_list = self.ip_cache.load_all_ip(proxy_brand_name=self.proxy_brand_name)
if len(ip_cache_list) >= num:
return ip_cache_list[:num]

# 如果缓存中的数量不够,从IP代理商获取补上,再存入缓存中
need_get_count = num - len(ip_cache_list)
self.params.update({"num": need_get_count})
ip_infos = []
async with httpx.AsyncClient() as client:
url = self.api_path + "/fetchips" + '?' + urlencode(self.params)
utils.logger.info(f"[JiSuHttpProxy] get ip proxy url:{url}")
response = await client.get(url, headers={
"User-Agent": "MediaCrawler https://github.com/NanmiCoder/MediaCrawler"})
res_dict: Dict = response.json()
if res_dict.get("code") == 0:
data: List[Dict] = res_dict.get("data")
current_ts = utils.get_unix_timestamp()
for ip_item in data:
ip_info_model = IpInfoModel(
ip=ip_item.get("ip"),
port=ip_item.get("port"),
user=ip_item.get("user"),
password=ip_item.get("pass"),
expired_time_ts=utils.get_unix_time_from_time_str(ip_item.get("expire"))
)
ip_key = f"JISUHTTP_{ip_info_model.ip}_{ip_info_model.port}_{ip_info_model.user}_{ip_info_model.password}"
ip_value = ip_info_model.model_dump_json()
ip_infos.append(ip_info_model)
self.ip_cache.set_ip(ip_key, ip_value, ex=ip_info_model.expired_time_ts - current_ts)
else:
raise IpGetError(res_dict.get("msg", "unkown err"))
return ip_cache_list + ip_infos


IpProxy = JiSuHttpProxy(
key=os.getenv("jisu_key", ""), # 通过环境变量的方式获取极速HTTPIP提取key值
crypto=os.getenv("jisu_crypto", ""), # 通过环境变量的方式获取极速HTTPIP提取加密签名
res_type="json",
protocol=2,
time=30
time_validity_period=30 # 30分钟(最长时效)
)

if __name__ == '__main__':
# 每一次提取都要消耗 IP 数量,谨慎使用
_ip_infos = asyncio.run(IpProxy.get_proxies(1))
print(_ip_infos)
7 changes: 4 additions & 3 deletions test/test_proxy_ip_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@

class TestIpPool(IsolatedAsyncioTestCase):
async def test_ip_pool(self):
pool = await create_ip_pool(ip_pool_count=30, enable_validate_ip=False)
for i in range(30):
pool = await create_ip_pool(ip_pool_count=3, enable_validate_ip=True)
for i in range(3):
ip_proxy_info: IpInfoModel = await pool.get_proxy()
self.assertIsNotNone(ip_proxy_info.ip, msg="验证 ip 是否获取成功")
print(ip_proxy_info)
self.assertIsNotNone(ip_proxy_info.ip, msg="验证 ip 是否获取成功")

0 comments on commit c530bd4

Please sign in to comment.