Skip to content

Commit

Permalink
fix: 修复屏幕答应bug,
Browse files Browse the repository at this point in the history
add: metaj参数构造任意结构数据
  • Loading branch information
ligangc committed May 1, 2020
1 parent eed3aeb commit e6ce618
Show file tree
Hide file tree
Showing 22 changed files with 358 additions and 81 deletions.
9 changes: 9 additions & 0 deletions datafaker/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
import traceback

from datafaker.constant import __version__, DEFAULT_INTERVAL, DEFAULT_FORMAT, DEFAULT_LOCALE, BATCH_SIZE, WORKERS
from datafaker.constant import JSON_FORMAT

# solve problem of oracle database
# UnicodeEncodeError: 'ascii' codec can't encode characters in position 32-3
from datafaker.exceptions import ParamError

os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'


Expand All @@ -26,6 +29,7 @@ def parse_args():
parser.add_argument('num', nargs='?', action='store', type=int, help='number of records to generate')
parser.add_argument('--auth', nargs='?', action='store', help='user and password')
parser.add_argument('--meta', nargs='?', action='store', help='meta file path')
parser.add_argument('--metaj', nargs='?', action='store', help='metaj file path')
parser.add_argument('--interval', action='store', type=float, help='the interval to make stream data')
parser.add_argument('--batch', action='store', type=int, default=BATCH_SIZE, help='the interval to make stream data')
parser.add_argument('--workers', action='store', type=int, default=WORKERS, help='the interval to make stream data')
Expand Down Expand Up @@ -57,6 +61,11 @@ def parse_args():
parser.print_help()
exit(0)

if args.format == JSON_FORMAT and any([args.outprint, args.dbtype in ['file', 'kafka']]):
raise ParamError('rdb not support for json format')
if args.metaj and args.dbtype not in ['file', 'kafka']:
raise ParamError('rdb not support for metaj')

return args


Expand Down
2 changes: 1 addition & 1 deletion datafaker/constant.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-

__version__ = '0.7.1'
__version__ = '0.7.2'

# batch size for inserting records
BATCH_SIZE = 1000
Expand Down
102 changes: 84 additions & 18 deletions datafaker/dbs/basedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from datafaker.constant import INT_TYPES, FLOAT_TYPES, ENUM_FILE, JSON_FORMAT, MAX_QUEUE_SIZE, MIN_RECORDS_FOR_PARALLEL
from datafaker.exceptions import EnumMustNotEmptyError, ParseSchemaError
from datafaker.fakedata import FackData
from datafaker.reg import reg_keyword, reg_cmd, reg_args
from datafaker.utils import count_time, read_file_lines, json_item, process_op_args
from datafaker.reg import reg_keyword, reg_cmd, reg_args, reg_all_keywords, reg_replace_keywords
from datafaker.utils import count_time, read_file_lines, json_item, process_op_args, read_file


class BaseDB(object):
Expand All @@ -21,7 +21,8 @@ class BaseDB(object):
def __init__(self, args):
self.args = args
self.schema = self.parse_schema()
self.column_names = [item['name'] for item in self.schema]
if self.args.metaj is None:
self.column_names = [item['name'] for item in self.schema]
self.fakedata = FackData(self.args.locale)

self.queue = compat.Queue(maxsize=MAX_QUEUE_SIZE)
Expand All @@ -30,6 +31,11 @@ def __init__(self, args):
self.cur_num = compat.Value('L', 0)
self.lock = compat.Lock()

# 处理自定义格式,可以用来json嵌套
if self.args.metaj:
meta = read_file(self.args.metaj)
self.metaj_content = reg_replace_keywords(meta)

# 调用子类初始化函数
self.init()

Expand All @@ -45,6 +51,14 @@ def get_cur_num(self):
self.cur_num.value += 1
return self.cur_num.value-1

def format_data(self, columns):
data = columns
if self.args.metaj:
data = self.metaj_content % tuple(columns)
elif self.args.format == JSON_FORMAT:
data = json_item(self.column_names, columns)
return data

def fake_data(self):
"""
sleep是为了防止产生数据后消费数据过慢
Expand All @@ -53,8 +67,6 @@ def fake_data(self):

while self.get_cur_num() < self.args.num:
columns = self.fake_column(self.cur_num.value)
if self.args.format == JSON_FORMAT:
columns = json_item(self.column_names, columns)
self.queue.put(columns)

sleep(0.1)
Expand Down Expand Up @@ -120,17 +132,21 @@ def print_data(self):
while not self.isover.value or not self.queue.empty():
try:
data = self.queue.get_nowait()
items = []
for item in data:
if item is None:
items.append('None')
elif isinstance(item, (int, float)):
items.append(str(item))
else:
items.append(safe_decode(item))
row = self.args.outspliter.join(items)
# 调用基类的方法
data = BaseDB.format_data(self, data)
if self.args.format == JSON_FORMAT or self.args.metaj:
row = data
else:
items = []
for item in data:
if item is None:
items.append('None')
elif isinstance(item, (int, float)):
items.append(str(item))
else:
items.append(safe_decode(item))
row = self.args.outspliter.join(items)
print(row)
# print(data)
if self.args.interval:
time.sleep(self.args.interval)
except queue.Empty:
Expand All @@ -141,6 +157,8 @@ def print_data(self):
def parse_schema(self):
if self.args.meta:
schema = self.parse_meta_schema()
elif self.args.metaj:
schema = self.parse_metaj_schema()
else:
schema = self.parse_self_schema()
return schema
Expand All @@ -153,8 +171,18 @@ def parse_meta_schema(self):
rows = self.construct_meta_rows()
return self.parse_schema_from_rows(rows)

def parse_metaj_schema(self):
meta = read_file(self.args.metaj)
return self.parse_schema_from_text(meta)

def parse_schema_from_rows(self, rows):
shema = []
"""
解析meta file
:param rows: 行信息
:return: schema: 命令关键字,数据类型,命令参数
"""
schema = []
column_names = []
for row in rows:
item = {'name': row[0], 'type': row[1], 'comment': row[-1]}
Expand All @@ -166,6 +194,7 @@ def parse_schema_from_rows(self, rows):
keyword = reg_keyword(item['comment'])

ctype = reg_cmd(item['type'])
# 如果没有找到标记,则使用第二列数据类型
if not keyword:
keyword = item['type']

Expand Down Expand Up @@ -201,9 +230,46 @@ def parse_schema_from_rows(self, rows):
item['cmd'] = cmd
item['ctype'] = ctype
item['args'] = args
shema.append(item)
schema.append(item)

return schema

def parse_schema_from_text(self, text):

keywords = reg_all_keywords(text)
schema = []

for keyword in keywords:

cmd = reg_cmd(keyword)
rets = reg_args(keyword)
if cmd == 'enum' or cmd == 'order_enum':
if len(rets) == 0:
raise EnumMustNotEmptyError

# 如果enum类型只有一个值,则产生固定值
# 如果enum类型只有一个值,且以file://开头,则读取文件
if len(rets) == 1 and rets[0].startswith(ENUM_FILE):
args = read_file_lines(rets[0][len(ENUM_FILE):])
else:
# 枚举全部当做字符类型
args = rets

elif cmd in INT_TYPES:
args = [int(ret) for ret in rets]
args.append(True) if 'unsigned' in keyword else args.append(False)
elif cmd == 'op':
args = [process_op_args(rets[0], 'columns'), ]
else:
try:
args = [int(ret) for ret in rets]
except:
args = rets

item = {'cmd': cmd, 'args': args}
schema.append(item)
return schema

return shema

def construct_meta_rows(self):
"""
Expand Down
16 changes: 15 additions & 1 deletion datafaker/dbs/esdb.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import json

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

from datafaker.dbs.basedb import BaseDB
from datafaker.exceptions import ParamError
from datafaker.utils import json_item


class EsDB(BaseDB):
Expand All @@ -23,7 +26,8 @@ def save_data(self, lines):
actions = []

for line in lines:
source = dict(zip(self.column_names, line))

source = self.format_data(line)
action = {
"_index": self.index,
"_type": self.type,
Expand All @@ -33,5 +37,15 @@ def save_data(self, lines):

success, _ = bulk(self.es, actions, index=self.args.table, raise_on_error=True)

def format_data(self, columns):
if self.args.metaj:
data = self.metaj_content % tuple(columns)
source = json.loads(data)
else:
source = dict(zip(self.column_names, columns))
return source





19 changes: 12 additions & 7 deletions datafaker/dbs/filedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,21 @@ def construct_self_rows(self):
return []

def save_data(self, lines):
spliter = self.args.outspliter if self.args.outspliter else ','
filepath = os.path.join(self.args.connect, self.args.table)

items = []
if self.args.format == TEXT_FORMAT:
for line in lines:
item = self.format_data(line)
items.append(item+os.linesep)

for item in lines:
line = spliter.join([str(safe_encode(word)) for word in item]) + "\n"
items.append(line)
else:
items = [line+"\n" for line in lines]
save2file(items, filepath)

def format_data(self, columns):
data = columns
if self.args.metaj:
data = self.metaj_content % tuple(columns)
elif self.args.format == JSON_FORMAT:
data = json_item(self.column_names, columns)
elif self.args.format == TEXT_FORMAT:
data = self.args.outspliter.join([str(safe_encode(word)) for word in columns])
return data
6 changes: 3 additions & 3 deletions datafaker/dbs/hbasedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ def construct_self_rows(self):
raise ParamError('hbase must set meta parameter')

def save_data(self, lines):
if self.args.metaj:
raise ParamError('hbase not support metaj')

with self.table.batch(batch_size=self.args.batch) as bt:
args = reg_args(self.column_names[0])
args = [int(arg) for arg in args]
Expand All @@ -33,6 +36,3 @@ def save_data(self, lines):
value = dict(zip(self.column_names[1:], line[1:]))
# this put() will result in two mutations (two cells)
bt.put(rowkey, value)



22 changes: 19 additions & 3 deletions datafaker/dbs/kafkadb.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,22 @@ def construct_self_rows(self):
def save_data(self, content):
self.producer.send(self.args.table, bytes(content.encode('utf-8')))


# def save_data(self, lines):
# for line in lines:
# content = self.format_data(line)
# self.producer.send(self.args.table, bytes(content.encode('utf-8')))
# if self.args.interval:
# time.sleep(self.args.interval)

@count_time
def do_fake(self):
i = 0
try:
while True:
while i < self.args.num:
i += 1
lines = self.fake_column(i)
content = json_item(self.column_names, lines)
columns = self.fake_column(i)
content = self.format_data(columns)
if self.args.outprint:
print(content)
self.save_data(content)
Expand All @@ -35,3 +43,11 @@ def do_fake(self):
except KeyboardInterrupt:
print("generated records : %d" % i)
print("insert records : %d" % i)

def format_data(self, columns):
if self.args.metaj:
data = self.metaj_content % tuple(columns)
else:
data = json_item(self.column_names, columns)

return data
4 changes: 0 additions & 4 deletions datafaker/dbs/rdbdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ class RdbDB(BaseDB):
def init(self):
self.session = load_sqlalchemy(self.args.connect)

def __del__(self):
if self.session:
self.session.close()

def save_data(self, lines):

names = [column['name'] for column in self.schema]
Expand Down
Loading

0 comments on commit e6ce618

Please sign in to comment.