Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
hhyo committed Nov 27, 2018
1 parent 161b12f commit cea4ecc
Showing 1 changed file with 108 additions and 90 deletions.
198 changes: 108 additions & 90 deletions sql/data_masking.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,19 @@ def data_masking(self, cluster_name, db_name, sql, sql_result):

if print_info is None:
result['status'] = 1
result['msg'] = 'inception返回的结果集为空!可能是SQL语句有语法错误'
result['msg'] = 'inception返回的结果集为空!可能是SQL语句有语法错误,无法完成脱敏校验,如果需要继续查询请关闭校验'
elif print_info['errlevel'] != 0:
result['status'] = 2
result['msg'] = 'inception返回异常:\n' + print_info['errmsg']
result['msg'] = 'inception返回异常,无法完成脱敏校验,如果需要继续查询请关闭校验\n' + print_info['errmsg']
else:
query_tree = print_info['query_tree']
# 获取集群所属环境,获取命中脱敏规则的列数据
# 获取命中脱敏规则的列数据
try:
table_hit_columns, hit_columns = self.analy_query_tree(query_tree, cluster_name)
except Exception as msg:
result['status'] = 2
result['msg'] = 'inception语法树解析表信息出错:{}\nquery_tree:{}'.format(str(msg), print_info)
result['msg'] = '解析inception语法树获取表信息出错,无法完成脱敏校验,如果需要继续查询请关闭校验:{}\nquery_tree:{}'.format(str(msg),
print_info)
return result

# 存在select * 的查询,遍历column_list,获取命中列的index,添加到hit_columns
Expand Down Expand Up @@ -81,7 +82,8 @@ def query_tree(self, sqlContent, cluster_name, dbName):
errlevel = 2
errmsg = 'Global environment: ' + query_tree
if errlevel == 0:
print(json.dumps(json.loads(query_tree), indent=4, sort_keys=False, ensure_ascii=False))
pass
# print(json.dumps(json.loads(query_tree), indent=4, sort_keys=False, ensure_ascii=False))
return {'id': id, 'statement': statement, 'errlevel': errlevel, 'query_tree': query_tree,
'errmsg': errmsg}
else:
Expand All @@ -99,10 +101,10 @@ def query_table_ref(self, sqlContent, cluster_name, dbName):

if print_info is None:
result['status'] = 1
result['msg'] = 'inception返回的结果集为空!可能是SQL语句有语法错误'
result['msg'] = 'inception返回的结果集为空!可能是SQL语句有语法错误,无法校验表权限,如果需要继续查询请关闭校验'
elif print_info['errlevel'] != 0:
result['status'] = 2
result['msg'] = 'inception返回异常:\n' + print_info['errmsg']
result['msg'] = 'inception返回异常,无法校验表权限,如果需要继续查询请关闭校验\n' + print_info['errmsg']
else:
try:
table_ref = json.loads(print_info['query_tree'])['table_ref']
Expand All @@ -118,7 +120,8 @@ def query_table_ref(self, sqlContent, cluster_name, dbName):
table_ref = json.loads(query_tree_str)['table_ref']
except Exception as msg:
result['status'] = 2
result['msg'] = 'inception语法树解析表信息出错:{}\nquery_tree:{}'.format(str(msg), print_info)
result['msg'] = '通过inception语法树解析表信息出错,无法校验表权限,如果需要继续查询请关闭校验:{}\nquery_tree:{}'.format(str(msg),
print_info)
table_ref = ''
result['data'] = table_ref
return result
Expand All @@ -140,97 +143,112 @@ def analy_query_tree(self, query_tree, cluster_name):
# 获取全部脱敏字段信息,减少循环查询,提升效率
DataMaskingColumnsOb = DataMaskingColumns.objects.all()

# 遍历select_list
columns = []
hit_columns = [] # 命中列
table_hit_columns = [] # 涉及表命中的列

# 判断是否存在不支持脱敏的语法
for select_item in select_list:
if select_item['type'] not in ('FIELD_ITEM', 'aggregate'):
raise Exception('不支持该查询语句脱敏!')
if select_item['type'] == 'aggregate':
if select_item['aggregate'].get('type') != 'FIELD_ITEM':
# 判断语句涉及的表是否存在脱敏字段配置
is_exist = False
for table in table_ref:
if DataMaskingColumnsOb.filter(cluster_name=cluster_name,
table_schema=table['db'],
table_name=table['table'],
active=1).exists():
is_exist = True
# 不存在脱敏字段则直接跳过规则解析
if is_exist:
# 遍历select_list
columns = []
hit_columns = [] # 命中列
table_hit_columns = [] # 涉及表命中的列,仅select *需要

# 判断是否存在不支持脱敏的语法
for select_item in select_list:
if select_item['type'] not in ('FIELD_ITEM', 'aggregate'):
raise Exception('不支持该查询语句脱敏!')
if select_item['type'] == 'aggregate':
if select_item['aggregate'].get('type') not in ('FIELD_ITEM', 'INT_ITEM'):
raise Exception('不支持该查询语句脱敏!')

# 获取select信息的规则,仅处理type为FIELD_ITEM和aggregate类型的select信息,如[*],[*,column_a],[column_a,*],[column_a,a.*,column_b],[a.*,column_a,b.*],
select_index = [
select_item['field'] if select_item['type'] == 'FIELD_ITEM' else select_item['aggregate'].get('field') for
select_item in select_list if select_item['type'] in ('FIELD_ITEM', 'aggregate')]

# 处理select_list,为统一的{'type': 'FIELD_ITEM', 'db': 'archer_master', 'table': 'sql_users', 'field': 'email'}格式
select_list = [select_item if select_item['type'] == 'FIELD_ITEM' else select_item['aggregate'] for
select_item in select_list if select_item['type'] in ('FIELD_ITEM', 'aggregate')]

if select_index:
# 如果发现存在field='*',则遍历所有表,找出所有的命中字段
if '*' in select_index:
# 涉及表命中的列
for table in table_ref:
hit_columns_info = self.hit_table(DataMaskingColumnsOb, cluster_name, table['db'],
table['table'])
table_hit_columns.extend(hit_columns_info)
# 几种不同查询格式
# [*]
if re.match(r"^(\*,?)+$", ','.join(select_index)):
hit_columns = []
# [*,column_a]
elif re.match(r"^(\*,)+(\w,?)+$", ','.join(select_index)):
# 找出field不为* 的列信息, 循环判断列是否命中脱敏规则,并增加规则类型和index,index采取后切片
for index, item in enumerate(select_list):
item['index'] = index - len(select_list)
if item.get('field') != '*':
columns.append(item)

# 获取select信息的规则,仅处理type为FIELD_ITEM和aggregate类型的select信息,如[*],[*,column_a],[column_a,*],[column_a,a.*,column_b],[a.*,column_a,b.*],
select_index = [
select_item['field'] if select_item['type'] == 'FIELD_ITEM' else select_item['aggregate'].get('field') for
select_item in select_list if select_item['type'] in ('FIELD_ITEM', 'aggregate')]

# 处理select_list,为统一的{'type': 'FIELD_ITEM', 'db': 'archer_master', 'table': 'sql_users', 'field': 'email'}格式
select_list = [select_item if select_item['type'] == 'FIELD_ITEM' else select_item['aggregate'] for
select_item in select_list if select_item['type'] in ('FIELD_ITEM', 'aggregate')]

if select_index:
# 如果发现存在field='*',则遍历所有表,找出所有的命中字段
if '*' in select_index:
for table in table_ref:
hit_columns_info = self.hit_table(DataMaskingColumnsOb, cluster_name, table['db'],
table['table'])
table_hit_columns.extend(hit_columns_info)
# [*]
if re.match(r"^(\*,?)+$", ','.join(select_index)):
hit_columns = []
# [*,column_a]
elif re.match(r"^(\*,)+(\w,?)+$", ','.join(select_index)):
# 找出field不为* 的列信息, 循环判断列是否命中脱敏规则,并增加规则类型和index,index采取后切片
for index, item in enumerate(select_list):
item['index'] = index - len(select_list)
if item.get('field') != '*':
columns.append(item)
# [column_a, *]
elif re.match(r"^(\w,?)+(\*,?)+$", ','.join(select_index)):
# 找出field不为* 的列信息, 循环判断列是否命中脱敏规则,并增加规则类型和index,index采取前切片
for index, item in enumerate(select_list):
item['index'] = index
if item.get('field') != '*':
columns.append(item)

# [column_a, *]
elif re.match(r"^(\w,?)+(\*,?)+$", ','.join(select_index)):
# 找出field不为* 的列信息, 循环判断列是否命中脱敏规则,并增加规则类型和index,index采取前切片
for index, item in enumerate(select_list):
item['index'] = index
if item.get('field') != '*':
columns.append(item)
# [column_a,a.*,column_b]
elif re.match(r"^(\w,?)+(\*,?)+(\w,?)+$", ','.join(select_index)):
# 找出field不为* 的列信息, 循环判断列是否命中脱敏规则,并增加规则类型和index,*前面的字段index采取前切片,*后面的字段采取后切片
for index, item in enumerate(select_list):
item['index'] = index
if item.get('field') == '*':
first_idx = index
break

# [column_a,a.*,column_b]
elif re.match(r"^(\w,?)+(\*,?)+(\w,?)+$", ','.join(select_index)):
# 找出field不为* 的列信息, 循环判断列是否命中脱敏规则,并增加规则类型和index,*前面的字段index采取前切片,*后面的字段采取后切片
for index, item in enumerate(select_list):
item['index'] = index
if item.get('field') == '*':
first_idx = index
break
select_list.reverse()
for index, item in enumerate(select_list):
item['index'] = index
if item.get('field') == '*':
last_idx = len(select_list) - index - 1
break

select_list.reverse()
for index, item in enumerate(select_list):
item['index'] = index
if item.get('field') == '*':
last_idx = len(select_list) - index - 1
break
select_list.reverse()
for index, item in enumerate(select_list):
if item.get('field') != '*' and index < first_idx:
item['index'] = index

select_list.reverse()
for index, item in enumerate(select_list):
if item.get('field') != '*' and index < first_idx:
item['index'] = index
if item.get('field') != '*' and index > last_idx:
item['index'] = index - len(select_list)
columns.append(item)

if item.get('field') != '*' and index > last_idx:
item['index'] = index - len(select_list)
columns.append(item)
# [a.*, column_a, b.*]
else:
raise Exception('不支持select信息为[a.*, column_a, b.*]格式的查询脱敏!')

# [a.*, column_a, b.*]
# 没有*的查询,直接遍历查询命中字段,query_tree的列index就是查询语句列的index
else:
raise Exception('不支持select信息为[a.*, column_a, b.*]格式的查询脱敏!')

# 没有*的查询,直接遍历查询命中字段,query_tree的列index就是查询语句列的index
else:
for index, item in enumerate(select_list):
item['index'] = index
if item.get('field') != '*':
columns.append(item)

# 格式化命中的列信息
for column in columns:
hit_info = self.hit_column(DataMaskingColumnsOb, cluster_name, column.get('db'), column.get('table'),
column.get('field'))
if hit_info['is_hit']:
hit_info['index'] = column['index']
hit_columns.append(hit_info)
for index, item in enumerate(select_list):
item['index'] = index
if item.get('field') != '*':
columns.append(item)

# 格式化命中的列信息
for column in columns:
hit_info = self.hit_column(DataMaskingColumnsOb, cluster_name, column.get('db'), column.get('table'),
column.get('field'))
if hit_info['is_hit']:
hit_info['index'] = column['index']
hit_columns.append(hit_info)
else:
table_hit_columns = None
hit_columns = None
return table_hit_columns, hit_columns

# 判断字段是否命中脱敏规则,如果命中则返回脱敏的规则id和规则类型
Expand Down

0 comments on commit cea4ecc

Please sign in to comment.