forked from aaPanel/BaoTa
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpanelApi.py
250 lines (220 loc) · 9.64 KB
/
panelApi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
#coding: utf-8
# +-------------------------------------------------------------------
# | 宝塔Linux面板
# +-------------------------------------------------------------------
# | Copyright (c) 2015-2017 宝塔软件(http://bt.cn) All rights reserved.
# +-------------------------------------------------------------------
# | Author: hwliang
# +-------------------------------------------------------------------
import public,os,json,time
class panelApi:
save_path = '/www/server/panel/config/api.json'
timeout = 600
max_bind = 5
def get_token(self,get):
data = self.get_api_config()
if not 'key' in data:
data['key'] = public.GetRandomString(16)
public.writeFile(self.save_path,json.dumps(data))
if 'token_crypt' in data:
data['token'] = public.de_crypt(data['token'],data['token_crypt'])
else:
data['token'] = "***********************************"
data['limit_addr'] = '\n'.join(data['limit_addr'])
data['bind'] = self.get_bind_token()
qrcode = (public.getPanelAddr() + "|" + data['token'] + "|" + data['key'] + '|' + data['bind']['token']).encode('utf-8')
data['qrcode'] = public.base64.b64encode(qrcode).decode('utf-8')
data['apps'] = sorted(data['apps'],key=lambda x: x['time'],reverse=True)
del(data['key'])
return data
def login_for_app(self,get):
from BTPanel import cache
import uuid
tid = get.tid
if(len(tid) != 32): return public.returnMsg(False,'无效的登录密钥1')
session_id = cache.get(tid)
if not session_id: return public.returnMsg(False,'指定密钥不存在,或已过期1')
if(len(session_id) != 64): return public.returnMsg(False,'无效的登录密钥2')
try:
if not os.path.exists('/www/server/panel/data/app_login_check.pl'):return public.returnMsg(False,'无效的登录密钥3')
key, init_time, tid2, status = public.readFile('/www/server/panel/data/app_login_check.pl').split(':')
if session_id!=key:return public.returnMsg(False,'无效的登录密钥4')
if tid != tid2: return public.returnMsg(False, '指定密钥不存在,或已过期5')
if time.time() - float(init_time) > 60:
return public.returnMsg(False, '二维码失效时间过期6')
cache.set(session_id,public.md5(uuid.UUID(int=uuid.getnode()).hex),120)
import uuid
data = key + ':' + init_time + ':' + tid2 + ':' + uuid.UUID(int=uuid.getnode()).hex[-12:]
public.writeFile("/www/server/panel/data/app_login_check.pl", data)
return public.returnMsg(True,'扫码成功,正在登录!')
except:
os.remove("/www/server/panel/data/app_login_check.pl")
return public.returnMsg(False,'无效的登录密钥')
def get_api_config(self):
tmp = public.ReadFile(self.save_path)
if not tmp or not os.path.exists(self.save_path):
data = { "open":False, "token":"", "limit_addr":[] }
public.WriteFile(self.save_path,json.dumps(data))
public.ExecShell("chmod 600 " + self.save_path)
tmp = public.ReadFile(self.save_path)
data = json.loads(tmp)
is_save = False
if not 'binds' in data:
data['binds'] = []
is_save = True
if not 'apps' in data:
data['apps'] = []
is_save = True
data['binds'] = sorted(data['binds'],key=lambda x: x['time'],reverse=True)
if len(data['binds']) > 5:
data['binds'] = data['binds'][:5]
is_save = True
if is_save:
self.save_api_config(data)
return data
def save_api_config(self,data):
public.WriteFile(self.save_path,json.dumps(data))
public.set_mode(self.save_path,'600')
return True
def check_bind(self,args):
if not 'bind_token' in args or not 'client_brand' in args or not 'client_model' in args:
return 0
if not args.client_brand or not args.client_model:
return '无效的设备'
bind = self.get_bind_token(args.bind_token)
if bind['token'] != args.bind_token:
return '当前二维码已过期,请刷新页面重新扫码!'
apps = self.get_apps()
if len(apps) >= self.max_bind:
return '该服务器最多绑定{}台设备,已达到上限!'.format(self.max_bind)
bind['status'] = 1
bind['brand'] = args.client_brand
bind['model'] = args.client_model
self.set_bind_token(bind)
return 1
def get_bind_status(self,args):
if not public.cache_get("get_bind_status"):
public.cache_set("get_bind_status",1,60)
bind = self.get_bind_token(args.bind_token)
return bind
def get_app_bind_status(self,args):
if not 'bind_token' in args:
return 0
if self.get_app_find(args.bind_token):
return 1
return 0
def set_bind_token(self,bind):
data = self.get_api_config()
is_save = False
for i in range(len(data['binds'])):
if data['binds'][i]['token'] == bind['token']:
data['binds'][i] = bind
is_save = True
break
if is_save:
self.save_api_config(data)
return True
def get_apps(self,args = None):
data = self.get_api_config()
return data['apps']
def get_app_find(self,bind_token):
apps = self.get_apps()
for s_app in apps:
if s_app['token'] == bind_token:
return s_app
return None
def add_bind_app(self,args):
bind = self.get_bind_token(args.bind_token)
if bind['status'] == 0:
return public.returnMsg(False,'未通过验证!')
apps = self.get_apps()
if len(apps) >= self.max_bind:
return public.returnMsg(False,'一台服务器最多允许{}个设备绑定!'.format(self.max_bind))
args.bind_app = args.bind_token
self.remove_bind_app(args)
data = self.get_api_config()
data['apps'].append(bind)
self.save_api_config(data)
self.remove_bind_token(args.bind_token)
return public.returnMsg(True,'绑定成功!')
def remove_bind_token(self,bind_token):
data = self.get_api_config()
tmp_binds = []
for s_bind in data['binds']:
if bind_token == s_bind['token']:
continue
tmp_binds.append(s_bind)
data['binds'] = tmp_binds
self.save_api_config(data)
def remove_bind_app(self,args):
data = self.get_api_config()
tmp_apps = []
for s_app in data['apps']:
if args.bind_app == s_app['token']:
continue
tmp_apps.append(s_app)
data['apps'] = tmp_apps
self.save_api_config(data)
s_file = '/dev/shm/{}'.format(args.bind_app)
if os.path.exists(s_file):
os.remove(s_file)
return public.returnMsg(True,'删除成功!')
def get_bind_token(self,token = None):
data = self.get_api_config()
s_time = time.time()
binds = []
bind = None
is_write = False
for i in range(len(data['binds'])):
if s_time - data['binds'][i]['time'] > self.timeout:
is_write = True
continue
binds.append(data['binds'][i])
if token:
if token == data['binds'][i]['token']:
bind = data['binds'][i]
else:
if not bind:
bind = data['binds'][i]
if not bind:
if len(binds) > 0:
binds = sorted(binds,key=lambda x: x['time'],reverse=True)
bind = binds[0]
else:
bind = {"time":s_time,"token":public.GetRandomString(18),'status':0}
binds.append(bind)
is_write = True
if is_write:
data['binds'] = binds
self.save_api_config(data)
return bind
def set_token(self,get):
if 'request_token' in get: return public.returnMsg(False,'不能通过API接口配置API')
data = self.get_api_config()
if get.t_type == '1':
token = public.GetRandomString(32)
data['token'] = public.md5(token)
data['token_crypt'] = public.en_crypt(data['token'],token).decode('utf-8')
public.WriteLog('API配置','重新生成API-Token')
elif get.t_type == '2':
data['open'] = not data['open']
stats = {True:'开启',False:'关闭'}
if not 'token_crypt' in data:
token = public.GetRandomString(32)
data['token'] = public.md5(token)
data['token_crypt'] = public.en_crypt(data['token'],token).decode('utf-8')
public.WriteLog('API配置','%sAPI接口' % stats[data['open']])
token = stats[data['open']] + '成功!'
elif get.t_type == '3':
data['limit_addr'] = get.limit_addr.split('\n')
public.WriteLog('API配置','变更IP限制为[%s]' % get.limit_addr)
token ='保存成功!'
self.save_api_config(data)
return public.returnMsg(True,token)
def get_tmp_token(self,get):
if not 'request_token' in get: return public.returnMsg(False,'只能通过API接口获取临时密钥')
data = self.get_api_config()
data['tmp_token'] = public.GetRandomString(64)
data['tmp_time'] = time.time()
self.save_api_config(data)
return public.returnMsg(True,data['tmp_token'])