Skip to content

Commit

Permalink
ARROW-1079: [Python] Filter out private directories when building Par…
Browse files Browse the repository at this point in the history
…quet dataset manifest

Some systems like Hive and Impala use special files or directories to signal to other readers that a dataset modification is in progress. If such directories (starting with an underscore) exist in a flat Parquet directory, it currently breaks the dataset reader.

Author: Wes McKinney <[email protected]>

Closes apache#860 from wesm/ARROW-1079 and squashes the following commits:

c1c445b [Wes McKinney] Filter out private directories when building Parquet dataset manifest
  • Loading branch information
wesm committed Jul 18, 2017
1 parent b4d34f8 commit a1c8b83
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 2 deletions.
9 changes: 9 additions & 0 deletions python/pyarrow/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.

import os
import json

import six
Expand Down Expand Up @@ -414,6 +415,9 @@ def _visit_level(self, level, base_path, part_keys):
elif fs.isdir(path):
directories.append(path)

# ARROW-1079: Filter out "private" directories starting with underscore
directories = [x for x in directories if not _is_private_directory(x)]

if len(files) > 0 and len(directories) > 0:
raise ValueError('Found files in an intermediate '
'directory: {0}'.format(base_path))
Expand Down Expand Up @@ -456,6 +460,11 @@ def _parse_hive_partition(value):
return value.split('=', 1)


def _is_private_directory(x):
_, tail = os.path.split(x)
return tail.startswith('_') and '=' not in tail


def _path_split(path, sep):
i = path.rfind(sep) + 1
head, tail = path[:i], path[i:]
Expand Down
35 changes: 33 additions & 2 deletions python/pyarrow/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,14 @@
parquet = pytest.mark.parquet


def _write_table(*args, **kwargs):
def _write_table(table, path, **kwargs):
import pyarrow.parquet as pq
return pq.write_table(*args, **kwargs)

if isinstance(table, pd.DataFrame):
table = pa.Table.from_pandas(table)

pq.write_table(table, path, **kwargs)
return table


def _read_table(*args, **kwargs):
Expand Down Expand Up @@ -851,6 +856,32 @@ def read_multiple_files(paths, columns=None, nthreads=None, **kwargs):
read_multiple_files(mixed_paths)


@parquet
def test_ignore_private_directories(tmpdir):
import pyarrow.parquet as pq

nfiles = 10
size = 5

dirpath = tmpdir.join(guid()).strpath
os.mkdir(dirpath)

test_data = []
paths = []
for i in range(nfiles):
df = _test_dataframe(size, seed=i)
path = pjoin(dirpath, '{0}.parquet'.format(i))

test_data.append(_write_table(df, path))
paths.append(path)

# private directory
os.mkdir(pjoin(dirpath, '_impala_staging'))

dataset = pq.ParquetDataset(dirpath)
assert set(paths) == set(x.path for x in dataset.pieces)


@parquet
def test_multiindex_duplicate_values(tmpdir):
num_rows = 3
Expand Down

0 comments on commit a1c8b83

Please sign in to comment.