forked from jly8866/archer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsqlreview.py
132 lines (112 loc) · 5.38 KB
/
sqlreview.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# -*- coding: UTF-8 -*-
import simplejson as json
import time
from threading import Thread
from django.db import connection
from django.utils import timezone
from django.conf import settings
from .dao import Dao
from .const import Const, WorkflowDict
from .sendmail import MailSender
from .inception import InceptionDao
from .aes_decryptor import Prpcrypt
from .models import users, workflow, master_config
from .workflow import Workflow
from .permission import role_required, superuser_required
import logging
logger = logging.getLogger('default')
dao = Dao()
inceptionDao = InceptionDao()
mailSender = MailSender()
prpCryptor = Prpcrypt()
workflowOb = Workflow()
# 获取当前请求url
def getDetailUrl(request):
scheme = request.scheme
host = request.META['HTTP_HOST']
return "%s://%s/detail/" % (scheme, host)
# 根据实例名获取主库连接字符串,并封装成一个dict
def getMasterConnStr(clusterName):
listMasters = master_config.objects.filter(cluster_name=clusterName)
masterHost = listMasters[0].master_host
masterPort = listMasters[0].master_port
masterUser = listMasters[0].master_user
masterPassword = prpCryptor.decrypt(listMasters[0].master_password)
dictConn = {'masterHost': masterHost, 'masterPort': masterPort, 'masterUser': masterUser,
'masterPassword': masterPassword}
return dictConn
# SQL工单执行回调
def execute_call_back(workflowId, clusterName, url):
workflowDetail = workflow.objects.get(id=workflowId)
# 获取审核人
try:
listAllReviewMen = json.loads(workflowDetail.review_man)
except ValueError:
listAllReviewMen = (workflowDetail.review_man,)
dictConn = getMasterConnStr(clusterName)
try:
# 交给inception先split,再执行
logger.debug('execute_call_back:' + str(workflowId) + ' executing')
(finalStatus, finalList) = inceptionDao.executeFinal(workflowDetail, dictConn)
# 封装成JSON格式存进数据库字段里
strJsonResult = json.dumps(finalList)
workflowDetail = workflow.objects.get(id=workflowId)
workflowDetail.execute_result = strJsonResult
workflowDetail.finish_time = timezone.now()
workflowDetail.status = finalStatus
workflowDetail.is_manual = 0
workflowDetail.audit_remark = ''
# 重新获取连接,防止超时
connection.close()
workflowDetail.save()
logger.debug('execute_call_back:' + str(workflowId) + ' finish')
except Exception as e:
logger.error(e)
# 如果执行完毕了,则根据settings.py里的配置决定是否给提交者和DBA一封邮件提醒.DBA需要知晓审核并执行过的单子
if hasattr(settings, 'MAIL_ON_OFF') == True:
if getattr(settings, 'MAIL_ON_OFF') == "on":
# 给主、副审核人,申请人,DBA各发一封邮件
engineer = workflowDetail.engineer
reviewMen = workflowDetail.review_man
workflowStatus = workflowDetail.status
workflowName = workflowDetail.workflow_name
objEngineer = users.objects.get(username=engineer)
strTitle = "SQL上线工单执行完毕 # " + str(workflowId)
strContent = "发起人:" + engineer + "\n审核人:" + reviewMen + "\n工单地址:" + url + "\n工单名称: " + workflowName + "\n执行结果:" + workflowStatus
reviewManAddr = [email['email'] for email in
users.objects.filter(username__in=listAllReviewMen).values('email')]
dbaAddr = [email['email'] for email in users.objects.filter(role='DBA').values('email')]
listCcAddr = reviewManAddr + dbaAddr
mailSender.sendEmail(strTitle, strContent, [objEngineer.email], listCcAddr=listCcAddr)
# 给定时任务执行sql
def execute_job(workflowId, url):
job_id = Const.workflowJobprefix['sqlreview'] + '-' + str(workflowId)
logger.debug('execute_job:' + job_id + ' start')
workflowDetail = workflow.objects.get(id=workflowId)
clusterName = workflowDetail.cluster_name
# 服务器端二次验证,当前工单状态必须为定时执行过状态
if workflowDetail.status != Const.workflowStatus['timingtask']:
raise Exception('工单不是定时执行状态')
# 将流程状态修改为执行中,并更新reviewok_time字段
workflowDetail.status = Const.workflowStatus['executing']
workflowDetail.reviewok_time = timezone.now()
try:
workflowDetail.save()
except Exception:
# 关闭后重新获取连接,防止超时
connection.close()
workflowDetail.save()
logger.debug('execute_job:' + job_id + ' executing')
# 执行之前重新split并check一遍,更新SHA1缓存;因为如果在执行中,其他进程去做这一步操作的话,会导致inception core dump挂掉
splitReviewResult = inceptionDao.sqlautoReview(workflowDetail.sql_content, workflowDetail.cluster_name,
isSplit='yes')
workflowDetail.review_content = json.dumps(splitReviewResult)
try:
workflowDetail.save()
except Exception:
# 关闭后重新获取连接,防止超时
connection.close()
workflowDetail.save()
# 采取异步回调的方式执行语句,防止出现持续执行中的异常
t = Thread(target=execute_call_back, args=(workflowId, clusterName, url))
t.start()