Skip to content

Commit

Permalink
Merge pull request jly8866#89 from hhyo/github
Browse files Browse the repository at this point in the history
支持部分函数脱敏,对于不支持脱敏的语法直接抛出异常
  • Loading branch information
Mr.July authored Aug 1, 2018
2 parents 2ebf3d1 + 843fb61 commit 1d0925f
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 157 deletions.
109 changes: 47 additions & 62 deletions sql/data_masking.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,19 @@ def analy_query_tree(self, query_tree, cluster_name):
hit_columns = [] # 命中列
table_hit_columns = [] # 涉及表命中的列

# 获取select信息的规则,仅处理type为FIELD_ITEM的select信息,如[*],[*,column_a],[column_a,*],[column_a,a.*,column_b],[a.*,column_a,b.*],
select_index = [select_item['field'] for select_item in select_list if
select_item['type'] == 'FIELD_ITEM']
# 判断是否存在不支持脱敏的语法
for select_item in select_list:
if select_item['type'] not in ('FIELD_ITEM', 'aggregate'):
raise Exception('不支持该查询语句脱敏!')

# 获取select信息的规则,仅处理type为FIELD_ITEM和aggregate类型的select信息,如[*],[*,column_a],[column_a,*],[column_a,a.*,column_b],[a.*,column_a,b.*],
select_index = [
select_item['field'] if select_item['type'] == 'FIELD_ITEM' else select_item['aggregate']['field'] for
select_item in select_list if select_item['type'] in ('FIELD_ITEM', 'aggregate')]

# 处理select_list,为统一的{'type': 'FIELD_ITEM', 'db': 'archer_master', 'table': 'sql_users', 'field': 'email'}格式
select_list = [select_item if select_item['type'] == 'FIELD_ITEM' else select_item['aggregate'] for
select_item in select_list if select_item['type'] in ('FIELD_ITEM', 'aggregate')]

if select_index:
# 如果发现存在field='*',则遍历所有表,找出所有的命中字段
Expand All @@ -163,86 +173,61 @@ def analy_query_tree(self, query_tree, cluster_name):
elif re.match(r"^(\*,)+(\w,?)+$", ','.join(select_index)):
# 找出field不为* 的列信息, 循环判断列是否命中脱敏规则,并增加规则类型和index,index采取后切片
for index, item in enumerate(select_list):
if item['type'] == 'FIELD_ITEM':
item['index'] = index - len(select_list)
if item['field'] != '*':
columns.append(item)

for column in columns:
hit_info = self.hit_column(DataMaskingColumnsOb, cluster_name, column['db'],
column['table'], column['field'])
if hit_info['is_hit']:
hit_info['index'] = column['index']
hit_columns.append(hit_info)
item['index'] = index - len(select_list)
if item['field'] != '*':
columns.append(item)

# [column_a, *]
elif re.match(r"^(\w,?)+(\*,?)+$", ','.join(select_index)):
# 找出field不为* 的列信息, 循环判断列是否命中脱敏规则,并增加规则类型和index,index采取前切片
for index, item in enumerate(select_list):
if item['type'] == 'FIELD_ITEM':
item['index'] = index
if item['field'] != '*':
columns.append(item)

for column in columns:
hit_info = self.hit_column(DataMaskingColumnsOb, cluster_name, column['db'],
column['table'], column['field'])
if hit_info['is_hit']:
hit_info['index'] = column['index']
hit_columns.append(hit_info)
item['index'] = index
if item['field'] != '*':
columns.append(item)

# [column_a,a.*,column_b]
elif re.match(r"^(\w,?)+(\*,?)+(\w,?)+$", ','.join(select_index)):
# 找出field不为* 的列信息, 循环判断列是否命中脱敏规则,并增加规则类型和index,*前面的字段index采取前切片,*后面的字段采取后切片
for index, item in enumerate(select_list):
if item['type'] == 'FIELD_ITEM':
item['index'] = index
if item['field'] == '*':
first_idx = index
break
item['index'] = index
if item['field'] == '*':
first_idx = index
break

select_list.reverse()
for index, item in enumerate(select_list):
if item['type'] == 'FIELD_ITEM':
item['index'] = index
if item['field'] == '*':
last_idx = len(select_list) - index - 1
break
item['index'] = index
if item['field'] == '*':
last_idx = len(select_list) - index - 1
break

select_list.reverse()
for index, item in enumerate(select_list):
if item['type'] == 'FIELD_ITEM':
if item['field'] != '*' and index < first_idx:
item['index'] = index
columns.append(item)

if item['field'] != '*' and index > last_idx:
item['index'] = index - len(select_list)
columns.append(item)

for column in columns:
hit_info = self.hit_column(DataMaskingColumnsOb, cluster_name, column['db'],
column['table'], column['field'])
if hit_info['is_hit']:
hit_info['index'] = column['index']
hit_columns.append(hit_info)
if item['field'] != '*' and index < first_idx:
item['index'] = index

if item['field'] != '*' and index > last_idx:
item['index'] = index - len(select_list)
columns.append(item)

# [a.*, column_a, b.*]
else:
hit_columns = []
return table_hit_columns, hit_columns
raise Exception('不支持select信息为[a.*, column_a, b.*]格式的查询脱敏!')

# 没有*的查询,直接遍历查询命中字段,query_tree的列index就是查询语句列的index
else:
for index, item in enumerate(select_list):
if item['type'] == 'FIELD_ITEM':
item['index'] = index
if item['field'] != '*':
columns.append(item)
item['index'] = index
if item['field'] != '*':
columns.append(item)

for column in columns:
hit_info = self.hit_column(DataMaskingColumnsOb, cluster_name, column['db'], column['table'],
column['field'])
if hit_info['is_hit']:
hit_info['index'] = column['index']
hit_columns.append(hit_info)
# 格式化命中的列信息
for column in columns:
hit_info = self.hit_column(DataMaskingColumnsOb, cluster_name, column['db'], column['table'],
column['field'])
if hit_info['is_hit']:
hit_info['index'] = column['index']
hit_columns.append(hit_info)
return table_hit_columns, hit_columns

# 判断字段是否命中脱敏规则,如果命中则返回脱敏的规则id和规则类型
Expand Down
10 changes: 8 additions & 2 deletions sql/extend_json_encoder.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: UTF-8 -*-
import simplejson as json

from datetime import datetime, date
from datetime import datetime, date, timedelta
from decimal import Decimal
from functools import singledispatch

Expand Down Expand Up @@ -36,6 +36,10 @@ def _(o):
return o.strftime('%Y-%m-%d')


@convert.register(timedelta)
def _(o):
return o.total_seconds()

# @convert.register(Decimal)
# def _(o):
# return float(o)
Expand All @@ -59,7 +63,9 @@ def default(self, obj):
'dm': dm,
'dt': dt,
'dat': dat,
'tl': timedelta(minutes=30),
'bigint': 988983860501598208
}

#print(json.dumps(data, cls=ExtendJSONEncoder, bigint_as_string=True))
# print(json.dumps(data, cls=ExtendJSONEncoder, bigint_as_string=True))
# print(json.dumps(data, cls=ExtendJSONEncoder, bigint_as_string=True, default=str))
5 changes: 3 additions & 2 deletions sql/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ class Meta:

# SlowQuery
class SlowQuery(models.Model):
checksum = models.BigIntegerField(primary_key=True)
checksum = models.CharField(max_length=32, primary_key=True)
fingerprint = models.TextField()
sample = models.TextField()
first_seen = models.DateTimeField(blank=True, null=True)
Expand Down Expand Up @@ -408,6 +408,7 @@ class SlowQueryHistory(models.Model):
class Meta:
managed = False
db_table = 'mysql_slow_query_review_history'
unique_together = ('hostname_max', 'ts_min')
unique_together = ('checksum', 'ts_min', 'ts_max')
index_together = ('hostname_max', 'ts_min')
verbose_name = u'慢日志明细'
verbose_name_plural = u'慢日志明细'
4 changes: 2 additions & 2 deletions sql/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ def slowquery_review_history(request):
# 获取慢查明细数据
slowsql_record_obj = SlowQueryHistory.objects.filter(
hostname_max=(cluster_info.master_host + ':' + str(cluster_info.master_port)),
checksum=int(SQLId),
checksum=SQLId,
ts_min__range=(StartTime, EndTime)
).annotate(ExecutionStartTime=F('ts_min'), # 执行开始时间
DBName=F('db_max'), # 数据库名
Expand All @@ -962,7 +962,7 @@ def slowquery_review_history(request):

slowsql_obj_count = SlowQueryHistory.objects.filter(
hostname_max=(cluster_info.master_host + ':' + str(cluster_info.master_port)),
checksum=int(SQLId),
checksum=SQLId,
ts_min__range=(StartTime, EndTime)
).count()
else:
Expand Down
Loading

0 comments on commit 1d0925f

Please sign in to comment.