Skip to content

Commit

Permalink
[data] [docs] Improve documentation for strict mode (ray-project#34876)
Browse files Browse the repository at this point in the history
  • Loading branch information
ericl authored May 1, 2023
1 parent 947bcee commit 7138da5
Show file tree
Hide file tree
Showing 10 changed files with 279 additions and 13 deletions.
51 changes: 51 additions & 0 deletions doc/source/data/faq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,57 @@ Ray Data doesn't perform query optimization, so some manual performance
tuning may be necessary depending on your use case and data scale. Please see our
:ref:`performance tuning guide <data_performance_tips>` for more information.

What is strict mode?
====================

In Ray 2.5, Ray Data by default always requires data schemas, dropping support for
standalone Python objects. In addition to unification and simplicity benefits, this
aligns the Ray Data API closer to industry-standard distributed data APIs like Apache
Spark and also emerging standards for machine learning datasets like HuggingFace.

Migrating to strict mode
~~~~~~~~~~~~~~~~~~~~~~~~

You can disable strict mode temporarily by setting the environment variable
``RAY_DATA_STRICT_MODE=0`` on all cluster processes. Strict mode will not be
possible to disable in future releases.

Migrating existing code is straightforward. There are two common changes you may need
to make to your code to be compatible:

1. Pass the ``batch_format="pandas"`` argument to ``map_batches`` or ``iter_batches``,
if your code assumes pandas is the default batch format.
2. Instead of returning a standalone objects or numpy arrays from ``map`` or ``map_batches``,
return a dictionary that names the field. E.g., change function code from ``return object()`` to
``return {"my_obj": object()}``, and ``return [1, 2, 3]`` to ``return {"my_values": [1, 2, 3]}``.

List of strict mode changes
~~~~~~~~~~~~~~~~~~~~~~~~~~~

In more detail, support for standalone Python objects is dropped. This means that
instead of directly storing, e.g., Python ``Tuple[str, int]`` instance in Ray Data,
you must either give each field a name (i.e., ``{foo: str, bar: int}``), or
use a named object-type field (i.e., ``{foo: object}``). In addition, the ``default``
batch format is replaced with ``numpy`` by default. This means that most users
just need to be aware of ``Dict[str, Any]`` (non-batched data records) and
``Dict[str, np.ndarray]`` (batched data) types when working with Ray Data.

**Full list of changes**:

* All read apis return structured data, never standalone Python objects.
* Standalone Python objects are prohibited from being returned from map / map batches.
* Standalone Numpy arrays are prohibited from being returned from map / map batches.
* There is no more special interpretation of single-column schema containing just ``__value__`` as a column.
* The default batch format is ``numpy`` instead of ``default`` (pandas).
* ``schema()`` returns a unified Schema class instead of ``Union[pyarrow.lib.Schema, type]``.

**Datasource behavior changes**:

* ``range_tensor``: create ``data`` column instead of ``__value__``.
* ``from_numpy`` / ``from_numpy_refs`` : create ``data`` column instead of using ``__value__``.
* ``from_items``: create ``item`` column instead of using Python objects.
* ``range``: create ``id`` column instead of using Python objects.

How can I contribute to Ray Data?
=====================================

Expand Down
8 changes: 8 additions & 0 deletions python/ray/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ py_test(
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_nonstrict_mode",
size = "small",
srcs = ["tests/test_nonstrict_mode.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_sql",
size = "small",
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ def shutdown(self):
global _num_shutdown

with self._shutdown_lock:
logger.get_logger().info(f"Shutting down {self}.")
if self._shutdown:
return
logger.get_logger().info(f"Shutting down {self}.")
_num_shutdown += 1
self._shutdown = True
# Give the scheduling loop some time to finish processing.
Expand Down
5 changes: 1 addition & 4 deletions python/ray/data/_internal/planner/plan_from_numpy_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ def get_input_data() -> List[RefBundle]:
ndarray_to_block_remote = cached_remote_fn(ndarray_to_block, num_returns=2)

ctx = ray.data.DataContext.get_current()
res = [
ndarray_to_block_remote.remote(arr_ref, ctx.strict_mode)
for arr_ref in op._ndarrays
]
res = [ndarray_to_block_remote.remote(arr_ref, ctx) for arr_ref in op._ndarrays]
blocks, metadata = map(list, zip(*res))
metadata = ray.get(metadata)
ref_bundles: List[RefBundle] = [
Expand Down
6 changes: 4 additions & 2 deletions python/ray/data/_internal/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,11 +450,13 @@ def pandas_df_to_arrow_block(df: "pandas.DataFrame") -> "Block":
)


def ndarray_to_block(ndarray: np.ndarray, strict_mode: bool) -> "Block":
def ndarray_to_block(ndarray: np.ndarray, ctx: DataContext) -> "Block":
from ray.data.block import BlockAccessor, BlockExecStats

DataContext._set_current(ctx)

stats = BlockExecStats.builder()
if strict_mode:
if ctx.strict_mode:
block = BlockAccessor.batch_to_block({"data": ndarray})
else:
block = BlockAccessor.batch_to_block(ndarray)
Expand Down
4 changes: 3 additions & 1 deletion python/ray/data/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@
"objects are no longer supported, and the default batch format changes to `numpy` "
"from `pandas`. To disable strict mode temporarily, set the environment variable "
"RAY_DATA_STRICT_MODE=0 on all cluster processes. Strict mode will not be "
"possible to disable in future releases." + colorama.Style.RESET_ALL
"possible to disable in future releases.\n\n"
"Learn more here: https://docs.ray.io/en/master/data/faq.html#what-is-strict-mode"
+ colorama.Style.RESET_ALL
)


Expand Down
3 changes: 2 additions & 1 deletion python/ray/data/datasource/file_based_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ def estimate_inmemory_data_size(self) -> Optional[int]:
def get_read_tasks(self, parallelism: int) -> List[ReadTask]:
import numpy as np

ctx = DataContext.get_current()
open_stream_args = self._open_stream_args
reader_args = self._reader_args
partitioning = self._partitioning
Expand All @@ -446,9 +447,9 @@ def read_files(
read_paths: List[str],
fs: Union["pyarrow.fs.FileSystem", _S3FileSystemWrapper],
) -> Iterable[Block]:
DataContext._set_current(ctx)
logger.debug(f"Reading {len(read_paths)} files.")
fs = _unwrap_s3_serialization_workaround(filesystem)
ctx = DataContext.get_current()
output_buffer = BlockOutputBuffer(
block_udf=_block_udf, target_max_block_size=ctx.target_max_block_size
)
Expand Down
5 changes: 1 addition & 4 deletions python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1674,10 +1674,7 @@ def from_numpy_refs(
ctx = DataContext.get_current()
ndarray_to_block_remote = cached_remote_fn(ndarray_to_block, num_returns=2)

res = [
ndarray_to_block_remote.remote(ndarray, strict_mode=ctx.strict_mode)
for ndarray in ndarrays
]
res = [ndarray_to_block_remote.remote(ndarray, ctx) for ndarray in ndarrays]
blocks, metadata = map(list, zip(*res))
metadata = ray.get(metadata)

Expand Down
8 changes: 8 additions & 0 deletions python/ray/data/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ def enable_strict_mode():
ctx.strict_mode = False


@pytest.fixture(scope="module")
def enable_nonstrict_mode():
ctx = ray.data.DataContext.get_current()
ctx.strict_mode = False
yield
ctx.strict_mode = True


@pytest.fixture(scope="function")
def aws_credentials():
import os
Expand Down
200 changes: 200 additions & 0 deletions python/ray/data/tests/test_nonstrict_mode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
import numpy as np
import pandas as pd
from collections import UserDict
import pytest

import ray
from ray.data.tests.conftest import * # noqa
from ray.tests.conftest import * # noqa


def test_nonstrict_read_schemas(ray_start_10_cpus_shared, enable_nonstrict_mode):
ds = ray.data.range(1)
assert ds.take()[0] == 0

ds = ray.data.range_table(1)
assert ds.take()[0] == {"value": 0}

ds = ray.data.range_tensor(1)
assert ds.take()[0] == np.array([0])

ds = ray.data.from_items([1])
assert ds.take()[0] == 1

ds = ray.data.from_items([object()])
assert isinstance(ds.take()[0], object)

ds = ray.data.read_numpy("example://mnist_subset.npy")
assert isinstance(ds.take()[0], np.ndarray)

ds = ray.data.from_numpy(np.ones((100, 10)))
assert isinstance(ds.take()[0], np.ndarray)

ds = ray.data.from_numpy_refs(ray.put(np.ones((100, 10))))
assert isinstance(ds.take()[0], np.ndarray)

ds = ray.data.read_binary_files("example://image-datasets/simple")
assert isinstance(ds.take()[0], bytes)

ds = ray.data.read_images("example://image-datasets/simple")
assert "image" in ds.take()[0]

ds = ray.data.read_text("example://sms_spam_collection_subset.txt")
assert "text" in ds.take()[0]


def test_nonstrict_map_output(ray_start_10_cpus_shared, enable_nonstrict_mode):
ds = ray.data.range(1)

ds.map(lambda x: 0, max_retries=0).materialize()
ds.map(lambda x: {"id": 0}).materialize()
ds.map(lambda x: UserDict({"id": 0})).materialize()

ds.map_batches(lambda x: np.array([0]), max_retries=0).materialize()
ds.map_batches(lambda x: {"id": np.array([0])}).materialize()
ds.map_batches(lambda x: UserDict({"id": np.array([0])})).materialize()

ds.map(lambda x: np.ones(10), max_retries=0).materialize()
ds.map(lambda x: {"x": np.ones(10)}).materialize()
ds.map(lambda x: UserDict({"x": np.ones(10)})).materialize()

ds.map_batches(lambda x: np.ones(10), max_retries=0).materialize()
ds.map_batches(lambda x: {"x": np.ones(10)}).materialize()
ds.map_batches(lambda x: UserDict({"x": np.ones(10)})).materialize()

# Not allowed in normal mode either.
with pytest.raises(ValueError):
ds.map_batches(lambda x: object(), max_retries=0).materialize()
with pytest.raises(ValueError):
ds.map_batches(lambda x: {"x": object()}, max_retries=0).materialize()
ds.map_batches(lambda x: {"x": np.array([object()])}).materialize()
ds.map_batches(lambda x: UserDict({"x": np.array([object()])})).materialize()

ds.map(lambda x: object(), max_retries=0).materialize()
ds.map(lambda x: {"x": object()}).materialize()
ds.map(lambda x: UserDict({"x": object()})).materialize()


def test_nonstrict_convert_map_output(ray_start_10_cpus_shared, enable_nonstrict_mode):
ds = ray.data.range(1).map_batches(lambda x: {"id": [0, 1, 2, 3]}).materialize()
assert ds.take_batch()["id"].tolist() == [0, 1, 2, 3]

with pytest.raises(ValueError):
# Strings not converted into array.
ray.data.range(1).map_batches(
lambda x: {"id": "string"}, max_retries=0
).materialize()

class UserObj:
def __eq__(self, other):
return isinstance(other, UserObj)

ds = (
ray.data.range(1)
.map_batches(lambda x: {"id": [0, 1, 2, UserObj()]})
.materialize()
)
assert ds.take_batch()["id"].tolist() == [0, 1, 2, UserObj()]


def test_nonstrict_default_batch_format(
ray_start_10_cpus_shared, enable_nonstrict_mode
):
ds = ray.data.range_table(1)

@ray.remote
class Queue:
def __init__(self):
self.item = None

def put(self, item):
old = self.item
self.item = item
return old

q = Queue.remote()

assert isinstance(next(ds.iter_batches()), pd.DataFrame)
assert isinstance(ds.take_batch(), pd.DataFrame)

def f(x):
ray.get(q.put.remote(x))
return x

ds.map_batches(f).materialize()
batch = ray.get(q.put.remote(None))
assert isinstance(batch, pd.DataFrame), batch


def test_nonstrict_tensor_support(ray_start_10_cpus_shared, enable_nonstrict_mode):
ds = ray.data.from_items([np.ones(10), np.ones(10)])
assert np.array_equal(ds.take()[0], np.ones(10))

ds = ds.map(lambda x: x * 2)
assert np.array_equal(ds.take()[0], 2 * np.ones(10))

ds = ds.map_batches(lambda x: x * 2)
assert np.array_equal(ds.take()[0], 4 * np.ones(10))


def test_nonstrict_value_repr(ray_start_10_cpus_shared, enable_nonstrict_mode):
ds = ray.data.from_items([{"__value__": np.ones(10)}])

ds = ds.map_batches(lambda x: {"__value__": x * 2})
ds = ds.map(lambda x: {"__value__": x * 2})
assert np.array_equal(ds.take()[0], 4 * np.ones(10))
assert np.array_equal(ds.take_batch()[0], 4 * np.ones(10))


def test_nonstrict_compute(ray_start_10_cpus_shared, enable_nonstrict_mode):
ray.data.range(10).map(lambda x: x, compute="actors").show()
ray.data.range(10).map(lambda x: x, compute=ray.data.ActorPoolStrategy(1, 1)).show()
ray.data.range(10).map(lambda x: x, compute="tasks").show()


def test_nonstrict_schema(ray_start_10_cpus_shared, enable_nonstrict_mode):
import pyarrow
from ray.data._internal.pandas_block import PandasBlockSchema

ds = ray.data.from_items([{"x": 2}])
schema = ds.schema()
assert isinstance(schema, pyarrow.lib.Schema)

ds = ray.data.from_items([{"x": 2, "y": [1, 2]}])
schema = ds.schema()
assert isinstance(schema, pyarrow.lib.Schema)

ds = ray.data.from_items([{"x": 2, "y": object(), "z": [1, 2]}])
schema = ds.schema()
assert isinstance(schema, type)

ds = ray.data.from_numpy(np.ones((100, 10)))
schema = ds.schema()
assert isinstance(schema, pyarrow.lib.Schema)

schema = ds.map_batches(lambda x: x, batch_format="pandas").schema()
assert isinstance(schema, PandasBlockSchema)


def test_nouse_raw_dicts(ray_start_10_cpus_shared, enable_nonstrict_mode):
assert type(ray.data.range_table(10).take(1)[0].as_pydict()) is dict
assert type(ray.data.from_items([{"x": 1}]).take(1)[0].as_pydict()) is dict

def checker(x):
assert type(x.as_pydict()) is dict
return x

ray.data.range_table(10).map(checker).show()


def test_nonstrict_require_batch_size_for_gpu(enable_nonstrict_mode):
ray.shutdown()
ray.init(num_cpus=4, num_gpus=1)
ds = ray.data.range(1)
ds.map_batches(lambda x: x, num_gpus=1)


if __name__ == "__main__":
import sys

sys.exit(pytest.main(["-v", __file__]))

0 comments on commit 7138da5

Please sign in to comment.