forked from yuanter/misaka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
wyx.py
218 lines (194 loc) · 8.16 KB
/
wyx.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
#!/usr/bin/python3
# -- coding: utf-8 --
# -------------------------------
# @Author : github@limoruirui https://github.com/limoruirui
# @Time : 19/5/2022 19:48
# -------------------------------
"""
1.无忧行app签到 请低调使用
2.cookie获取方式
1.打开app-我的-任务中心 很多链接里(链接里 不是看headers)都有token 复制token的值填入环境变量即可 格式应该是 32位16进制数
2.有效期不懂
3.cookie食用方式:只要token的值 32位16进制数 青龙运行可新建并放入到环境变量 WXY_TOKEN 中
"""
from json import loads, dumps
from base64 import b64decode, b64encode
from hashlib import md5 as md5Encode
from random import randint
from os import environ, system
from time import time
try:
from Crypto.Cipher import AES
from requests import post
except:
print(
"你还没有安装requests库和pycryptodome库 正在尝试自动安装 请在安装结束后重新执行此脚本\n若还是提示本条消息 请自行运行pip3 install requests和pip3 install pycryptodome或者在青龙的依赖管理里安装python的 requests 和 pycryptodome ")
system("pip3 install pycryptodome")
system("pip3 install requests")
print("安装完成 脚本退出 请重新执行")
exit(0)
token = environ.get("WXY_TOKEN") if environ.get("WXY_TOKEN") else ""
pushplus_token = environ.get("PUSH_PLUS_TOKEN") if environ.get("PUSH_PLUS_TOKEN") else ""
tgbot_token = environ.get("TG_BOT_TOKEN") if environ.get("TG_BOT_TOKEN") else ""
tg_userId = environ.get("TG_USER_ID") if environ.get("TG_USER_ID") else ""
tg_push_api = environ.get("TG_API_HOST") if environ.get("TG_API_HOST") else ""
if token == "":
print("未填写token 请添加环境变量 WXY_TOKEN")
exit(0)
if len(token) != 32:
print("填写的token不对 是一个32位16进制数")
exit(0)
BLOCK_SIZE = AES.block_size
# 不足BLOCK_SIZE的补位(s可能是含中文,而中文字符utf-8编码占3个位置,gbk是2,所以需要以len(s.encode()),而不是len(s)计算补码)
pad = lambda s: s + (BLOCK_SIZE - len(s.encode()) % BLOCK_SIZE) * chr(BLOCK_SIZE - len(s.encode()) % BLOCK_SIZE)
# 去除补位
unpad = lambda s: s[:-ord(s[len(s) - 1:])]
class AESCipher:
def __init__(self, secretkey: str):
self.key = secretkey # 密钥
# self.iv = secretkey[0:16] # 偏移量
def encrypt(self, text):
"""
加密 :先补位,再AES加密,后base64编码
:param text: 需加密的明文
:return:
"""
# text = pad(text) 包pycrypto的写法,加密函数可以接受str也可以接受bytess
text = pad(text).encode() # 包pycryptodome 的加密函数不接受str
cipher = AES.new(key=self.key.encode(), mode=AES.MODE_ECB)
encrypted_text = cipher.encrypt(text)
# 进行64位的编码,返回得到加密后的bytes,decode成字符串
return b64encode(encrypted_text).decode('utf-8')
def decrypt(self, encrypted_text):
"""
解密 :偏移量为key[0:16];先base64解,再AES解密,后取消补位
:param encrypted_text : 已经加密的密文
:return:
"""
encrypted_text = b64decode(encrypted_text)
cipher = AES.new(key=self.key.encode(), mode=AES.MODE_ECB)
decrypted_text = cipher.decrypt(encrypted_text)
return unpad(decrypted_text).decode('utf-8')
class WYX:
def __init__(self, token):
self.token = token
self.headers = {
"User-Agent": "Mozilla/5.0 (iPad; CPU OS 15_4 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148",
"Referer": "https://cdn.jegotrip.com.cn/"
}
self.ge = "93EFE107DDE6DE51"
self.he = "online_jego_h5"
self.fe = "01"
self.taskId = ""
self.msg = ""
def timestamp(self):
return int(time() * 1000)
def pushplus(self, title, content):
url = "http://www.pushplus.plus/send"
headers = {
"Content-Type": "application/json"
}
data = {
"token": pushplus_token,
"title": title,
"content": content
}
try:
post(url, headers=headers, data=dumps(data))
except:
print('推送失败')
def tgpush(self, content):
url = f"https://api.telegram.org/bot{tgbot_token}/sendMessage"
if tg_push_api != "":
url = f"https://{tg_push_api}/bot{tgbot_token}/sendMessage"
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
data = {'chat_id': str(tg_userId), 'text': content, 'disable_web_page_preview': 'true'}
try:
post(url, headers=headers, data=data, timeout=10)
except:
print('推送失败')
def push(self, msg):
if pushplus_token != "":
self.pushplus("无忧行签到", msg)
if tgbot_token != "" and tg_userId != "":
self.tgpush(f"无忧行签到\n{msg}")
def md5(self, text):
m = md5Encode(text.encode(encoding='utf-8'))
return m.hexdigest()
def decrypt_key(self, encrypt_key):
t = b64decode(encrypt_key).decode()
a = t.split(";")
if a and 3 == len(a):
c = self.ge + a[1]
n = self.md5(c)[8:24]
return n
def gene_encrypt_key(self):
e = f"{self.timestamp()}{randint(100, 999)}"
i = self.ge + e
a = self.md5(i)[8:24]
c = f"{self.he};{e};{self.fe}"
t = b64encode(c.encode("utf-8")).decode()
return a, t
"""查询总积分"""
def query_total_score(self):
url = f"https://app.jegotrip.com.cn/api/service/member/v1/expireRewardQuery?token={self.token}&h_token={self.token}&lang=zh_CN"
encrypt_key = self.gene_encrypt_key()
key = encrypt_key[0]
sec = encrypt_key[1]
body = {
"sec": sec,
"body": AESCipher(key).encrypt("{}")
}
encrypt_data = post(url, headers=self.headers, json=body).json()
if encrypt_data["code"] == "0":
encrypt_body = encrypt_data["body"]
encrypt_sec = encrypt_data["sec"]
decrypt_data = AESCipher(self.decrypt_key(encrypt_sec)).decrypt(encrypt_body)
total_score = loads(decrypt_data)["tripcoins"]
print(f"查询成功, 你共有{total_score}点积分")
self.msg += f", 你共有{total_score}点积分"
def get_checkin_taskid(self):
url = f"https://app.jegotrip.com.cn/api/service/v1/mission/sign/querySign?token={self.token}&h_token={self.token}&lang=zh_CN"
encrypt_key = self.gene_encrypt_key()
key = encrypt_key[0]
sec = encrypt_key[1]
body = {
"sec": sec,
"body": AESCipher(key).encrypt("{}")
}
encrypt_data = post(url, headers=self.headers, json=body).json()
if encrypt_data["code"] == "0":
encrypt_body = encrypt_data["body"]
encrypt_sec = encrypt_data["sec"]
decrypt_data = AESCipher(self.decrypt_key(encrypt_sec)).decrypt(encrypt_body)
for checkin_task in eval(decrypt_data)[::-1]:
task_status = checkin_task["isSign"]
if task_status == 2:
self.taskId = checkin_task["id"]
# print(self.taskId)
return self.taskId
if self.taskId == "":
print("获取任务id失败 退出")
exit(0)
def checkin(self):
url = f"https://app.jegotrip.com.cn/api/service/v1/mission/sign/userSign?token={self.token}&h_token={self.token}&lang=zh_CN"
encrypt_key = self.gene_encrypt_key()
key = encrypt_key[0]
sec = encrypt_key[1]
body = {
"sec": sec,
"body": AESCipher(key).encrypt(f'{{"signConfigId":{self.get_checkin_taskid()}}}')
}
data = post(url, headers=self.headers, json=body).json()
if data["code"] == "0":
self.msg += "签到成功"
print(self.msg)
else:
self.msg += "签到失败"
print(self.msg)
def main(self):
self.checkin()
self.query_total_score()
self.push(self.msg)
if __name__ == "__main__":
WYX(token).main()