Skip to content

Commit

Permalink
[Datasets] Add support for dynamic block splitting to actor pool. (ra…
Browse files Browse the repository at this point in the history
…y-project#31715)

This PR adds support for dynamic block splitting to the actor pool in the new execution backend.
  • Loading branch information
clarkzinzow authored Jan 18, 2023
1 parent e0b30bf commit 87cbb62
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 95 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from typing import Dict, Any, Iterator, Callable, List, Tuple
from typing import Dict, Any, Iterator, Callable, List, Union
import ray
from ray.data.block import Block, BlockAccessor, BlockMetadata, BlockExecStats
from ray.data.block import Block, BlockMetadata
from ray.data.context import DEFAULT_SCHEDULING_STRATEGY, DatasetContext
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
from ray.data._internal.execution.operators.map_task_submitter import MapTaskSubmitter
from ray.data._internal.execution.operators.map_task_submitter import (
MapTaskSubmitter,
_map_task,
)
from ray.types import ObjectRef
from ray._raylet import ObjectRefGenerator


class ActorPoolSubmitter(MapTaskSubmitter):
Expand Down Expand Up @@ -41,17 +44,17 @@ def start(self):

def submit(
self, input_blocks: List[ObjectRef[Block]]
) -> Tuple[ObjectRef[Block], ObjectRef[BlockMetadata]]:
) -> ObjectRef[ObjectRefGenerator]:
# Pick an actor from the pool.
actor = self._actor_pool.pick_actor()
# Submit the map task.
block, block_metadata = actor.submit.options(num_returns=2).remote(
ref = actor.submit.options(num_returns="dynamic").remote(
self._transform_fn_ref, *input_blocks
)
self._active_actors[block] = actor
return block, block_metadata
self._active_actors[ref] = actor
return ref

def task_done(self, ref: ObjectRef[Block]):
def task_done(self, ref: ObjectRef[ObjectRefGenerator]):
# Return the actor that was running the task to the pool.
actor = self._active_actors.pop(ref)
self._actor_pool.return_actor(actor)
Expand Down Expand Up @@ -87,19 +90,8 @@ def ready(self):

def submit(
self, fn: Callable[[Iterator[Block]], Iterator[Block]], *blocks: Block
) -> Tuple[Block, BlockMetadata]:
# Coalesce all fn output blocks.
# TODO(Clark): Remove this coalescing once dynamic block splitting is supported
# for actors.
# yield from _map_task(fn, *blocks)
stats = BlockExecStats.builder()
builder = DelegatingBlockBuilder()
for block in fn(iter(blocks)):
builder.add_block(block)
block = builder.build()
block_metadata = BlockAccessor.for_block(block).get_metadata([], None)
block_metadata.exec_stats = stats.build()
return block, block_metadata
) -> Iterator[Union[Block, List[BlockMetadata]]]:
yield from _map_task(fn, *blocks)


class ActorPool:
Expand Down
43 changes: 10 additions & 33 deletions python/ray/data/_internal/execution/operators/map_operator_state.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from dataclasses import dataclass
from typing import Callable, Optional, List, Dict, Any, Union, Tuple, Iterator
from typing import Callable, Optional, List, Dict, Any, Iterator

import ray
from ray.data.block import Block, BlockMetadata
from ray.data.block import Block
from ray.data.context import DatasetContext
from ray.data._internal.compute import (
ComputeStrategy,
Expand Down Expand Up @@ -64,7 +64,7 @@ def __init__(
self._block_bundle: Optional[RefBundle] = None

# Execution state.
self._tasks: Dict[ObjectRef[Union[ObjectRefGenerator, Block]], _TaskState] = {}
self._tasks: Dict[ObjectRef[ObjectRefGenerator], _TaskState] = {}
self._tasks_by_output_order: Dict[int, _TaskState] = {}
self._next_task_index: int = 0
self._next_output_index: int = 0
Expand Down Expand Up @@ -109,21 +109,14 @@ def inputs_done(self) -> None:
self._block_bundle = None
self._task_submitter.task_submission_done()

def work_completed(self, ref: ObjectRef[Union[ObjectRefGenerator, Block]]) -> None:
def work_completed(self, ref: ObjectRef[ObjectRefGenerator]) -> None:
self._task_submitter.task_done(ref)
task: _TaskState = self._tasks.pop(ref)
if task.block_metadata_ref is not None:
# Non-dynamic block splitting path.
# TODO(Clark): Remove this special case once dynamic block splitting is
# supported for actors.
block_refs = [ref]
block_metas = [ray.get(task.block_metadata_ref)]
else:
# Dynamic block splitting path.
all_refs = list(ray.get(ref))
del ref
block_refs = all_refs[:-1]
block_metas = ray.get(all_refs[-1])
# Dynamic block splitting path.
all_refs = list(ray.get(ref))
del ref
block_refs = all_refs[:-1]
block_metas = ray.get(all_refs[-1])
assert len(block_metas) == len(block_refs), (block_refs, block_metas)
for ref in block_refs:
trace_allocation(ref, "map_operator_work_completed")
Expand Down Expand Up @@ -187,19 +180,8 @@ def _create_task(self, bundle: RefBundle) -> None:
# TODO fix for Ray client: https://github.com/ray-project/ray/issues/30458
if not DatasetContext.get_current().block_splitting_enabled:
raise NotImplementedError("New backend requires block splitting")
ref: Union[
ObjectRef[ObjectRefGenerator],
Tuple[ObjectRef[Block], ObjectRef[BlockMetadata]],
] = self._task_submitter.submit(input_blocks)
ref: ObjectRef[ObjectRefGenerator] = self._task_submitter.submit(input_blocks)
task = _TaskState(bundle)
if isinstance(ref, tuple):
# Task submitter returned a block ref and block metadata ref tuple; we make
# the block ref the canonical task ref, and store the block metadata ref for
# future resolution, when the task completes.
# TODO(Clark): Remove this special case once dynamic block splitting is
# supported for actors.
ref, block_metadata_ref = ref
task.block_metadata_ref = block_metadata_ref
self._tasks[ref] = task
self._tasks_by_output_order[self._next_task_index] = task
self._next_task_index += 1
Expand All @@ -215,12 +197,7 @@ class _TaskState:
Attributes:
inputs: The input ref bundle.
output: The output ref bundle that is set when the task completes.
block_metadata_ref: A future for the block metadata; this will only be set for
the ActorPoolTaskSubmitter, which doesn't yet support dynamic block
splitting.
"""

inputs: RefBundle
output: Optional[RefBundle] = None
# TODO(Clark): Remove this once dynamic block splitting is supported for actors.
block_metadata_ref: Optional[ObjectRef[BlockMetadata]] = None
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from abc import ABC, abstractmethod
from typing import List, Union, Tuple
from ray.data.block import Block, BlockMetadata
from typing import List, Union, Tuple, Callable, Iterator
from ray.data.block import Block, BlockAccessor, BlockMetadata, BlockExecStats
from ray.types import ObjectRef
from ray._raylet import ObjectRefGenerator

Expand Down Expand Up @@ -58,3 +58,30 @@ def shutdown(self, task_refs: List[ObjectRef[Union[ObjectRefGenerator, Block]]])
task_refs: The output refs for all of the tasks submitted by this submitter.
"""
raise NotImplementedError


def _map_task(
fn: Callable[[Iterator[Block]], Iterator[Block]],
*blocks: Block,
) -> Iterator[Union[Block, List[BlockMetadata]]]:
"""Remote function for a single operator task.
Args:
fn: The callable that takes Iterator[Block] as input and returns
Iterator[Block] as output.
blocks: The concrete block values from the task ref bundle.
Returns:
A generator of blocks, followed by the list of BlockMetadata for the blocks
as the last generator return.
"""
output_metadata = []
stats = BlockExecStats.builder()
for b_out in fn(iter(blocks)):
# TODO(Clark): Add input file propagation from input blocks.
m_out = BlockAccessor.for_block(b_out).get_metadata([], None)
m_out.exec_stats = stats.build()
output_metadata.append(m_out)
yield b_out
stats = BlockExecStats.builder()
yield output_metadata
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from typing import Dict, Any, Iterator, Callable, Union, List

import ray
from ray.data.block import Block, BlockAccessor, BlockMetadata, BlockExecStats
from ray.data._internal.execution.operators.map_task_submitter import MapTaskSubmitter
from ray.data.block import Block
from ray.data._internal.execution.operators.map_task_submitter import (
MapTaskSubmitter,
_map_task,
)
from ray.data._internal.remote_fn import cached_remote_fn
from ray.types import ObjectRef
from ray._raylet import ObjectRefGenerator
Expand Down Expand Up @@ -48,30 +51,3 @@ def shutdown(self, task_refs: List[ObjectRef[Union[ObjectRefGenerator, Block]]])
# a different error, or cancellation failed. In all cases, we
# swallow the exception.
pass


def _map_task(
fn: Callable[[Iterator[Block]], Iterator[Block]],
*blocks: Block,
) -> Iterator[Union[Block, List[BlockMetadata]]]:
"""Remote function for a single operator task.
Args:
fn: The callable that takes Iterator[Block] as input and returns
Iterator[Block] as output.
blocks: The concrete block values from the task ref bundle.
Returns:
A generator of blocks, followed by the list of BlockMetadata for the blocks
as the last generator return.
"""
output_metadata = []
stats = BlockExecStats.builder()
for b_out in fn(iter(blocks)):
# TODO(Clark): Add input file propagation from input blocks.
m_out = BlockAccessor.for_block(b_out).get_metadata([], None)
m_out.exec_stats = stats.build()
output_metadata.append(m_out)
yield b_out
stats = BlockExecStats.builder()
yield output_metadata
32 changes: 28 additions & 4 deletions python/ray/data/tests/test_dynamic_block_split.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import ray
from ray.data._internal.lazy_block_list import LazyBlockList
from ray.data.block import BlockMetadata
from ray.data.context import DatasetContext
from ray.data.datasource import Datasource
from ray.data.datasource.datasource import ReadTask, Reader

Expand Down Expand Up @@ -62,8 +63,29 @@ def test_disable_in_ray_client(ray_start_cluster_enabled):
assert not DatasetContext.get_current().block_splitting_enabled


@pytest.mark.parametrize(
"compute",
[
"tasks",
# TODO(Clark): Remove skip for old execution backend once the old execution
# backend is removed.
pytest.param(
"actors",
marks=pytest.mark.skipif(
not DatasetContext.get_current().new_execution_backend,
reason=(
"Dynamic block splitting for the actor compute strategy is only "
"enabled for the new execution backend."
),
),
),
],
)
def test_dataset(
ray_start_regular_shared, enable_dynamic_block_splitting, target_max_block_size
ray_start_regular_shared,
enable_dynamic_block_splitting,
target_max_block_size,
compute,
):
# Test 10 blocks from 10 tasks, each block is 1024 bytes.
num_blocks = 10
Expand All @@ -81,13 +103,15 @@ def test_dataset(
assert ds.num_blocks() == num_tasks
assert ds.size_bytes() >= 0.7 * block_size * num_blocks * num_tasks

map_ds = ds.map_batches(lambda x: x)
map_ds = ds.map_batches(lambda x: x, compute=compute)
map_ds.fully_executed()
assert map_ds.num_blocks() == num_tasks
map_ds = ds.map_batches(lambda x: x, batch_size=num_blocks * num_tasks)
map_ds = ds.map_batches(
lambda x: x, batch_size=num_blocks * num_tasks, compute=compute
)
map_ds.fully_executed()
assert map_ds.num_blocks() == 1
map_ds = ds.map(lambda x: x)
map_ds = ds.map(lambda x: x, compute=compute)
map_ds.fully_executed()
assert map_ds.num_blocks() == num_blocks * num_tasks

Expand Down
6 changes: 1 addition & 5 deletions python/ray/data/tests/test_operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,7 @@ def _check_batch(block_iter: Iterable[Block]) -> Iterable[Block]:
op.notify_work_completed(work)

# Check we return transformed bundles in order.
if use_actors:
# TODO(Clark): Remove this once dynamic block splitting is supported for actors.
assert _take_outputs(op) == [list(range(5)), list(range(5, 10))]
else:
assert _take_outputs(op) == [[i] for i in range(10)]
assert _take_outputs(op) == [[i] for i in range(10)]
assert op.completed()


Expand Down

0 comments on commit 87cbb62

Please sign in to comment.