Skip to content

Commit

Permalink
Create get headers function in numpy and parquet readers
Browse files Browse the repository at this point in the history
  • Loading branch information
rom1504 committed May 21, 2022
1 parent 0c24c7f commit 031575c
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 56 deletions.
45 changes: 27 additions & 18 deletions embedding_reader/numpy_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,31 +33,40 @@ def read_numpy_header(f):
return (shape[0], shape[1], dtype, end, byte_per_item)


def file_to_header(filename, fs):
try:
with fs.open(filename, "rb") as f:
return (None, [filename, *read_numpy_header(f)])
except Exception as e: # pylint: disable=broad-except
return e, (filename, None)


def get_numpy_headers(embeddings_file_paths, fs):
"""get numpy headers"""
headers = []
count_before = 0
with ThreadPool(128) as p:
for err, c in tqdm(
p.imap(lambda f: file_to_header(f, fs), embeddings_file_paths), total=len(embeddings_file_paths)
):
if err is not None:
raise Exception(f"failed reading file {c[0]}") from err
if c[1] == 0:
continue
headers.append([*c[0:2], count_before, *c[2:]])
count_before += c[1]

return headers


class NumpyReader:
"""Numpy reader class, implements init to read the files headers and call to procuce embeddings batches"""

def __init__(self, embeddings_folder):
self.embeddings_folder = embeddings_folder
self.fs, embeddings_file_paths = get_file_list(embeddings_folder, "npy")

def file_to_header(filename):
try:
with self.fs.open(filename, "rb") as f:
return (None, [filename, *read_numpy_header(f)])
except Exception as e: # pylint: disable=broad-except
return e, (filename, None)

headers = []
count_before = 0
with ThreadPool(10) as p:
for err, c in tqdm(p.imap(file_to_header, embeddings_file_paths), total=len(embeddings_file_paths)):
if err is not None:
raise Exception(f"failed reading file {c[0]}") from err
if c[1] == 0:
continue
headers.append([*c[0:2], count_before, *c[2:]])
count_before += c[1]

headers = get_numpy_headers(embeddings_file_paths, self.fs)
self.headers = pd.DataFrame(
headers,
columns=["filename", "count", "count_before", "dimension", "dtype", "header_offset", "byte_per_item"],
Expand Down
26 changes: 7 additions & 19 deletions embedding_reader/parquet_numpy_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import math
from collections import namedtuple
from embedding_reader.get_file_list import get_file_list
from embedding_reader.numpy_reader import read_numpy_header
from embedding_reader.numpy_reader import get_numpy_headers
from embedding_reader.piece_builder import build_pieces, PIECES_BASE_COLUMNS
from threading import Semaphore
import pyarrow.parquet as pq
Expand All @@ -25,23 +25,7 @@ def __init__(self, embeddings_folder, metadata_folder, metadata_column_names):
self.metadata_fs, metadata_file_paths = get_file_list(metadata_folder, "parquet")
self.metadata_column_names = metadata_column_names

def file_to_header(filename):
try:
with self.fs.open(filename, "rb") as f:
return (None, [filename, *read_numpy_header(f)])
except Exception as e: # pylint: disable=broad-except
return e, (filename, None)

headers = []
count_before = 0
with ThreadPool(10) as p:
for err, c in tqdm(p.imap(file_to_header, embeddings_file_paths), total=len(embeddings_file_paths)):
if err is not None:
raise Exception(f"failed reading file {c[0]}") from err
if c[1] == 0:
continue
headers.append([*c[0:2], count_before, *c[2:]])
count_before += c[1]
headers = get_numpy_headers(embeddings_file_paths, self.fs)

# add metadata path to headers by zipping
headers = [[*h, m] for h, m in zip(headers, metadata_file_paths)]
Expand Down Expand Up @@ -137,13 +121,17 @@ def piece_generator(pieces, open_parquet_files):
semaphore.acquire() # pylint: disable=consider-using-with
if piece.metadata_path not in open_parquet_files:
file = self.metadata_fs.open(piece.metadata_path, "rb")
for _ in range(5):
for i in range(5):
try:
table = pq.read_table(file, use_threads=True)
break
except Exception as e: # pylint: disable=broad-except
print("Fail to read " + piece.metadata_path)
print(e)
if i != 4:
print(f"Retry number {i+1}/5...")
else:
raise e

open_parquet_files[piece.metadata_path] = {"file": file, "table": table}
if current_parquet_file != piece.metadata_path:
Expand Down
47 changes: 28 additions & 19 deletions embedding_reader/parquet_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,33 @@
import math


def file_to_header(filename, fs):
try:
with fs.open(filename, "rb") as f:
parquet_file = pq.ParquetFile(f, memory_map=True)
return (None, [filename, parquet_file.metadata.num_rows])
except Exception as e: # pylint: disable=broad-except
return e, (filename, None)


def get_parquet_headers(fs, embeddings_file_paths):
"""get parquet headers"""
headers = []
count_before = 0
with ThreadPool(10) as p:
for err, c in tqdm(
p.imap(lambda fp: file_to_header(fp, fs), embeddings_file_paths), total=len(embeddings_file_paths)
):
if err is not None:
raise Exception(f"failed reading file {c[0]}") from err
if c[1] == 0:
continue
headers.append([*c, count_before])
count_before += c[1]

return headers


class ParquetReader:
"""Parquet reader class, implements init to read the files headers and call to produce embeddings batches"""

Expand All @@ -34,25 +61,7 @@ def __init__(self, embeddings_folder, embedding_column_name, metadata_column_nam
except StopIteration:
continue

def file_to_header(filename):
try:
with self.fs.open(filename, "rb") as f:
parquet_file = pq.ParquetFile(f, memory_map=True)
return (None, [filename, parquet_file.metadata.num_rows])
except Exception as e: # pylint: disable=broad-except
return e, (filename, None)

headers = []
count_before = 0
with ThreadPool(10) as p:
for err, c in tqdm(p.imap(file_to_header, embeddings_file_paths), total=len(embeddings_file_paths)):
if err is not None:
raise Exception(f"failed reading file {c[0]}") from err
if c[1] == 0:
continue
headers.append([*c, count_before])
count_before += c[1]

headers = get_parquet_headers(self.fs, embeddings_file_paths)
self.headers = pd.DataFrame(headers, columns=["filename", "count", "count_before"])
self.count = self.headers["count"].sum()
if self.count == 0:
Expand Down

0 comments on commit 031575c

Please sign in to comment.