Skip to content

Commit

Permalink
ARROW-18225: [Python] Fully support filesystem in parquet.write_metad…
Browse files Browse the repository at this point in the history
…ata (apache#14574)

Will fix [ARROW-18225](https://issues.apache.org/jira/browse/ARROW-18225)

Authored-by: Miles Granger <[email protected]>
Signed-off-by: Joris Van den Bossche <[email protected]>
  • Loading branch information
milesgranger authored Nov 22, 2022
1 parent cf66f48 commit 57b81ca
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 4 deletions.
24 changes: 20 additions & 4 deletions python/pyarrow/parquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3379,7 +3379,8 @@ def file_visitor(written_file):
metadata_collector[-1].set_file_path(outfile)


def write_metadata(schema, where, metadata_collector=None, **kwargs):
def write_metadata(schema, where, metadata_collector=None, filesystem=None,
**kwargs):
"""
Write metadata-only Parquet file from schema. This can be used with
`write_to_dataset` to generate `_common_metadata` and `_metadata` sidecar
Expand All @@ -3391,6 +3392,9 @@ def write_metadata(schema, where, metadata_collector=None, **kwargs):
where : string or pyarrow.NativeFile
metadata_collector : list
where to collect metadata information.
filesystem : FileSystem, default None
If nothing passed, will be inferred from `where` if path-like, else
`where` is already a file-like object so no filesystem is needed.
**kwargs : dict,
Additional kwargs for ParquetWriter class. See docstring for
`ParquetWriter` for more information.
Expand Down Expand Up @@ -3423,16 +3427,28 @@ def write_metadata(schema, where, metadata_collector=None, **kwargs):
... table.schema, 'dataset_metadata/_metadata',
... metadata_collector=metadata_collector)
"""
writer = ParquetWriter(where, schema, **kwargs)
filesystem, where = _resolve_filesystem_and_path(where, filesystem)

if hasattr(where, "seek"): # file-like
cursor_position = where.tell()

writer = ParquetWriter(where, schema, filesystem, **kwargs)
writer.close()

if metadata_collector is not None:
# ParquetWriter doesn't expose the metadata until it's written. Write
# it and read it again.
metadata = read_metadata(where)
metadata = read_metadata(where, filesystem=filesystem)
if hasattr(where, "seek"):
where.seek(cursor_position) # file-like, set cursor back.

for m in metadata_collector:
metadata.append_row_groups(m)
metadata.write_metadata_file(where)
if filesystem is not None:
with filesystem.open_output_stream(where) as f:
metadata.write_metadata_file(f)
else:
metadata.write_metadata_file(where)


def read_metadata(where, memory_map=False, decryption_properties=None,
Expand Down
33 changes: 33 additions & 0 deletions python/pyarrow/tests/parquet/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,3 +613,36 @@ def test_metadata_append_row_groups_diff(t1, t2, expected_error):
meta1.append_row_groups(meta2)
else:
meta1.append_row_groups(meta2)


@pytest.mark.s3
def test_write_metadata_fs_file_combinations(tempdir, s3_example_s3fs):
s3_fs, s3_path = s3_example_s3fs

meta1 = tempdir / "meta1"
meta2 = tempdir / "meta2"
meta3 = tempdir / "meta3"
meta4 = tempdir / "meta4"
meta5 = f"{s3_path}/meta5"

table = pa.table({"col": range(5)})

# plain local path
pq.write_metadata(table.schema, meta1, [])

# Used the localfilesystem to resolve opening an output stream
pq.write_metadata(table.schema, meta2, [], filesystem=LocalFileSystem())

# Can resolve local file URI
pq.write_metadata(table.schema, meta3.as_uri(), [])

# Take a file-like obj all the way thru?
with meta4.open('wb+') as meta4_stream:
pq.write_metadata(table.schema, meta4_stream, [])

# S3FileSystem
pq.write_metadata(table.schema, meta5, [], filesystem=s3_fs)

assert meta1.read_bytes() == meta2.read_bytes() \
== meta3.read_bytes() == meta4.read_bytes() \
== s3_fs.open(meta5).read()

0 comments on commit 57b81ca

Please sign in to comment.