forked from jly8866/archer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
dao.py
177 lines (159 loc) · 6.29 KB
/
dao.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# -*- coding: UTF-8 -*-
import MySQLdb
from django.db import connection
class Dao(object):
_CHART_DAYS = 90
# 连进指定的mysql实例里,读取所有databases并返回
def getAlldbByCluster(self, masterHost, masterPort, masterUser, masterPassword):
listDb = []
conn = None
cursor = None
try:
conn = MySQLdb.connect(host=masterHost, port=masterPort, user=masterUser, passwd=masterPassword,
charset='utf8mb4')
cursor = conn.cursor()
sql = "show databases"
n = cursor.execute(sql)
listDb = [row[0] for row in cursor.fetchall()
if row[0] not in ('information_schema', 'performance_schema', 'mysql', 'test')]
except MySQLdb.Warning as w:
raise Exception(w)
except MySQLdb.Error as e:
raise Exception(e)
finally:
if cursor is not None:
cursor.close()
if conn is not None:
conn.commit()
conn.close()
return listDb
# 连进指定的mysql实例里,读取所有tables并返回
def getAllTableByDb(self, masterHost, masterPort, masterUser, masterPassword, dbName):
listTb = []
conn = None
cursor = None
try:
conn = MySQLdb.connect(host=masterHost, port=masterPort, user=masterUser, passwd=masterPassword,
db=dbName,
charset='utf8mb4')
cursor = conn.cursor()
sql = "show tables"
n = cursor.execute(sql)
listTb = [row[0] for row in cursor.fetchall()
if row[0] not in (
'test')]
except MySQLdb.Warning as w:
raise Exception(w)
except MySQLdb.Error as e:
raise Exception(e)
finally:
if cursor is not None:
cursor.close()
if conn is not None:
conn.commit()
conn.close()
return listTb
# 连进指定的mysql实例里,读取所有Columns并返回
def getAllColumnsByTb(self, masterHost, masterPort, masterUser, masterPassword, dbName, tbName):
listCol = []
conn = None
cursor = None
try:
conn = MySQLdb.connect(host=masterHost, port=masterPort, user=masterUser, passwd=masterPassword,
db=dbName,
charset='utf8mb4')
cursor = conn.cursor()
sql = "SELECT COLUMN_NAME FROM information_schema.COLUMNS WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s';" % (
dbName, tbName)
n = cursor.execute(sql)
listCol = [row[0] for row in cursor.fetchall()]
except MySQLdb.Warning as w:
raise Exception(w)
except MySQLdb.Error as e:
raise Exception(e)
finally:
if cursor is not None:
cursor.close()
if conn is not None:
conn.commit()
conn.close()
return listCol
# 连进指定的mysql实例里,执行sql并返回
def mysql_query(self, masterHost, masterPort, masterUser, masterPassword, dbName, sql, limit_num=0):
result = {}
conn = None
cursor = None
try:
conn = MySQLdb.connect(host=masterHost, port=masterPort, user=masterUser, passwd=masterPassword, db=dbName,
charset='utf8mb4')
cursor = conn.cursor()
effect_row = cursor.execute(sql)
if int(limit_num) > 0:
rows = cursor.fetchmany(size=int(limit_num))
else:
rows = cursor.fetchall()
fields = cursor.description
column_list = []
if fields:
for i in fields:
column_list.append(i[0])
result = {}
result['column_list'] = column_list
result['rows'] = rows
result['effect_row'] = effect_row
except MySQLdb.Warning as w:
print(str(w))
result['Warning'] = str(w)
except MySQLdb.Error as e:
print(str(e))
result['Error'] = str(e)
finally:
if cursor is not None:
cursor.close()
if conn is not None:
try:
conn.rollback()
conn.close()
except:
conn.close()
return result
# 连进指定的mysql实例里,执行sql并返回
def mysql_execute(self, masterHost, masterPort, masterUser, masterPassword, db_name, sql):
result = {}
conn = None
cursor = None
try:
conn = MySQLdb.connect(host=masterHost, port=masterPort, user=masterUser, passwd=masterPassword, db=db_name,
charset='utf8mb4', max_allowed_packet=1073741824)
cursor = conn.cursor()
effect_row = cursor.execute(sql)
# result = {}
# result['effect_row'] = effect_row
conn.commit()
except MySQLdb.Warning as w:
print(str(w))
result['Warning'] = str(w)
except MySQLdb.Error as e:
print(str(e))
result['Error'] = str(e)
finally:
if result.get('Error') or result.get('Warning'):
conn.close()
elif cursor is not None:
cursor.close()
conn.close()
return result
def getWorkChartsByMonth(self):
cursor = connection.cursor()
sql = "select date_format(create_time, '%%m-%%d'),count(*) from sql_workflow where create_time>=date_add(now(),interval -%s day) group by date_format(create_time, '%%m-%%d') order by 1 asc;" % (
Dao._CHART_DAYS)
cursor.execute(sql)
result = cursor.fetchall()
return result
def getWorkChartsByPerson(self):
cursor = connection.cursor()
sql = "select engineer, count(*) as cnt from sql_workflow where create_time>=date_add(now(),interval -%s day) group by engineer order by cnt desc limit 50;" % (
Dao._CHART_DAYS)
cursor.execute(sql)
result = cursor.fetchall()
return result