forked from aaPanel/BaoTa
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatatool.py
165 lines (145 loc) · 5.57 KB
/
datatool.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# coding: utf-8
# -------------------------------------------------------------------
# 宝塔Linux面板
# -------------------------------------------------------------------
# Copyright (c) 2015-2017 宝塔软件(http:#bt.cn) All rights reserved.
# -------------------------------------------------------------------
# Author: [email protected]
# -------------------------------------------------------------------
# ------------------------------
# 数据库工具类
# ------------------------------
import sys, os
os.chdir("/www/server/panel")
if not 'class/' in sys.path:
sys.path.insert(0,'class/')
import panelMysql
import re,json,public
class datatools:
DB_MySQL = None
# 字节单位转换
def ToSize(self, size):
ds = ['b', 'KB', 'MB', 'GB', 'TB']
for d in ds:
if size < 1024: return ('%.2f' % size) + d
size = size / 1024
return '0b';
# 获取当前数据库信息
def GetdataInfo(self,get):
'''
传递一个数据库名称即可 get.databases
'''
if not self.DB_MySQL:self.DB_MySQL = panelMysql.panelMysql()
db_name=get.db_name
if not db_name:return False
ret = {}
tables = self.map_to_list(self.DB_MySQL.query('show tables from `%s`' % db_name))
if type(tables) == list:
try:
data = self.map_to_list(self.DB_MySQL.query("select sum(DATA_LENGTH)+sum(INDEX_LENGTH) from information_schema.tables where table_schema='%s'" % db_name))[0][0]
except:
data=0
if not data: data = 0
ret['data_size'] = self.ToSize(data)
ret['database'] = db_name
ret3 = []
for i in tables:
if i == 1049: return public.returnMsg(False,'指定数据库不存在!')
if type(i) == int: continue
table = self.map_to_list(self.DB_MySQL.query("show table status from `%s` where name = '%s'" % (db_name, i[0])))
if not table: continue
try:
ret2 = {}
ret2['type']=table[0][1]
data_size = table[0][6]
ret2['rows_count'] = self.DB_MySQL.query("select count(*) from `{}`.`{}`".format(db_name,i[0]))[0][0] #table[0][4] 实时获取行数 @authow hwliang<2021-08-05> 修改
ret2['collation'] = table[0][14]
ret2['data_size'] = self.ToSize(int(data_size))
ret2['table_name'] = i[0]
ret3.append(ret2)
except: continue
ret['tables'] = (ret3)
return ret
#修复表信息
def RepairTable(self,get):
'''
POST:
db_name=web
tables=['web1','web2']
'''
db_name = get.db_name
tables = json.loads(get.tables)
if not db_name or not tables: return False
if not self.DB_MySQL:self.DB_MySQL = panelMysql.panelMysql()
mysql_table = self.map_to_list(self.DB_MySQL.query('show tables from `%s`' % db_name))
ret=[]
if type(mysql_table)==list:
if len(mysql_table)>0:
for i in mysql_table:
for i2 in tables:
if i2==i[0]:
ret.append(i2)
if len(ret)>0:
for i in ret:
self.DB_MySQL.execute('REPAIR TABLE `%s`.`%s`'%(db_name,i))
return True
return False
#map to list
def map_to_list(self,map_obj):
try:
if type(map_obj) != list and type(map_obj) != str: map_obj = list(map_obj)
return map_obj
except: return []
# 优化表
def OptimizeTable(self,get):
'''
POST:
db_name=web
tables=['web1','web2']
'''
if not self.DB_MySQL:self.DB_MySQL = panelMysql.panelMysql()
db_name = get.db_name
tables = json.loads(get.tables)
if not db_name or not tables: return False
mysql_table = self.map_to_list(self.DB_MySQL.query('show tables from `%s`' % db_name))
ret=[]
if type(mysql_table) == list:
if len(mysql_table) > 0:
for i in mysql_table:
for i2 in tables:
if i2 == i[0]:
ret.append(i2)
if len(ret)>0:
for i in ret:
self.DB_MySQL.execute('OPTIMIZE table `%s`.`%s` ENGINE=MyISAM' % (db_name,i))
return True
return False
# 更改表引擎
def AlterTable(self,get):
'''
POST:
db_name=web
table_type=innodb
tables=['web1','web2']
'''
if not self.DB_MySQL:self.DB_MySQL = panelMysql.panelMysql()
db_name = get.db_name
table_type = get.table_type
tables = json.loads(get.tables)
if not db_name or not tables: return False
mysql_table = self.map_to_list(self.DB_MySQL.query('show tables from `%s`' % db_name))
ret=[]
if type(mysql_table)==list:
if len(mysql_table)>0:
for i in mysql_table:
for i2 in tables:
if i2==i[0]:
ret.append(i2)
if len(ret)>0:
for i in ret:
self.DB_MySQL.execute('alter table `%s`.`%s` ENGINE=`%s`' % (db_name,i,table_type))
return True
return False
#检查表
def CheckTable(self,database,tables,*args,**kwargs):
pass