Skip to content

Commit

Permalink
[Data] Remove DataContext variables for legacy execution backend / …
Browse files Browse the repository at this point in the history
…stages optimizer (ray-project#42780)

Removes `DataContext` attributes related to the legacy execution backend and stages optimizer:
- optimize_fuse_stages
- optimize_fuse_read_stages
- optimize_fuse_shuffle_stages
- optimize_reorder_stages
- optimizer_enabled
- new_execution_backend
- use_streaming_executor

Signed-off-by: Scott Lee <[email protected]>
  • Loading branch information
scottjlee authored Jan 29, 2024
1 parent 87cd741 commit 3173487
Show file tree
Hide file tree
Showing 20 changed files with 236 additions and 700 deletions.
6 changes: 1 addition & 5 deletions .buildkite/data.rayci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ steps:
--workers "$${BUILDKITE_PARALLEL_JOB_COUNT}"
--worker-id "$${BUILDKITE_PARALLEL_JOB}" --parallelism-per-worker 3
--build-name data6build
--test-env RAY_DATA_USE_STREAMING_EXECUTOR=1
--except-tags data_integration,doctest
depends_on: data6build

Expand All @@ -51,7 +50,6 @@ steps:
--workers "$${BUILDKITE_PARALLEL_JOB_COUNT}"
--worker-id "$${BUILDKITE_PARALLEL_JOB}" --parallelism-per-worker 3
--build-name data14build
--test-env RAY_DATA_USE_STREAMING_EXECUTOR=1
--except-tags data_integration,doctest
depends_on: data14build

Expand All @@ -64,7 +62,6 @@ steps:
commands:
- bazel run //ci/ray_ci:test_in_docker -- //python/ray/data/... //python/ray/air/... data
--workers 2 --worker-id {{matrix.worker_id}} --parallelism-per-worker 3
--test-env RAY_DATA_USE_STREAMING_EXECUTOR=1
--except-tags data_integration,doctest
--python-version {{matrix.python}}
depends_on: databuild-multipy
Expand All @@ -85,7 +82,6 @@ steps:
--workers "$${BUILDKITE_PARALLEL_JOB_COUNT}"
--worker-id "$${BUILDKITE_PARALLEL_JOB}" --parallelism-per-worker 3
--build-name datanbuild
--test-env RAY_DATA_USE_STREAMING_EXECUTOR=1
--except-tags data_integration,doctest
depends_on: datanbuild

Expand Down Expand Up @@ -163,7 +159,7 @@ steps:
soft_fail: true
commands:
- bazel run //ci/ray_ci:test_in_docker -- //... data --run-flaky-tests
--test-env RAY_DATA_USE_STREAMING_EXECUTOR=1 --parallelism-per-worker 3
--parallelism-per-worker 3
--build-name data14build
--except-tags gpu_only,gpu
depends_on: data14build
Expand Down
2 changes: 0 additions & 2 deletions python/ray/data/_internal/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ def _apply(
fn_constructor_args: Optional[Iterable[Any]] = None,
fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
) -> BlockList:
assert not DataContext.get_current().new_execution_backend, "Legacy backend off"
assert fn_constructor_args is None and fn_constructor_kwargs is None
if fn_args is None:
fn_args = tuple()
Expand Down Expand Up @@ -244,7 +243,6 @@ def _apply(
fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
) -> BlockList:
"""Note: this is not part of the Dataset public API."""
assert not DataContext.get_current().new_execution_backend, "Legacy backend off"
if fn_args is None:
fn_args = tuple()
if fn_kwargs is None:
Expand Down
1 change: 0 additions & 1 deletion python/ray/data/_internal/execution/legacy_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ def _get_execution_dag(


def _get_initial_stats_from_plan(plan: ExecutionPlan) -> DatasetStats:
assert DataContext.get_current().optimizer_enabled
if plan._snapshot_blocks is not None and not plan._snapshot_blocks.is_cleared():
return plan._snapshot_stats
# For Datasets created from "read_xxx", `plan._in_blocks` is a LazyBlockList,
Expand Down
8 changes: 1 addition & 7 deletions python/ray/data/_internal/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ def execute_to_iterator(
# Always used the saved context for execution.
ctx = self._context

if not ctx.use_streaming_executor or self.has_computed_output():
if self.has_computed_output():
return (
self.execute(
allow_clear_input_blocks, force_read
Expand Down Expand Up @@ -711,12 +711,6 @@ def has_computed_output(self) -> bool:
and self._snapshot_operator == self._logical_plan.dag
)

def _run_with_new_execution_backend(self) -> bool:
"""Whether this plan should run with new backend.
By default, the new execution backend is now fully enabled
unless configured otherwise by the user."""
return self._context.new_execution_backend

def require_preserve_order(self) -> bool:
"""Whether this plan requires to preserve order."""
from ray.data._internal.logical.operators.all_to_all_operator import Sort
Expand Down
49 changes: 0 additions & 49 deletions python/ray/data/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,6 @@
# TODO (kfstorm): Remove this once stable.
DEFAULT_ENABLE_PANDAS_BLOCK = True

# Whether to enable stage-fusion optimizations for dataset pipelines.
DEFAULT_OPTIMIZE_FUSE_STAGES = True

# Whether to enable stage-reorder optimizations for dataset pipelines.
DEFAULT_OPTIMIZE_REORDER_STAGES = True

# Whether to furthermore fuse read stages.
DEFAULT_OPTIMIZE_FUSE_READ_STAGES = True

# Whether to furthermore fuse prior map tasks with shuffle stages.
DEFAULT_OPTIMIZE_FUSE_SHUFFLE_STAGES = True

# Minimum amount of parallelism to auto-detect for a dataset. Note that the min
# block size config takes precedence over this.
DEFAULT_MIN_PARALLELISM = 200
Expand Down Expand Up @@ -91,17 +79,6 @@
# Whether to use Polars for tabular dataset sorts, groupbys, and aggregations.
DEFAULT_USE_POLARS = False

# Whether to use the new executor backend.
DEFAULT_NEW_EXECUTION_BACKEND = bool(
int(os.environ.get("RAY_DATA_NEW_EXECUTION_BACKEND", "1"))
)

# Whether to use the streaming executor. This only has an effect if the new execution
# backend is enabled.
DEFAULT_USE_STREAMING_EXECUTOR = bool(
int(os.environ.get("RAY_DATA_USE_STREAMING_EXECUTOR", "1"))
)

# Whether to use the runtime object store memory metrics for scheduling.
DEFAULT_USE_RUNTIME_METRICS_SCHEDULING = bool(
int(os.environ.get("DEFAULT_USE_RUNTIME_METRICS_SCHEDULING", "0"))
Expand All @@ -125,11 +102,6 @@
# If disabled, users can still manually print stats with Dataset.stats().
DEFAULT_AUTO_LOG_STATS = False

# Whether to enable optimizer.
DEFAULT_OPTIMIZER_ENABLED = bool(
int(os.environ.get("RAY_DATA_NEW_EXECUTION_OPTIMIZER", "1"))
)

# Set this env var to enable distributed tqdm (experimental).
DEFAULT_USE_RAY_TQDM = bool(int(os.environ.get("RAY_TQDM", "1")))

Expand Down Expand Up @@ -181,26 +153,19 @@ def __init__(
target_min_block_size: int,
streaming_read_buffer_size: int,
enable_pandas_block: bool,
optimize_fuse_stages: bool,
optimize_fuse_read_stages: bool,
optimize_fuse_shuffle_stages: bool,
optimize_reorder_stages: bool,
actor_prefetcher_enabled: bool,
use_push_based_shuffle: bool,
pipeline_push_based_shuffle_reduce_tasks: bool,
scheduling_strategy: SchedulingStrategyT,
scheduling_strategy_large_args: SchedulingStrategyT,
large_args_threshold: int,
use_polars: bool,
new_execution_backend: bool,
use_streaming_executor: bool,
eager_free: bool,
decoding_size_estimation: bool,
min_parallelism: bool,
enable_tensor_extension_casting: bool,
enable_auto_log_stats: bool,
trace_allocations: bool,
optimizer_enabled: bool,
execution_options: "ExecutionOptions",
use_ray_tqdm: bool,
enable_progress_bars: bool,
Expand All @@ -215,10 +180,6 @@ def __init__(
self.target_min_block_size = target_min_block_size
self.streaming_read_buffer_size = streaming_read_buffer_size
self.enable_pandas_block = enable_pandas_block
self.optimize_fuse_stages = optimize_fuse_stages
self.optimize_fuse_read_stages = optimize_fuse_read_stages
self.optimize_fuse_shuffle_stages = optimize_fuse_shuffle_stages
self.optimize_reorder_stages = optimize_reorder_stages
self.actor_prefetcher_enabled = actor_prefetcher_enabled
self.use_push_based_shuffle = use_push_based_shuffle
self.pipeline_push_based_shuffle_reduce_tasks = (
Expand All @@ -228,15 +189,12 @@ def __init__(
self.scheduling_strategy_large_args = scheduling_strategy_large_args
self.large_args_threshold = large_args_threshold
self.use_polars = use_polars
self.new_execution_backend = new_execution_backend
self.use_streaming_executor = use_streaming_executor
self.eager_free = eager_free
self.decoding_size_estimation = decoding_size_estimation
self.min_parallelism = min_parallelism
self.enable_tensor_extension_casting = enable_tensor_extension_casting
self.enable_auto_log_stats = enable_auto_log_stats
self.trace_allocations = trace_allocations
self.optimizer_enabled = optimizer_enabled
# TODO: expose execution options in Dataset public APIs.
self.execution_options = execution_options
self.use_ray_tqdm = use_ray_tqdm
Expand Down Expand Up @@ -287,10 +245,6 @@ def get_current() -> "DataContext":
target_min_block_size=DEFAULT_TARGET_MIN_BLOCK_SIZE,
streaming_read_buffer_size=DEFAULT_STREAMING_READ_BUFFER_SIZE,
enable_pandas_block=DEFAULT_ENABLE_PANDAS_BLOCK,
optimize_fuse_stages=DEFAULT_OPTIMIZE_FUSE_STAGES,
optimize_fuse_read_stages=DEFAULT_OPTIMIZE_FUSE_READ_STAGES,
optimize_fuse_shuffle_stages=DEFAULT_OPTIMIZE_FUSE_SHUFFLE_STAGES,
optimize_reorder_stages=DEFAULT_OPTIMIZE_REORDER_STAGES,
actor_prefetcher_enabled=DEFAULT_ACTOR_PREFETCHER_ENABLED,
use_push_based_shuffle=DEFAULT_USE_PUSH_BASED_SHUFFLE,
# NOTE(swang): We have to pipeline reduce tasks right now
Expand All @@ -303,8 +257,6 @@ def get_current() -> "DataContext":
),
large_args_threshold=DEFAULT_LARGE_ARGS_THRESHOLD,
use_polars=DEFAULT_USE_POLARS,
new_execution_backend=DEFAULT_NEW_EXECUTION_BACKEND,
use_streaming_executor=DEFAULT_USE_STREAMING_EXECUTOR,
eager_free=DEFAULT_EAGER_FREE,
decoding_size_estimation=DEFAULT_DECODING_SIZE_ESTIMATION_ENABLED,
min_parallelism=DEFAULT_MIN_PARALLELISM,
Expand All @@ -313,7 +265,6 @@ def get_current() -> "DataContext":
),
enable_auto_log_stats=DEFAULT_AUTO_LOG_STATS,
trace_allocations=DEFAULT_TRACE_ALLOCATIONS,
optimizer_enabled=DEFAULT_OPTIMIZER_ENABLED,
execution_options=ray.data.ExecutionOptions(),
use_ray_tqdm=DEFAULT_USE_RAY_TQDM,
enable_progress_bars=DEFAULT_ENABLE_PROGRESS_BARS,
Expand Down
4 changes: 0 additions & 4 deletions python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
from ray.data._internal.util import (
_autodetect_parallelism,
_lazy_import_pyarrow_dataset,
_warn_on_high_parallelism,
get_table_block_metadata,
ndarray_to_block,
pandas_df_to_arrow_block,
Expand Down Expand Up @@ -363,9 +362,6 @@ def read_datasource(
# removing LazyBlockList code path.
read_tasks = datasource_or_legacy_reader.get_read_tasks(requested_parallelism)

if not ctx.use_streaming_executor:
_warn_on_high_parallelism(requested_parallelism, len(read_tasks))

read_op_name = f"Read{datasource.get_name()}"

block_list = LazyBlockList(
Expand Down
24 changes: 0 additions & 24 deletions python/ray/data/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,30 +339,6 @@ def target_max_block_size(request):
ctx.target_max_block_size = original


@pytest.fixture
def enable_optimizer():
ctx = ray.data.context.DataContext.get_current()
original_backend = ctx.new_execution_backend
original_optimizer = ctx.optimizer_enabled
ctx.new_execution_backend = True
ctx.optimizer_enabled = True
yield
ctx.new_execution_backend = original_backend
ctx.optimizer_enabled = original_optimizer


@pytest.fixture
def enable_streaming_executor():
ctx = ray.data.context.DataContext.get_current()
original_backend = ctx.new_execution_backend
use_streaming_executor = ctx.use_streaming_executor
ctx.new_execution_backend = True
ctx.use_streaming_executor = True
yield
ctx.new_execution_backend = original_backend
ctx.use_streaming_executor = use_streaming_executor


# ===== Pandas dataset formats =====
@pytest.fixture(scope="function")
def ds_pandas_single_column_format(ray_start_regular_shared):
Expand Down
19 changes: 5 additions & 14 deletions python/ray/data/tests/test_all_to_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -1132,20 +1132,11 @@ def test_random_block_order(ray_start_regular_shared, restore_data_context):
assert results == expected

# Test LazyBlockList.randomize_block_order.
context = DataContext.get_current()
try:
original_optimize_fuse_read_stages = context.optimize_fuse_read_stages
context.optimize_fuse_read_stages = False

lazy_blocklist_ds = ray.data.range(12, parallelism=4)
lazy_blocklist_ds = lazy_blocklist_ds.randomize_block_order(seed=0)
lazy_blocklist_results = lazy_blocklist_ds.take()
lazy_blocklist_expected = named_values(
"id", [6, 7, 8, 0, 1, 2, 3, 4, 5, 9, 10, 11]
)
assert lazy_blocklist_results == lazy_blocklist_expected
finally:
context.optimize_fuse_read_stages = original_optimize_fuse_read_stages
lazy_blocklist_ds = ray.data.range(12, parallelism=4)
lazy_blocklist_ds = lazy_blocklist_ds.randomize_block_order(seed=0)
lazy_blocklist_results = lazy_blocklist_ds.take()
lazy_blocklist_expected = named_values("id", [6, 7, 8, 0, 1, 2, 3, 4, 5, 9, 10, 11])
assert lazy_blocklist_results == lazy_blocklist_expected


# NOTE: All tests above share a Ray cluster, while the tests below do not. These
Expand Down
66 changes: 22 additions & 44 deletions python/ray/data/tests/test_consumption.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,28 +418,25 @@ def test_schema_repr(ray_start_regular_shared):
assert repr(ds.schema()) == expected_repr


def test_lazy_loading_exponential_rampup(ray_start_regular_shared):
ds = ray.data.range(100, parallelism=20)
def _check_none_computed(ds):
# In streaming executor, ds.take() will not invoke partial execution
# in LazyBlocklist.
assert ds._plan.execute()._num_computed() == 0

def check_num_computed(expected):
if ray.data.context.DataContext.get_current().use_streaming_executor:
# In streaing executor, ds.take() will not invoke partial execution
# in LazyBlocklist.
assert ds._plan.execute()._num_computed() == 0
else:
assert ds._plan.execute()._num_computed() == expected

check_num_computed(0)
def test_lazy_loading_exponential_rampup(ray_start_regular_shared):
ds = ray.data.range(100, parallelism=20)
_check_none_computed(ds)
assert extract_values("id", ds.take(10)) == list(range(10))
check_num_computed(2)
_check_none_computed(ds)
assert extract_values("id", ds.take(20)) == list(range(20))
check_num_computed(4)
_check_none_computed(ds)
assert extract_values("id", ds.take(30)) == list(range(30))
check_num_computed(8)
_check_none_computed(ds)
assert extract_values("id", ds.take(50)) == list(range(50))
check_num_computed(16)
_check_none_computed(ds)
assert extract_values("id", ds.take(100)) == list(range(100))
check_num_computed(20)
_check_none_computed(ds)


def test_dataset_repr(ray_start_regular_shared):
Expand Down Expand Up @@ -1160,12 +1157,9 @@ def test_lazy_loading_iter_batches_exponential_rampup(ray_start_regular_shared):
ds = ray.data.range(32, parallelism=8)
expected_num_blocks = [1, 2, 4, 4, 8, 8, 8, 8]
for _, expected in zip(ds.iter_batches(batch_size=None), expected_num_blocks):
if ray.data.context.DataContext.get_current().use_streaming_executor:
# In streaming execution of ds.iter_batches(), there is no partial
# execution so _num_computed() in LazyBlocklist is 0.
assert ds._plan.execute()._num_computed() == 0
else:
assert ds._plan.execute()._num_computed() == expected
# In streaming execution of ds.iter_batches(), there is no partial
# execution so _num_computed() in LazyBlocklist is 0.
_check_none_computed(ds)


def test_union(ray_start_regular_shared):
Expand Down Expand Up @@ -1800,29 +1794,13 @@ def test_warning_execute_with_no_cpu(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=0)

logger = DatasetLogger("ray.data._internal.plan").get_logger()
with patch.object(
logger,
"warning",
side_effect=LoggerWarningCalled,
) as mock_logger:
try:
ds = ray.data.range(10)
ds = ds.map_batches(lambda x: x)
ds.take()
except Exception as e:
if ray.data.context.DataContext.get_current().use_streaming_executor:
assert isinstance(e, ValueError)
assert "exceeds the execution limits ExecutionResources(cpu=0.0" in str(
e
)
else:
assert isinstance(e, LoggerWarningCalled)
logger_args, logger_kwargs = mock_logger.call_args
assert (
"Warning: The Ray cluster currently does not have "
in logger_args[0]
)
try:
ds = ray.data.range(10)
ds = ds.map_batches(lambda x: x)
ds.take()
except Exception as e:
assert isinstance(e, ValueError)
assert "exceeds the execution limits ExecutionResources(cpu=0.0" in str(e)


def test_nowarning_execute_with_cpu(ray_start_cluster):
Expand Down
Loading

0 comments on commit 3173487

Please sign in to comment.