forked from OpenEthan/SMSBoom
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspider-api.py
executable file
·224 lines (189 loc) · 6.49 KB
/
spider-api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
#!/usr/bin/python python3
# coding=utf-8
# 爬取轰炸平台接口
from loguru import logger
import httpx
import requests
import re
from utils import Sql
import queue
import pathlib
import threading
import sys
import json
from prettytable import PrettyTable
import click
import urllib3
urllib3.disable_warnings()
# logger config
logger.remove()
logger.add(
sink=sys.stdout,
format="<green>{time:YYYY-MM-DD at HH:mm:ss}</green> - <level>{level}</level> - <level>{message}</level>",
colorize=True,
backtrace=True
)
path = pathlib.Path(__file__).parent
header = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.9 Safari/537.36",
}
class SMS(object):
# 默认的请求密钥
default_phone = "15019682928"
key_default = f"?hm={default_phone}&ok="
def __init__(self, website, key) -> None:
self.url = website
self.header = header
if key == "":
self.key = self.key_default
self.api_queue = queue.Queue()
self.db = Sql()
self.lock = threading.Lock()
self.ok_api = 0
def get_sms_api(self):
'''请求短信轰炸平台'''
with httpx.Client(verify=False) as ses:
ses.get(self.url, headers=self.header)
resp = ses.get(f"{self.url}{self.key}", headers=self.header)
pat = re.compile(r"<img src='(.*?)' alt")
apis = pat.findall(resp.text)
assert not apis == [], "未找到任何接口!"
# print(resp.text)
logger.info("获取到的原始接口总数:%s" % (len(apis)))
for api in apis:
# 三重校验网址
# 排除接口中没有电话号码的网址
if self.default_phone not in api:
continue
# 去除空白字符并替换默认手机号
api = api.strip().replace(" ", "").replace(
self.default_phone, "[phone]")
# 校验网址开头
if not (api.startswith("https://") or api.startswith("http://")):
continue
self.api_queue.put(api)
logger.info("Put到队列的接口总数:%s" % (self.api_queue.qsize()))
self.size = self.api_queue.qsize()
def check_theads(self):
'''多线程检查可用性'''
while not self.api_queue.empty():
api = self.api_queue.get()
try:
with requests.get(api.replace("[phone]", self.default_phone), headers=self.header, timeout=8, verify=False) as resp:
if resp.status_code == 200:
with self.lock:
self.db.update(api)
except Exception as e:
pass
finally:
self.api_queue.task_done()
def main(self):
self.get_sms_api()
# 在此设置线程数 int 类型
threads_count = 254
threads = [
threading.Thread(target=self.check_theads,
name=f"{i}", daemon=True)
for i in range(1, threads_count+1)
]
for thread in threads:
thread.start()
logger.info("多线程校验进行中......(可能耗时比较长)")
from tqdm import tqdm
import time
with tqdm(total=self.size) as pbar:
while not self.api_queue.empty():
pbar.update(self.size-self.api_queue.qsize())
self.size = self.api_queue.qsize()
time.sleep(0.5)
self.api_queue.join()
logger.info(f"总接口数目(去重后):{len(self.db.select())}")
def test_api_web(url: str) -> tuple:
"""check api web is ok?
:return: tuple
"""
if url is None:
return
with httpx.Client(headers=header, verify=False) as client:
try:
resp = client.get(url=url).text
title = re.findall('<title>(.*?)</title>', resp)
if title:
logger.info(f"{url} title:{title[0]}")
return (title[0], url)
except httpx.HTTPError as why:
logger.error(f"{url} 请求错误! {why}")
return
def load_api_web():
"""从 json 文件加载轰炸网址.并测试!
:return:
"""
json_path = pathlib.Path(path, 'hz-web.json')
table = PrettyTable(["标题", "链接"])
if not json_path.exists():
logger.error(f"hz-web.json not exists in {str(json_path)}!")
return
j = json_path.read_text(encoding="utf8")
ok_web = []
try:
webs = json.loads(j)
except json.decoder.JSONDecodeError as why:
logger.error(f"json syctax error! {why}")
return
for web in webs:
result = test_api_web(web['url'])
if result:
table.add_row([result[0], result[1]])
ok_web.append(
{"url": result[1], "key": web.get('key'), "title": result[0]})
logger.success(f"有效的轰炸网站:\n{table}")
if input(">>是否写入 hz-web.json?(Y/n)") == "Y":
with open(json_path, encoding="utf8", mode="w") as fp:
try:
json.dump(ok_web, fp, ensure_ascii=False)
logger.success("save hz-web.json success!")
except Exception as why:
logger.error(f"write hz-web.json error {why}")
return ok_web
@click.group()
def cli():
pass
@click.command()
def spider_all():
"""
根据目录下的 hz-web.json 文件更新接口
"""
websites = load_api_web()
for website in websites:
logger.info(f"正在爬取:{website['url']}")
try:
sms = SMS(website=website['url'], key=website['key']).main()
except Exception as why:
logger.critical(f"爬取:{website['url']} 出错:{why}")
@click.command()
@click.option('--url', help='轰炸网站的网址,结尾需要带/', prompt=True)
@click.option('--key', help='网址携带的参数(可选)', default="")
def spider_one(url, key):
"""爬取单个网址."""
try:
sms = SMS(website=url, key=key).main()
except Exception as why:
logger.critical(f"爬取:{url} 出错:{why}")
@click.command()
@logger.catch
def save_api():
"""保存api到 GETAPI.json 文件"""
db = Sql()
apis = db.select()
api_lst = [
api
for api in apis
]
with open("GETAPI.json", mode="w") as j:
json.dump(fp=j, obj=api_lst, ensure_ascii=False)
logger.success("写入到 GETAPI.json 成功!")
cli.add_command(spider_all)
cli.add_command(spider_one)
cli.add_command(save_api)
if __name__ == '__main__':
cli()