forked from aaPanel/BaoTa
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsite_dir_auth.py
286 lines (269 loc) · 11.8 KB
/
site_dir_auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
#coding: utf-8
#-------------------------------------------------------------------
# 宝塔Linux面板
#-------------------------------------------------------------------
# Copyright (c) 2015-2017 宝塔软件(http:#bt.cn) All rights reserved.
#-------------------------------------------------------------------
# Author: 邹浩文 <[email protected]>
#-------------------------------------------------------------------
#------------------------------
# 站点目录密码保护
#------------------------------
import public,re,os,json,shutil
class SiteDirAuth:
# 取目录加密状态
def __init__(self):
self.setup_path = public.GetConfigValue('setup_path')
self.conf_file = self.setup_path + "/panel/data/site_dir_auth.json"
# 读取配置
def _read_conf(self):
conf = public.readFile(self.conf_file)
if not conf:
conf = {}
public.writeFile(self.conf_file,json.dumps(conf))
return conf
try:
conf = json.loads(conf)
if not isinstance(conf,dict):
conf = {}
public.writeFile(self.conf_file, json.dumps(conf))
except:
conf = {}
public.writeFile(self.conf_file, json.dumps(conf))
return conf
def _write_conf(self,conf,site_name):
c = self._read_conf()
if not c or site_name not in c:
c[site_name] = [conf]
else:
if site_name in c:
c[site_name].append(conf)
public.writeFile(self.conf_file,json.dumps(c))
def _check_site_authorization(self,site_name):
webserver=public.get_webserver()
conf_file = "{setup_path}/panel/vhost/{webserver}/{site_name}.conf".format(
setup_path=self.setup_path, site_name=site_name,webserver=webserver)
if "Authorization" in public.readFile(conf_file):
return True
# 设置目录加密
def set_dir_auth(self,get):
'''
get.name auth_name
get.site_dir auth_dir
get.username username
get.password password
get.id site id
:param get:
:return:
'''
name = get.name
site_dir = get.site_dir
if not hasattr(get,"password") or not get.password or not hasattr(get,"username") or not get.username:
return public.returnMsg(False, '请输入账号或密码')
if not get.site_dir:
return public.returnMsg(False, '请输入需要保护的目录')
if not get.name:
return public.returnMsg(False, '请输入名称')
if site_dir[0] != "/" or site_dir[-1] != "/":
return public.returnMsg(False, '目录格式不正确')
# if site_dir[0] == "/":
# site_dir = site_dir[1:]
# if site_dir[-1] == "/":
# site_dir = site_dir[:-1]
passwd = public.hasPwd(get.password)
site_info = self.get_site_info(get.id)
site_name = site_info["site_name"]
if self._check_site_authorization(site_name):
return public.returnMsg(False, '已经设置站点密码保护,请取消后再设置 站点配置 --> 站点目录 --> 密码访问')
if self._check_dir_auth(site_name, name,site_dir):
return public.returnMsg(False, '目录已经保护')
auth = "{user}:{passwd}".format(user=get.username,passwd=passwd)
auth_file = '{setup_path}/pass/{site_name}'.format(setup_path=self.setup_path,site_name=site_name)
if not os.path.exists(auth_file):
os.makedirs(auth_file)
auth_file = auth_file+"/{}.pass".format(name)
public.writeFile(auth_file,auth)
# 配置独立认证文件
self.set_dir_auth_file(site_info["site_path"],site_name,name,get.username,site_dir,auth_file)
# 配置站点主文件
result = self.set_conf(site_name,"create")
if result:
return result
# 检查配置
webserver = public.get_webserver()
result=self.check_site_conf(webserver,site_name,name)
if result:
return result
# 写配置
conf = {"name":name,"site_dir":get.site_dir,"auth_file":auth_file}
self._write_conf(conf,site_name)
public.serviceReload()
return public.returnMsg(True,"创建成功")
# 检查配置是否存在
def _check_dir_auth(self, site_name, name,site_dir):
conf = self._read_conf()
if not conf:
return False
if site_name in conf:
for i in conf[site_name]:
if name in i.values() or site_dir == i["site_dir"]:
return True
# 获取当前站点php版本
def get_site_php_version(self,siteName):
try:
conf = public.readFile(self.setup_path + '/panel/vhost/'+public.get_webserver()+'/'+siteName+'.conf');
if public.get_webserver() == 'nginx':
rep = "enable-php-([0-9]{2,3})\.conf"
else:
rep = "php-cgi-([0-9]{2,3})\.sock"
tmp = re.search(rep,conf).groups()
if tmp:
return tmp[0]
else:
return ""
except:
return public.returnMsg(False, 'SITE_PHPVERSION_ERR_A22')
# 获取站点名
def get_site_info(self,id):
site_info = public.M('sites').where('id=?', (id,)).field('name,path').find()
return {"site_name":site_info["name"],"site_path":site_info["path"]}
# 设置独立认证文件
def set_dir_auth_file(self,site_path,site_name,name,username,site_dir,auth_file):
php_ver = self.get_site_php_version(site_name)
php_conf = ""
if php_ver:
php_conf = "include enable-php-{}.conf;".format(php_ver)
for i in ["nginx","apache"]:
file_path = "{setup_path}/panel/vhost/{webserver}/dir_auth/{site_name}"
if i == "nginx":
# 设置nginx
conf = '''location ~* ^%s* {
#AUTH_START
auth_basic "Authorization";
auth_basic_user_file %s;
%s
#AUTH_END
}''' % (site_dir,auth_file,php_conf)
else:
# 设置apache
conf = '''<Directory "{site_path}{site_dir}/">
#AUTH_START
AuthType basic
AuthName "Authorization "
AuthUserFile {auth_file}
Require user {username}
#AUTH_END
SetOutputFilter DEFLATE
Options FollowSymLinks
AllowOverride All
#Require all granted
DirectoryIndex index.php index.html index.htm default.php default.html default.htm
</Directory>'''.format(site_path=site_path,site_dir=site_dir,auth_file=auth_file,username=username,site_name=site_name)
conf_file = file_path.format(setup_path=self.setup_path,site_name=site_name,webserver=i)
if not os.path.exists(conf_file):
os.makedirs(conf_file)
conf_file = conf_file + '/{}.conf'.format(name)
public.writeFile(conf_file,conf)
# 设置apache配置
def set_conf(self,site_name,act):
for i in ["nginx", "apache"]:
dir_auth_file = "%s/panel/vhost/%s/dir_auth/%s/*.conf" % (self.setup_path,i,site_name,)
file = self.setup_path + "/panel/vhost/{}/".format(i) + site_name + ".conf"
shutil.copyfile(file, '/tmp/{}_file_bk.conf'.format(i))
if os.path.exists(file):
conf = public.readFile(file)
if i == "apache":
if act == "create":
rep = "combined(\n|.)+IncludeOptional.*\/dir_auth\/.*conf"
rep1 = "combined"
if not re.search(rep,conf):
conf = conf.replace(rep1, rep1 + "\n\t#Directory protection rules, do not manually delete\n\tIncludeOptional {}".format(dir_auth_file))
else:
rep = "\n*#Directory protection rules, do not manually delete\n+\s+IncludeOptional[\s\w\/\.\*]+"
conf = re.sub(rep, '', conf)
public.writeFile(file, conf)
else:
if act == "create":
rep = "#SSL-END(\n|.)+include.*\/dir_auth\/.*conf;"
rep1 = "#SSL-END"
if not re.search(rep,conf):
conf = conf.replace(rep1, rep1 + "\n\t#Directory protection rules, do not manually delete\n\tinclude {};".format(dir_auth_file))
else:
rep = "\n*#Directory protection rules, do not manually delete\n+\s+include[\s\w\/\.\*]+;"
conf = re.sub(rep, '', conf)
public.writeFile(file, conf)
# 验证站点配置
def check_site_conf(self,webserver,site_name,name):
isError = public.checkWebConfig()
auth_file = "{setup_path}/panel/vhost/{webserver}/dir_auth/{site_name}/{name}.conf".format(setup_path=self.setup_path,webserver=webserver,site_name=site_name,name=name)
if (isError != True):
os.remove(auth_file)
# a_conf = self._read_conf()
# for i in range(len(a_conf)-1,-1,-1):
# if site_name == a_conf[i]["sitename"] and a_conf[i]["proxyname"]:
# del a_conf[i]
return public.returnMsg(False, 'ERROR: %s<br><a style="color:red;">' % public.GetMsg("CONFIG_ERROR") + isError.replace("\n",
'<br>') + '</a>')
# 删除密码保护
def delete_dir_auth(self,get):
'''
get.id
get.name
:param get:
:return:
'''
name = get.name
site_info = self.get_site_info(get.id)
site_name = site_info["site_name"]
conf = self._read_conf()
if site_name in conf:
for i in range(len(conf[site_name])):
if name in conf[site_name][i].values():
print(conf[site_name][i])
del(conf[site_name][i])
if not conf[site_name]:
del(conf[site_name])
break
public.writeFile(self.conf_file,json.dumps(conf))
for i in ["nginx", "apache"]:
file_path = "{setup_path}/panel/vhost/{webserver}/dir_auth/{site_name}/{name}.conf".format(webserver=i,
setup_path=self.setup_path,
site_name=site_name,
name=name)
os.remove(file_path)
if not conf:
self.set_conf(site_name,"delete")
public.serviceReload()
return public.returnMsg(True,"删除成功")
# 修改目录保护密码
def modify_dir_auth_pass(self,get):
'''
get.id
get.name
get.username
get.password
:param get:
:return:
'''
name = get.name
site_info = self.get_site_info(get.id)
site_name = site_info["site_name"]
passwd = public.hasPwd(get.password)
auth = "{user}:{passwd}".format(user=get.username,passwd=passwd)
auth_file = '{setup_path}/pass/{site_name}/{name}.pass'.format(setup_path=self.setup_path,site_name=site_name,name=name)
public.writeFile(auth_file,auth)
public.serviceReload()
return public.returnMsg(True,"修改成功")
# 获取目录保护列表
def get_dir_auth(self,get):
'''
get.id
:param get:
:return:
'''
site_info = self.get_site_info(get.id)
site_name = site_info["site_name"]
conf = self._read_conf()
if site_name in conf:
return {site_name:conf[site_name]}
return {}