diff --git a/python/ray/data/_internal/output_buffer.py b/python/ray/data/_internal/output_buffer.py index 5dd1e2139a81..bfefd8bb18aa 100644 --- a/python/ray/data/_internal/output_buffer.py +++ b/python/ray/data/_internal/output_buffer.py @@ -2,6 +2,7 @@ from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder from ray.data.block import Block, BlockAccessor, DataBatch +from ray.data.context import MAX_SAFE_BLOCK_SIZE_FACTOR class BlockOutputBuffer: @@ -72,15 +73,27 @@ def next(self) -> Block: block_to_yield = self._buffer.build() block_remainder = None block = BlockAccessor.for_block(block_to_yield) - if block.size_bytes() > self._target_max_block_size: + if ( + block.size_bytes() + >= MAX_SAFE_BLOCK_SIZE_FACTOR * self._target_max_block_size + ): + # Slice a block to respect the target max block size. We only do + # this if we are more than 50% above the target block size, because + # this ensures that the last block produced will be at least half + # the block size. num_bytes_per_row = block.size_bytes() // block.num_rows() target_num_rows = self._target_max_block_size // num_bytes_per_row target_num_rows = max(1, target_num_rows) - num_rows = min(target_num_rows, block.num_rows()) - # Use copy=True to avoid holding the entire block in memory. - block_to_yield = block.slice(0, num_rows, copy=True) - block_remainder = block.slice(num_rows, block.num_rows(), copy=True) + # TODO(swang): If the buffer is finalized, try to create even + # blocks? + + if target_num_rows < block.num_rows(): + # Use copy=True to avoid holding the entire block in memory. + block_to_yield = block.slice(0, target_num_rows, copy=True) + block_remainder = block.slice( + target_num_rows, block.num_rows(), copy=True + ) self._buffer = DelegatingBlockBuilder() if block_remainder is not None: diff --git a/python/ray/data/_internal/planner/exchange/shuffle_task_spec.py b/python/ray/data/_internal/planner/exchange/shuffle_task_spec.py index 499985f82c35..ccb948faa9f3 100644 --- a/python/ray/data/_internal/planner/exchange/shuffle_task_spec.py +++ b/python/ray/data/_internal/planner/exchange/shuffle_task_spec.py @@ -7,6 +7,7 @@ from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder from ray.data._internal.planner.exchange.interfaces import ExchangeTaskSpec from ray.data.block import Block, BlockAccessor, BlockExecStats, BlockMetadata +from ray.data.context import MAX_SAFE_BLOCK_SIZE_FACTOR logger = DatasetLogger(__name__) @@ -62,7 +63,10 @@ def map( del mapped_block block = builder.build() block = BlockAccessor.for_block(block) - if block.size_bytes() >= 1.5 * target_shuffle_max_block_size: + if ( + block.size_bytes() + > MAX_SAFE_BLOCK_SIZE_FACTOR * target_shuffle_max_block_size + ): logger.get_logger().warn( "Input block to map task has size " f"{block.size_bytes() // (1024 * 1024)}MiB, which exceeds " diff --git a/python/ray/data/context.py b/python/ray/data/context.py index dc5ad8f8538b..95cd906f4f3f 100644 --- a/python/ray/data/context.py +++ b/python/ray/data/context.py @@ -33,6 +33,11 @@ # We choose 1GiB as 4x less than the typical memory:core ratio (4:1). DEFAULT_SHUFFLE_TARGET_MAX_BLOCK_SIZE = 1024 * 1024 * 1024 +# We will attempt to slice blocks whose size exceeds this factor * +# target_max_block_size. We will warn the user if slicing fails and we produce +# blocks larger than this threshold. +MAX_SAFE_BLOCK_SIZE_FACTOR = 1.5 + # Dataset will avoid creating blocks smaller than this size in bytes on read. # This takes precedence over DEFAULT_MIN_PARALLELISM. DEFAULT_TARGET_MIN_BLOCK_SIZE = 1 * 1024 * 1024 diff --git a/python/ray/data/tests/test_dynamic_block_split.py b/python/ray/data/tests/test_dynamic_block_split.py index a943066e397e..bc0c7ee53a9d 100644 --- a/python/ray/data/tests/test_dynamic_block_split.py +++ b/python/ray/data/tests/test_dynamic_block_split.py @@ -1,5 +1,6 @@ import os import time +from dataclasses import astuple, dataclass import numpy as np import pandas as pd @@ -8,6 +9,7 @@ import ray from ray.data import Dataset +from ray.data._internal.arrow_block import ArrowBlockAccessor from ray.data._internal.lazy_block_list import LazyBlockList from ray.data.block import BlockMetadata from ray.data.datasource import Datasource @@ -20,37 +22,75 @@ class RandomBytesDatasource(Datasource): def create_reader(self, **read_args): return RandomBytesReader( - read_args["num_blocks_per_task"], - read_args["block_size"], - read_args.get("use_bytes", True), + read_args["num_batches_per_task"], + read_args["row_size"], + num_rows_per_batch=read_args.get("num_rows_per_batch", None), + use_bytes=read_args.get("use_bytes", True), + use_arrow=read_args.get("use_arrow", False), ) class RandomBytesReader(Reader): - def __init__(self, num_blocks_per_task: int, block_size: int, use_bytes=True): - self.num_blocks_per_task = num_blocks_per_task - self.block_size = block_size + def __init__( + self, + num_batches_per_task: int, + row_size: int, + num_rows_per_batch=None, + use_bytes=True, + use_arrow=False, + ): + self.num_batches_per_task = num_batches_per_task + self.row_size = row_size + if num_rows_per_batch is None: + num_rows_per_batch = 1 + self.num_rows_per_batch = num_rows_per_batch self.use_bytes = use_bytes + self.use_arrow = use_arrow def estimate_inmemory_data_size(self): return None def get_read_tasks(self, parallelism: int): def _blocks_generator(): - for _ in range(self.num_blocks_per_task): + for _ in range(self.num_batches_per_task): if self.use_bytes: - yield pd.DataFrame({"one": [np.random.bytes(self.block_size)]}) + # NOTE(swang): Each np object has some metadata bytes, so + # actual size can be much more than num_rows_per_batch * row_size + # if row_size is small. + yield pd.DataFrame( + { + "one": [ + np.random.bytes(self.row_size) + for _ in range(self.num_rows_per_batch) + ] + } + ) + elif self.use_arrow: + batch = { + "one": np.ones( + (self.num_rows_per_batch, self.row_size), dtype=np.uint8 + ) + } + block = ArrowBlockAccessor.numpy_to_block(batch) + yield block else: yield pd.DataFrame( - {"one": [np.array2string(np.ones(self.block_size, dtype=int))]} + { + "one": [ + np.array2string(np.ones(self.row_size, dtype=int)) + for _ in range(self.num_rows_per_batch) + ] + } ) return parallelism * [ ReadTask( lambda: _blocks_generator(), BlockMetadata( - num_rows=self.num_blocks_per_task, - size_bytes=self.num_blocks_per_task * self.block_size, + num_rows=self.num_batches_per_task * self.num_rows_per_batch, + size_bytes=self.num_batches_per_task + * self.num_rows_per_batch + * self.row_size, schema=None, input_files=None, exec_stats=None, @@ -122,8 +162,8 @@ def test_dataset( ds = ray.data.read_datasource( RandomBytesDatasource(), parallelism=num_tasks, - num_blocks_per_task=num_blocks_per_task, - block_size=block_size, + num_batches_per_task=num_blocks_per_task, + row_size=block_size, ) # Note the following calls to ds will not fully execute it. assert ds.schema() is not None @@ -137,7 +177,9 @@ def test_dataset( assert map_ds.num_blocks() == num_tasks * num_blocks_per_task # Blocks smaller than requested batch size will get coalesced. map_ds = ds.map_batches( - lambda x: {}, batch_size=num_blocks_per_task * num_tasks, compute=compute + lambda x: {}, + batch_size=num_blocks_per_task * num_tasks, + compute=compute, ) map_ds = map_ds.materialize() assert map_ds.num_blocks() == 1 @@ -184,8 +226,8 @@ def test_filter(ray_start_regular_shared, target_max_block_size): ds = ray.data.read_datasource( RandomBytesDatasource(), parallelism=1, - num_blocks_per_task=num_blocks_per_task, - block_size=block_size, + num_batches_per_task=num_blocks_per_task, + row_size=block_size, ) ds = ds.filter(lambda _: True) @@ -209,8 +251,8 @@ def test_lazy_block_list(shutdown_only, target_max_block_size): ds = ray.data.read_datasource( RandomBytesDatasource(), parallelism=num_tasks, - num_blocks_per_task=num_blocks_per_task, - block_size=block_size, + num_batches_per_task=num_blocks_per_task, + row_size=block_size, ) ds.schema() @@ -315,11 +357,11 @@ def foo(batch): ds = ray.data.read_datasource( RandomBytesDatasource(), parallelism=1, - num_blocks_per_task=num_blocks_per_task, - block_size=block_size, + num_batches_per_task=num_blocks_per_task, + row_size=block_size, ) - ds = ds.map_batches(foo, batch_size=None) + ds = ds.map_batches(foo, num_rows_per_batch=None) assert ds.count() == num_blocks_per_task @@ -333,8 +375,8 @@ def _test_write_large_data( ds = ray.data.read_datasource( RandomBytesDatasource(), parallelism=1, - num_blocks_per_task=num_blocks_per_task, - block_size=block_size, + num_batches_per_task=num_blocks_per_task, + row_size=block_size, use_bytes=use_bytes, ) @@ -408,6 +450,100 @@ def test_write_large_data_webdataset(shutdown_only, tmp_path): ) +@dataclass +class TestCase: + target_max_block_size: int + batch_size: int + num_batches: int + expected_num_blocks: int + + +TEST_CASES = [ + # Don't create blocks smaller than 50%. + TestCase( + target_max_block_size=1024, + batch_size=int(1024 * 1.125), + num_batches=1, + expected_num_blocks=1, + ), + # Split blocks larger than 150% the target block size. + TestCase( + target_max_block_size=1024, + batch_size=int(1024 * 1.8), + num_batches=1, + expected_num_blocks=2, + ), + # Huge batch will get split into multiple blocks. + TestCase( + target_max_block_size=1024, + batch_size=int(1024 * 10.125), + num_batches=1, + expected_num_blocks=11, + ), + # Different batch sizes but same total size should produce a similar number + # of blocks. + TestCase( + target_max_block_size=1024, + batch_size=int(1024 * 1.5), + num_batches=4, + expected_num_blocks=6, + ), + TestCase( + target_max_block_size=1024, + batch_size=int(1024 * 0.75), + num_batches=8, + expected_num_blocks=6, + ), +] + + +@pytest.mark.parametrize( + "target_max_block_size,batch_size,num_batches,expected_num_blocks", + [astuple(test) for test in TEST_CASES], +) +def test_block_slicing( + ray_start_regular_shared, + restore_data_context, + target_max_block_size, + batch_size, + num_batches, + expected_num_blocks, +): + ctx = ray.data.context.DataContext.get_current() + ctx.target_max_block_size = target_max_block_size + + # Row sizes smaller than this seem to add significant amounts of per-row + # metadata overhead. + row_size = 128 + num_rows_per_batch = int(batch_size / row_size) + num_tasks = 1 + + ds = ray.data.read_datasource( + RandomBytesDatasource(), + parallelism=num_tasks, + num_batches_per_task=num_batches, + num_rows_per_batch=num_rows_per_batch, + row_size=row_size, + use_bytes=False, + use_arrow=True, + ).materialize() + assert ds.num_blocks() == expected_num_blocks + + block_sizes = [] + num_rows = 0 + for batch in ds.iter_batches(batch_size=None, batch_format="numpy"): + block_sizes.append(batch["one"].size) + num_rows += len(batch["one"]) + assert num_rows == num_rows_per_batch * num_batches + for size in block_sizes: + # Blocks are not too big. + assert ( + size <= target_max_block_size * ray.data.context.MAX_SAFE_BLOCK_SIZE_FACTOR + ) + # Blocks are not too small. + assert size >= target_max_block_size / 2 + + if __name__ == "__main__": import sys diff --git a/release/nightly_tests/dataset/iter_tensor_batches_benchmark.py b/release/nightly_tests/dataset/iter_tensor_batches_benchmark.py index 9b944255ff08..de61a7769aed 100644 --- a/release/nightly_tests/dataset/iter_tensor_batches_benchmark.py +++ b/release/nightly_tests/dataset/iter_tensor_batches_benchmark.py @@ -59,7 +59,12 @@ def to_tf( return ds -def run_iter_tensor_batches_benchmark(benchmark: Benchmark, data_size_gb: int): +def run_iter_tensor_batches_benchmark( + benchmark: Benchmark, data_size_gb: int, block_size_mb: int +): + ctx = ray.data.context.DataContext.get_current() + ctx.target_max_block_size = block_size_mb * 1024 * 1024 + ds = ray.data.read_images( f"s3://anonymous@air-example-data-2/{data_size_gb}G-image-data-synthetic-raw" ) @@ -95,19 +100,11 @@ def add_label(batch): # Test with varying batch sizes for iter_torch_batches() and to_tf(). for batch_size in batch_sizes: benchmark.run_materialize_ds( - f"iter-torch-batches-{batch_size}", + f"iter-torch-batches-{batch_size}-block-size-{block_size_mb}", iter_torch_batches, ds=ds, batch_size=batch_size, ) - benchmark.run_materialize_ds( - f"to-tf-{batch_size}", - to_tf, - ds=ds, - feature_columns="image", - label_columns="label", - batch_size=batch_size, - ) prefetch_batches = [0, 1, 4] # Test with varying prefetching for iter_torch_batches() @@ -158,11 +155,17 @@ def add_label(batch): default=1, help="The data size to use for the dataset.", ) + parser.add_argument( + "--block-size-mb", + type=int, + default=128, + help="The data size to use for the dataset.", + ) args = parser.parse_args() benchmark = Benchmark("iter-tensor-batches") - run_iter_tensor_batches_benchmark(benchmark, args.data_size_gb) + run_iter_tensor_batches_benchmark(benchmark, args.data_size_gb, args.block_size_mb) benchmark.write_result()