Skip to content

Commit

Permalink
[Datasets] Always preserve order for the BulkExecutor. (ray-project#3…
Browse files Browse the repository at this point in the history
…2437)

This PR always preserves order for the bulk executor. We may revisit this in the future, at which point we'd update all of the tests that rely on order preservation.

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for https://docs.ray.io/en/master/.
- [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(
  • Loading branch information
clarkzinzow authored Feb 14, 2023
1 parent 91940e3 commit 71dfd20
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 2 deletions.
2 changes: 2 additions & 0 deletions python/ray/data/_internal/execution/bulk_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class BulkExecutor(Executor):
"""

def __init__(self, options: ExecutionOptions):
# Bulk executor always preserves order.
options.preserve_order = True
super().__init__(options)
self._stats: Optional[DatasetStats] = DatasetStats(stages={}, parent=None)
self._executed = False
Expand Down
3 changes: 2 additions & 1 deletion python/ray/data/_internal/execution/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ class ExecutionOptions:
# node (node driving the execution).
locality_with_output: bool = False

# Set this to preserve the ordering between blocks processed by operators.
# Set this to preserve the ordering between blocks processed by operators under the
# streaming executor. The bulk executor always preserves order.
preserve_order: bool = False

# Whether to enable locality-aware task dispatch to actors (on by default).
Expand Down
13 changes: 12 additions & 1 deletion python/ray/data/tests/test_bulk_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,18 @@ def ref_bundles_to_list(bundles: List[RefBundle]) -> List[List[Any]]:
return output


@pytest.mark.parametrize("preserve_order", [False, True])
@pytest.mark.parametrize(
"preserve_order",
[
True,
pytest.param(
False,
marks=pytest.mark.skip(
reason="Bulk executor currently always preserves order"
),
),
],
)
def test_multi_stage_execution(ray_start_10_cpus_shared, preserve_order):
executor = BulkExecutor(ExecutionOptions(preserve_order=preserve_order))
inputs = make_ref_bundles([[x] for x in range(20)])
Expand Down

0 comments on commit 71dfd20

Please sign in to comment.