Skip to content

Commit

Permalink
[dataset] deduct filesystem automatically (ray-project#16762)
Browse files Browse the repository at this point in the history
  • Loading branch information
fishbone authored Jul 3, 2021
1 parent 122bf30 commit 4bb3883
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 20 deletions.
3 changes: 3 additions & 0 deletions ci/travis/install-dependencies.sh
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,9 @@ install_dependencies() {
if [ "${DATA_PROCESSING_TESTING-}" = 1 ] || [ "${DOC_TESTING-}" = 1 ]; then
pip install -r "${WORKSPACE_DIR}"/python/requirements/data_processing/requirements.txt
fi
if [ "${DATA_PROCESSING_TESTING-}" = 1 ]; then
pip install -r "${WORKSPACE_DIR}"/python/requirements/data_processing/requirements_dataset.txt
fi

# Remove this entire section once Serve dependencies are fixed.
if [ "${DOC_TESTING-}" != 1 ] && [ "${SGD_TESTING-}" != 1 ] && [ "${TUNE_TESTING-}" != 1 ] && [ "${RLLIB_TESTING-}" != 1 ]; then
Expand Down
51 changes: 41 additions & 10 deletions python/ray/experimental/data/read_api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from pathlib import Path
import functools
import builtins
import inspect
Expand Down Expand Up @@ -40,6 +41,37 @@ def wrapped(*a, **kw):
return wrapped


def _parse_paths(paths: Union[str, List[str]]
) -> Tuple["pyarrow.fs.FileSystem", Union[str, List[str]]]:
from pyarrow import fs

def parse_single_path(path: str):
if Path(path).exists():
return fs.LocalFileSystem(), path
else:
return fs.FileSystem.from_uri(path)

if isinstance(paths, str):
return parse_single_path(paths)

if not isinstance(paths, list) or any(not isinstance(p, str)
for p in paths):
raise ValueError(
"paths must be a path string or a list of path strings.")
else:
if len(paths) == 0:
raise ValueError("No data provided")

parsed_results = [parse_single_path(path) for path in paths]
fses, paths = zip(*parsed_results)
unique_fses = set(map(type, fses))
if len(unique_fses) > 1:
raise ValueError(
f"When specifying multiple paths, each path must have the "
f"same filesystem, but found: {unique_fses}")
return fses[0], list(paths)


def _expand_directory(path: str,
filesystem: "pyarrow.fs.FileSystem",
exclude_prefixes: List[str] = [".", "_"]) -> List[str]:
Expand Down Expand Up @@ -261,24 +293,23 @@ def read_parquet(paths: Union[str, List[str]],
"""
import pyarrow.parquet as pq

pq_ds = pq.ParquetDataset(paths, **arrow_parquet_args)
if filesystem is None:
filesystem, paths = _parse_paths(paths)
pq_ds = pq.ParquetDataset(
paths, **arrow_parquet_args, filesystem=filesystem)
pieces = pq_ds.pieces

read_tasks = [[] for _ in builtins.range(parallelism)]
# TODO(ekl) support reading row groups (maybe as an option)
for i, piece in enumerate(pq_ds.pieces):
read_tasks[i % len(read_tasks)].append(piece)
nonempty_tasks = [r for r in read_tasks if r]
partitions = pq_ds.partitions

@ray.remote
def gen_read(pieces: List[pq.ParquetDatasetPiece]):
def gen_read(pieces: List["pyarrow._dataset.ParquetFileFragment"]):
import pyarrow
logger.debug("Reading {} parquet pieces".format(len(pieces)))
tables = [
piece.read(
columns=columns, use_threads=False, partitions=partitions)
for piece in pieces
]
print("Reading {} parquet pieces".format(len(pieces)))
tables = [piece.to_table() for piece in pieces]
if len(tables) > 1:
table = pyarrow.concat_tables(tables)
else:
Expand All @@ -289,7 +320,7 @@ def gen_read(pieces: List[pq.ParquetDatasetPiece]):
metadata: List[BlockMetadata] = []
for pieces in nonempty_tasks:
calls.append(lambda pieces=pieces: gen_read.remote(pieces))
piece_metadata = [p.get_metadata() for p in pieces]
piece_metadata = [p.metadata for p in pieces]
metadata.append(
BlockMetadata(
num_rows=sum(m.num_rows for m in piece_metadata),
Expand Down
41 changes: 33 additions & 8 deletions python/ray/experimental/data/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,12 @@ def test_pandas_roundtrip(ray_start_regular_shared):
def test_parquet_read(ray_start_regular_shared, tmp_path):
df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
table = pa.Table.from_pandas(df1)
pq.write_table(table, os.path.join(tmp_path, "test1.parquet"))
pq.write_table(table, os.path.join(str(tmp_path), "test1.parquet"))
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
table = pa.Table.from_pandas(df2)
pq.write_table(table, os.path.join(tmp_path, "test2.parquet"))
pq.write_table(table, os.path.join(str(tmp_path), "test2.parquet"))

ds = ray.experimental.data.read_parquet(tmp_path)
ds = ray.experimental.data.read_parquet(str(tmp_path))

# Test metadata-only parquet ops.
assert len(ds._blocks._blocks) == 1
Expand Down Expand Up @@ -241,6 +241,31 @@ def test_pyarrow(ray_start_regular_shared):
.take() == [{"b": 2}, {"b": 20}]


def test_uri_parser():
from ray.experimental.data.read_api import _parse_paths
fs, path = _parse_paths("/local/path")
assert path == "/local/path"
assert fs.type_name == "local"

fs, path = _parse_paths("./")
assert path == "./"
assert fs.type_name == "local"

fs, path = _parse_paths("s3://bucket/dir")
assert path == "bucket/dir"
assert fs.type_name == "s3"

fs, path = _parse_paths(["s3://bucket/dir_1", "s3://bucket/dir_2"])
assert path == ["bucket/dir_1", "bucket/dir_2"]
assert fs.type_name == "s3"

with pytest.raises(ValueError):
_parse_paths(["s3://bucket/dir_1", "/path/local"])

with pytest.raises(ValueError):
_parse_paths([])


def test_read_binary_files(ray_start_regular_shared):
with util.gen_bin_files(10) as (_, paths):
ds = ray.experimental.data.read_binary_files(paths, parallelism=10)
Expand Down Expand Up @@ -285,7 +310,7 @@ def test_map_batch(ray_start_regular_shared, tmp_path):
df = pd.DataFrame({"one": [1, 2, 3], "two": [2, 3, 4]})
table = pa.Table.from_pandas(df)
pq.write_table(table, os.path.join(tmp_path, "test1.parquet"))
ds = ray.experimental.data.read_parquet(tmp_path)
ds = ray.experimental.data.read_parquet(str(tmp_path))
ds_list = ds.map_batches(lambda df: df + 1, batch_size=1).take()
print(ds_list)
values = [s["one"] for s in ds_list]
Expand All @@ -294,7 +319,7 @@ def test_map_batch(ray_start_regular_shared, tmp_path):
assert values == [3, 4, 5]

# Test Pyarrow
ds = ray.experimental.data.read_parquet(tmp_path)
ds = ray.experimental.data.read_parquet(str(tmp_path))
ds_list = ds.map_batches(
lambda pa: pa, batch_size=1, batch_format="pyarrow").take()
values = [s["one"] for s in ds_list]
Expand All @@ -315,20 +340,20 @@ def test_map_batch(ray_start_regular_shared, tmp_path):

# Test the lambda returns different types than the batch_format
# pandas => list block
ds = ray.experimental.data.read_parquet(tmp_path)
ds = ray.experimental.data.read_parquet(str(tmp_path))
ds_list = ds.map_batches(lambda df: [1], batch_size=1).take()
assert ds_list == [1, 1, 1]
assert ds.count() == 3

# pyarrow => list block
ds = ray.experimental.data.read_parquet(tmp_path)
ds = ray.experimental.data.read_parquet(str(tmp_path))
ds_list = ds.map_batches(
lambda df: [1], batch_size=1, batch_format="pyarrow").take()
assert ds_list == [1, 1, 1]
assert ds.count() == 3

# Test the wrong return value raises an exception.
ds = ray.experimental.data.read_parquet(tmp_path)
ds = ray.experimental.data.read_parquet(str(tmp_path))
with pytest.raises(ValueError):
ds_list = ds.map_batches(
lambda df: 1, batch_size=2, batch_format="pyarrow").take()
Expand Down
3 changes: 1 addition & 2 deletions python/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ dataclasses; python_version < '3.7'
starlette

# Requirements for running tests
pyarrow==4.0.1
blist; platform_system != "Windows"
azure-common
azure-mgmt-resource
Expand Down Expand Up @@ -83,5 +84,3 @@ smart_open[s3]
tqdm
async-exit-stack
async-generator
# pyarrow v4.0.0 doesn't work in macOS
pyarrow==3.0.0
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pickle5; python_version < '3.8'

0 comments on commit 4bb3883

Please sign in to comment.