-
Notifications
You must be signed in to change notification settings - Fork 66
/
Copy path__main__.py
155 lines (130 loc) · 5.74 KB
/
__main__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import argparse
import sys
import re
import json
from . import SchemaBuilder, __version__
class CLI:
def __init__(self, prog=None):
self._make_parser(prog)
self._prepare_args()
self.builder = SchemaBuilder(schema_uri=self.args.schema_uri)
def run(self):
if not self.args.schema and not self.args.object:
self.fail('noting to do - no schemas or objects given')
self.add_schemas()
self.add_objects()
self.print_output()
def add_schemas(self):
for fp in self.args.schema:
self._call_with_json_from_fp(self.builder.add_schema, fp)
fp.close()
def add_objects(self):
for fp in self.args.object:
self._call_with_json_from_fp(self.builder.add_object, fp)
fp.close()
def print_output(self):
print(self.builder.to_json(indent=self.args.indent))
def fail(self, message):
self.parser.error(message)
def _make_parser(self, prog=None):
file_type = argparse.FileType('r', encoding=self._get_encoding())
self.parser = argparse.ArgumentParser(
add_help=False,
prog=prog,
description="""Generate one, unified JSON Schema from one or more
JSON objects and/or JSON Schemas. Compatible with JSON-Schema Draft
4 and above.""")
self.parser.add_argument(
'-h', '--help', action='help', default=argparse.SUPPRESS,
help='Show this help message and exit.')
self.parser.add_argument(
'--version', action='version', default=argparse.SUPPRESS,
version='%(prog)s {}'.format(__version__),
help='Show version number and exit.')
self.parser.add_argument(
'-d', '--delimiter', metavar='DELIM',
help="""Set a delimiter. Use this option if the input files
contain multiple JSON objects/schemas. You can pass any string. A
few cases ('newline', 'tab', 'space') will get converted to a
whitespace character. If this option is omitted, the parser will
try to auto-detect boundaries.""")
self.parser.add_argument(
'-e', '--encoding', type=str, metavar='ENCODING',
help="""Use ENCODING instead of the default system encoding
when reading files. ENCODING must be a valid codec name or
alias.""")
self.parser.add_argument(
'-i', '--indent', type=int, metavar='SPACES',
help="""Pretty-print the output, indenting SPACES spaces.""")
self.parser.add_argument(
'-s', '--schema', action='append', default=[], type=file_type,
help="""File containing a JSON Schema (can be specified multiple
times to merge schemas).""")
self.parser.add_argument(
'-$', '--schema-uri', metavar='SCHEMA_URI', dest='schema_uri',
default=SchemaBuilder.DEFAULT_URI,
help="""The value of the '$schema' keyword (defaults to {default!r}
or can be specified in a schema with the -s option). If {null!r} is
passed, the "$schema" keyword will not be included in the
result.""".format(default=SchemaBuilder.DEFAULT_URI,
null=SchemaBuilder.NULL_URI))
self.parser.add_argument(
'object', nargs=argparse.REMAINDER, type=file_type,
help="""Files containing JSON objects (defaults to stdin if no
arguments are passed).""")
def _get_encoding(self):
"""
use separate arg parser to grab encoding argument before
defining FileType args
"""
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument('-e', '--encoding', type=str)
args, _ = parser.parse_known_args()
return args.encoding
def _prepare_args(self):
self.args = self.parser.parse_args()
self._prepare_delimiter()
# default to stdin if no objects or schemas
if not self.args.object and not sys.stdin.isatty():
self.args.object.append(sys.stdin)
def _prepare_delimiter(self):
"""
manage special conversions for difficult bash characters
"""
if self.args.delimiter == 'newline':
self.args.delimiter = '\n'
elif self.args.delimiter == 'tab':
self.args.delimiter = '\t'
elif self.args.delimiter == 'space':
self.args.delimiter = ' '
def _call_with_json_from_fp(self, method, fp):
for json_string in self._get_json_strings(fp.read().strip()):
try:
json_obj = json.loads(json_string)
except json.JSONDecodeError as err:
self.fail('invalid JSON in {}: {}'.format(fp.name, err))
method(json_obj)
def _get_json_strings(self, raw_text):
if self.args.delimiter is None or self.args.delimiter == '':
json_strings = self._detect_json_strings(raw_text)
else:
json_strings = raw_text.split(self.args.delimiter)
# sanitize data before returning
return [string.strip() for string in json_strings if string.strip()]
@staticmethod
def _detect_json_strings(raw_text):
"""
Use regex with lookaround to spot the boundaries between JSON
objects. Unfortunately, it has to match *something*, so at least
one character must be removed and replaced.
"""
strings = re.split(r'}\s*(?={)', raw_text)
# put back the stripped character
json_strings = [string + '}' for string in strings[:-1]]
# the last one doesn't need to be modified
json_strings.append(strings[-1])
return json_strings
def main():
CLI().run()
if __name__ == "__main__":
CLI('genson').run()