diff --git a/.buildkite/data.rayci.yml b/.buildkite/data.rayci.yml
index e92214b14a37d..607b0036b3c33 100644
--- a/.buildkite/data.rayci.yml
+++ b/.buildkite/data.rayci.yml
@@ -36,7 +36,6 @@ steps:
         --workers "$${BUILDKITE_PARALLEL_JOB_COUNT}" 
         --worker-id "$${BUILDKITE_PARALLEL_JOB}" --parallelism-per-worker 3
         --build-name data6build
-        --test-env RAY_DATA_USE_STREAMING_EXECUTOR=1
         --except-tags data_integration,doctest
     depends_on: data6build
 
@@ -51,7 +50,6 @@ steps:
         --workers "$${BUILDKITE_PARALLEL_JOB_COUNT}" 
         --worker-id "$${BUILDKITE_PARALLEL_JOB}" --parallelism-per-worker 3
         --build-name data14build
-        --test-env RAY_DATA_USE_STREAMING_EXECUTOR=1
         --except-tags data_integration,doctest
     depends_on: data14build
 
@@ -64,7 +62,6 @@ steps:
     commands:
       - bazel run //ci/ray_ci:test_in_docker -- //python/ray/data/... //python/ray/air/... data
         --workers 2 --worker-id {{matrix.worker_id}} --parallelism-per-worker 3
-        --test-env RAY_DATA_USE_STREAMING_EXECUTOR=1
         --except-tags data_integration,doctest
         --python-version {{matrix.python}}
     depends_on: databuild-multipy
@@ -85,7 +82,6 @@ steps:
         --workers "$${BUILDKITE_PARALLEL_JOB_COUNT}" 
         --worker-id "$${BUILDKITE_PARALLEL_JOB}" --parallelism-per-worker 3
         --build-name datanbuild
-        --test-env RAY_DATA_USE_STREAMING_EXECUTOR=1
         --except-tags data_integration,doctest
     depends_on: datanbuild
 
@@ -163,7 +159,7 @@ steps:
     soft_fail: true
     commands:
       - bazel run //ci/ray_ci:test_in_docker -- //... data --run-flaky-tests 
-        --test-env RAY_DATA_USE_STREAMING_EXECUTOR=1 --parallelism-per-worker 3
+        --parallelism-per-worker 3
         --build-name data14build
         --except-tags gpu_only,gpu
     depends_on: data14build
diff --git a/python/ray/data/_internal/compute.py b/python/ray/data/_internal/compute.py
index 62a18d496103d..8c96688925980 100644
--- a/python/ray/data/_internal/compute.py
+++ b/python/ray/data/_internal/compute.py
@@ -70,7 +70,6 @@ def _apply(
         fn_constructor_args: Optional[Iterable[Any]] = None,
         fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
     ) -> BlockList:
-        assert not DataContext.get_current().new_execution_backend, "Legacy backend off"
         assert fn_constructor_args is None and fn_constructor_kwargs is None
         if fn_args is None:
             fn_args = tuple()
@@ -244,7 +243,6 @@ def _apply(
         fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
     ) -> BlockList:
         """Note: this is not part of the Dataset public API."""
-        assert not DataContext.get_current().new_execution_backend, "Legacy backend off"
         if fn_args is None:
             fn_args = tuple()
         if fn_kwargs is None:
diff --git a/python/ray/data/_internal/execution/legacy_compat.py b/python/ray/data/_internal/execution/legacy_compat.py
index 7241a4afd82ed..c216617eeb0f1 100644
--- a/python/ray/data/_internal/execution/legacy_compat.py
+++ b/python/ray/data/_internal/execution/legacy_compat.py
@@ -183,7 +183,6 @@ def _get_execution_dag(
 
 
 def _get_initial_stats_from_plan(plan: ExecutionPlan) -> DatasetStats:
-    assert DataContext.get_current().optimizer_enabled
     if plan._snapshot_blocks is not None and not plan._snapshot_blocks.is_cleared():
         return plan._snapshot_stats
     # For Datasets created from "read_xxx", `plan._in_blocks` is a LazyBlockList,
diff --git a/python/ray/data/_internal/plan.py b/python/ray/data/_internal/plan.py
index 949dfb89e036b..c3eb438274c05 100644
--- a/python/ray/data/_internal/plan.py
+++ b/python/ray/data/_internal/plan.py
@@ -477,7 +477,7 @@ def execute_to_iterator(
         # Always used the saved context for execution.
         ctx = self._context
 
-        if not ctx.use_streaming_executor or self.has_computed_output():
+        if self.has_computed_output():
             return (
                 self.execute(
                     allow_clear_input_blocks, force_read
@@ -711,12 +711,6 @@ def has_computed_output(self) -> bool:
             and self._snapshot_operator == self._logical_plan.dag
         )
 
-    def _run_with_new_execution_backend(self) -> bool:
-        """Whether this plan should run with new backend.
-        By default, the new execution backend is now fully enabled
-        unless configured otherwise by the user."""
-        return self._context.new_execution_backend
-
     def require_preserve_order(self) -> bool:
         """Whether this plan requires to preserve order."""
         from ray.data._internal.logical.operators.all_to_all_operator import Sort
diff --git a/python/ray/data/context.py b/python/ray/data/context.py
index b7bfe0b116f4a..0b222e0a74fcb 100644
--- a/python/ray/data/context.py
+++ b/python/ray/data/context.py
@@ -51,18 +51,6 @@
 # TODO (kfstorm): Remove this once stable.
 DEFAULT_ENABLE_PANDAS_BLOCK = True
 
-# Whether to enable stage-fusion optimizations for dataset pipelines.
-DEFAULT_OPTIMIZE_FUSE_STAGES = True
-
-# Whether to enable stage-reorder optimizations for dataset pipelines.
-DEFAULT_OPTIMIZE_REORDER_STAGES = True
-
-# Whether to furthermore fuse read stages.
-DEFAULT_OPTIMIZE_FUSE_READ_STAGES = True
-
-# Whether to furthermore fuse prior map tasks with shuffle stages.
-DEFAULT_OPTIMIZE_FUSE_SHUFFLE_STAGES = True
-
 # Minimum amount of parallelism to auto-detect for a dataset. Note that the min
 # block size config takes precedence over this.
 DEFAULT_MIN_PARALLELISM = 200
@@ -91,17 +79,6 @@
 # Whether to use Polars for tabular dataset sorts, groupbys, and aggregations.
 DEFAULT_USE_POLARS = False
 
-# Whether to use the new executor backend.
-DEFAULT_NEW_EXECUTION_BACKEND = bool(
-    int(os.environ.get("RAY_DATA_NEW_EXECUTION_BACKEND", "1"))
-)
-
-# Whether to use the streaming executor. This only has an effect if the new execution
-# backend is enabled.
-DEFAULT_USE_STREAMING_EXECUTOR = bool(
-    int(os.environ.get("RAY_DATA_USE_STREAMING_EXECUTOR", "1"))
-)
-
 # Whether to use the runtime object store memory metrics for scheduling.
 DEFAULT_USE_RUNTIME_METRICS_SCHEDULING = bool(
     int(os.environ.get("DEFAULT_USE_RUNTIME_METRICS_SCHEDULING", "0"))
@@ -125,11 +102,6 @@
 # If disabled, users can still manually print stats with Dataset.stats().
 DEFAULT_AUTO_LOG_STATS = False
 
-# Whether to enable optimizer.
-DEFAULT_OPTIMIZER_ENABLED = bool(
-    int(os.environ.get("RAY_DATA_NEW_EXECUTION_OPTIMIZER", "1"))
-)
-
 # Set this env var to enable distributed tqdm (experimental).
 DEFAULT_USE_RAY_TQDM = bool(int(os.environ.get("RAY_TQDM", "1")))
 
@@ -181,10 +153,6 @@ def __init__(
         target_min_block_size: int,
         streaming_read_buffer_size: int,
         enable_pandas_block: bool,
-        optimize_fuse_stages: bool,
-        optimize_fuse_read_stages: bool,
-        optimize_fuse_shuffle_stages: bool,
-        optimize_reorder_stages: bool,
         actor_prefetcher_enabled: bool,
         use_push_based_shuffle: bool,
         pipeline_push_based_shuffle_reduce_tasks: bool,
@@ -192,15 +160,12 @@ def __init__(
         scheduling_strategy_large_args: SchedulingStrategyT,
         large_args_threshold: int,
         use_polars: bool,
-        new_execution_backend: bool,
-        use_streaming_executor: bool,
         eager_free: bool,
         decoding_size_estimation: bool,
         min_parallelism: bool,
         enable_tensor_extension_casting: bool,
         enable_auto_log_stats: bool,
         trace_allocations: bool,
-        optimizer_enabled: bool,
         execution_options: "ExecutionOptions",
         use_ray_tqdm: bool,
         enable_progress_bars: bool,
@@ -215,10 +180,6 @@ def __init__(
         self.target_min_block_size = target_min_block_size
         self.streaming_read_buffer_size = streaming_read_buffer_size
         self.enable_pandas_block = enable_pandas_block
-        self.optimize_fuse_stages = optimize_fuse_stages
-        self.optimize_fuse_read_stages = optimize_fuse_read_stages
-        self.optimize_fuse_shuffle_stages = optimize_fuse_shuffle_stages
-        self.optimize_reorder_stages = optimize_reorder_stages
         self.actor_prefetcher_enabled = actor_prefetcher_enabled
         self.use_push_based_shuffle = use_push_based_shuffle
         self.pipeline_push_based_shuffle_reduce_tasks = (
@@ -228,15 +189,12 @@ def __init__(
         self.scheduling_strategy_large_args = scheduling_strategy_large_args
         self.large_args_threshold = large_args_threshold
         self.use_polars = use_polars
-        self.new_execution_backend = new_execution_backend
-        self.use_streaming_executor = use_streaming_executor
         self.eager_free = eager_free
         self.decoding_size_estimation = decoding_size_estimation
         self.min_parallelism = min_parallelism
         self.enable_tensor_extension_casting = enable_tensor_extension_casting
         self.enable_auto_log_stats = enable_auto_log_stats
         self.trace_allocations = trace_allocations
-        self.optimizer_enabled = optimizer_enabled
         # TODO: expose execution options in Dataset public APIs.
         self.execution_options = execution_options
         self.use_ray_tqdm = use_ray_tqdm
@@ -287,10 +245,6 @@ def get_current() -> "DataContext":
                     target_min_block_size=DEFAULT_TARGET_MIN_BLOCK_SIZE,
                     streaming_read_buffer_size=DEFAULT_STREAMING_READ_BUFFER_SIZE,
                     enable_pandas_block=DEFAULT_ENABLE_PANDAS_BLOCK,
-                    optimize_fuse_stages=DEFAULT_OPTIMIZE_FUSE_STAGES,
-                    optimize_fuse_read_stages=DEFAULT_OPTIMIZE_FUSE_READ_STAGES,
-                    optimize_fuse_shuffle_stages=DEFAULT_OPTIMIZE_FUSE_SHUFFLE_STAGES,
-                    optimize_reorder_stages=DEFAULT_OPTIMIZE_REORDER_STAGES,
                     actor_prefetcher_enabled=DEFAULT_ACTOR_PREFETCHER_ENABLED,
                     use_push_based_shuffle=DEFAULT_USE_PUSH_BASED_SHUFFLE,
                     # NOTE(swang): We have to pipeline reduce tasks right now
@@ -303,8 +257,6 @@ def get_current() -> "DataContext":
                     ),
                     large_args_threshold=DEFAULT_LARGE_ARGS_THRESHOLD,
                     use_polars=DEFAULT_USE_POLARS,
-                    new_execution_backend=DEFAULT_NEW_EXECUTION_BACKEND,
-                    use_streaming_executor=DEFAULT_USE_STREAMING_EXECUTOR,
                     eager_free=DEFAULT_EAGER_FREE,
                     decoding_size_estimation=DEFAULT_DECODING_SIZE_ESTIMATION_ENABLED,
                     min_parallelism=DEFAULT_MIN_PARALLELISM,
@@ -313,7 +265,6 @@ def get_current() -> "DataContext":
                     ),
                     enable_auto_log_stats=DEFAULT_AUTO_LOG_STATS,
                     trace_allocations=DEFAULT_TRACE_ALLOCATIONS,
-                    optimizer_enabled=DEFAULT_OPTIMIZER_ENABLED,
                     execution_options=ray.data.ExecutionOptions(),
                     use_ray_tqdm=DEFAULT_USE_RAY_TQDM,
                     enable_progress_bars=DEFAULT_ENABLE_PROGRESS_BARS,
diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py
index 6035b235a70cb..463e9c5164758 100644
--- a/python/ray/data/read_api.py
+++ b/python/ray/data/read_api.py
@@ -36,7 +36,6 @@
 from ray.data._internal.util import (
     _autodetect_parallelism,
     _lazy_import_pyarrow_dataset,
-    _warn_on_high_parallelism,
     get_table_block_metadata,
     ndarray_to_block,
     pandas_df_to_arrow_block,
@@ -363,9 +362,6 @@ def read_datasource(
     # removing LazyBlockList code path.
     read_tasks = datasource_or_legacy_reader.get_read_tasks(requested_parallelism)
 
-    if not ctx.use_streaming_executor:
-        _warn_on_high_parallelism(requested_parallelism, len(read_tasks))
-
     read_op_name = f"Read{datasource.get_name()}"
 
     block_list = LazyBlockList(
diff --git a/python/ray/data/tests/conftest.py b/python/ray/data/tests/conftest.py
index 5655c64a50492..5d85b2df4d567 100644
--- a/python/ray/data/tests/conftest.py
+++ b/python/ray/data/tests/conftest.py
@@ -339,30 +339,6 @@ def target_max_block_size(request):
     ctx.target_max_block_size = original
 
 
-@pytest.fixture
-def enable_optimizer():
-    ctx = ray.data.context.DataContext.get_current()
-    original_backend = ctx.new_execution_backend
-    original_optimizer = ctx.optimizer_enabled
-    ctx.new_execution_backend = True
-    ctx.optimizer_enabled = True
-    yield
-    ctx.new_execution_backend = original_backend
-    ctx.optimizer_enabled = original_optimizer
-
-
-@pytest.fixture
-def enable_streaming_executor():
-    ctx = ray.data.context.DataContext.get_current()
-    original_backend = ctx.new_execution_backend
-    use_streaming_executor = ctx.use_streaming_executor
-    ctx.new_execution_backend = True
-    ctx.use_streaming_executor = True
-    yield
-    ctx.new_execution_backend = original_backend
-    ctx.use_streaming_executor = use_streaming_executor
-
-
 # ===== Pandas dataset formats =====
 @pytest.fixture(scope="function")
 def ds_pandas_single_column_format(ray_start_regular_shared):
diff --git a/python/ray/data/tests/test_all_to_all.py b/python/ray/data/tests/test_all_to_all.py
index d413b815c65b9..c3951c3ed69d6 100644
--- a/python/ray/data/tests/test_all_to_all.py
+++ b/python/ray/data/tests/test_all_to_all.py
@@ -1132,20 +1132,11 @@ def test_random_block_order(ray_start_regular_shared, restore_data_context):
     assert results == expected
 
     # Test LazyBlockList.randomize_block_order.
-    context = DataContext.get_current()
-    try:
-        original_optimize_fuse_read_stages = context.optimize_fuse_read_stages
-        context.optimize_fuse_read_stages = False
-
-        lazy_blocklist_ds = ray.data.range(12, parallelism=4)
-        lazy_blocklist_ds = lazy_blocklist_ds.randomize_block_order(seed=0)
-        lazy_blocklist_results = lazy_blocklist_ds.take()
-        lazy_blocklist_expected = named_values(
-            "id", [6, 7, 8, 0, 1, 2, 3, 4, 5, 9, 10, 11]
-        )
-        assert lazy_blocklist_results == lazy_blocklist_expected
-    finally:
-        context.optimize_fuse_read_stages = original_optimize_fuse_read_stages
+    lazy_blocklist_ds = ray.data.range(12, parallelism=4)
+    lazy_blocklist_ds = lazy_blocklist_ds.randomize_block_order(seed=0)
+    lazy_blocklist_results = lazy_blocklist_ds.take()
+    lazy_blocklist_expected = named_values("id", [6, 7, 8, 0, 1, 2, 3, 4, 5, 9, 10, 11])
+    assert lazy_blocklist_results == lazy_blocklist_expected
 
 
 # NOTE: All tests above share a Ray cluster, while the tests below do not. These
diff --git a/python/ray/data/tests/test_consumption.py b/python/ray/data/tests/test_consumption.py
index d1927f67d68df..959f5501456dc 100644
--- a/python/ray/data/tests/test_consumption.py
+++ b/python/ray/data/tests/test_consumption.py
@@ -418,28 +418,25 @@ def test_schema_repr(ray_start_regular_shared):
     assert repr(ds.schema()) == expected_repr
 
 
-def test_lazy_loading_exponential_rampup(ray_start_regular_shared):
-    ds = ray.data.range(100, parallelism=20)
+def _check_none_computed(ds):
+    # In streaming executor, ds.take() will not invoke partial execution
+    # in LazyBlocklist.
+    assert ds._plan.execute()._num_computed() == 0
 
-    def check_num_computed(expected):
-        if ray.data.context.DataContext.get_current().use_streaming_executor:
-            # In streaing executor, ds.take() will not invoke partial execution
-            # in LazyBlocklist.
-            assert ds._plan.execute()._num_computed() == 0
-        else:
-            assert ds._plan.execute()._num_computed() == expected
 
-    check_num_computed(0)
+def test_lazy_loading_exponential_rampup(ray_start_regular_shared):
+    ds = ray.data.range(100, parallelism=20)
+    _check_none_computed(ds)
     assert extract_values("id", ds.take(10)) == list(range(10))
-    check_num_computed(2)
+    _check_none_computed(ds)
     assert extract_values("id", ds.take(20)) == list(range(20))
-    check_num_computed(4)
+    _check_none_computed(ds)
     assert extract_values("id", ds.take(30)) == list(range(30))
-    check_num_computed(8)
+    _check_none_computed(ds)
     assert extract_values("id", ds.take(50)) == list(range(50))
-    check_num_computed(16)
+    _check_none_computed(ds)
     assert extract_values("id", ds.take(100)) == list(range(100))
-    check_num_computed(20)
+    _check_none_computed(ds)
 
 
 def test_dataset_repr(ray_start_regular_shared):
@@ -1160,12 +1157,9 @@ def test_lazy_loading_iter_batches_exponential_rampup(ray_start_regular_shared):
     ds = ray.data.range(32, parallelism=8)
     expected_num_blocks = [1, 2, 4, 4, 8, 8, 8, 8]
     for _, expected in zip(ds.iter_batches(batch_size=None), expected_num_blocks):
-        if ray.data.context.DataContext.get_current().use_streaming_executor:
-            # In streaming execution of ds.iter_batches(), there is no partial
-            # execution so _num_computed() in LazyBlocklist is 0.
-            assert ds._plan.execute()._num_computed() == 0
-        else:
-            assert ds._plan.execute()._num_computed() == expected
+        # In streaming execution of ds.iter_batches(), there is no partial
+        # execution so _num_computed() in LazyBlocklist is 0.
+        _check_none_computed(ds)
 
 
 def test_union(ray_start_regular_shared):
@@ -1800,29 +1794,13 @@ def test_warning_execute_with_no_cpu(ray_start_cluster):
     cluster = ray_start_cluster
     cluster.add_node(num_cpus=0)
 
-    logger = DatasetLogger("ray.data._internal.plan").get_logger()
-    with patch.object(
-        logger,
-        "warning",
-        side_effect=LoggerWarningCalled,
-    ) as mock_logger:
-        try:
-            ds = ray.data.range(10)
-            ds = ds.map_batches(lambda x: x)
-            ds.take()
-        except Exception as e:
-            if ray.data.context.DataContext.get_current().use_streaming_executor:
-                assert isinstance(e, ValueError)
-                assert "exceeds the execution limits ExecutionResources(cpu=0.0" in str(
-                    e
-                )
-            else:
-                assert isinstance(e, LoggerWarningCalled)
-                logger_args, logger_kwargs = mock_logger.call_args
-                assert (
-                    "Warning: The Ray cluster currently does not have "
-                    in logger_args[0]
-                )
+    try:
+        ds = ray.data.range(10)
+        ds = ds.map_batches(lambda x: x)
+        ds.take()
+    except Exception as e:
+        assert isinstance(e, ValueError)
+        assert "exceeds the execution limits ExecutionResources(cpu=0.0" in str(e)
 
 
 def test_nowarning_execute_with_cpu(ray_start_cluster):
diff --git a/python/ray/data/tests/test_execution_optimizer.py b/python/ray/data/tests/test_execution_optimizer.py
index 88e93a6390c1c..2547d6e9322b8 100644
--- a/python/ray/data/tests/test_execution_optimizer.py
+++ b/python/ray/data/tests/test_execution_optimizer.py
@@ -91,7 +91,7 @@ def _check_valid_plan_and_result(
         assert op in ds.stats(), f"Operator {op} not found: {ds.stats()}"
 
 
-def test_read_operator(ray_start_regular_shared, enable_optimizer):
+def test_read_operator(ray_start_regular_shared):
     planner = Planner()
     op = get_parquet_read_logical_op()
     plan = LogicalPlan(op)
@@ -107,7 +107,7 @@ def test_read_operator(ray_start_regular_shared, enable_optimizer):
     )
 
 
-def test_split_blocks_operator(ray_start_regular_shared, enable_optimizer):
+def test_split_blocks_operator(ray_start_regular_shared):
     planner = Planner()
     op = get_parquet_read_logical_op(parallelism=10)
     logical_plan = LogicalPlan(op)
@@ -141,7 +141,7 @@ def test_split_blocks_operator(ray_start_regular_shared, enable_optimizer):
     assert up_physical_op.name == "ReadParquet->SplitBlocks(10)"
 
 
-def test_from_operators(ray_start_regular_shared, enable_optimizer):
+def test_from_operators(ray_start_regular_shared):
     op_classes = [
         FromArrow,
         FromItems,
@@ -159,7 +159,7 @@ def test_from_operators(ray_start_regular_shared, enable_optimizer):
         assert len(physical_op.input_dependencies) == 0
 
 
-def test_from_items_e2e(ray_start_regular_shared, enable_optimizer):
+def test_from_items_e2e(ray_start_regular_shared):
     data = ["Hello", "World"]
     ds = ray.data.from_items(data)
     assert ds.take_all() == named_values("item", data), ds
@@ -170,7 +170,7 @@ def test_from_items_e2e(ray_start_regular_shared, enable_optimizer):
     _check_usage_record(["FromItems"])
 
 
-def test_map_operator_udf_name(ray_start_regular_shared, enable_optimizer):
+def test_map_operator_udf_name(ray_start_regular_shared):
     # Test the name of the Map operator with different types of UDF.
     def normal_function(x):
         return x
@@ -214,7 +214,7 @@ def method(self, x):
         assert op.name == f"Map({expected_name})"
 
 
-def test_map_batches_operator(ray_start_regular_shared, enable_optimizer):
+def test_map_batches_operator(ray_start_regular_shared):
     planner = Planner()
     read_op = get_parquet_read_logical_op()
     op = MapBatches(
@@ -230,14 +230,14 @@ def test_map_batches_operator(ray_start_regular_shared, enable_optimizer):
     assert isinstance(physical_op.input_dependencies[0], MapOperator)
 
 
-def test_map_batches_e2e(ray_start_regular_shared, enable_optimizer):
+def test_map_batches_e2e(ray_start_regular_shared):
     ds = ray.data.range(5)
     ds = ds.map_batches(column_udf("id", lambda x: x))
     assert extract_values("id", ds.take_all()) == list(range(5)), ds
     _check_usage_record(["ReadRange", "MapBatches"])
 
 
-def test_map_rows_operator(ray_start_regular_shared, enable_optimizer):
+def test_map_rows_operator(ray_start_regular_shared):
     planner = Planner()
     read_op = get_parquet_read_logical_op()
     op = MapRows(
@@ -253,14 +253,14 @@ def test_map_rows_operator(ray_start_regular_shared, enable_optimizer):
     assert isinstance(physical_op.input_dependencies[0], MapOperator)
 
 
-def test_map_rows_e2e(ray_start_regular_shared, enable_optimizer):
+def test_map_rows_e2e(ray_start_regular_shared):
     ds = ray.data.range(5)
     ds = ds.map(column_udf("id", lambda x: x + 1))
     assert extract_values("id", ds.take_all()) == [1, 2, 3, 4, 5], ds
     _check_usage_record(["ReadRange", "Map"])
 
 
-def test_filter_operator(ray_start_regular_shared, enable_optimizer):
+def test_filter_operator(ray_start_regular_shared):
     planner = Planner()
     read_op = get_parquet_read_logical_op()
     op = Filter(
@@ -280,14 +280,14 @@ def test_filter_operator(ray_start_regular_shared, enable_optimizer):
     )
 
 
-def test_filter_e2e(ray_start_regular_shared, enable_optimizer):
+def test_filter_e2e(ray_start_regular_shared):
     ds = ray.data.range(5)
     ds = ds.filter(fn=lambda x: x["id"] % 2 == 0)
     assert extract_values("id", ds.take_all()) == [0, 2, 4], ds
     _check_usage_record(["ReadRange", "Filter"])
 
 
-def test_flat_map(ray_start_regular_shared, enable_optimizer):
+def test_flat_map(ray_start_regular_shared):
     planner = Planner()
     read_op = get_parquet_read_logical_op()
     op = FlatMap(
@@ -307,14 +307,14 @@ def test_flat_map(ray_start_regular_shared, enable_optimizer):
     )
 
 
-def test_flat_map_e2e(ray_start_regular_shared, enable_optimizer):
+def test_flat_map_e2e(ray_start_regular_shared):
     ds = ray.data.range(2)
     ds = ds.flat_map(fn=lambda x: [{"id": x["id"]}, {"id": x["id"]}])
     assert extract_values("id", ds.take_all()) == [0, 0, 1, 1], ds
     _check_usage_record(["ReadRange", "FlatMap"])
 
 
-def test_column_ops_e2e(ray_start_regular_shared, enable_optimizer):
+def test_column_ops_e2e(ray_start_regular_shared):
     ds = ray.data.range(2)
     ds = ds.add_column(fn=lambda df: df.iloc[:, 0], col="new_col")
     assert ds.take_all() == [{"id": 0, "new_col": 0}, {"id": 1, "new_col": 1}], ds
@@ -329,7 +329,7 @@ def test_column_ops_e2e(ray_start_regular_shared, enable_optimizer):
     _check_usage_record(["ReadRange", "MapBatches"])
 
 
-def test_random_sample_e2e(ray_start_regular_shared, enable_optimizer):
+def test_random_sample_e2e(ray_start_regular_shared):
     import math
 
     def ensure_sample_size_close(dataset, sample_percent=0.5):
@@ -350,7 +350,7 @@ def ensure_sample_size_close(dataset, sample_percent=0.5):
     _check_usage_record(["ReadRange", "MapBatches"])
 
 
-def test_random_shuffle_operator(ray_start_regular_shared, enable_optimizer):
+def test_random_shuffle_operator(ray_start_regular_shared):
     planner = Planner()
     read_op = get_parquet_read_logical_op()
     op = RandomShuffle(
@@ -370,9 +370,7 @@ def test_random_shuffle_operator(ray_start_regular_shared, enable_optimizer):
     )
 
 
-def test_random_shuffle_e2e(
-    ray_start_regular_shared, enable_optimizer, use_push_based_shuffle
-):
+def test_random_shuffle_e2e(ray_start_regular_shared, use_push_based_shuffle):
     ds = ray.data.range(12, parallelism=4)
     r1 = extract_values("id", ds.random_shuffle(seed=0).take_all())
     r2 = extract_values("id", ds.random_shuffle(seed=1024).take_all())
@@ -386,7 +384,7 @@ def test_random_shuffle_e2e(
     "shuffle",
     [True, False],
 )
-def test_repartition_operator(ray_start_regular_shared, enable_optimizer, shuffle):
+def test_repartition_operator(ray_start_regular_shared, shuffle):
     planner = Planner()
     read_op = get_parquet_read_logical_op()
     op = Repartition(read_op, num_outputs=5, shuffle=shuffle)
@@ -413,9 +411,7 @@ def test_repartition_operator(ray_start_regular_shared, enable_optimizer, shuffl
     "shuffle",
     [True, False],
 )
-def test_repartition_e2e(
-    ray_start_regular_shared, enable_optimizer, use_push_based_shuffle, shuffle
-):
+def test_repartition_e2e(ray_start_regular_shared, use_push_based_shuffle, shuffle):
     def _check_repartition_usage_and_stats(ds):
         _check_usage_record(["ReadRange", "Repartition"])
         ds_stats: DatasetStats = ds._plan.stats()
@@ -462,7 +458,7 @@ def _check_repartition_usage_and_stats(ds):
 
 
 @pytest.mark.parametrize("preserve_order", (True, False))
-def test_union_operator(ray_start_regular_shared, enable_optimizer, preserve_order):
+def test_union_operator(ray_start_regular_shared, preserve_order):
     planner = Planner()
     read_parquet_op1 = get_parquet_read_logical_op()
     read_parquet_op2 = get_parquet_read_logical_op()
@@ -488,7 +484,7 @@ def test_union_operator(ray_start_regular_shared, enable_optimizer, preserve_ord
 
 
 @pytest.mark.parametrize("preserve_order", (True, False))
-def test_union_e2e(ray_start_regular_shared, enable_optimizer, preserve_order):
+def test_union_e2e(ray_start_regular_shared, preserve_order):
     execution_options = ExecutionOptions(preserve_order=preserve_order)
     ctx = ray.data.DataContext.get_current()
     ctx.execution_options = execution_options
@@ -534,7 +530,7 @@ def test_union_e2e(ray_start_regular_shared, enable_optimizer, preserve_order):
         assert ds2.take_all() == (ds2_result + ds_result * 2)
 
 
-def test_read_map_batches_operator_fusion(ray_start_regular_shared, enable_optimizer):
+def test_read_map_batches_operator_fusion(ray_start_regular_shared):
     # Test that Read is fused with MapBatches.
     planner = Planner()
     read_op = get_parquet_read_logical_op(parallelism=1)
@@ -560,7 +556,7 @@ def test_read_map_batches_operator_fusion(ray_start_regular_shared, enable_optim
     )
 
 
-def test_read_map_chain_operator_fusion(ray_start_regular_shared, enable_optimizer):
+def test_read_map_chain_operator_fusion(ray_start_regular_shared):
     # Test that a chain of different map operators are fused.
     planner = Planner()
     read_op = get_parquet_read_logical_op(parallelism=1)
@@ -588,7 +584,7 @@ def test_read_map_chain_operator_fusion(ray_start_regular_shared, enable_optimiz
 
 
 def test_read_map_batches_operator_fusion_compatible_remote_args(
-    ray_start_regular_shared, enable_optimizer
+    ray_start_regular_shared,
 ):
     # Test that map operators are stilled fused when remote args are compatible.
     compatiple_remote_args_pairs = [
@@ -638,7 +634,7 @@ def test_read_map_batches_operator_fusion_compatible_remote_args(
 
 
 def test_read_map_batches_operator_fusion_incompatible_remote_args(
-    ray_start_regular_shared, enable_optimizer
+    ray_start_regular_shared,
 ):
     # Test that map operators won't get fused if the remote args are incompatible.
     incompatiple_remote_args_pairs = [
@@ -681,7 +677,7 @@ def test_read_map_batches_operator_fusion_incompatible_remote_args(
 
 
 def test_read_map_batches_operator_fusion_compute_tasks_to_actors(
-    ray_start_regular_shared, enable_optimizer
+    ray_start_regular_shared,
 ):
     # Test that a task-based map operator is fused into an actor-based map operator when
     # the former comes before the latter.
@@ -702,7 +698,7 @@ def test_read_map_batches_operator_fusion_compute_tasks_to_actors(
 
 
 def test_read_map_batches_operator_fusion_compute_read_to_actors(
-    ray_start_regular_shared, enable_optimizer
+    ray_start_regular_shared,
 ):
     # Test that reads fuse into an actor-based map operator.
     planner = Planner()
@@ -721,7 +717,7 @@ def test_read_map_batches_operator_fusion_compute_read_to_actors(
 
 
 def test_read_map_batches_operator_fusion_incompatible_compute(
-    ray_start_regular_shared, enable_optimizer
+    ray_start_regular_shared,
 ):
     # Test that map operators are not fused when compute strategies are incompatible.
     planner = Planner()
@@ -743,9 +739,7 @@ def test_read_map_batches_operator_fusion_incompatible_compute(
     assert upstream_physical_op.name == "ReadParquet->MapBatches(<lambda>)"
 
 
-def test_read_map_batches_operator_fusion_min_rows_per_block(
-    ray_start_regular_shared, enable_optimizer
-):
+def test_read_map_batches_operator_fusion_min_rows_per_block(ray_start_regular_shared):
     # Test that fusion of map operators merges their block sizes in the expected way
     # (taking the max).
     planner = Planner()
@@ -777,7 +771,7 @@ def test_read_map_batches_operator_fusion_min_rows_per_block(
 
 
 def test_read_map_batches_operator_fusion_with_randomize_blocks_operator(
-    ray_start_regular_shared, enable_optimizer
+    ray_start_regular_shared,
 ):
     # Note: We currently do not fuse MapBatches->RandomizeBlocks.
     # This test is to ensure that we don't accidentally fuse them.
@@ -798,7 +792,7 @@ def fn(batch):
 
 
 def test_read_map_batches_operator_fusion_with_random_shuffle_operator(
-    ray_start_regular_shared, enable_optimizer, use_push_based_shuffle
+    ray_start_regular_shared, use_push_based_shuffle
 ):
     # Note: we currently only support fusing MapOperator->AllToAllOperator.
     def fn(batch):
@@ -863,7 +857,7 @@ def fn(_):
 
 @pytest.mark.parametrize("shuffle", (True, False))
 def test_read_map_batches_operator_fusion_with_repartition_operator(
-    ray_start_regular_shared, enable_optimizer, shuffle, use_push_based_shuffle
+    ray_start_regular_shared, shuffle, use_push_based_shuffle
 ):
     def fn(batch):
         return {"id": [x + 1 for x in batch["id"]]}
@@ -884,9 +878,7 @@ def fn(batch):
     _check_usage_record(["ReadRange", "MapBatches", "Repartition"])
 
 
-def test_read_map_batches_operator_fusion_with_sort_operator(
-    ray_start_regular_shared, enable_optimizer
-):
+def test_read_map_batches_operator_fusion_with_sort_operator(ray_start_regular_shared):
     # Note: We currently do not fuse MapBatches->Sort.
     # This test is to ensure that we don't accidentally fuse them, until
     # we implement it later.
@@ -906,7 +898,7 @@ def fn(batch):
 
 
 def test_read_map_batches_operator_fusion_with_aggregate_operator(
-    ray_start_regular_shared, enable_optimizer
+    ray_start_regular_shared,
 ):
     from ray.data.aggregate import AggregateFn
 
@@ -935,7 +927,9 @@ def fn(batch):
     _check_usage_record(["ReadRange", "MapBatches", "Aggregate"])
 
 
-def test_read_map_chain_operator_fusion_e2e(ray_start_regular_shared, enable_optimizer):
+def test_read_map_chain_operator_fusion_e2e(
+    ray_start_regular_shared,
+):
     ds = ray.data.range(10, parallelism=2)
     ds = ds.filter(lambda x: x["id"] % 2 == 0)
     ds = ds.map(column_udf("id", lambda x: x + 1))
@@ -963,14 +957,14 @@ def test_read_map_chain_operator_fusion_e2e(ray_start_regular_shared, enable_opt
     _check_usage_record(["ReadRange", "Filter", "Map", "MapBatches", "FlatMap"])
 
 
-def test_write_fusion(ray_start_regular_shared, enable_optimizer, tmp_path):
+def test_write_fusion(ray_start_regular_shared, tmp_path):
     ds = ray.data.range(10, parallelism=2)
     ds.write_csv(tmp_path)
     assert "ReadRange->Write" in ds._write_ds.stats()
     _check_usage_record(["ReadRange", "WriteCSV"])
 
 
-def test_write_operator(ray_start_regular_shared, enable_optimizer, tmp_path):
+def test_write_operator(ray_start_regular_shared, tmp_path):
     planner = Planner()
     datasink = _ParquetDatasink(tmp_path)
     read_op = get_parquet_read_logical_op()
@@ -987,7 +981,9 @@ def test_write_operator(ray_start_regular_shared, enable_optimizer, tmp_path):
     assert isinstance(physical_op.input_dependencies[0], MapOperator)
 
 
-def test_sort_operator(ray_start_regular_shared, enable_optimizer):
+def test_sort_operator(
+    ray_start_regular_shared,
+):
     planner = Planner()
     read_op = get_parquet_read_logical_op()
     op = Sort(
@@ -1007,9 +1003,7 @@ def test_sort_operator(ray_start_regular_shared, enable_optimizer):
     )
 
 
-def test_sort_e2e(
-    ray_start_regular_shared, enable_optimizer, use_push_based_shuffle, tmp_path
-):
+def test_sort_e2e(ray_start_regular_shared, use_push_based_shuffle, tmp_path):
     ds = ray.data.range(100, parallelism=4)
     ds = ds.random_shuffle()
     ds = ds.sort("id")
@@ -1030,10 +1024,7 @@ def test_sort_e2e(
     assert [d["one"] for d in r2] == list(reversed(range(100)))
 
 
-def test_sort_validate_keys(
-    ray_start_regular_shared,
-    enable_optimizer,
-):
+def test_sort_validate_keys(ray_start_regular_shared):
     ds = ray.data.range(10)
     assert extract_values("id", ds.sort("id").take_all()) == list(range(10))
 
@@ -1065,7 +1056,7 @@ def test_sort_validate_keys(
         ds_named.sort(invalid_col_name).take_all()
 
 
-def test_aggregate_operator(ray_start_regular_shared, enable_optimizer):
+def test_aggregate_operator(ray_start_regular_shared):
     planner = Planner()
     read_op = get_parquet_read_logical_op()
     op = Aggregate(
@@ -1086,11 +1077,7 @@ def test_aggregate_operator(ray_start_regular_shared, enable_optimizer):
     )
 
 
-def test_aggregate_e2e(
-    ray_start_regular_shared,
-    enable_optimizer,
-    use_push_based_shuffle,
-):
+def test_aggregate_e2e(ray_start_regular_shared, use_push_based_shuffle):
     ds = ray.data.range(100, parallelism=4)
     ds = ds.groupby("id").count()
     assert ds.count() == 100
@@ -1099,10 +1086,7 @@ def test_aggregate_e2e(
     _check_usage_record(["ReadRange", "Aggregate"])
 
 
-def test_aggregate_validate_keys(
-    ray_start_regular_shared,
-    enable_optimizer,
-):
+def test_aggregate_validate_keys(ray_start_regular_shared):
     ds = ray.data.range(10)
     invalid_col_name = "invalid_column"
     with pytest.raises(
@@ -1139,7 +1123,7 @@ def test_aggregate_validate_keys(
         ds_named.groupby(invalid_col_name).count()
 
 
-def test_zip_operator(ray_start_regular_shared, enable_optimizer):
+def test_zip_operator(ray_start_regular_shared):
     planner = Planner()
     read_op1 = get_parquet_read_logical_op()
     read_op2 = get_parquet_read_logical_op()
@@ -1163,7 +1147,7 @@ def test_zip_operator(ray_start_regular_shared, enable_optimizer):
     "num_blocks1,num_blocks2",
     list(itertools.combinations_with_replacement(range(1, 12), 2)),
 )
-def test_zip_e2e(ray_start_regular_shared, enable_optimizer, num_blocks1, num_blocks2):
+def test_zip_e2e(ray_start_regular_shared, num_blocks1, num_blocks2):
     n = 12
     ds1 = ray.data.range(n, parallelism=num_blocks1)
     ds2 = ray.data.range(n, parallelism=num_blocks2).map(
@@ -1174,7 +1158,7 @@ def test_zip_e2e(ray_start_regular_shared, enable_optimizer, num_blocks1, num_bl
     _check_usage_record(["ReadRange", "Zip"])
 
 
-def test_from_dask_e2e(ray_start_regular_shared, enable_optimizer):
+def test_from_dask_e2e(ray_start_regular_shared):
     import dask.dataframe as dd
 
     df = pd.DataFrame({"one": list(range(100)), "two": list(range(100))})
@@ -1192,7 +1176,7 @@ def test_from_dask_e2e(ray_start_regular_shared, enable_optimizer):
     _check_usage_record(["FromPandas"])
 
 
-def test_from_modin_e2e(ray_start_regular_shared, enable_optimizer):
+def test_from_modin_e2e(ray_start_regular_shared):
     import modin.pandas as mopd
 
     df = pd.DataFrame(
@@ -1215,9 +1199,7 @@ def test_from_modin_e2e(ray_start_regular_shared, enable_optimizer):
 
 
 @pytest.mark.parametrize("enable_pandas_block", [False, True])
-def test_from_pandas_refs_e2e(
-    ray_start_regular_shared, enable_optimizer, enable_pandas_block
-):
+def test_from_pandas_refs_e2e(ray_start_regular_shared, enable_pandas_block):
     ctx = ray.data.context.DataContext.get_current()
     old_enable_pandas_block = ctx.enable_pandas_block
     ctx.enable_pandas_block = enable_pandas_block
@@ -1255,7 +1237,7 @@ def test_from_pandas_refs_e2e(
         ctx.enable_pandas_block = old_enable_pandas_block
 
 
-def test_from_numpy_refs_e2e(ray_start_regular_shared, enable_optimizer):
+def test_from_numpy_refs_e2e(ray_start_regular_shared):
     import numpy as np
 
     arr1 = np.expand_dims(np.arange(0, 4), axis=1)
@@ -1288,7 +1270,7 @@ def test_from_numpy_refs_e2e(ray_start_regular_shared, enable_optimizer):
     _check_usage_record(["FromNumpy"])
 
 
-def test_from_arrow_refs_e2e(ray_start_regular_shared, enable_optimizer):
+def test_from_arrow_refs_e2e(ray_start_regular_shared):
     import pyarrow as pa
 
     df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
@@ -1316,7 +1298,7 @@ def test_from_arrow_refs_e2e(ray_start_regular_shared, enable_optimizer):
     _check_usage_record(["FromArrow"])
 
 
-def test_from_huggingface_e2e(ray_start_regular_shared, enable_optimizer):
+def test_from_huggingface_e2e(ray_start_regular_shared):
     import datasets
 
     from ray.data.tests.test_huggingface import hfds_assert_equals
@@ -1359,7 +1341,7 @@ def test_from_huggingface_e2e(ray_start_regular_shared, enable_optimizer):
     _check_usage_record(["FromArrow"])
 
 
-def test_from_tf_e2e(ray_start_regular_shared, enable_optimizer):
+def test_from_tf_e2e(ray_start_regular_shared):
     import tensorflow as tf
     import tensorflow_datasets as tfds
 
@@ -1384,7 +1366,7 @@ def test_from_tf_e2e(ray_start_regular_shared, enable_optimizer):
     _check_usage_record(["FromItems"])
 
 
-def test_from_torch_e2e(ray_start_regular_shared, enable_optimizer, tmp_path):
+def test_from_torch_e2e(ray_start_regular_shared, tmp_path):
     import torchvision
 
     torch_dataset = torchvision.datasets.MNIST(tmp_path, download=True)
@@ -1407,7 +1389,7 @@ def test_from_torch_e2e(ray_start_regular_shared, enable_optimizer, tmp_path):
     reason="Limit pushdown currently disabled, see "
     "https://github.com/ray-project/ray/issues/36295"
 )
-def test_limit_pushdown(ray_start_regular_shared, enable_optimizer):
+def test_limit_pushdown(ray_start_regular_shared):
     def f1(x):
         return x
 
@@ -1489,8 +1471,6 @@ def f2(x):
 
 def test_execute_to_legacy_block_list(
     ray_start_regular_shared,
-    enable_optimizer,
-    enable_streaming_executor,
 ):
     ds = ray.data.range(10)
     # Stats not initialized until `ds.iter_rows()` is called
@@ -1506,8 +1486,6 @@ def test_execute_to_legacy_block_list(
 
 def test_execute_to_legacy_block_iterator(
     ray_start_regular_shared,
-    enable_optimizer,
-    enable_streaming_executor,
 ):
     ds = ray.data.range(10)
     assert ds._plan._snapshot_stats is None
@@ -1521,8 +1499,6 @@ def test_execute_to_legacy_block_iterator(
 
 def test_streaming_executor(
     ray_start_regular_shared,
-    enable_optimizer,
-    enable_streaming_executor,
 ):
     ds = ray.data.range(100, parallelism=4)
     ds = ds.map_batches(lambda x: x)
@@ -1541,8 +1517,6 @@ def test_streaming_executor(
 
 def test_schema_partial_execution(
     ray_start_regular_shared,
-    enable_optimizer,
-    enable_streaming_executor,
 ):
     fields = [
         ("sepal.length", pa.float64()),
@@ -1576,9 +1550,7 @@ def check_transform_fns(op, expected_types):
 
 
 @pytest.mark.skip("Needs zero-copy optimization for read->map_batches.")
-def test_zero_copy_fusion_eliminate_build_output_blocks(
-    ray_start_regular_shared, enable_optimizer
-):
+def test_zero_copy_fusion_eliminate_build_output_blocks(ray_start_regular_shared):
     # Test the EliminateBuildOutputBlocks optimization rule.
     planner = Planner()
     read_op = get_parquet_read_logical_op()
diff --git a/python/ray/data/tests/test_map.py b/python/ray/data/tests/test_map.py
index 97d0fc12d1b16..a3dc65e921460 100644
--- a/python/ray/data/tests/test_map.py
+++ b/python/ray/data/tests/test_map.py
@@ -1,7 +1,6 @@
 import itertools
 import math
 import os
-import signal
 import threading
 import time
 from typing import Iterator
@@ -13,8 +12,6 @@
 import pytest
 
 import ray
-from ray._private.test_utils import wait_for_condition
-from ray.data.block import BlockAccessor
 from ray.data.context import DataContext
 from ray.data.tests.conftest import *  # noqa
 from ray.data.tests.util import column_udf, column_udf_class, extract_values
@@ -426,10 +423,7 @@ def test_map_batches_extra_args(shutdown_only, tmp_path):
 
     def put(x):
         # We only support automatic deref in the legacy backend.
-        if DataContext.get_current().new_execution_backend:
-            return x
-        else:
-            return ray.put(x)
+        return x
 
     # Test input validation
     ds = ray.data.range(5)
@@ -951,37 +945,6 @@ def test_random_sample_checks(ray_start_regular_shared):
 
 # NOTE: All tests above share a Ray cluster, while the tests below do not. These
 # tests should only be carefully reordered to retain this invariant!
-
-
-def test_actor_pool_strategy_apply_interrupt(shutdown_only):
-    """Test that _apply kills the actor pool if an interrupt is raised."""
-    ray.shutdown()
-
-    ray.init(include_dashboard=False, num_cpus=1)
-
-    cpus = ray.available_resources()["CPU"]
-    ds = ray.data.range(5, parallelism=5)
-    aps = ray.data.ActorPoolStrategy(max_size=5)
-    blocks = ds._plan.execute()
-
-    # Start some actors, the first one sends a SIGINT, emulating a KeyboardInterrupt
-    def test_func(block):
-        for i, _ in enumerate(BlockAccessor.for_block(block).iter_rows()):
-            if i == 0:
-                os.kill(os.getpid(), signal.SIGINT)
-            else:
-                time.sleep(1000)
-                return block
-
-    # No need to test ActorPoolStrategy in new execution backend.
-    if not DataContext.get_current().new_execution_backend:
-        with pytest.raises(ray.exceptions.RayTaskError):
-            aps._apply(test_func, {}, blocks, False)
-
-    # Check that all actors have been killed by counting the available CPUs
-    wait_for_condition(lambda: (ray.available_resources().get("CPU", 0) == cpus))
-
-
 def test_actor_pool_strategy_default_num_actors(shutdown_only):
     import time
 
@@ -991,25 +954,13 @@ def __call__(self, x):
             return x
 
     num_cpus = 5
+    ray.shutdown()
     ray.init(num_cpus=num_cpus)
     compute_strategy = ray.data.ActorPoolStrategy()
     ray.data.range(10, parallelism=10).map_batches(
         UDFClass, compute=compute_strategy, batch_size=1
     ).materialize()
 
-    # The new execution backend is not using the ActorPoolStrategy under
-    # the hood, so the expectation here applies only to the old backend.
-    # TODO(https://github.com/ray-project/ray/issues/31723): we should check
-    # the num of workers once we have autoscaling in new execution backend.
-    if not DataContext.get_current().new_execution_backend:
-        expected_max_num_workers = math.ceil(
-            num_cpus * (1 / compute_strategy.ready_to_total_workers_ratio)
-        )
-        assert (
-            compute_strategy.num_workers >= num_cpus
-            and compute_strategy.num_workers <= expected_max_num_workers
-        ), "Number of actors is out of the expected bound"
-
 
 def test_actor_pool_strategy_bundles_to_max_actors(shutdown_only):
     """Tests that blocks are bundled up to the specified max number of actors."""
@@ -1025,11 +976,6 @@ def __call__(self, x):
         .materialize()
     )
 
-    # TODO(https://github.com/ray-project/ray/issues/31723): implement the feature
-    # of capping bundle size by actor pool size, and then re-enable this test.
-    if not DataContext.get_current().new_execution_backend:
-        assert f"{max_size} blocks" in ds.stats()
-
     # Check batch size is still respected.
     ds = (
         ray.data.range(10, parallelism=10)
diff --git a/python/ray/data/tests/test_mars.py b/python/ray/data/tests/test_mars.py
index 5c47d9f7081f4..99838c6b78865 100644
--- a/python/ray/data/tests/test_mars.py
+++ b/python/ray/data/tests/test_mars.py
@@ -48,7 +48,7 @@ def test_mars(ray_start_regular):
     cluster.stop()
 
 
-def test_from_mars_e2e(ray_start_regular, enable_optimizer):
+def test_from_mars_e2e(ray_start_regular):
     import pandas as pd
 
     cluster = mars.new_cluster_in_ray(worker_num=2, worker_cpu=1)
diff --git a/python/ray/data/tests/test_optimize.py b/python/ray/data/tests/test_optimize.py
index ff2ee6883530b..3a986035c219d 100644
--- a/python/ray/data/tests/test_optimize.py
+++ b/python/ray/data/tests/test_optimize.py
@@ -9,7 +9,6 @@
 from ray._private.internal_api import memory_summary
 from ray.data import Dataset
 from ray.data.block import BlockMetadata
-from ray.data.context import DataContext
 from ray.data.datasource import Datasource, ReadTask
 from ray.data.datasource.csv_datasource import CSVDatasource
 from ray.data.tests.util import column_udf, extract_values
@@ -97,9 +96,6 @@ def prepare_read(
 
 
 def test_memory_release(shutdown_only):
-    context = DataContext.get_current()
-    # Ensure that stage fusion is enabled.
-    context.optimize_fuse_stages = True
     info = ray.init(num_cpus=1, object_store_memory=1500e6)
     ds = ray.data.range(10)
 
@@ -219,11 +215,6 @@ def test_spread_hint_inherit(ray_start_regular_shared):
 
 
 def test_optimize_reorder(ray_start_regular_shared):
-    context = DataContext.get_current()
-    context.optimize_fuse_stages = True
-    context.optimize_fuse_read_stages = True
-    context.optimize_reorder_stages = True
-
     ds = ray.data.range(10).randomize_block_order().map_batches(dummy_map).materialize()
     print("Stats", ds.stats())
     expect_stages(
@@ -247,11 +238,6 @@ def test_optimize_reorder(ray_start_regular_shared):
 
 
 def test_write_fusion(ray_start_regular_shared, tmp_path):
-    context = DataContext.get_current()
-    context.optimize_fuse_stages = True
-    context.optimize_fuse_read_stages = True
-    context.optimize_fuse_shuffle_stages = True
-
     path = os.path.join(tmp_path, "out")
     ds = ray.data.range(100).map_batches(lambda x: x)
     ds.write_csv(path)
diff --git a/python/ray/data/tests/test_parquet.py b/python/ray/data/tests/test_parquet.py
index dd2924209488a..da41f8a170b8e 100644
--- a/python/ray/data/tests/test_parquet.py
+++ b/python/ray/data/tests/test_parquet.py
@@ -31,15 +31,12 @@
 from ray.tests.conftest import *  # noqa
 
 
-def check_num_computed(ds, expected, streaming_expected) -> None:
+def check_num_computed(ds, streaming_expected) -> None:
     # When streaming executor is on, the _num_computed() is affected only
     # by the ds.schema() which will still partial read the blocks, but will
     # not affected by operations like take() as it's executed via streaming
     # executor.
-    if not ray.data.context.DataContext.get_current().use_streaming_executor:
-        assert ds._plan.execute()._num_computed() == expected
-    else:
-        assert ds._plan.execute()._num_computed() == streaming_expected
+    assert ds._plan.execute()._num_computed() == streaming_expected
 
 
 def test_include_paths(ray_start_regular_shared, tmp_path):
@@ -145,13 +142,13 @@ def test_parquet_read_basic(ray_start_regular_shared, fs, data_path):
     ds = ray.data.read_parquet(data_path, filesystem=fs)
 
     # Test metadata-only parquet ops.
-    check_num_computed(ds, 0, 0)
+    check_num_computed(ds, 0)
     assert ds.count() == 6
     assert ds.size_bytes() > 0
     # Schema information is available from Parquet metadata, so
     # we do not need to compute the first block.
     assert ds.schema() is not None
-    check_num_computed(ds, 0, 0)
+    check_num_computed(ds, 0)
     input_files = ds.input_files()
     assert len(input_files) == 2, input_files
     assert "test1.parquet" in str(input_files)
@@ -164,11 +161,11 @@ def test_parquet_read_basic(ray_start_regular_shared, fs, data_path):
         repr(ds) == "Dataset(num_blocks=2, num_rows=6, "
         "schema={one: int64, two: string})"
     ), ds
-    check_num_computed(ds, 0, 0)
+    check_num_computed(ds, 0)
 
     # Forces a data read.
     values = [[s["one"], s["two"]] for s in ds.take_all()]
-    check_num_computed(ds, 2, 0)
+    check_num_computed(ds, 0)
     assert sorted(values) == [
         [1, "a"],
         [2, "b"],
@@ -235,7 +232,7 @@ def prefetch_file_metadata(self, fragments, **ray_remote_args):
     )
 
     # Expect to lazily compute all metadata correctly.
-    check_num_computed(ds, 0, 0)
+    check_num_computed(ds, 0)
     assert ds.count() == 6
     assert ds.size_bytes() > 0
     assert ds.schema() is not None
@@ -251,11 +248,11 @@ def prefetch_file_metadata(self, fragments, **ray_remote_args):
         repr(ds) == "Dataset(num_blocks=2, num_rows=6, "
         "schema={one: int64, two: string})"
     ), ds
-    check_num_computed(ds, 2, 2)
+    check_num_computed(ds, 2)
 
     # Forces a data read.
     values = [[s["one"], s["two"]] for s in ds.take()]
-    check_num_computed(ds, 2, 2)
+    check_num_computed(ds, 2)
     assert sorted(values) == [
         [1, "a"],
         [2, "b"],
@@ -362,7 +359,7 @@ def test_parquet_read_bulk(ray_start_regular_shared, fs, data_path):
     assert ds._meta_count() is None
 
     # Expect to lazily compute all metadata correctly.
-    check_num_computed(ds, 0, 0)
+    check_num_computed(ds, 0)
     assert ds.count() == 6
     assert ds.size_bytes() > 0
     assert ds.schema() is not None
@@ -378,11 +375,11 @@ def test_parquet_read_bulk(ray_start_regular_shared, fs, data_path):
         repr(ds) == "Dataset(num_blocks=2, num_rows=6, "
         "schema={one: int64, two: string})"
     ), ds
-    check_num_computed(ds, 2, 2)
+    check_num_computed(ds, 2)
 
     # Forces a data read.
     values = [[s["one"], s["two"]] for s in ds.take()]
-    check_num_computed(ds, 2, 2)
+    check_num_computed(ds, 2)
     assert sorted(values) == [
         [1, "a"],
         [2, "b"],
@@ -403,7 +400,7 @@ def test_parquet_read_bulk(ray_start_regular_shared, fs, data_path):
 
     # Forces a data read.
     values = [[s["one"], s["two"]] for s in ds.take()]
-    check_num_computed(ds, 2, 0)
+    check_num_computed(ds, 0)
     assert sorted(values) == [
         [1, "a"],
         [2, "b"],
@@ -452,7 +449,7 @@ def test_parquet_read_bulk_meta_provider(ray_start_regular_shared, fs, data_path
     assert ds._meta_count() is None
 
     # Expect to lazily compute all metadata correctly.
-    check_num_computed(ds, 0, 0)
+    check_num_computed(ds, 0)
     assert ds.count() == 6
     assert ds.size_bytes() > 0
     assert ds.schema() is not None
@@ -468,11 +465,11 @@ def test_parquet_read_bulk_meta_provider(ray_start_regular_shared, fs, data_path
         repr(ds) == "Dataset(num_blocks=2, num_rows=6, "
         "schema={one: int64, two: string})"
     ), ds
-    check_num_computed(ds, 2, 2)
+    check_num_computed(ds, 2)
 
     # Forces a data read.
     values = [[s["one"], s["two"]] for s in ds.take()]
-    check_num_computed(ds, 2, 2)
+    check_num_computed(ds, 2)
     assert sorted(values) == [
         [1, "a"],
         [2, "b"],
@@ -511,16 +508,16 @@ def test_parquet_read_partitioned(ray_start_regular_shared, fs, data_path):
     ds = ray.data.read_parquet(data_path, filesystem=fs)
 
     # Test metadata-only parquet ops.
-    check_num_computed(ds, 0, 0)
+    check_num_computed(ds, 0)
     assert ds.count() == 6
     assert ds.size_bytes() > 0
     # Schema information and input files are available from Parquet metadata,
     # so we do not need to compute the first block.
     assert ds.schema() is not None
-    check_num_computed(ds, 0, 0)
+    check_num_computed(ds, 0)
     input_files = ds.input_files()
     assert len(input_files) == 2, input_files
-    check_num_computed(ds, 0, 0)
+    check_num_computed(ds, 0)
     assert str(ds) == (
         "Dataset(\n"
         "   num_blocks=2,\n"
@@ -537,7 +534,7 @@ def test_parquet_read_partitioned(ray_start_regular_shared, fs, data_path):
         "one: dictionary<values=int32, indices=int32, ordered=0>}\n"
         ")"
     ), ds
-    check_num_computed(ds, 0, 0)
+    check_num_computed(ds, 0)
 
     # Forces a data read.
     values = [[s["one"], s["two"]] for s in ds.take()]
@@ -549,13 +546,13 @@ def test_parquet_read_partitioned(ray_start_regular_shared, fs, data_path):
         [3, "f"],
         [3, "g"],
     ]
-    check_num_computed(ds, 2, 0)
+    check_num_computed(ds, 0)
 
     # Test column selection.
     ds = ray.data.read_parquet(data_path, columns=["one"], filesystem=fs)
     values = [s["one"] for s in ds.take()]
     assert sorted(values) == [1, 1, 1, 3, 3, 3]
-    check_num_computed(ds, 2, 0)
+    check_num_computed(ds, 0)
 
 
 def test_parquet_read_partitioned_with_filter(ray_start_regular_shared, tmp_path):
@@ -574,7 +571,7 @@ def test_parquet_read_partitioned_with_filter(ray_start_regular_shared, tmp_path
     )
 
     values = [[s["one"], s["two"]] for s in ds.take()]
-    check_num_computed(ds, 1, 0)
+    check_num_computed(ds, 0)
     assert sorted(values) == [[1, "a"], [1, "a"]]
     assert ds.count() == 2
 
@@ -585,7 +582,7 @@ def test_parquet_read_partitioned_with_filter(ray_start_regular_shared, tmp_path
     )
 
     values = [[s["one"], s["two"]] for s in ds.take()]
-    check_num_computed(ds, 2, 0)
+    check_num_computed(ds, 0)
     assert sorted(values) == [[1, "a"], [1, "a"]]
     assert ds.count() == 2
 
@@ -625,7 +622,7 @@ def test_parquet_read_partitioned_with_columns(ray_start_regular_shared, fs, dat
     )
     assert ds.columns() == ["y", "z"]
     values = [[s["y"], s["z"]] for s in ds.take()]
-    check_num_computed(ds, 2, 0)
+    check_num_computed(ds, 0)
     assert sorted(values) == [
         ["a", 0.1],
         ["a", 0.3],
@@ -687,7 +684,7 @@ def test_parquet_read_partitioned_with_partition_filter(
 
     assert ds.columns() == ["x", "y", "z"]
     values = [[s["x"], s["y"], s["z"]] for s in ds.take()]
-    check_num_computed(ds, 2, 0)
+    check_num_computed(ds, 0)
     assert sorted(values) == [[0, "a", 0.1]]
 
 
@@ -711,12 +708,12 @@ def test_parquet_read_partitioned_explicit(ray_start_regular_shared, tmp_path):
     )
 
     # Test metadata-only parquet ops.
-    check_num_computed(ds, 0, 0)
+    check_num_computed(ds, 0)
     assert ds.count() == 6
     assert ds.size_bytes() > 0
     # Schema information and input files are available from Parquet metadata,
     # so we do not need to compute the first block.
-    check_num_computed(ds, 0, 0)
+    check_num_computed(ds, 0)
     assert ds.schema() is not None
     input_files = ds.input_files()
     assert len(input_files) == 2, input_files
@@ -728,11 +725,11 @@ def test_parquet_read_partitioned_explicit(ray_start_regular_shared, tmp_path):
         repr(ds) == "Dataset(num_blocks=2, num_rows=6, "
         "schema={two: string, one: int32})"
     ), ds
-    check_num_computed(ds, 0, 0)
+    check_num_computed(ds, 0)
 
     # Forces a data read.
     values = [[s["one"], s["two"]] for s in ds.take()]
-    check_num_computed(ds, 2, 0)
+    check_num_computed(ds, 0)
     assert sorted(values) == [
         [1, "a"],
         [1, "b"],
@@ -761,7 +758,7 @@ def _block_udf(block: pa.Table):
     ds = ray.data.read_parquet(str(tmp_path), parallelism=1, _block_udf=_block_udf)
 
     ones, twos = zip(*[[s["one"], s["two"]] for s in ds.take()])
-    check_num_computed(ds, 1, 0)
+    check_num_computed(ds, 0)
     np.testing.assert_array_equal(sorted(ones), np.array(one_data) + 1)
 
     # 2 blocks/read tasks
@@ -769,7 +766,7 @@ def _block_udf(block: pa.Table):
     ds = ray.data.read_parquet(str(tmp_path), parallelism=2, _block_udf=_block_udf)
 
     ones, twos = zip(*[[s["one"], s["two"]] for s in ds.take()])
-    check_num_computed(ds, 2, 0)
+    check_num_computed(ds, 0)
     np.testing.assert_array_equal(sorted(ones), np.array(one_data) + 1)
 
     # 2 blocks/read tasks, 1 empty block
@@ -782,7 +779,7 @@ def _block_udf(block: pa.Table):
     )
 
     ones, twos = zip(*[[s["one"], s["two"]] for s in ds.take()])
-    check_num_computed(ds, 2, 0)
+    check_num_computed(ds, 0)
     np.testing.assert_array_equal(sorted(ones), np.array(one_data[:2]) + 1)
 
 
@@ -812,7 +809,7 @@ def test_parquet_read_parallel_meta_fetch(ray_start_regular_shared, fs, data_pat
     ds = ray.data.read_parquet(data_path, filesystem=fs, parallelism=parallelism)
 
     # Test metadata-only parquet ops.
-    check_num_computed(ds, 0, 0)
+    check_num_computed(ds, 0)
     assert ds.count() == num_dfs * 3
     assert ds.size_bytes() > 0
     # Schema information and input files are available from Parquet metadata,
@@ -820,11 +817,11 @@ def test_parquet_read_parallel_meta_fetch(ray_start_regular_shared, fs, data_pat
     assert ds.schema() is not None
     input_files = ds.input_files()
     assert len(input_files) == num_dfs, input_files
-    check_num_computed(ds, 0, 0)
+    check_num_computed(ds, 0)
 
     # Forces a data read.
     values = [s["one"] for s in ds.take(limit=3 * num_dfs)]
-    check_num_computed(ds, parallelism, 0)
+    check_num_computed(ds, 0)
     assert sorted(values) == list(range(3 * num_dfs))
 
 
diff --git a/python/ray/data/tests/test_randomize_block_order.py b/python/ray/data/tests/test_randomize_block_order.py
index 4dec9fa1adb70..7be374fe7c5a3 100644
--- a/python/ray/data/tests/test_randomize_block_order.py
+++ b/python/ray/data/tests/test_randomize_block_order.py
@@ -19,7 +19,7 @@
 from ray.data.tests.util import extract_values
 
 
-def test_randomize_blocks_operator(ray_start_regular_shared, enable_optimizer):
+def test_randomize_blocks_operator(ray_start_regular_shared):
     planner = Planner()
     read_op = get_parquet_read_logical_op()
     op = RandomizeBlocks(
@@ -112,7 +112,7 @@ def test_randomize_block_order_after_repartition():
     assert operator_count == 6
 
 
-def test_randomize_blocks_e2e(ray_start_regular_shared, enable_optimizer):
+def test_randomize_blocks_e2e(ray_start_regular_shared):
     ds = ray.data.range(12, parallelism=4)
     ds = ds.randomize_block_order(seed=0)
     assert extract_values("id", ds.take_all()) == [
@@ -131,7 +131,7 @@ def test_randomize_blocks_e2e(ray_start_regular_shared, enable_optimizer):
     ], ds
 
 
-def test_randomize_blocks_rule_e2e(ray_start_regular_shared, enable_optimizer):
+def test_randomize_blocks_rule_e2e(ray_start_regular_shared):
     def dummy_map(x):
         return x
 
diff --git a/python/ray/data/tests/test_raydp.py b/python/ray/data/tests/test_raydp.py
index adbb6fa17c78d..84576aedaed95 100644
--- a/python/ray/data/tests/test_raydp.py
+++ b/python/ray/data/tests/test_raydp.py
@@ -42,7 +42,7 @@ def test_raydp_to_spark(spark):
     assert values == rows
 
 
-def test_from_spark_e2e(enable_optimizer, spark):
+def test_from_spark_e2e(spark):
     spark_df = spark.createDataFrame([(1, "a"), (2, "b"), (3, "c")], ["one", "two"])
 
     rows = [(r.one, r.two) for r in spark_df.take(3)]
diff --git a/python/ray/data/tests/test_split.py b/python/ray/data/tests/test_split.py
index 9c04b82ff1468..1e827ca0de8c4 100644
--- a/python/ray/data/tests/test_split.py
+++ b/python/ray/data/tests/test_split.py
@@ -79,9 +79,7 @@ def count(s):
         ([2, 5], 1),  # Single split.
     ],
 )
-def test_equal_split_balanced(
-    ray_start_regular_shared, enable_optimizer, block_sizes, num_splits
-):
+def test_equal_split_balanced(ray_start_regular_shared, block_sizes, num_splits):
     _test_equal_split_balanced(block_sizes, num_splits)
 
 
diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py
index a1772d5ab2617..1ce4de4905a8c 100644
--- a/python/ray/data/tests/test_stats.py
+++ b/python/ray/data/tests/test_stats.py
@@ -20,7 +20,6 @@
 )
 from ray.data._internal.util import create_dataset_tag
 from ray.data.block import BlockMetadata
-from ray.data.context import DataContext
 from ray.data.tests.util import column_udf
 from ray.tests.conftest import *  # noqa
 
@@ -240,26 +239,12 @@ def test_large_args_scheduling_strategy(ray_start_regular_shared):
 
 
 def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats):
-    context = DataContext.get_current()
-    context.optimize_fuse_stages = True
-
-    if context.new_execution_backend:
-        if context.use_streaming_executor:
-            logger = DatasetLogger(
-                "ray.data._internal.execution.streaming_executor"
-            ).get_logger(
-                log_to_stdout=enable_auto_log_stats,
-            )
-        else:
-            logger = DatasetLogger(
-                "ray.data._internal.execution.bulk_executor"
-            ).get_logger(
-                log_to_stdout=enable_auto_log_stats,
-            )
-    else:
-        logger = DatasetLogger("ray.data._internal.plan").get_logger(
-            log_to_stdout=enable_auto_log_stats,
-        )
+    logger = DatasetLogger(
+        "ray.data._internal.execution.streaming_executor"
+    ).get_logger(
+        log_to_stdout=enable_auto_log_stats,
+    )
+
     with patch.object(logger, "info") as mock_logger:
         ds = ray.data.range(1000, parallelism=10)
         ds = ds.map_batches(dummy_map_batches).materialize()
@@ -267,208 +252,72 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats):
         if enable_auto_log_stats:
             logger_args, logger_kwargs = mock_logger.call_args
 
-            if context.new_execution_backend:
-                assert (
-                    canonicalize(logger_args[0])
-                    == f"""Operator N ReadRange->MapBatches(dummy_map_batches): {EXECUTION_STRING}
-* Remote wall time: T min, T max, T mean, T total
-* Remote cpu time: T min, T max, T mean, T total
-* Peak heap memory usage (MiB): N min, N max, N mean
-* Output num rows per block: N min, N max, N mean, N total
-* Output size bytes per block: N min, N max, N mean, N total
-* Output rows per task: N min, N max, N mean, N tasks used
-* Tasks per node: N min, N max, N mean; N nodes used
-* Extra metrics: {STANDARD_EXTRA_METRICS}
-"""
-                )
-            else:
-                assert (
-                    canonicalize(logger_args[0])
-                    == """Operator N Read->MapBatches(dummy_map_batches): {EXECUTION_STRING}
-* Remote wall time: T min, T max, T mean, T total
-* Remote cpu time: T min, T max, T mean, T total
-* Peak heap memory usage (MiB): N min, N max, N mean
-* Output num rows per block: N min, N max, N mean, N total
-* Output size bytes per block: N min, N max, N mean, N total
-* Output rows per task: N min, N max, N mean, N tasks used
-* Tasks per node: N min, N max, N mean; N nodes used
-"""
-                )
+            assert canonicalize(logger_args[0]) == (
+                f"Operator N ReadRange->MapBatches(dummy_map_batches): "
+                f"{EXECUTION_STRING}\n"
+                f"* Remote wall time: T min, T max, T mean, T total\n"
+                f"* Remote cpu time: T min, T max, T mean, T total\n"
+                f"* Peak heap memory usage (MiB): N min, N max, N mean\n"
+                f"* Output num rows per block: N min, N max, N mean, N total\n"
+                f"* Output size bytes per block: N min, N max, N mean, N total\n"
+                f"* Output rows per task: N min, N max, N mean, N tasks used\n"
+                f"* Tasks per node: N min, N max, N mean; N nodes used\n"
+                f"* Extra metrics: {STANDARD_EXTRA_METRICS}\n"
+            )
 
         ds = ds.map(dummy_map_batches).materialize()
         if enable_auto_log_stats:
             logger_args, logger_kwargs = mock_logger.call_args
 
-            if context.new_execution_backend:
-                assert (
-                    canonicalize(logger_args[0])
-                    == f"""Operator N Map(dummy_map_batches): {EXECUTION_STRING}
-* Remote wall time: T min, T max, T mean, T total
-* Remote cpu time: T min, T max, T mean, T total
-* Peak heap memory usage (MiB): N min, N max, N mean
-* Output num rows per block: N min, N max, N mean, N total
-* Output size bytes per block: N min, N max, N mean, N total
-* Output rows per task: N min, N max, N mean, N tasks used
-* Tasks per node: N min, N max, N mean; N nodes used
-* Extra metrics: {STANDARD_EXTRA_METRICS}
-"""
-                )
-            else:
-                assert (
-                    canonicalize(logger_args[0])
-                    == """Operator N Map(dummy_map_batches): {EXECUTION_STRING}
-* Remote wall time: T min, T max, T mean, T total
-* Remote cpu time: T min, T max, T mean, T total
-* Peak heap memory usage (MiB): N min, N max, N mean
-* Output num rows per block: N min, N max, N mean, N total
-* Output size bytes per block: N min, N max, N mean, N total
-* Output rows per task: N min, N max, N mean, N tasks used
-* Tasks per node: N min, N max, N mean; N nodes used
-"""
-                )
+            assert canonicalize(logger_args[0]) == (
+                f"Operator N Map(dummy_map_batches): {EXECUTION_STRING}\n"
+                f"* Remote wall time: T min, T max, T mean, T total\n"
+                f"* Remote cpu time: T min, T max, T mean, T total\n"
+                f"* Peak heap memory usage (MiB): N min, N max, N mean\n"
+                f"* Output num rows per block: N min, N max, N mean, N total\n"
+                f"* Output size bytes per block: N min, N max, N mean, N total\n"
+                f"* Output rows per task: N min, N max, N mean, N tasks used\n"
+                f"* Tasks per node: N min, N max, N mean; N nodes used\n"
+                f"* Extra metrics: {STANDARD_EXTRA_METRICS}\n"
+            )
+
     for batch in ds.iter_batches():
         pass
     stats = canonicalize(ds.materialize().stats())
 
-    if context.new_execution_backend:
-        if context.use_streaming_executor:
-            assert (
-                stats
-                == f"""Operator N ReadRange->MapBatches(dummy_map_batches): {EXECUTION_STRING}
-* Remote wall time: T min, T max, T mean, T total
-* Remote cpu time: T min, T max, T mean, T total
-* Peak heap memory usage (MiB): N min, N max, N mean
-* Output num rows per block: N min, N max, N mean, N total
-* Output size bytes per block: N min, N max, N mean, N total
-* Output rows per task: N min, N max, N mean, N tasks used
-* Tasks per node: N min, N max, N mean; N nodes used
-* Extra metrics: {STANDARD_EXTRA_METRICS}
-
-Operator N Map(dummy_map_batches): {EXECUTION_STRING}
-* Remote wall time: T min, T max, T mean, T total
-* Remote cpu time: T min, T max, T mean, T total
-* Peak heap memory usage (MiB): N min, N max, N mean
-* Output num rows per block: N min, N max, N mean, N total
-* Output size bytes per block: N min, N max, N mean, N total
-* Output rows per task: N min, N max, N mean, N tasks used
-* Tasks per node: N min, N max, N mean; N nodes used
-* Extra metrics: {STANDARD_EXTRA_METRICS}
-
-Dataset iterator time breakdown:
-* Total time user code is blocked: T
-* Total time in user code: T
-* Total time overall: T
-* Num blocks local: Z
-* Num blocks remote: Z
-* Num blocks unknown location: N
-* Batch iteration time breakdown (summed across prefetch threads):
-    * In ray.get(): T min, T max, T avg, T total
-    * In batch creation: T min, T max, T avg, T total
-    * In batch formatting: T min, T max, T avg, T total
-"""
-            )
-        else:
-            assert (
-                stats
-                == f"""Operator N ReadRange->MapBatches(dummy_map_batches): {EXECUTION_STRING}
-* Remote wall time: T min, T max, T mean, T total
-* Remote cpu time: T min, T max, T mean, T total
-* Peak heap memory usage (MiB): N min, N max, N mean
-* Output num rows per block: N min, N max, N mean, N total
-* Output size bytes per block: N min, N max, N mean, N total
-* Output rows per task: N min, N max, N mean, N tasks used
-* Tasks per node: N min, N max, N mean; N nodes used
-* Extra metrics: {STANDARD_EXTRA_METRICS}
-
-Operator N Map(dummy_map_batches): {EXECUTION_STRING}
-* Remote wall time: T min, T max, T mean, T total
-* Remote cpu time: T min, T max, T mean, T total
-* Peak heap memory usage (MiB): N min, N max, N mean
-* Output num rows per block: N min, N max, N mean, N total
-* Output size bytes per block: N min, N max, N mean, N total
-* Output rows per task: N min, N max, N mean, N tasks used
-* Tasks per node: N min, N max, N mean; N nodes used
-* Extra metrics: {STANDARD_EXTRA_METRICS}
-
-Dataset iterator time breakdown:
-* In ray.wait(): T
-* In ray.get(): T
-* Num blocks local: Z
-* Num blocks remote: Z
-* Num blocks unknown location: N
-* In next_batch(): T
-* In format_batch(): T
-* In user code: T
-* Total time: T
-"""
-            )
-    else:
-        if context.use_streaming_executor:
-            assert (
-                stats
-                == f"""Operator N ReadRange->MapBatches(dummy_map_batches): {EXECUTION_STRING}
-* Remote wall time: T min, T max, T mean, T total
-* Remote cpu time: T min, T max, T mean, T total
-* Peak heap memory usage (MiB): N min, N max, N mean
-* Output num rows per block: N min, N max, N mean, N total
-* Output size bytes per block: N min, N max, N mean, N total
-* Output rows per task: N min, N max, N mean, N tasks used
-* Tasks per node: N min, N max, N mean; N nodes used
-* Extra metrics: {STANDARD_EXTRA_METRICS}
-
-Operator N Map(dummy_map_batches): {EXECUTION_STRING}
-* Remote wall time: T min, T max, T mean, T total
-* Remote cpu time: T min, T max, T mean, T total
-* Peak heap memory usage (MiB): N min, N max, N mean
-* Output num rows per block: N min, N max, N mean, N total
-* Output size bytes per block: N min, N max, N mean, N total
-* Output rows per task: N min, N max, N mean, N tasks used
-* Tasks per node: N min, N max, N mean; N nodes used
-* Extra metrics: {STANDARD_EXTRA_METRICS}
-
-Dataset iterator time breakdown:
-* Total time user code is blocked: T
-* Total time in user code: T
-* Total time overall: T
-* Num blocks local: Z
-* Num blocks remote: Z
-* Num blocks unknown location: N
-* Batch iteration time breakdown (summed across prefetch threads):
-    * In ray.get(): T min, T max, T avg, T total
-    * In batch creation: T min, T max, T avg, T total
-    * In batch formatting: T min, T max, T avg, T total
-"""
-            )
-        else:
-            assert (
-                stats
-                == """Operator N Read->MapBatches(dummy_map_batches): {EXECUTION_STRING}
-* Remote wall time: T min, T max, T mean, T total
-* Remote cpu time: T min, T max, T mean, T total
-* Peak heap memory usage (MiB): N min, N max, N mean
-* Output num rows per block: N min, N max, N mean, N total
-* Output size bytes per block: N min, N max, N mean, N total
-* Output rows per task: N min, N max, N mean, N tasks used
-* Tasks per node: N min, N max, N mean; N nodes used
-
-Operator N Map(dummy_map_batches): {EXECUTION_STRING}
-* Remote wall time: T min, T max, T mean, T total
-* Remote cpu time: T min, T max, T mean, T total
-* Peak heap memory usage (MiB): N min, N max, N mean
-* Output num rows per block: N min, N max, N mean, N total
-* Output size bytes per block: N min, N max, N mean, N total
-* Output rows per task: N min, N max, N mean, N tasks used
-* Tasks per node: N min, N max, N mean; N nodes used
-
-Dataset iterator time breakdown:
-* In ray.wait(): T
-* In ray.get(): T
-* In next_batch(): T
-* In format_batch(): T
-* In user code: T
-* Total time: T
-"""
-            )
+    assert stats == (
+        f"Operator N ReadRange->MapBatches(dummy_map_batches): {EXECUTION_STRING}\n"
+        f"* Remote wall time: T min, T max, T mean, T total\n"
+        f"* Remote cpu time: T min, T max, T mean, T total\n"
+        f"* Peak heap memory usage (MiB): N min, N max, N mean\n"
+        f"* Output num rows per block: N min, N max, N mean, N total\n"
+        f"* Output size bytes per block: N min, N max, N mean, N total\n"
+        f"* Output rows per task: N min, N max, N mean, N tasks used\n"
+        f"* Tasks per node: N min, N max, N mean; N nodes used\n"
+        f"* Extra metrics: {STANDARD_EXTRA_METRICS}\n"
+        f"\n"
+        f"Operator N Map(dummy_map_batches): {EXECUTION_STRING}\n"
+        f"* Remote wall time: T min, T max, T mean, T total\n"
+        f"* Remote cpu time: T min, T max, T mean, T total\n"
+        f"* Peak heap memory usage (MiB): N min, N max, N mean\n"
+        f"* Output num rows per block: N min, N max, N mean, N total\n"
+        f"* Output size bytes per block: N min, N max, N mean, N total\n"
+        f"* Output rows per task: N min, N max, N mean, N tasks used\n"
+        f"* Tasks per node: N min, N max, N mean; N nodes used\n"
+        f"* Extra metrics: {STANDARD_EXTRA_METRICS}\n"
+        f"\n"
+        f"Dataset iterator time breakdown:\n"
+        f"* Total time user code is blocked: T\n"
+        f"* Total time in user code: T\n"
+        f"* Total time overall: T\n"
+        f"* Num blocks local: Z\n"
+        f"* Num blocks remote: Z\n"
+        f"* Num blocks unknown location: N\n"
+        f"* Batch iteration time breakdown (summed across prefetch threads):\n"
+        f"    * In ray.get(): T min, T max, T avg, T total\n"
+        f"    * In batch creation: T min, T max, T avg, T total\n"
+        f"    * In batch formatting: T min, T max, T avg, T total\n"
+    )
 
 
 def test_dataset__repr__(ray_start_regular_shared):
@@ -647,8 +496,6 @@ def check_stats2():
 
 
 def test_dataset_stats_shuffle(ray_start_regular_shared):
-    context = DataContext.get_current()
-    context.optimize_fuse_stages = True
     ds = ray.data.range(1000, parallelism=10)
     ds = ds.random_shuffle().repartition(1, shuffle=True)
     stats = canonicalize(ds.materialize().stats())
@@ -735,119 +582,63 @@ def test_dataset_stats_from_items(ray_start_regular_shared):
 
 
 def test_dataset_stats_read_parquet(ray_start_regular_shared, tmp_path):
-    context = DataContext.get_current()
-    context.optimize_fuse_stages = True
     ds = ray.data.range(1000, parallelism=10)
     ds.write_parquet(str(tmp_path))
     ds = ray.data.read_parquet(str(tmp_path)).map(lambda x: x)
     stats = canonicalize(ds.materialize().stats())
-    if context.new_execution_backend:
-        assert (
-            stats
-            == f"""Operator N ReadParquet->Map(<lambda>): {EXECUTION_STRING}
-* Remote wall time: T min, T max, T mean, T total
-* Remote cpu time: T min, T max, T mean, T total
-* Peak heap memory usage (MiB): N min, N max, N mean
-* Output num rows per block: N min, N max, N mean, N total
-* Output size bytes per block: N min, N max, N mean, N total
-* Output rows per task: N min, N max, N mean, N tasks used
-* Tasks per node: N min, N max, N mean; N nodes used
-* Extra metrics: {STANDARD_EXTRA_METRICS}
-"""
-        )
-    else:
-        assert (
-            stats
-            == """Operator N Read->Map(<lambda>): {EXECUTION_STRING}
-* Remote wall time: T min, T max, T mean, T total
-* Remote cpu time: T min, T max, T mean, T total
-* Peak heap memory usage (MiB): N min, N max, N mean
-* Output num rows per block: N min, N max, N mean, N total
-* Output size bytes per block: N min, N max, N mean, N total
-* Output rows per task: N min, N max, N mean, N tasks used
-* Tasks per node: N min, N max, N mean; N nodes used
-"""
-        )
+    assert stats == (
+        f"Operator N ReadParquet->Map(<lambda>): {EXECUTION_STRING}\n"
+        f"* Remote wall time: T min, T max, T mean, T total\n"
+        f"* Remote cpu time: T min, T max, T mean, T total\n"
+        f"* Peak heap memory usage (MiB): N min, N max, N mean\n"
+        f"* Output num rows per block: N min, N max, N mean, N total\n"
+        f"* Output size bytes per block: N min, N max, N mean, N total\n"
+        f"* Output rows per task: N min, N max, N mean, N tasks used\n"
+        f"* Tasks per node: N min, N max, N mean; N nodes used\n"
+        f"* Extra metrics: {STANDARD_EXTRA_METRICS}\n"
+    )
 
 
 def test_dataset_split_stats(ray_start_regular_shared, tmp_path):
-    context = DataContext.get_current()
     ds = ray.data.range(100, parallelism=10).map(column_udf("id", lambda x: x + 1))
     dses = ds.split_at_indices([49])
     dses = [ds.map(column_udf("id", lambda x: x + 1)) for ds in dses]
     for ds_ in dses:
         stats = canonicalize(ds_.materialize().stats())
 
-        if context.new_execution_backend:
-            assert (
-                stats
-                == f"""Operator N ReadRange->Map(<lambda>): {EXECUTION_STRING}
-* Remote wall time: T min, T max, T mean, T total
-* Remote cpu time: T min, T max, T mean, T total
-* Peak heap memory usage (MiB): N min, N max, N mean
-* Output num rows per block: N min, N max, N mean, N total
-* Output size bytes per block: N min, N max, N mean, N total
-* Output rows per task: N min, N max, N mean, N tasks used
-* Tasks per node: N min, N max, N mean; N nodes used
-* Extra metrics: {STANDARD_EXTRA_METRICS}
-
-Operator N Split: {EXECUTION_STRING}
-* Remote wall time: T min, T max, T mean, T total
-* Remote cpu time: T min, T max, T mean, T total
-* Peak heap memory usage (MiB): N min, N max, N mean
-* Output num rows per block: N min, N max, N mean, N total
-* Output size bytes per block: N min, N max, N mean, N total
-* Output rows per task: N min, N max, N mean, N tasks used
-* Tasks per node: N min, N max, N mean; N nodes used
-
-Operator N Map(<lambda>): {EXECUTION_STRING}
-* Remote wall time: T min, T max, T mean, T total
-* Remote cpu time: T min, T max, T mean, T total
-* Peak heap memory usage (MiB): N min, N max, N mean
-* Output num rows per block: N min, N max, N mean, N total
-* Output size bytes per block: N min, N max, N mean, N total
-* Output rows per task: N min, N max, N mean, N tasks used
-* Tasks per node: N min, N max, N mean; N nodes used
-* Extra metrics: {STANDARD_EXTRA_METRICS}
-"""
-            )
-        else:
-            assert (
-                stats
-                == """Operator N Read->Map(<lambda>): {EXECUTION_STRING}
-* Remote wall time: T min, T max, T mean, T total
-* Remote cpu time: T min, T max, T mean, T total
-* Peak heap memory usage (MiB): N min, N max, N mean
-* Output num rows per block: N min, N max, N mean, N total
-* Output size bytes per block: N min, N max, N mean, N total
-* Output rows per task: N min, N max, N mean, N tasks used
-* Tasks per node: N min, N max, N mean; N nodes used
-
-Operator N Split: {EXECUTION_STRING}
-* Remote wall time: T min, T max, T mean, T total
-* Remote cpu time: T min, T max, T mean, T total
-* Peak heap memory usage (MiB): N min, N max, N mean
-* Output num rows per block: N min, N max, N mean, N total
-* Output size bytes per block: N min, N max, N mean, N total
-* Output rows per task: N min, N max, N mean, N tasks used
-* Tasks per node: N min, N max, N mean; N nodes used
-
-Operator N Map(<lambda>): {EXECUTION_STRING}
-* Remote wall time: T min, T max, T mean, T total
-* Remote cpu time: T min, T max, T mean, T total
-* Peak heap memory usage (MiB): N min, N max, N mean
-* Output num rows per block: N min, N max, N mean, N total
-* Output size bytes per block: N min, N max, N mean, N total
-* Output rows per task: N min, N max, N mean, N tasks used
-* Tasks per node: N min, N max, N mean; N nodes used
-"""
-            )
+        assert stats == (
+            f"Operator N ReadRange->Map(<lambda>): {EXECUTION_STRING}\n"
+            f"* Remote wall time: T min, T max, T mean, T total\n"
+            f"* Remote cpu time: T min, T max, T mean, T total\n"
+            f"* Peak heap memory usage (MiB): N min, N max, N mean\n"
+            f"* Output num rows per block: N min, N max, N mean, N total\n"
+            f"* Output size bytes per block: N min, N max, N mean, N total\n"
+            f"* Output rows per task: N min, N max, N mean, N tasks used\n"
+            f"* Tasks per node: N min, N max, N mean; N nodes used\n"
+            f"* Extra metrics: {STANDARD_EXTRA_METRICS}\n"
+            f"\n"
+            f"Operator N Split: {EXECUTION_STRING}\n"
+            f"* Remote wall time: T min, T max, T mean, T total\n"
+            f"* Remote cpu time: T min, T max, T mean, T total\n"
+            f"* Peak heap memory usage (MiB): N min, N max, N mean\n"
+            f"* Output num rows per block: N min, N max, N mean, N total\n"
+            f"* Output size bytes per block: N min, N max, N mean, N total\n"
+            f"* Output rows per task: N min, N max, N mean, N tasks used\n"
+            f"* Tasks per node: N min, N max, N mean; N nodes used\n"
+            f"\n"
+            f"Operator N Map(<lambda>): {EXECUTION_STRING}\n"
+            f"* Remote wall time: T min, T max, T mean, T total\n"
+            f"* Remote cpu time: T min, T max, T mean, T total\n"
+            f"* Peak heap memory usage (MiB): N min, N max, N mean\n"
+            f"* Output num rows per block: N min, N max, N mean, N total\n"
+            f"* Output size bytes per block: N min, N max, N mean, N total\n"
+            f"* Output rows per task: N min, N max, N mean, N tasks used\n"
+            f"* Tasks per node: N min, N max, N mean; N nodes used\n"
+            f"* Extra metrics: {STANDARD_EXTRA_METRICS}\n"
+        )
 
 
 def test_calculate_blocks_stats(ray_start_regular_shared, op_two_block):
-    context = DataContext.get_current()
-    context.optimize_fuse_stages = True
-
     block_params, block_meta_list = op_two_block
     stats = DatasetStats(
         metadata={"Read": block_meta_list},
@@ -890,9 +681,6 @@ def test_calculate_blocks_stats(ray_start_regular_shared, op_two_block):
 
 
 def test_summarize_blocks(ray_start_regular_shared, op_two_block):
-    context = DataContext.get_current()
-    context.optimize_fuse_stages = True
-
     block_params, block_meta_list = op_two_block
     stats = DatasetStats(
         metadata={"Read": block_meta_list},
@@ -984,9 +772,6 @@ def test_get_total_stats(ray_start_regular_shared, op_two_block):
     `DatasetStats.get_max_wall_time()`,
     `DatasetStats.get_total_cpu_time()`,
     `DatasetStats.get_max_heap_memory()`."""
-    context = DataContext.get_current()
-    context.optimize_fuse_stages = True
-
     block_params, block_meta_list = op_two_block
     stats = DatasetStats(
         metadata={"Read": block_meta_list},
@@ -1010,9 +795,6 @@ def test_get_total_stats(ray_start_regular_shared, op_two_block):
     "See: https://github.com/ray-project/ray/pull/40173"
 )
 def test_streaming_stats_full(ray_start_regular_shared, restore_data_context):
-    DataContext.get_current().new_execution_backend = True
-    DataContext.get_current().use_streaming_executor = True
-
     ds = ray.data.range(5, parallelism=5).map(column_udf("id", lambda x: x + 1))
     ds.take_all()
     stats = canonicalize(ds.stats())
diff --git a/python/ray/data/tests/test_streaming_integration.py b/python/ray/data/tests/test_streaming_integration.py
index 499b367c56b83..ef67e8d74ee05 100644
--- a/python/ray/data/tests/test_streaming_integration.py
+++ b/python/ray/data/tests/test_streaming_integration.py
@@ -346,9 +346,6 @@ def consume(self, it, signal_actor, split_index):
     "remove DAG validation for now; see https://github.com/ray-project/ray/pull/37829"
 )
 def test_e2e_option_propagation(ray_start_10_cpus_shared, restore_data_context):
-    DataContext.get_current().new_execution_backend = True
-    DataContext.get_current().use_streaming_executor = True
-
     def run():
         ray.data.range(5, parallelism=5).map(
             lambda x: x, compute=ray.data.ActorPoolStrategy(size=2)
@@ -372,7 +369,6 @@ def _test_hook(fn, args, strategy):
             tasks.append(strategy)
 
     remote_function._task_launch_hook = _test_hook
-    DataContext.get_current().use_streaming_executor = True
     DataContext.get_current().execution_options.preserve_order = True
     DataContext.get_current().large_args_threshold = 0
 
@@ -406,7 +402,6 @@ def func(x):
         ray.get(counter.inc.remote())
         return x
 
-    DataContext.get_current().use_streaming_executor = True
     DataContext.get_current().execution_options.preserve_order = True
 
     # Only take the first item from the iterator.
@@ -444,7 +439,6 @@ def func(x):
         return x
 
     ctx = DataContext.get_current()
-    ctx.use_streaming_executor = True
     ctx.execution_options.resource_limits.object_store_memory = 10000
 
     # Only take the first item from the iterator.
@@ -472,7 +466,6 @@ def test_e2e_liveness_with_output_backpressure_edge_case(
 ):
     # At least one operator is ensured to be running, if the output becomes idle.
     ctx = DataContext.get_current()
-    ctx.use_streaming_executor = True
     ctx.execution_options.preserve_order = True
     ctx.execution_options.resource_limits.object_store_memory = 1
     ds = ray.data.range(10000, parallelism=100).map(lambda x: x, num_cpus=2)
@@ -482,9 +475,6 @@ def test_e2e_liveness_with_output_backpressure_edge_case(
 
 
 def test_e2e_autoscaling_up(ray_start_10_cpus_shared, restore_data_context):
-    DataContext.get_current().new_execution_backend = True
-    DataContext.get_current().use_streaming_executor = True
-
     @ray.remote(max_concurrency=10)
     class Barrier:
         def __init__(self, n, delay=0):
@@ -557,9 +547,6 @@ def __call__(self, x):
 
 
 def test_e2e_autoscaling_down(ray_start_10_cpus_shared, restore_data_context):
-    DataContext.get_current().new_execution_backend = True
-    DataContext.get_current().use_streaming_executor = True
-
     class UDFClass:
         def __call__(self, x):
             time.sleep(1)
@@ -576,9 +563,6 @@ def __call__(self, x):
 
 
 def test_can_pickle(ray_start_10_cpus_shared, restore_data_context):
-    DataContext.get_current().new_execution_backend = True
-    DataContext.get_current().use_streaming_executor = True
-
     ds = ray.data.range(1000000)
     it = iter(ds.iter_batches())
     next(it)
@@ -589,9 +573,6 @@ def test_can_pickle(ray_start_10_cpus_shared, restore_data_context):
 
 
 def test_streaming_fault_tolerance(ray_start_10_cpus_shared, restore_data_context):
-    DataContext.get_current().new_execution_backend = True
-    DataContext.get_current().use_streaming_executor = True
-
     class RandomExit:
         def __call__(self, x):
             import os
diff --git a/release/nightly_tests/dataset/operator_fusion_benchmark.py b/release/nightly_tests/dataset/operator_fusion_benchmark.py
index 002950890e6d7..eb04bc64be609 100644
--- a/release/nightly_tests/dataset/operator_fusion_benchmark.py
+++ b/release/nightly_tests/dataset/operator_fusion_benchmark.py
@@ -127,7 +127,6 @@ def _summarize_results(results: List[Dict[str, float]]) -> Dict[str, float]:
         ),
     )
     parser.add_argument("--target-max-block-size", type=int, default=None)
-    parser.add_argument("--disable-optimizer", action="store_true", default=False)
     parser.add_argument("--num-trials", type=int, default=1)
     args = parser.parse_args()
 
@@ -154,10 +153,6 @@ def _summarize_results(results: List[Dict[str, float]]) -> Dict[str, float]:
 
     ctx = DataContext.get_current()
     ctx.target_max_block_size = target_max_block_size
-    if args.disable_optimizer:
-        ctx.optimizer_enabled = False
-    else:
-        ctx.optimizer_enabled = True
     results = []
     for trial in range(args.num_trials):
         print(f"\n\nRunning trial {trial}\n")