Skip to content

Commit

Permalink
[Datasets] Bundle blocks smaller than batch size in map_batches tas…
Browse files Browse the repository at this point in the history
…ks. (ray-project#28648)

When blocks are smaller than the batch size in map_batches(), the actual size of batches provided to the UDF can be much smaller (order of magnitude) than the specified batch size, resulting in very poor batch mapping throughput when the UDF's performance is sensitive to the batch size (such as in batch inference on GPUs).

This PR optimistically sends multiple blocks to a single mapper task up to (but not past) the provided batch size, mitigating the case in which block size << batch size without needing a shuffle step.
  • Loading branch information
clarkzinzow authored Oct 6, 2022
1 parent e142be0 commit b5687a1
Show file tree
Hide file tree
Showing 12 changed files with 434 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1090,7 +1090,7 @@
"source": [
"import ray.data\n",
"\n",
"ds = ray.data.from_items([x.numpy() for x, y in test_data])"
"ds = ray.data.from_items([x.numpy() for x, y in test_data], parallelism=8)"
]
},
{
Expand All @@ -1111,12 +1111,12 @@
"name": "stderr",
"output_type": "stream",
"text": [
"Map Progress (2 actors 1 pending): 100%|██████████| 200/200 [00:02<00:00, 70.01it/s]\n"
"Map Progress (2 actors 1 pending): 100%|██████████| 8/8 [00:02<00:00, 70.01it/s]\n"
]
}
],
"source": [
"results = batch_predictor.predict(ds, min_scoring_workers=2)"
"results = batch_predictor.predict(ds, batch_size=32, min_scoring_workers=2)"
]
},
{
Expand Down Expand Up @@ -1222,13 +1222,14 @@
"name": "stderr",
"output_type": "stream",
"text": [
"Map_Batches: 100%|██████████| 200/200 [00:02<00:00, 80.05it/s]\n"
"Map_Batches: 100%|██████████| 8/8 [00:02<00:00, 80.05it/s]\n"
]
}
],
"source": [
"predicted_classes = results.map_batches(\n",
" lambda batch: [classes[pred.argmax(0)] for pred in batch[\"predictions\"]], \n",
" batch_size=32,\n",
" batch_format=\"pandas\")"
]
},
Expand All @@ -1237,33 +1238,14 @@
"id": "cb7040db",
"metadata": {},
"source": [
"To compare this with the actual labels, let's create a Ray dataset for these and zip it together with the predicted classes:"
"To see how well our prediction did, let's zip the predicted labels together with some of the actual labels to compare them:"
]
},
{
"cell_type": "code",
"execution_count": 30,
"id": "207e13b9",
"metadata": {},
"outputs": [],
"source": [
"real_classes = ray.data.from_items([classes[y] for x, y in test_data])\n",
"merged = predicted_classes.zip(real_classes)"
]
},
{
"cell_type": "markdown",
"id": "18b1012c",
"metadata": {},
"source": [
"Let's examine our results:"
]
},
{
"cell_type": "code",
"execution_count": 31,
"id": "2b2decc6",
"metadata": {},
"outputs": [
{
"name": "stdout",
Expand Down Expand Up @@ -1293,7 +1275,9 @@
}
],
"source": [
"merged.show()"
"real_classes = [classes[y] for x, y in test_data]\n",
"for predicted, real in zip(predicted_classes.take(), real_classes):\n",
" print((predicted, real))"
]
},
{
Expand Down
4 changes: 2 additions & 2 deletions doc/source/ray-air/examples/torch_image_batch_pretrained.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def preprocess(df: pd.DataFrame) -> pd.DataFrame:
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
]
)
df["image"] = [preprocess(x).numpy() for x in df["image"]]
df.loc[:, "image"] = [preprocess(x).numpy() for x in df["image"]]
return df


Expand All @@ -39,4 +39,4 @@ def preprocess(df: pd.DataFrame) -> pd.DataFrame:
ckpt = TorchCheckpoint.from_model(model=model, preprocessor=preprocessor)

predictor = BatchPredictor.from_checkpoint(ckpt, TorchPredictor)
predictor.predict(dataset, feature_columns=["image"])
predictor.predict(dataset, feature_columns=["image"], batch_size=80)
6 changes: 3 additions & 3 deletions python/ray/air/tests/test_dataset_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,13 +255,13 @@ def checker(shard, results):
assert results[0] == results[1], results
stats = shard.stats()
assert str(shard) == "DatasetPipeline(num_windows=inf, num_stages=1)", shard
assert "Stage 1 read->map_batches: 5/5 blocks executed " in stats, stats
assert "Stage 1 read->map_batches: 1/1 blocks executed " in stats, stats

def rand(x):
return [random.random() for _ in range(len(x))]

prep = BatchMapper(rand)
ds = ray.data.range_table(5)
ds = ray.data.range_table(5, parallelism=1)
test = TestStream(
checker,
preprocessor=prep,
Expand All @@ -287,7 +287,7 @@ def checker(shard, results):
stats = shard.stats()
assert str(shard) == "DatasetPipeline(num_windows=inf, num_stages=1)", shard
assert (
"Stage 1 read->randomize_block_order->map_batches: 5/5 blocks executed "
"Stage 1 read->randomize_block_order->map_batches: 1/1 blocks executed "
in stats
), stats

Expand Down
31 changes: 26 additions & 5 deletions python/ray/data/_internal/batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,20 @@ class Batcher(BatcherInterface):
# zero-copy views, we sacrifice what should be a small performance hit for better
# readability.

def __init__(self, batch_size: Optional[int]):
def __init__(self, batch_size: Optional[int], ensure_copy: bool = False):
"""
Construct a batcher that yields batches of batch_sizes rows.
Args:
batch_size: The size of batches to yield.
ensure_copy: Whether batches are always copied from the underlying base
blocks (not zero-copy views).
"""
self._batch_size = batch_size
self._buffer = []
self._buffer_size = 0
self._done_adding = False
self._ensure_copy = ensure_copy

def add(self, block: Block):
"""Add a block to the block buffer.
Expand Down Expand Up @@ -94,6 +103,10 @@ def next_batch(self) -> Block:
if self._batch_size is None:
assert len(self._buffer) == 1
block = self._buffer[0]
if self._ensure_copy:
# Copy block if needing to ensure fresh batch copy.
block = BlockAccessor.for_block(block)
block = block.slice(0, block.num_rows(), copy=True)
self._buffer = []
self._buffer_size = 0
return block
Expand All @@ -110,20 +123,28 @@ def next_batch(self) -> Block:
# We need this entire block to fill out a batch.
# We need to call `accessor.slice()` to ensure
# the subsequent block's type are the same.
output.add_block(accessor.slice(0, accessor.num_rows()))
output.add_block(accessor.slice(0, accessor.num_rows(), copy=False))
needed -= accessor.num_rows()
else:
# We only need part of the block to fill out a batch.
output.add_block(accessor.slice(0, needed))
output.add_block(accessor.slice(0, needed, copy=False))
# Add the rest of the block to the leftovers.
leftover.append(accessor.slice(needed, accessor.num_rows()))
leftover.append(accessor.slice(needed, accessor.num_rows(), copy=False))
needed = 0

# Move the leftovers into the block buffer so they're the first
# blocks consumed on the next batch extraction.
self._buffer = leftover
self._buffer_size -= self._batch_size
return output.build()
batch = output.build()
if self._ensure_copy:
# Need to ensure that the batch is a fresh copy.
batch = BlockAccessor.for_block(batch)
# TOOD(Clark): This copy will often be unnecessary, e.g. for pandas
# DataFrame batches that have required concatenation to construct, which
# always requires a copy. We should elide this copy in those cases.
batch = batch.slice(0, batch.num_rows(), copy=True)
return batch


class ShufflingBatcher(BatcherInterface):
Expand Down
Loading

0 comments on commit b5687a1

Please sign in to comment.