Skip to content

Commit

Permalink
[data] Avoid slicing too-small blocks (ray-project#40840)
Browse files Browse the repository at this point in the history
Block slicing may create blocks that are very small. This PR ensures that we do not produce blocks < 50% the target block size during block slicing.

---------

Signed-off-by: Stephanie Wang <[email protected]>
  • Loading branch information
stephanie-wang authored Nov 3, 2023
1 parent 77cf6cc commit f22137d
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 40 deletions.
23 changes: 18 additions & 5 deletions python/ray/data/_internal/output_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
from ray.data.block import Block, BlockAccessor, DataBatch
from ray.data.context import MAX_SAFE_BLOCK_SIZE_FACTOR


class BlockOutputBuffer:
Expand Down Expand Up @@ -72,15 +73,27 @@ def next(self) -> Block:
block_to_yield = self._buffer.build()
block_remainder = None
block = BlockAccessor.for_block(block_to_yield)
if block.size_bytes() > self._target_max_block_size:
if (
block.size_bytes()
>= MAX_SAFE_BLOCK_SIZE_FACTOR * self._target_max_block_size
):
# Slice a block to respect the target max block size. We only do
# this if we are more than 50% above the target block size, because
# this ensures that the last block produced will be at least half
# the block size.
num_bytes_per_row = block.size_bytes() // block.num_rows()
target_num_rows = self._target_max_block_size // num_bytes_per_row
target_num_rows = max(1, target_num_rows)

num_rows = min(target_num_rows, block.num_rows())
# Use copy=True to avoid holding the entire block in memory.
block_to_yield = block.slice(0, num_rows, copy=True)
block_remainder = block.slice(num_rows, block.num_rows(), copy=True)
# TODO(swang): If the buffer is finalized, try to create even
# blocks?

if target_num_rows < block.num_rows():
# Use copy=True to avoid holding the entire block in memory.
block_to_yield = block.slice(0, target_num_rows, copy=True)
block_remainder = block.slice(
target_num_rows, block.num_rows(), copy=True
)

self._buffer = DelegatingBlockBuilder()
if block_remainder is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
from ray.data._internal.planner.exchange.interfaces import ExchangeTaskSpec
from ray.data.block import Block, BlockAccessor, BlockExecStats, BlockMetadata
from ray.data.context import MAX_SAFE_BLOCK_SIZE_FACTOR

logger = DatasetLogger(__name__)

Expand Down Expand Up @@ -62,7 +63,10 @@ def map(
del mapped_block
block = builder.build()
block = BlockAccessor.for_block(block)
if block.size_bytes() >= 1.5 * target_shuffle_max_block_size:
if (
block.size_bytes()
> MAX_SAFE_BLOCK_SIZE_FACTOR * target_shuffle_max_block_size
):
logger.get_logger().warn(
"Input block to map task has size "
f"{block.size_bytes() // (1024 * 1024)}MiB, which exceeds "
Expand Down
5 changes: 5 additions & 0 deletions python/ray/data/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
# We choose 1GiB as 4x less than the typical memory:core ratio (4:1).
DEFAULT_SHUFFLE_TARGET_MAX_BLOCK_SIZE = 1024 * 1024 * 1024

# We will attempt to slice blocks whose size exceeds this factor *
# target_max_block_size. We will warn the user if slicing fails and we produce
# blocks larger than this threshold.
MAX_SAFE_BLOCK_SIZE_FACTOR = 1.5

# Dataset will avoid creating blocks smaller than this size in bytes on read.
# This takes precedence over DEFAULT_MIN_PARALLELISM.
DEFAULT_TARGET_MIN_BLOCK_SIZE = 1 * 1024 * 1024
Expand Down
182 changes: 159 additions & 23 deletions python/ray/data/tests/test_dynamic_block_split.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import time
from dataclasses import astuple, dataclass

import numpy as np
import pandas as pd
Expand All @@ -8,6 +9,7 @@

import ray
from ray.data import Dataset
from ray.data._internal.arrow_block import ArrowBlockAccessor
from ray.data._internal.lazy_block_list import LazyBlockList
from ray.data.block import BlockMetadata
from ray.data.datasource import Datasource
Expand All @@ -20,37 +22,75 @@
class RandomBytesDatasource(Datasource):
def create_reader(self, **read_args):
return RandomBytesReader(
read_args["num_blocks_per_task"],
read_args["block_size"],
read_args.get("use_bytes", True),
read_args["num_batches_per_task"],
read_args["row_size"],
num_rows_per_batch=read_args.get("num_rows_per_batch", None),
use_bytes=read_args.get("use_bytes", True),
use_arrow=read_args.get("use_arrow", False),
)


class RandomBytesReader(Reader):
def __init__(self, num_blocks_per_task: int, block_size: int, use_bytes=True):
self.num_blocks_per_task = num_blocks_per_task
self.block_size = block_size
def __init__(
self,
num_batches_per_task: int,
row_size: int,
num_rows_per_batch=None,
use_bytes=True,
use_arrow=False,
):
self.num_batches_per_task = num_batches_per_task
self.row_size = row_size
if num_rows_per_batch is None:
num_rows_per_batch = 1
self.num_rows_per_batch = num_rows_per_batch
self.use_bytes = use_bytes
self.use_arrow = use_arrow

def estimate_inmemory_data_size(self):
return None

def get_read_tasks(self, parallelism: int):
def _blocks_generator():
for _ in range(self.num_blocks_per_task):
for _ in range(self.num_batches_per_task):
if self.use_bytes:
yield pd.DataFrame({"one": [np.random.bytes(self.block_size)]})
# NOTE(swang): Each np object has some metadata bytes, so
# actual size can be much more than num_rows_per_batch * row_size
# if row_size is small.
yield pd.DataFrame(
{
"one": [
np.random.bytes(self.row_size)
for _ in range(self.num_rows_per_batch)
]
}
)
elif self.use_arrow:
batch = {
"one": np.ones(
(self.num_rows_per_batch, self.row_size), dtype=np.uint8
)
}
block = ArrowBlockAccessor.numpy_to_block(batch)
yield block
else:
yield pd.DataFrame(
{"one": [np.array2string(np.ones(self.block_size, dtype=int))]}
{
"one": [
np.array2string(np.ones(self.row_size, dtype=int))
for _ in range(self.num_rows_per_batch)
]
}
)

return parallelism * [
ReadTask(
lambda: _blocks_generator(),
BlockMetadata(
num_rows=self.num_blocks_per_task,
size_bytes=self.num_blocks_per_task * self.block_size,
num_rows=self.num_batches_per_task * self.num_rows_per_batch,
size_bytes=self.num_batches_per_task
* self.num_rows_per_batch
* self.row_size,
schema=None,
input_files=None,
exec_stats=None,
Expand Down Expand Up @@ -122,8 +162,8 @@ def test_dataset(
ds = ray.data.read_datasource(
RandomBytesDatasource(),
parallelism=num_tasks,
num_blocks_per_task=num_blocks_per_task,
block_size=block_size,
num_batches_per_task=num_blocks_per_task,
row_size=block_size,
)
# Note the following calls to ds will not fully execute it.
assert ds.schema() is not None
Expand All @@ -137,7 +177,9 @@ def test_dataset(
assert map_ds.num_blocks() == num_tasks * num_blocks_per_task
# Blocks smaller than requested batch size will get coalesced.
map_ds = ds.map_batches(
lambda x: {}, batch_size=num_blocks_per_task * num_tasks, compute=compute
lambda x: {},
batch_size=num_blocks_per_task * num_tasks,
compute=compute,
)
map_ds = map_ds.materialize()
assert map_ds.num_blocks() == 1
Expand Down Expand Up @@ -184,8 +226,8 @@ def test_filter(ray_start_regular_shared, target_max_block_size):
ds = ray.data.read_datasource(
RandomBytesDatasource(),
parallelism=1,
num_blocks_per_task=num_blocks_per_task,
block_size=block_size,
num_batches_per_task=num_blocks_per_task,
row_size=block_size,
)

ds = ds.filter(lambda _: True)
Expand All @@ -209,8 +251,8 @@ def test_lazy_block_list(shutdown_only, target_max_block_size):
ds = ray.data.read_datasource(
RandomBytesDatasource(),
parallelism=num_tasks,
num_blocks_per_task=num_blocks_per_task,
block_size=block_size,
num_batches_per_task=num_blocks_per_task,
row_size=block_size,
)
ds.schema()

Expand Down Expand Up @@ -315,11 +357,11 @@ def foo(batch):
ds = ray.data.read_datasource(
RandomBytesDatasource(),
parallelism=1,
num_blocks_per_task=num_blocks_per_task,
block_size=block_size,
num_batches_per_task=num_blocks_per_task,
row_size=block_size,
)

ds = ds.map_batches(foo, batch_size=None)
ds = ds.map_batches(foo, num_rows_per_batch=None)
assert ds.count() == num_blocks_per_task


Expand All @@ -333,8 +375,8 @@ def _test_write_large_data(
ds = ray.data.read_datasource(
RandomBytesDatasource(),
parallelism=1,
num_blocks_per_task=num_blocks_per_task,
block_size=block_size,
num_batches_per_task=num_blocks_per_task,
row_size=block_size,
use_bytes=use_bytes,
)

Expand Down Expand Up @@ -408,6 +450,100 @@ def test_write_large_data_webdataset(shutdown_only, tmp_path):
)


@dataclass
class TestCase:
target_max_block_size: int
batch_size: int
num_batches: int
expected_num_blocks: int


TEST_CASES = [
# Don't create blocks smaller than 50%.
TestCase(
target_max_block_size=1024,
batch_size=int(1024 * 1.125),
num_batches=1,
expected_num_blocks=1,
),
# Split blocks larger than 150% the target block size.
TestCase(
target_max_block_size=1024,
batch_size=int(1024 * 1.8),
num_batches=1,
expected_num_blocks=2,
),
# Huge batch will get split into multiple blocks.
TestCase(
target_max_block_size=1024,
batch_size=int(1024 * 10.125),
num_batches=1,
expected_num_blocks=11,
),
# Different batch sizes but same total size should produce a similar number
# of blocks.
TestCase(
target_max_block_size=1024,
batch_size=int(1024 * 1.5),
num_batches=4,
expected_num_blocks=6,
),
TestCase(
target_max_block_size=1024,
batch_size=int(1024 * 0.75),
num_batches=8,
expected_num_blocks=6,
),
]


@pytest.mark.parametrize(
"target_max_block_size,batch_size,num_batches,expected_num_blocks",
[astuple(test) for test in TEST_CASES],
)
def test_block_slicing(
ray_start_regular_shared,
restore_data_context,
target_max_block_size,
batch_size,
num_batches,
expected_num_blocks,
):
ctx = ray.data.context.DataContext.get_current()
ctx.target_max_block_size = target_max_block_size

# Row sizes smaller than this seem to add significant amounts of per-row
# metadata overhead.
row_size = 128
num_rows_per_batch = int(batch_size / row_size)
num_tasks = 1

ds = ray.data.read_datasource(
RandomBytesDatasource(),
parallelism=num_tasks,
num_batches_per_task=num_batches,
num_rows_per_batch=num_rows_per_batch,
row_size=row_size,
use_bytes=False,
use_arrow=True,
).materialize()
assert ds.num_blocks() == expected_num_blocks

block_sizes = []
num_rows = 0
for batch in ds.iter_batches(batch_size=None, batch_format="numpy"):
block_sizes.append(batch["one"].size)
num_rows += len(batch["one"])
assert num_rows == num_rows_per_batch * num_batches
for size in block_sizes:
# Blocks are not too big.
assert (
size <= target_max_block_size * ray.data.context.MAX_SAFE_BLOCK_SIZE_FACTOR
)
# Blocks are not too small.
assert size >= target_max_block_size / 2


if __name__ == "__main__":
import sys

Expand Down
25 changes: 14 additions & 11 deletions release/nightly_tests/dataset/iter_tensor_batches_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ def to_tf(
return ds


def run_iter_tensor_batches_benchmark(benchmark: Benchmark, data_size_gb: int):
def run_iter_tensor_batches_benchmark(
benchmark: Benchmark, data_size_gb: int, block_size_mb: int
):
ctx = ray.data.context.DataContext.get_current()
ctx.target_max_block_size = block_size_mb * 1024 * 1024

ds = ray.data.read_images(
f"s3://anonymous@air-example-data-2/{data_size_gb}G-image-data-synthetic-raw"
)
Expand Down Expand Up @@ -95,19 +100,11 @@ def add_label(batch):
# Test with varying batch sizes for iter_torch_batches() and to_tf().
for batch_size in batch_sizes:
benchmark.run_materialize_ds(
f"iter-torch-batches-{batch_size}",
f"iter-torch-batches-{batch_size}-block-size-{block_size_mb}",
iter_torch_batches,
ds=ds,
batch_size=batch_size,
)
benchmark.run_materialize_ds(
f"to-tf-{batch_size}",
to_tf,
ds=ds,
feature_columns="image",
label_columns="label",
batch_size=batch_size,
)

prefetch_batches = [0, 1, 4]
# Test with varying prefetching for iter_torch_batches()
Expand Down Expand Up @@ -158,11 +155,17 @@ def add_label(batch):
default=1,
help="The data size to use for the dataset.",
)
parser.add_argument(
"--block-size-mb",
type=int,
default=128,
help="The data size to use for the dataset.",
)

args = parser.parse_args()

benchmark = Benchmark("iter-tensor-batches")

run_iter_tensor_batches_benchmark(benchmark, args.data_size_gb)
run_iter_tensor_batches_benchmark(benchmark, args.data_size_gb, args.block_size_mb)

benchmark.write_result()

0 comments on commit f22137d

Please sign in to comment.