forked from OpenEthan/SMSBoom
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
7 changed files
with
220 additions
and
113 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
default_header = { | ||
"User-Agent": "Mozilla/5.0 (Linux; U; Android 10; zh-cn; Mi 10 Build/QKQ1.191117.002) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/79.0.3945.147 Mobile Safari/537.36 XiaoMi/MiuiBrowser/13.5.40" | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
# encoding=utf8 | ||
# 日志模块 | ||
from loguru import logger | ||
import pathlib | ||
import sys | ||
import os | ||
|
||
# 终端日志输出格式 | ||
stdout_fmt = '{level.icon} <cyan>{time:HH:mm:ss,SSS}</cyan> ' \ | ||
'[<level>{level}</level>] ' \ | ||
'<cyan>{thread.name}</cyan> ' \ | ||
'<blue>{module}</blue>:<cyan>{line}</cyan> - ' \ | ||
'<level>{message}</level>' | ||
|
||
# 日志文件记录格式 | ||
# logfile_fmt = '<light-green>{time:YYYY-MM-DD HH:mm:ss,SSS}</light-green> ' \ | ||
# '[<level>{level: <5}</level>] ' \ | ||
# '<cyan>{process.name}({process.id})</cyan>:' \ | ||
# '<cyan>{thread.name: <10}({thread.id: <5})</cyan> | ' \ | ||
# '<blue>{module}</blue>.<blue>{function}</blue>:' \ | ||
# '<blue>{line}</blue> - <level>{message}</level>' | ||
|
||
logfile_fmt = '<light-green>{time:YYYY-MM-DD HH:mm:ss,SSS}</light-green> ' \ | ||
'[<level>{level}</level>] ' \ | ||
'<blue>{module}</blue>.<blue>{function}</blue>:' \ | ||
'<blue>{line}</blue> - <level>{message}</level>' | ||
|
||
log_pathDir = pathlib.Path(os.getcwd()).resolve().joinpath('logs') | ||
if not log_pathDir.is_dir(): | ||
log_pathDir.mkdir() | ||
log_path = log_pathDir.joinpath('run.log').resolve() | ||
|
||
logger.remove() | ||
|
||
if not os.environ.get('PYTHONIOENCODING'): # 设置编码 | ||
os.environ['PYTHONIOENCODING'] = 'utf-8' | ||
|
||
logger.add(sys.stderr, level='INFO', format=stdout_fmt, enqueue=True) | ||
# 输出到文件 | ||
# logger.add(log_path, level='DEBUG', format=logfile_fmt, | ||
# enqueue=True, encoding='utf-8') | ||
|
||
if __name__ == "__main__": | ||
logger.info("test") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
# encoding=utf8 | ||
# 一些模型 | ||
from pydantic import BaseModel | ||
from typing import Union, Optional | ||
from datetime import datetime | ||
import json | ||
from utils import default_header | ||
|
||
class API(BaseModel): | ||
"""处理自定义 API 数据""" | ||
desc: str = "Default" | ||
url: str | ||
method: str = "GET" | ||
header: Optional[Union[str, dict]] = default_header | ||
data: Optional[Union[str, dict]] | ||
|
||
def replace_data(self, content: Union[str, dict], phone: str) -> str: | ||
# 统一转换成 str 再替换. ' -> " | ||
if phone: | ||
content = str(content).replace("[phone]", phone).replace( | ||
"[timestamp]", self.timestamp_new()).replace("'", '"') | ||
|
||
# 尝试 json 化 | ||
try: | ||
return json.loads(content.replace("'", '"')) | ||
except: | ||
return content | ||
|
||
def timestamp_new(self) -> str: | ||
"""返回整数字符串时间戳""" | ||
return str(int(datetime.now().timestamp())) | ||
|
||
def handle_API(self, phone: str=None): | ||
""" 传入手机号处理 API | ||
:param API: one API basemodel | ||
:return: API basemodel | ||
""" | ||
# 仅仅当传入 phone 参数时添加 Referer | ||
# fix: 这段代码很有问题....... | ||
if phone: | ||
# 进入的 header 是个字符串 | ||
if self.header == "": | ||
self.header = {} | ||
self.header['Referer'] = self.url # 增加 Referer | ||
|
||
self.header = self.replace_data(self.header, phone) | ||
if not self.header.get('Referer'): | ||
self.header['Referer'] = self.url # 增加 Referer | ||
|
||
self.data = self.replace_data(self.data, phone) | ||
self.url = self.replace_data(self.url, phone) | ||
# print(self) | ||
return self |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
# encoding=utf8 | ||
# 请求的方法 | ||
import httpx | ||
from typing import Union | ||
|
||
from utils import default_header | ||
from utils.models import API | ||
from utils.log import logger | ||
|
||
def reqAPI(api: API, client: httpx.Client) -> httpx.Response: | ||
if isinstance(api.data, dict): | ||
resp = client.request(method=api.method, json=api.data, | ||
headers=api.header, url=api.url) | ||
else: | ||
resp = client.request(method=api.method, data=api.data, | ||
headers=api.header, url=api.url) | ||
return resp | ||
|
||
|
||
def reqFunc(api: Union[API, str], phone: tuple): | ||
"""请求接口方法""" | ||
# 多手机号支持 | ||
if isinstance(phone, tuple): | ||
phone_lst = [_ for _ in phone] | ||
else: | ||
phone_lst = [phone] | ||
|
||
with httpx.Client(headers=default_header, verify=False) as client: | ||
for ph in phone_lst: | ||
try: | ||
if isinstance(api, API): | ||
api = api.handle_API(ph) | ||
resp = reqAPI(api, client) | ||
logger.info(f"{api.desc}-{resp.text[:30]}") | ||
else: | ||
api = api.replace("[phone]", ph) | ||
resp = client.get(url=api, headers=default_header) | ||
logger.info(f"GETAPI接口-{resp.text[:30]}") | ||
except httpx.HTTPError as why: | ||
logger.error(f"请求失败{why}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
# encoding=utf8 | ||
# 读写sqlite数据库 | ||
from pathlib import Path | ||
import sqlite3 | ||
|
||
class Sql(object): | ||
"""处理SQL数据""" | ||
|
||
def __init__(self) -> None: | ||
'''初始化数据库''' | ||
# 数据库路径 | ||
db_path = Path.cwd().joinpath("api.db") | ||
# 连接数据库,不检查是否在同一个线程. | ||
self.client = sqlite3.connect( | ||
db_path, timeout=6, check_same_thread=False) | ||
self.cursor = self.client.cursor() | ||
self.newTable() | ||
|
||
def newTable(self): | ||
'''初始化表结构''' | ||
sql = ''' | ||
CREATE TABLE IF NOT EXISTS API200 ( | ||
id INT NULL, | ||
url TEXT NOT NULL, | ||
primary key (url) | ||
); | ||
''' | ||
self.cursor.execute(sql) | ||
self.client.commit() | ||
|
||
def update(self, url): | ||
'''插入数据''' | ||
sql = ''' | ||
INSERT INTO API200 (ID,url) VALUES (null,?) | ||
''' | ||
try: | ||
self.cursor.execute(sql, (url,)) | ||
self.client.commit() | ||
return True | ||
except sqlite3.IntegrityError: | ||
# print(f"{url} 数据重复!") | ||
return False | ||
|
||
def select(self) -> list: | ||
'''获取所有接口''' | ||
sql = ''' | ||
SELECT url FROM API200; | ||
''' | ||
try: | ||
self.cursor.execute(sql) | ||
result = self.cursor.fetchall() | ||
urls = [] | ||
for url in result: | ||
urls.append(url[0]) | ||
return urls | ||
except Exception as e: | ||
print('读取出现错误!', e) | ||
|
||
def __del__(self) -> None: | ||
'''对象被删除时执行的函数''' | ||
print(f"共改变{self.client.total_changes}条数据!,正在关闭数据库连接......") | ||
self.client.close() | ||
|