Skip to content

Commit

Permalink
✨ feat: 异步百万并发校验 GETAPI 中的接口
Browse files Browse the repository at this point in the history
  • Loading branch information
WhaleFell committed May 12, 2022
1 parent b52dd61 commit 0f736b3
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 558 deletions.
Empty file added debug/__init__.py
Empty file.
Binary file modified debug/api.db
Binary file not shown.
451 changes: 0 additions & 451 deletions debug/test-1.py

This file was deleted.

104 changes: 104 additions & 0 deletions handle_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# encoding=utf8
# 维护 api.提供去重等功能
from pathlib import Path
import json
from threading import Lock
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
import httpx
from httpx import Limits
import asyncio

from utils.sql import Sql
from utils.req import reqFunc, default_header
from utils.log import logger

path = Path(__file__).parent.absolute().joinpath("debug", "api.db")

sql = Sql(db_path=path)
sql.newTable()
lock = Lock()
q = Queue()


def read_url() -> str:
global q
with open("GETAPI.json", "r", encoding="utf8") as f:
data = json.load(fp=f)
for d in data:
if not (
(
d.startswith("https://") or
d.startswith("http://")
) and ("[phone]" in d)
):
# print(f"{d}淘汰")
continue
q.put(d)
logger.info(f"GETAPI接口总数:{q.qsize()}")
return q


def test():
while not q.empty():
i = q.get()
if reqFunc(i, "19820294268"):
with lock:
sql.update(i)


async def test2():
while not q.empty():
i = q.get()
_i = i.replace("[phone]", "19820294267")
async with httpx.AsyncClient(headers=default_header, timeout=100, limits=Limits(max_connections=1000, max_keepalive_connections=20), verify=False) as client:
try:
await client.get(_i)
# if r.status_code == 200:
sql.update(i)
# logger.info("更新")
except httpx.HTTPError as why:
if why is None:
logger.exception("未知的失败")
logger.error(f"请求失败{type(why)}{why} {i}")
except Exception as e:
logger.error("全局失败")
logger.exception(e)


async def asMain():
await asyncio.gather(

*(
test2()
for _ in range(150)
)

)


def save_api():
"""保存api到 GETAPI.json 文件"""
apis = sql.select()
api_lst = [
api
for api in apis
]
with open("GETAPI.json", mode="w", encoding="utf8") as j:
json.dump(fp=j, obj=api_lst, ensure_ascii=False)
logger.success("写入到 GETAPI.json 成功!")


def main():
read_url()
# with ThreadPoolExecutor(max_workers=1024) as pool:
# for _ in range(1024):
# pool.submit(test)
loop = asyncio.get_event_loop()
loop.run_until_complete(asMain())


if __name__ == "__main__":
main()
# read_url()
save_api()
41 changes: 0 additions & 41 deletions index.py

This file was deleted.

59 changes: 0 additions & 59 deletions utils.py

This file was deleted.

5 changes: 4 additions & 1 deletion utils/req.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from utils.models import API
from utils.log import logger


def reqAPI(api: API, client: httpx.Client) -> httpx.Response:
if isinstance(api.data, dict):
resp = client.request(method=api.method, json=api.data,
Expand All @@ -17,7 +18,7 @@ def reqAPI(api: API, client: httpx.Client) -> httpx.Response:
return resp


def reqFunc(api: Union[API, str], phone: tuple):
def reqFunc(api: Union[API, str], phone: Union[tuple, str]):
"""请求接口方法"""
# 多手机号支持
if isinstance(phone, tuple):
Expand All @@ -36,5 +37,7 @@ def reqFunc(api: Union[API, str], phone: tuple):
api = api.replace("[phone]", ph)
resp = client.get(url=api, headers=default_header)
logger.info(f"GETAPI接口-{resp.text[:30]}")
return True
except httpx.HTTPError as why:
logger.error(f"请求失败{why}")
return False
14 changes: 8 additions & 6 deletions utils/sql.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
# encoding=utf8
# 读写sqlite数据库
from pathlib import Path
from utils.log import logger
import sqlite3


class Sql(object):
"""处理SQL数据"""

def __init__(self) -> None:
def __init__(self, db_path: Path) -> None:
'''初始化数据库'''
# 数据库路径
db_path = Path.cwd().joinpath("api.db")
# db_path = Path.cwd().joinpath("api.db")
# 连接数据库,不检查是否在同一个线程.
self.client = sqlite3.connect(
db_path, timeout=6, check_same_thread=False)
Expand All @@ -36,9 +38,10 @@ def update(self, url):
try:
self.cursor.execute(sql, (url,))
self.client.commit()
logger.success("插入成功!")
return True
except sqlite3.IntegrityError:
# print(f"{url} 数据重复!")
logger.error(f"数据重复!")
return False

def select(self) -> list:
Expand All @@ -54,10 +57,9 @@ def select(self) -> list:
urls.append(url[0])
return urls
except Exception as e:
print('读取出现错误!', e)
logger.error('读取出现错误!', e)

def __del__(self) -> None:
'''对象被删除时执行的函数'''
print(f"共改变{self.client.total_changes}条数据!,正在关闭数据库连接......")
logger.info(f"共改变{self.client.total_changes}条数据!,正在关闭数据库连接......")
self.client.close()

0 comments on commit 0f736b3

Please sign in to comment.