forked from jly8866/archer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
inception.py
279 lines (253 loc) · 14 KB
/
inception.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
#-*-coding: utf-8-*-
import re
import simplejson as json
import MySQLdb
from django.conf import settings
from django.db import connection
from .models import master_config, slave_config, workflow
from .aes_decryptor import Prpcrypt
class InceptionDao(object):
def __init__(self):
try:
self.inception_host = getattr(settings, 'INCEPTION_HOST')
self.inception_port = int(getattr(settings, 'INCEPTION_PORT'))
self.inception_remote_backup_host = getattr(settings, 'INCEPTION_REMOTE_BACKUP_HOST')
self.inception_remote_backup_port = int(getattr(settings, 'INCEPTION_REMOTE_BACKUP_PORT'))
self.inception_remote_backup_user = getattr(settings, 'INCEPTION_REMOTE_BACKUP_USER')
self.inception_remote_backup_password = getattr(settings, 'INCEPTION_REMOTE_BACKUP_PASSWORD')
self.prpCryptor = Prpcrypt()
except AttributeError as a:
print("Error: %s" % a)
except ValueError as v:
print("Error: %s" % v)
def criticalDDL(self, sqlContent):
'''
识别DROP DATABASE, DROP TABLE, TRUNCATE PARTITION, TRUNCATE TABLE等高危DDL操作,因为对于这些操作,inception在备份时只能备份METADATA,而不会备份数据!
如果识别到包含高危操作,则返回“审核不通过”
'''
resultList = []
criticalSqlFound = 0
# 删除注释语句
sqlContent = ''.join(
map(lambda x: re.compile(r'(^--\s+.*|^/\*.*\*/;\s*$)').sub('', x, count=1),sqlContent.splitlines(1))).strip()
for row in sqlContent.rstrip(';').split(';'):
if re.match(r"([\s\S]*)drop(\s+)database(\s+.*)|([\s\S]*)drop(\s+)table(\s+.*)|([\s\S]*)truncate(\s+.*)|([\s\S]*)truncate(\s+)partition(\s+.*)|([\s\S]*)truncate(\s+)table(\s+.*)", row.lower()):
result = ('', '', 2, '驳回高危SQL', '不能包含【DROP DATABASE】|【DROP TABLE】|【TRUNCATE PARTITION】|【TRUNCATE TABLE】关键字!', row, '', '', '', '', '')
criticalSqlFound = 1
else:
result = ('', '', 0, '', 'None', row, '', '', '', '', '')
resultList.append(result)
if criticalSqlFound == 1:
return resultList
else:
return None
def preCheck(self, sqlContent):
'''
在提交给inception之前,预先识别一些Inception不能正确审核的SQL,比如"alter table t1;"或"alter table test.t1;" 以免导致inception core dump
'''
resultList = []
syntaxErrorSqlFound = 0
for row in sqlContent.rstrip(';').split(';'):
if re.match(r"(\s*)alter(\s+)table(\s+)(\S+)(\s*);|(\s*)alter(\s+)table(\s+)(\S+)\.(\S+)(\s*);", row.lower() + ";"):
result = ('', '', 2, 'SQL语法错误', 'ALTER TABLE 必须带有选项', row, '', '', '', '')
syntaxErrorSqlFound = 1
else:
result = ('', '', 0, '', 'None', row, '', '', '', '')
resultList.append(result)
if syntaxErrorSqlFound == 1:
return resultList
else:
return None
def sqlautoReview(self, sqlContent, clusterName, isSplit="no"):
'''
将sql交给inception进行自动审核,并返回审核结果。
'''
listMasters = master_config.objects.filter(cluster_name=clusterName)
if len(listMasters) != 1:
print("Error: 主库配置返回为0")
masterHost = listMasters[0].master_host
masterPort = listMasters[0].master_port
masterUser = listMasters[0].master_user
masterPassword = self.prpCryptor.decrypt(listMasters[0].master_password)
#这里无需判断字符串是否以;结尾,直接抛给inception enable check即可。
#if sqlContent[-1] != ";":
#sqlContent = sqlContent + ";"
if hasattr(settings, 'CRITICAL_DDL_ON_OFF') == True:
if getattr(settings, 'CRITICAL_DDL_ON_OFF') == "on":
criticalDDL_check = self.criticalDDL(sqlContent)
else:
criticalDDL_check = None
if criticalDDL_check is not None:
result = criticalDDL_check
else:
preCheckResult = self.preCheck(sqlContent)
if preCheckResult is not None:
result = preCheckResult
else:
if isSplit == "yes":
# 这种场景只给osc进度功能使用
# 如果一个工单中同时包含DML和DDL,那么执行时被split后的SQL与提交的SQL会不一样(会在每条语句前面加use database;),导致osc进度更新取不到正确的SHA1值。
# 请参考inception文档中--enable-split参数的说明
sqlSplit = "/*--user=%s; --password=%s; --host=%s; --enable-execute;--port=%s; --enable-ignore-warnings;--enable-split;*/\
inception_magic_start;\
%s\
inception_magic_commit;" % (masterUser, masterPassword, masterHost, str(masterPort), sqlContent)
splitResult = self._fetchall(sqlSplit, self.inception_host, self.inception_port, '', '', '')
tmpList = []
for splitRow in splitResult:
sqlTmp = splitRow[1]
sql = "/*--user=%s;--password=%s;--host=%s;--enable-check;--port=%s; --enable-ignore-warnings;*/\
inception_magic_start;\
%s\
inception_magic_commit;" % (masterUser, masterPassword, masterHost, str(masterPort), sqlTmp)
reviewResult = self._fetchall(sql, self.inception_host, self.inception_port, '', '', '')
tmpList.append(reviewResult)
#二次加工一下
finalList = []
for splitRow in tmpList:
for sqlRow in splitRow:
finalList.append(list(sqlRow))
result = finalList
else:
# 工单审核使用
sql="/*--user=%s;--password=%s;--host=%s;--enable-check=1;--port=%s;*/\
inception_magic_start;\
%s\
inception_magic_commit;" % (masterUser, masterPassword, masterHost, str(masterPort), sqlContent)
result = self._fetchall(sql, self.inception_host, self.inception_port, '', '', '')
return result
def executeFinal(self, workflowDetail, dictConn):
'''
将sql交给inception进行最终执行,并返回执行结果。
'''
strBackup = ""
if workflowDetail.is_backup == '是':
strBackup = "--enable-remote-backup;"
else:
strBackup = "--disable-remote-backup;"
#根据inception的要求,执行之前最好先split一下
sqlSplit = "/*--user=%s; --password=%s; --host=%s; --enable-execute;--port=%s; --enable-ignore-warnings;--enable-split;*/\
inception_magic_start;\
%s\
inception_magic_commit;" % (dictConn['masterUser'], dictConn['masterPassword'], dictConn['masterHost'], str(dictConn['masterPort']), workflowDetail.sql_content)
splitResult = self._fetchall(sqlSplit, self.inception_host, self.inception_port, '', '', '')
tmpList = []
#对于split好的结果,再次交给inception执行.这里无需保持在长连接里执行,短连接即可.
for splitRow in splitResult:
sqlTmp = splitRow[1]
sqlExecute = "/*--user=%s;--password=%s;--host=%s;--enable-execute;--port=%s; --enable-ignore-warnings;%s*/\
inception_magic_start;\
%s\
inception_magic_commit;" % (dictConn['masterUser'], dictConn['masterPassword'], dictConn['masterHost'], str(dictConn['masterPort']), strBackup, sqlTmp)
executeResult = self._fetchall(sqlExecute, self.inception_host, self.inception_port, '', '', '')
for sqlRow in executeResult:
tmpList.append(sqlRow)
# 每执行一次,就将执行结果更新到工单的execute_result,便于获取osc进度时对比
workflowDetail.execute_result = json.dumps(tmpList)
try:
workflowDetail.save()
except Exception:
# 重新获取连接,防止超时
connection.close()
workflowDetail.save()
#二次加工一下,目的是为了和sqlautoReview()函数的return保持格式一致,便于在detail页面渲染.
finalStatus = "已正常结束"
finalList = []
for sqlRow in tmpList:
#如果发现任何一个行执行结果里有errLevel为1或2,并且stagestatus列没有包含Execute Successfully字样,则判断最终执行结果为有异常.
if (sqlRow[2] == 1 or sqlRow[2] == 2) and re.match(r"\w*Execute Successfully\w*", sqlRow[3]) is None:
finalStatus = "执行有异常"
finalList.append(list(sqlRow))
return (finalStatus, finalList)
def getRollbackSqlList(self, workflowId):
workflowDetail = workflow.objects.get(id=workflowId)
listExecuteResult = json.loads(workflowDetail.execute_result)
# 回滚数据倒序展示
listExecuteResult.reverse()
listBackupSql = []
for row in listExecuteResult:
try:
# 获取backup_dbname
if row[8] == 'None':
continue
backupDbName = row[8]
sequence = row[7]
sql = row[5]
opidTime = sequence.replace("'", "")
sqlTable = "select tablename from %s.$_$Inception_backup_information$_$ where opid_time='%s';" % (
backupDbName, opidTime)
listTables = self._fetchall(sqlTable, self.inception_remote_backup_host,
self.inception_remote_backup_port, self.inception_remote_backup_user,
self.inception_remote_backup_password, '')
if listTables:
tableName = listTables[0][0]
sqlBack = "select rollback_statement from %s.%s where opid_time='%s'" % (
backupDbName, tableName, opidTime)
listBackup = self._fetchall(sqlBack, self.inception_remote_backup_host,
self.inception_remote_backup_port, self.inception_remote_backup_user,
self.inception_remote_backup_password, '')
block_rollback_sql_list = [sql]
block_rollback_sql = '\n'.join([back_info[0] for back_info in listBackup])
block_rollback_sql_list.append(block_rollback_sql)
listBackupSql.append(block_rollback_sql_list)
except Exception as e:
raise Exception(e)
return listBackupSql
def _fetchall(self, sql, paramHost, paramPort, paramUser, paramPasswd, paramDb):
'''
封装mysql连接和获取结果集方法
'''
result = None
conn = None
cur = None
try:
conn=MySQLdb.connect(host=paramHost, user=paramUser, passwd=paramPasswd, db=paramDb, port=paramPort, charset='utf8mb4')
cur=conn.cursor()
ret=cur.execute(sql)
result=cur.fetchall()
except Exception as e:
raise Exception(e)
finally:
if cur is not None:
cur.close()
if conn is not None:
conn.close()
return result
def getOscPercent(self, sqlSHA1):
"""已知SHA1值,去inception里查看OSC进度"""
sqlStr = "inception get osc_percent '%s'" % sqlSHA1
result = self._fetchall(sqlStr, self.inception_host, self.inception_port, '', '', '')
if len(result) > 0:
percent = result[0][3]
timeRemained = result[0][4]
pctResult = {"status":0, "msg":"ok", "data":{"percent":percent, "timeRemained":timeRemained}}
else:
pctResult = {"status":1, "msg":"没找到该SQL的进度信息,是否已经执行完毕?", "data":{"percent":-100, "timeRemained":-100}}
return pctResult
def stopOscProgress(self, sqlSHA1):
"""已知SHA1值,调用inception命令停止OSC进程,涉及的Inception命令和注意事项,请参考http://mysql-inception.github.io/inception-document/osc/"""
sqlStr = "inception stop alter '%s'" % sqlSHA1
result = self._fetchall(sqlStr, self.inception_host, self.inception_port, '', '', '')
if result is not None:
optResult = {"status":0, "msg":"已成功停止OSC进程,请注意清理触发器和临时表,先清理触发器再删除临时表", "data":""}
else:
optResult = {"status":1, "msg":"ERROR 2624 (HY000):未找到OSC执行进程,可能已经执行完成", "data":""}
return optResult
def query_print(self, sqlContent, clusterName, dbName):
'''
将sql交给inception打印语法树。
'''
listSlaves = slave_config.objects.get(cluster_name=clusterName)
masterHost = listSlaves.slave_host
masterPort = listSlaves.slave_port
masterUser = listSlaves.slave_user
masterPassword = self.prpCryptor.decrypt(listSlaves.slave_password)
# 工单审核使用
sql = "/*--user=%s;--password=%s;--host=%s;--port=%s;--enable-query-print;*/\
inception_magic_start;\
use %s;\
%s\
inception_magic_commit;" % (
masterUser, masterPassword, masterHost, str(masterPort), dbName, sqlContent)
result = self._fetchall(sql, self.inception_host, self.inception_port, '', '', '')
return result