Skip to content

Commit

Permalink
[Datasets] Update ExecutionPlan repr formatting to use consistent s…
Browse files Browse the repository at this point in the history
…pacing (ray-project#33125)

- Use a default indenting of three spaces "   " in the custom formatting logic used in ExecutionPlan.get_plan_as_string() to maintain consistency with existing trailing_space logic
- Updating remaining docs according to the changes in [Datasets] Truncate lines from Dataset.__repr__ involving long column names ray-project#32722, which are currently breaking the CI docs tests.

Signed-off-by: Scott Lee <[email protected]>
scottjlee authored Mar 10, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 9cf7ae6 commit 2fb820f
Showing 12 changed files with 255 additions and 64 deletions.
12 changes: 11 additions & 1 deletion doc/source/data/getting-started.rst
Original file line number Diff line number Diff line change
@@ -62,7 +62,17 @@ transform datasets. Ray executes transformations in parallel for performance at
.. testoutput::

MapBatches(transform_batch)
+- Dataset(num_blocks=1, num_rows=150, schema={sepal length (cm): double, sepal width (cm): double, petal length (cm): double, petal width (cm): double, target: int64})
+- Dataset(
num_blocks=...,
num_rows=150,
schema={
sepal length (cm): double,
sepal width (cm): double,
petal length (cm): double,
petal width (cm): double,
target: int64
}
)

To learn more about transforming datasets, read
:ref:`Transforming datasets <transforming_datasets>`.
18 changes: 16 additions & 2 deletions doc/source/data/glossary.rst
Original file line number Diff line number Diff line change
@@ -116,7 +116,11 @@ Ray Datasets Glossary
>>> import numpy as np
>>> import ray
>>> ray.data.from_numpy(np.zeros((100, 32, 32, 3)))
Dataset(num_blocks=1, num_rows=100, schema={__value__: ArrowTensorType(shape=(32, 32, 3), dtype=double)})
Dataset(
num_blocks=1,
num_rows=100,
schema={__value__: ArrowTensorType(shape=(32, 32, 3), dtype=double)}
)

Tabular Dataset
A Dataset that represents columnar data.
@@ -125,7 +129,17 @@ Ray Datasets Glossary

>>> import ray
>>> ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
Dataset(num_blocks=1, num_rows=150, schema={sepal length (cm): double, sepal width (cm): double, petal length (cm): double, petal width (cm): double, target: int64})
Dataset(
num_blocks=1,
num_rows=150,
schema={
sepal length (cm): double,
sepal width (cm): double,
petal length (cm): double,
petal width (cm): double,
target: int64
}
)

User-defined function (UDF)
A callable that transforms batches or :term:`records <Record>` of data. UDFs let you arbitrarily transform datasets.
20 changes: 13 additions & 7 deletions python/ray/data/_internal/plan.py
Original file line number Diff line number Diff line change
@@ -241,20 +241,22 @@ def get_plan_as_string(self) -> str:
# If the resulting string representation fits in one line, use it directly.
SCHEMA_LINE_CHAR_LIMIT = 80
MIN_FIELD_LENGTH = 10
INDENT_STR = " " * 3
trailing_space = " " * (max(num_stages, 0) * 3)
if len(dataset_str) > SCHEMA_LINE_CHAR_LIMIT:
# If the resulting string representation exceeds the line char limit,
# first try breaking up each `Dataset` parameter into its own line
# and check if each line fits within the line limit. We check the
# `schema` param's length, since this is likely the longest string.
schema_str_on_new_line = f"\tschema={schema_str}"
schema_str_on_new_line = f"{trailing_space}{INDENT_STR}schema={schema_str}"
if len(schema_str_on_new_line) > SCHEMA_LINE_CHAR_LIMIT:
# If the schema cannot fit on a single line, break up each field
# into its own line.
schema_str = []
for n, t in zip(schema.names, schema.types):
if hasattr(t, "__name__"):
t = t.__name__
col_str = f"\t\t{n}: {t}"
col_str = f"{trailing_space}{INDENT_STR * 2}{n}: {t}"
# If the field line exceeds the char limit, abbreviate
# the field name to fit while maintaining the full type
if len(col_str) > SCHEMA_LINE_CHAR_LIMIT:
@@ -268,13 +270,17 @@ def get_plan_as_string(self) -> str:
col_str = (
f"{col_str[:chars_left_for_col_name]}{shortened_suffix}"
)
schema_str.append(f"{col_str}")
schema_str.append(col_str)
schema_str = ",\n".join(schema_str)
schema_str = "{\n" + schema_str + "\n\t}"
dataset_str = (
"Dataset(\n\tnum_blocks={},\n\tnum_rows={},\n\tschema={}\n)".format(
num_blocks, count, schema_str
schema_str = (
"{\n" + schema_str + f"\n{trailing_space}{INDENT_STR}" + "}"
)
dataset_str = (
f"Dataset("
f"\n{trailing_space}{INDENT_STR}num_blocks={num_blocks},"
f"\n{trailing_space}{INDENT_STR}num_rows={count},"
f"\n{trailing_space}{INDENT_STR}schema={schema_str}"
f"\n{trailing_space})"
)

if num_stages == 0:
24 changes: 22 additions & 2 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
@@ -3232,7 +3232,17 @@ def to_tf(
>>> import ray
>>> ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
>>> ds
Dataset(num_blocks=1, num_rows=150, schema={sepal length (cm): double, sepal width (cm): double, petal length (cm): double, petal width (cm): double, target: int64})
Dataset(
num_blocks=1,
num_rows=150,
schema={
sepal length (cm): double,
sepal width (cm): double,
petal length (cm): double,
petal width (cm): double,
target: int64
}
)
If your model accepts a single tensor as input, specify a single feature column.
@@ -3253,7 +3263,17 @@ def to_tf(
>>> ds = preprocessor.transform(ds)
>>> ds
Concatenator
+- Dataset(num_blocks=1, num_rows=150, schema={sepal length (cm): double, sepal width (cm): double, petal length (cm): double, petal width (cm): double, target: int64})
+- Dataset(
num_blocks=1,
num_rows=150,
schema={
sepal length (cm): double,
sepal width (cm): double,
petal length (cm): double,
petal width (cm): double,
target: int64
}
)
>>> ds.to_tf("features", "target") # doctest: +SKIP
<_OptionsDataset element_spec=(TensorSpec(shape=(None, 4), dtype=tf.float64, name='features'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'))>
24 changes: 22 additions & 2 deletions python/ray/data/dataset_iterator.py
Original file line number Diff line number Diff line change
@@ -497,7 +497,17 @@ def to_tf(
... "s3://anonymous@air-example-data/iris.csv"
... )
>>> it = ds.iterator(); it
DatasetIterator(Dataset(num_blocks=1, num_rows=150, schema={sepal length (cm): double, sepal width (cm): double, petal length (cm): double, petal width (cm): double, target: int64}))
DatasetIterator(Dataset(
num_blocks=1,
num_rows=150,
schema={
sepal length (cm): double,
sepal width (cm): double,
petal length (cm): double,
petal width (cm): double,
target: int64
}
))
If your model accepts a single tensor as input, specify a single feature column.
@@ -518,7 +528,17 @@ def to_tf(
>>> it = preprocessor.transform(ds).iterator()
>>> it
DatasetIterator(Concatenator
+- Dataset(num_blocks=1, num_rows=150, schema={sepal length (cm): double, sepal width (cm): double, petal length (cm): double, petal width (cm): double, target: int64}))
+- Dataset(
num_blocks=1,
num_rows=150,
schema={
sepal length (cm): double,
sepal width (cm): double,
petal length (cm): double,
petal width (cm): double,
target: int64
}
))
>>> it.to_tf("features", "target") # doctest: +SKIP
<_OptionsDataset element_spec=(TensorSpec(shape=(None, 4), dtype=tf.float64, name='features'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'))>
12 changes: 11 additions & 1 deletion python/ray/data/read_api.py
Original file line number Diff line number Diff line change
@@ -492,7 +492,17 @@ def read_parquet(
... ("variety", pa.string())]
>>> ray.data.read_parquet("example://iris.parquet",
... schema=pa.schema(fields))
Dataset(num_blocks=..., num_rows=150, schema={sepal.length: double, ...})
Dataset(
num_blocks=1,
num_rows=150,
schema={
sepal.length: double,
sepal.width: double,
petal.length: double,
petal.width: double,
variety: string
}
)
For further arguments you can pass to pyarrow as a keyword argument, see
https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Scanner.html#pyarrow.dataset.Scanner.from_fragment
2 changes: 1 addition & 1 deletion python/ray/data/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -258,7 +258,7 @@ def _assert_base_partitioned_ds(
# `ExecutionPlan.get_plan_as_string()`). Therefore, we remove whitespace
# characters to test the string contents regardless of the string repr length.
def _remove_whitespace(ds_str):
for c in ["\n", "\t", " "]:
for c in ["\n", " ", " "]:
ds_str = ds_str.replace(c, "")
return ds_str

37 changes: 37 additions & 0 deletions python/ray/data/tests/test_dataset_consumption.py
Original file line number Diff line number Diff line change
@@ -1613,6 +1613,43 @@ def test_dataset_schema_after_read_stats(ray_start_cluster):
assert schema == ds.schema()


def test_dataset_plan_as_string(ray_start_cluster):
ds = ray.data.read_parquet("example://iris.parquet")
assert ds._plan.get_plan_as_string() == (
"Dataset(\n"
" num_blocks=1,\n"
" num_rows=150,\n"
" schema={\n"
" sepal.length: double,\n"
" sepal.width: double,\n"
" petal.length: double,\n"
" petal.width: double,\n"
" variety: string\n"
" }\n"
")"
)
for _ in range(5):
ds = ds.map_batches(lambda x: x)
assert ds._plan.get_plan_as_string() == (
"MapBatches(<lambda>)\n"
"+- MapBatches(<lambda>)\n"
" +- MapBatches(<lambda>)\n"
" +- MapBatches(<lambda>)\n"
" +- MapBatches(<lambda>)\n"
" +- Dataset(\n"
" num_blocks=1,\n"
" num_rows=150,\n"
" schema={\n"
" sepal.length: double,\n"
" sepal.width: double,\n"
" petal.length: double,\n"
" petal.width: double,\n"
" variety: string\n"
" }\n"
" )"
)


class LoggerWarningCalled(Exception):
"""Custom exception used in test_warning_execute_with_no_cpu() and
test_nowarning_execute_with_cpu(). Raised when the `logger.warning` method
28 changes: 20 additions & 8 deletions python/ray/data/tests/test_dataset_numpy.py
Original file line number Diff line number Diff line change
@@ -122,8 +122,11 @@ def test_numpy_roundtrip(ray_start_regular_shared, fs, data_path):
ds.write_numpy(data_path, filesystem=fs)
ds = ray.data.read_numpy(data_path, filesystem=fs)
assert str(ds) == (
"Dataset(\n\tnum_blocks=2,\n\tnum_rows=?,"
"\n\tschema={__value__: ArrowTensorType(shape=(1,), dtype=int64)}\n)"
"Dataset(\n"
" num_blocks=2,\n"
" num_rows=?,\n"
" schema={__value__: ArrowTensorType(shape=(1,), dtype=int64)}\n"
")"
)
np.testing.assert_equal(ds.take(2), [np.array([0]), np.array([1])])

@@ -134,8 +137,11 @@ def test_numpy_read(ray_start_regular_shared, tmp_path):
np.save(os.path.join(path, "test.npy"), np.expand_dims(np.arange(0, 10), 1))
ds = ray.data.read_numpy(path)
assert str(ds) == (
"Dataset(\n\tnum_blocks=1,\n\tnum_rows=10,"
"\n\tschema={__value__: ArrowTensorType(shape=(1,), dtype=int64)}\n)"
"Dataset(\n"
" num_blocks=1,\n"
" num_rows=10,\n"
" schema={__value__: ArrowTensorType(shape=(1,), dtype=int64)}\n"
")"
)
np.testing.assert_equal(ds.take(2), [np.array([0]), np.array([1])])

@@ -147,8 +153,11 @@ def test_numpy_read(ray_start_regular_shared, tmp_path):
assert ds.num_blocks() == 1
assert ds.count() == 10
assert str(ds) == (
"Dataset(\n\tnum_blocks=1,\n\tnum_rows=10,"
"\n\tschema={__value__: ArrowTensorType(shape=(1,), dtype=int64)}\n)"
"Dataset(\n"
" num_blocks=1,\n"
" num_rows=10,\n"
" schema={__value__: ArrowTensorType(shape=(1,), dtype=int64)}\n"
")"
)
assert [v.item() for v in ds.take(2)] == [0, 1]

@@ -160,8 +169,11 @@ def test_numpy_read_meta_provider(ray_start_regular_shared, tmp_path):
np.save(path, np.expand_dims(np.arange(0, 10), 1))
ds = ray.data.read_numpy(path, meta_provider=FastFileMetadataProvider())
assert str(ds) == (
"Dataset(\n\tnum_blocks=1,\n\tnum_rows=10,"
"\n\tschema={__value__: ArrowTensorType(shape=(1,), dtype=int64)}\n)"
"Dataset(\n"
" num_blocks=1,\n"
" num_rows=10,\n"
" schema={__value__: ArrowTensorType(shape=(1,), dtype=int64)}\n"
")"
)
np.testing.assert_equal(ds.take(2), [np.array([0]), np.array([1])])

22 changes: 14 additions & 8 deletions python/ray/data/tests/test_dataset_parquet.py
Original file line number Diff line number Diff line change
@@ -446,15 +446,21 @@ def test_parquet_read_partitioned(ray_start_regular_shared, fs, data_path):
assert ds.schema() is not None
input_files = ds.input_files()
assert len(input_files) == 2, input_files
assert (
str(ds) == "Dataset(\n\tnum_blocks=2,\n\tnum_rows=6,"
"\n\tschema={two: string, "
"one: dictionary<values=int32, indices=int32, ordered=0>}\n)"
assert str(ds) == (
"Dataset(\n"
" num_blocks=2,\n"
" num_rows=6,\n"
" schema={two: string, "
"one: dictionary<values=int32, indices=int32, ordered=0>}\n"
")"
), ds
assert (
repr(ds) == "Dataset(\n\tnum_blocks=2,\n\tnum_rows=6,"
"\n\tschema={two: string, "
"one: dictionary<values=int32, indices=int32, ordered=0>}\n)"
assert repr(ds) == (
"Dataset(\n"
" num_blocks=2,\n"
" num_rows=6,\n"
" schema={two: string, "
"one: dictionary<values=int32, indices=int32, ordered=0>}\n"
")"
), ds
check_num_computed(ds, 1, 1)

Loading

0 comments on commit 2fb820f

Please sign in to comment.