Skip to content

Commit

Permalink
ARROW-7680: [C++] Fix dataset.factory(...) with Windows paths
Browse files Browse the repository at this point in the history
Closes apache#6597 from pitrou/ARROW-7680-win-dataset-partition and squashes the following commits:

861700c <Antoine Pitrou> ARROW-7680:  Fix dataset.factory(...) with Windows paths

Authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Benjamin Kietzman <[email protected]>
  • Loading branch information
pitrou authored and bkietz committed Mar 12, 2020
1 parent 8d967d0 commit b641fe4
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 30 deletions.
4 changes: 4 additions & 0 deletions python/pyarrow/_fs.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ cdef class FileSelector:
def recursive(self, bint recursive):
self.selector.recursive = recursive

def __repr__(self):
return ("<FileSelector base_dir={0.base_dir!r} "
"recursive={0.recursive}>".format(self))


cdef class FileSystem:
"""Abstract file system API"""
Expand Down
58 changes: 30 additions & 28 deletions python/pyarrow/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,38 +153,27 @@ def partitioning(schema=None, field_names=None, flavor=None):

def _ensure_fs(filesystem, path):
# Validate or infer the filesystem from the path
from pyarrow.fs import FileSystem, LocalFileSystem
from pyarrow.fs import FileSystem

if filesystem is None:
try:
filesystem, _ = FileSystem.from_uri(path)
except Exception:
# when path is not found, we fall back to local file system
filesystem = LocalFileSystem()
return filesystem
if filesystem is not None:
return filesystem, path
return FileSystem.from_uri(path)


def _ensure_fs_and_paths(path_or_paths, filesystem=None):
# Validate and convert the path-likes and filesystem.
# Returns filesystem and list of string paths or FileSelector
def _ensure_fs_and_paths(path, filesystem=None):
# Return filesystem and list of string paths or FileSelector
from pyarrow.fs import FileType, FileSelector

if isinstance(path_or_paths, list):
paths_or_selector = [_stringify_path(path) for path in path_or_paths]
# infer from first path
filesystem = _ensure_fs(filesystem, paths_or_selector[0])
filesystem, path = _ensure_fs(filesystem, _stringify_path(path))
infos = filesystem.get_target_infos([path])[0]
if infos.type == FileType.Directory:
# for directory, pass a selector
paths_or_selector = FileSelector(path, recursive=True)
elif infos.type == FileType.File:
# for a single file path, pass it as a list
paths_or_selector = [path]
else:
path = _stringify_path(path_or_paths)
filesystem = _ensure_fs(filesystem, path)
infos = filesystem.get_target_infos([path])[0]
if infos.type == FileType.Directory:
# for directory, pass a selector
paths_or_selector = FileSelector(path, recursive=True)
elif infos.type == FileType.File:
# for a single file path, pass it as a list
paths_or_selector = [path]
else:
raise FileNotFoundError(path)
raise FileNotFoundError(path)

return filesystem, paths_or_selector

Expand Down Expand Up @@ -241,7 +230,9 @@ def factory(path_or_paths, filesystem=None, partitioning=None,
-------
FileSystemDatasetFactory
"""
fs, paths_or_selector = _ensure_fs_and_paths(path_or_paths, filesystem)
if not isinstance(path_or_paths, (list, tuple)):
path_or_paths = [path_or_paths]

partitioning = _ensure_partitioning(partitioning)
format = _ensure_format(format or "parquet")

Expand All @@ -252,7 +243,18 @@ def factory(path_or_paths, filesystem=None, partitioning=None,
elif isinstance(partitioning, Partitioning):
options.partitioning = partitioning

return FileSystemDatasetFactory(fs, paths_or_selector, format, options)
factories = []
for path in path_or_paths:
fs, paths_or_selector = _ensure_fs_and_paths(path, filesystem)
factories.append(FileSystemDatasetFactory(fs, paths_or_selector,
format, options))

if len(factories) == 0:
raise ValueError("Need at least one path")
elif len(factories) == 1:
return factories[0]
else:
return UnionDatasetFactory(factories)


def _ensure_factory(src, **kwargs):
Expand Down
2 changes: 0 additions & 2 deletions python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# specific language governing permissions and limitations
# under the License.

import sys
import operator

import numpy as np
Expand Down Expand Up @@ -639,7 +638,6 @@ def test_open_dataset_list_of_files(tempdir):
assert result.equals(table)


@pytest.mark.skipif(sys.platform == "win32", reason="fails on windows")
@pytest.mark.parquet
def test_open_dataset_partitioned_directory(tempdir):
import pyarrow.parquet as pq
Expand Down

0 comments on commit b641fe4

Please sign in to comment.