Skip to content

Commit

Permalink
ARROW-16610: [Python] Raise an error for conflicting options in pq.wr…
Browse files Browse the repository at this point in the history
…ite_to_dataset (apache#13317)

Lead-authored-by: Alenka Frim <[email protected]>
Co-authored-by: Alenka Frim <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
AlenkaF authored Jun 8, 2022
1 parent c52a04f commit 65bb7d4
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 4 deletions.
37 changes: 33 additions & 4 deletions python/pyarrow/parquet/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3144,6 +3144,7 @@ def file_visitor(written_file):
>>> pq.ParquetDataset('dataset_name_4/', use_legacy_dataset=False).files
['dataset_name_4/...-0.parquet']
"""
# Choose the implementation
if use_legacy_dataset is None:
# if partition_filename_cb is specified ->
# default to the old implementation
Expand All @@ -3153,6 +3154,33 @@ def file_visitor(written_file):
else:
use_legacy_dataset = False

# Check for conflicting kewords
msg_confl_0 = (
"The '{0}' argument is not supported by use_legacy_dataset={2}. "
"Use only '{1}' instead."
)
msg_confl_1 = (
"The '{1}' argument is not supported by use_legacy_dataset={2}. "
"Use only '{0}' instead."
)
msg_confl = msg_confl_0 if use_legacy_dataset else msg_confl_1
if partition_filename_cb is not None and basename_template is not None:
raise ValueError(msg_confl.format("basename_template",
"partition_filename_cb",
use_legacy_dataset))

if partition_cols is not None and partitioning is not None:
raise ValueError(msg_confl.format("partitioning",
"partition_cols",
use_legacy_dataset))

metadata_collector = kwargs.pop('metadata_collector', None)
if metadata_collector is not None and file_visitor is not None:
raise ValueError(msg_confl.format("file_visitor",
"metadata_collector",
use_legacy_dataset))

# New dataset implementation
if not use_legacy_dataset:
import pyarrow.dataset as ds

Expand All @@ -3171,7 +3199,7 @@ def file_visitor(written_file):
"The '{}' argument is not supported with the new dataset "
"implementation."
)
metadata_collector = kwargs.pop('metadata_collector', None)

if metadata_collector is not None:
def file_visitor(written_file):
metadata_collector.append(written_file.metadata)
Expand Down Expand Up @@ -3206,7 +3234,7 @@ def file_visitor(written_file):
max_rows_per_group=row_group_size)
return

# warnings and errors when using legecy implementation
# warnings and errors when using legacy implementation
if use_legacy_dataset:
warnings.warn(
"Passing 'use_legacy_dataset=True' to get the legacy behaviour is "
Expand All @@ -3229,6 +3257,8 @@ def file_visitor(written_file):
raise ValueError(msg2.format("file_visitor"))
if existing_data_behavior is not None:
raise ValueError(msg2.format("existing_data_behavior"))
if basename_template is not None:
raise ValueError(msg2.format("basename_template"))
if partition_filename_cb is not None:
warnings.warn(
_DEPR_MSG.format("partition_filename_cb", " Specify "
Expand All @@ -3238,12 +3268,11 @@ def file_visitor(written_file):
"usage see `pyarrow.dataset.write_dataset`"),
FutureWarning, stacklevel=2)

# Legacy implementation
fs, root_path = legacyfs.resolve_filesystem_and_path(root_path, filesystem)

_mkdir_if_not_exists(fs, root_path)

metadata_collector = kwargs.pop('metadata_collector', None)

if partition_cols is not None and len(partition_cols) > 0:
df = table.to_pandas()
partition_keys = [df[col] for col in partition_cols]
Expand Down
51 changes: 51 additions & 0 deletions python/pyarrow/tests/parquet/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1790,10 +1790,15 @@ def test_parquet_write_to_dataset_unsupported_keywards_in_legacy(tempdir):
with pytest.raises(ValueError, match="file_visitor"):
pq.write_to_dataset(table, path, use_legacy_dataset=True,
file_visitor=lambda x: x)

with pytest.raises(ValueError, match="existing_data_behavior"):
pq.write_to_dataset(table, path, use_legacy_dataset=True,
existing_data_behavior='error')

with pytest.raises(ValueError, match="basename_template"):
pq.write_to_dataset(table, path, use_legacy_dataset=True,
basename_template='part-{i}.parquet')


@pytest.mark.dataset
def test_parquet_write_to_dataset_exposed_keywords(tempdir):
Expand All @@ -1819,3 +1824,49 @@ def file_visitor(written_file):
}
paths_written_set = set(map(pathlib.Path, paths_written))
assert paths_written_set == expected_paths


@pytest.mark.dataset
def test_write_to_dataset_conflicting_keywords(tempdir):
table = pa.table({'a': [1, 2, 3]})
path = tempdir / 'data.parquet'

with pytest.raises(ValueError, match="'basename_template' argument "
"is not supported by use_legacy_dataset=True"):
pq.write_to_dataset(table, path,
use_legacy_dataset=True,
partition_filename_cb=lambda x: 'filename.parquet',
basename_template='file-{i}.parquet')
with pytest.raises(ValueError, match="'partition_filename_cb' argument "
"is not supported by use_legacy_dataset=False"):
pq.write_to_dataset(table, path,
use_legacy_dataset=False,
partition_filename_cb=lambda x: 'filename.parquet',
basename_template='file-{i}.parquet')

with pytest.raises(ValueError, match="'partitioning' argument "
"is not supported by use_legacy_dataset=True"):
pq.write_to_dataset(table, path,
use_legacy_dataset=True,
partition_cols=["a"],
partitioning=["a"])

with pytest.raises(ValueError, match="'partition_cols' argument "
"is not supported by use_legacy_dataset=False"):
pq.write_to_dataset(table, path,
use_legacy_dataset=False,
partition_cols=["a"],
partitioning=["a"])

with pytest.raises(ValueError, match="'file_visitor' argument "
"is not supported by use_legacy_dataset=True"):
pq.write_to_dataset(table, path,
use_legacy_dataset=True,
metadata_collector=[],
file_visitor=lambda x: x)
with pytest.raises(ValueError, match="'metadata_collector' argument "
"is not supported by use_legacy_dataset=False"):
pq.write_to_dataset(table, path,
use_legacy_dataset=False,
metadata_collector=[],
file_visitor=lambda x: x)

0 comments on commit 65bb7d4

Please sign in to comment.