-
Notifications
You must be signed in to change notification settings - Fork 27
/
Copy pathfileStreams.py
88 lines (78 loc) · 2.28 KB
/
fileStreams.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import traceback
from typing import BinaryIO, Iterator
try:
import orjson as json
except ImportError:
import json
print("Recommended to install 'orjson' for faster JSON parsing")
import zstandard
try:
from zst_blocks_format.python_cli.ZstBlocksFile import ZstBlocksFile
except ImportError:
pass
def getZstFileJsonStream(f: BinaryIO, chunk_size=1024*1024*10) -> Iterator[dict]:
decompressor = zstandard.ZstdDecompressor(max_window_size=2**31)
currentString = ""
def yieldLinesJson():
nonlocal currentString
lines = currentString.split("\n")
currentString = lines[-1]
for line in lines[:-1]:
try:
yield json.loads(line)
except json.JSONDecodeError:
print("Error parsing line: " + line)
traceback.print_exc()
continue
zstReader = decompressor.stream_reader(f)
while True:
try:
chunk = zstReader.read(chunk_size)
except zstandard.ZstdError:
print("Error reading zst chunk")
traceback.print_exc()
break
if not chunk:
break
currentString += chunk.decode("utf-8", "replace")
yield from yieldLinesJson()
yield from yieldLinesJson()
if len(currentString) > 0:
try:
yield json.loads(currentString)
except json.JSONDecodeError:
print("Error parsing line: " + currentString)
print(traceback.format_exc())
pass
def getJsonLinesFileJsonStream(f: BinaryIO) -> Iterator[dict]:
for line in f:
line = line.decode("utf-8", errors="replace")
try:
yield json.loads(line)
except json.JSONDecodeError:
print("Error parsing line: " + line)
traceback.print_exc()
continue
def getZstBlocksFileJsonStream(f: BinaryIO) -> Iterator[dict]:
for row in ZstBlocksFile.streamRows(f):
line = row.decode("utf-8", errors="replace")
try:
yield json.loads(line)
except json.JSONDecodeError:
print("Error parsing line: " + line)
traceback.print_exc()
continue
def getJsonFileStream(f: BinaryIO) -> Iterator[dict]:
data = json.loads(f.read())
yield from data
def getFileJsonStream(path: str, f: BinaryIO) -> Iterator[dict]|None:
if path.endswith(".jsonl") or path.endswith(".ndjson"):
return getJsonLinesFileJsonStream(f)
elif path.endswith(".zst"):
return getZstFileJsonStream(f)
elif path.endswith(".zst_blocks"):
return getZstBlocksFileJsonStream(f)
elif path.endswith(".json"):
return getJsonFileStream(f)
else:
return None