-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathparquet_utils.py
105 lines (93 loc) · 3.28 KB
/
parquet_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
"""
Parquet parser utils
"""
from collections import OrderedDict
from itertools import chain
import pyarrow
from pyarrow import DataType, ListType, StructType, TimestampType
def union_or_scalar(iterable):
"""
Create `Union[str,int]` or `str` depending on length of input
:param iterable: Tuple or List (or similar) with size known ahead of time containing strings
:type iterable: ```Iterable[str]```
:return: `Union[str,int]` or `str` depending on length of input
:rtype: ```str```
"""
return iterable[0] if len(iterable) == 1 else "Union[{}]".format(",".join(iterable))
def parquet_type_to_param(field):
"""
Convert Parquet type to param
:param field: PyArrow field
:type field: ```pyarrow.lib.Field```
:return: Union[{"typ": <parsed_type_as_str>}|intermediate_repr]
:rtype: ```dict```
"""
field_type = field.type
if isinstance(field_type, TimestampType):
return {
"typ": "datetime",
"x_typ": {
"sql": dict(
type="TIMESTAMP",
type_extra={"unit": field_type.unit},
**{"type_kwargs": {"timezone": True}}
if bool(getattr(field_type, "unit", False))
else {}
)
},
}
elif isinstance(field_type, StructType):
return {
"typ": "dict",
"ir": {
"name": field.name,
"params": OrderedDict(
map(
lambda flattened: (
flattened.name,
parquet_type_to_param(flattened),
),
field.flatten(),
)
),
"returns": None,
},
}
elif isinstance(field_type, ListType):
reduction = field_type.__reduce__()
assert len(reduction) > 1
assert reduction[0] == pyarrow.lib.list_
return {
"typ": (lambda iterable: "List[{}]".format(",".join(iterable)))(
tuple(
map(
lambda flattened: str(flattened.type),
chain.from_iterable(reduction[1:]),
)
)
)
}
elif isinstance(field_type, DataType):
param = {
"typ": union_or_scalar(
tuple(map(lambda flattened: str(flattened.type), field.flatten()))
)
}
if param["typ"] == "int16":
param.update({"typ": "int", "x_typ": {"sql": {"type": "SmallInteger"}}})
elif param["typ"] == "int64":
param.update({"typ": "int", "x_typ": {"sql": {"type": "BigInteger"}}})
elif param["typ"] == "double":
param.update({"typ": "float", "x_typ": {"sql": {"type": "Float"}}})
elif param["typ"] == "string":
param.update({"typ": "str"})
elif param["typ"] == "bool":
param.update({"typ": "bool"})
elif param["typ"] == "binary":
param.update({"x_typ": {"sql": {"type": "LargeBinary"}}})
else:
raise NotImplementedError(param["typ"])
return param
else:
raise NotImplementedError(field_type)
__all__ = ["parquet_type_to_param"]