Skip to content

Commit

Permalink
[workflow] Workflow queue (ray-project#24697)
Browse files Browse the repository at this point in the history
* implement workflow queue
  • Loading branch information
suquark authored Jul 9, 2022
1 parent d234348 commit b0e913f
Show file tree
Hide file tree
Showing 10 changed files with 481 additions and 129 deletions.
21 changes: 18 additions & 3 deletions doc/source/workflows/management.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ Each workflow has a unique ``workflow_id``. By default, when you call ``.run()``

If ``.run()`` is called with a previously used workflow id, the workflow will be resumed from the previous execution.

Workflow States
Workflow Status
---------------
A workflow can be in one of several states:
A workflow can be in one of several status:

=================== =======================================================================================
Status Description
=================== =======================================================================================
RUNNING The workflow is currently running in the cluster.
PENDING The workflow is queued and waited to be executed.
FAILED This workflow failed with an application error. It can be resumed from the failed task.
RESUMABLE This workflow failed with a system error. It can be resumed from the failed task.
CANCELED The workflow was canceled. Its result is unavailable, and it cannot be resumed.
Expand Down Expand Up @@ -82,12 +83,26 @@ Storage Configuration
Workflows supports two types of storage backends out of the box:

* Local file system: the data is stored locally. This is only for single node testing. It needs to be a NFS to work with multi-node clusters. To use local storage, specify ``ray.init(storage="/path/to/storage_dir")``.
* S3: Production users should use S3 as the storage backend. Enable S3 storage with ``r.init(storage="s3://bucket/path")``.
* S3: Production users should use S3 as the storage backend. Enable S3 storage with ``ray.init(storage="s3://bucket/path")``.

Additional storage backends can be written by subclassing the ``Storage`` class and passing a storage instance to ``ray.init()`` [TODO: note that the Storage API is not currently stable].

If left unspecified, ``/tmp/ray/workflow_data`` will be used for temporary storage. This default setting *will only work for single-node Ray clusters*.

Concurrency Control
-------------------
Ray workflow supports concurrency control. You can support the maximum running workflows and maximum pending workflows via ``workflow.init()``
before executing any workflow. ``workflow.init()`` again with a different configuration would raise an error.

For example, ``workflow.init(max_running_workflows=10, max_pending_workflows=50)`` means there will be at most 10 workflows running, 50 workflows pending.
Submitting workflows when the number of pending workflows are at maximum would raise ``queue.Full("Workflow queue has been full")``. Getting the output of a pending workflow would be blocking until the workflow finishes running later.

A pending workflows has the ``PENDING`` status. After the pending workflow get interrupted (e.g., a cluster failure), it can be resumed.
When resuming interrupted workflows that were running and pending with ``workflow.resume_all()``, running workflows have higher priority than pending workflows (i.e. the pending workflows would still likely be pending).

.. note::

We currently does not guarantee that resumed pending workflows are running in the same order as they originally did.

Handling Dependencies
---------------------
Expand Down
59 changes: 46 additions & 13 deletions python/ray/workflow/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,33 +29,65 @@
logger = logging.getLogger(__name__)


_is_workflow_initialized = False


@PublicAPI(stability="beta")
def init() -> None:
def init(
*,
max_running_workflows: Optional[int] = None,
max_pending_workflows: Optional[int] = None,
) -> None:
"""Initialize workflow.
If Ray is not initialized, we will initialize Ray and
use ``/tmp/ray/workflow_data`` as the default storage.
Args:
max_running_workflows: The maximum number of concurrently running workflows.
Use -1 as infinity. 'None' means preserving previous setting or initialize
the setting with infinity.
max_pending_workflows: The maximum number of queued workflows.
Use -1 as infinity. 'None' means preserving previous setting or initialize
the setting with infinity.
"""
usage_lib.record_library_usage("workflow")

if max_running_workflows is not None:
if not isinstance(max_running_workflows, int):
raise TypeError("'max_running_workflows' must be None or an integer.")
if max_running_workflows < -1 or max_running_workflows == 0:
raise ValueError(
"'max_running_workflows' must be a positive integer "
"or use -1 as infinity."
)
if max_pending_workflows is not None:
if not isinstance(max_pending_workflows, int):
raise TypeError("'max_pending_workflows' must be None or an integer.")
if max_pending_workflows < -1:
raise ValueError(
"'max_pending_workflows' must be a non-negative integer "
"or use -1 as infinity."
)

if not ray.is_initialized():
# We should use get_temp_dir_path, but for ray client, we don't
# have this one. We need a flag to tell whether it's a client
# or a driver to use the right dir.
# For now, just use /tmp/ray/workflow_data
ray.init(storage="file:///tmp/ray/workflow_data")
workflow_access.init_management_actor()
workflow_access.init_management_actor(max_running_workflows, max_pending_workflows)
serialization.init_manager()
global _is_workflow_initialized
_is_workflow_initialized = True


def _ensure_workflow_initialized() -> None:
if not _is_workflow_initialized or not ray.is_initialized():
# NOTE: Trying to get the actor has a side effect: it initializes Ray with
# default arguments. This is different in "init()": it assigns a temporary
# storage. This is why we need to check "ray.is_initialized()" first.
if not ray.is_initialized():
init()
else:
try:
workflow_access.get_management_actor()
except ValueError:
init()


@PublicAPI(stability="beta")
Expand Down Expand Up @@ -232,14 +264,15 @@ def list_all(
Union[Union[WorkflowStatus, str], Set[Union[WorkflowStatus, str]]]
] = None
) -> List[Tuple[str, WorkflowStatus]]:
"""List all workflows matching a given status filter.
"""List all workflows matching a given status filter. When returning "RESUMEABLE"
workflows, the workflows that was running ranks before the workflow that was pending
in the result list.
Args:
status: If given, only returns workflow with that status. This can
status_filter: If given, only returns workflow with that status. This can
be a single status or set of statuses. The string form of the
status is also acceptable, i.e.,
"RUNNING"/"FAILED"/"SUCCESSFUL"/"CANCELED"/"RESUMABLE".
"RUNNING"/"FAILED"/"SUCCESSFUL"/"CANCELED"/"RESUMABLE"/"PENDING".
Examples:
>>> from ray import workflow
>>> long_running_job = ... # doctest: +SKIP
Expand Down Expand Up @@ -283,7 +316,7 @@ def list_all(


@PublicAPI(stability="beta")
def resume_all(include_failed: bool = False) -> Dict[str, ray.ObjectRef]:
def resume_all(include_failed: bool = False) -> List[Tuple[str, ray.ObjectRef]]:
"""Resume all resumable workflow jobs.
This can be used after cluster restart to resume all tasks.
Expand Down
8 changes: 7 additions & 1 deletion python/ray/workflow/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from ray import cloudpickle
from enum import Enum, unique
import hashlib
from typing import Dict, Optional, Any
from typing import Dict, Optional, Any, Tuple

from dataclasses import dataclass

Expand Down Expand Up @@ -97,6 +97,12 @@ class WorkflowStatus(str, Enum):
# The workflow failed with a system error, i.e., ray shutdown.
# It can be resumed.
RESUMABLE = "RESUMABLE"
# The workflow is queued and waited to be executed.
PENDING = "PENDING"

@classmethod
def non_terminating_status(cls) -> "Tuple[WorkflowStatus, ...]":
return cls.RUNNING, cls.PENDING


@unique
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,24 @@
import requests


def fibonacci(n):
assert n > 0
a, b = 0, 1
for _ in range(n - 1):
a, b = b, a + b
return b


@ray.remote
def compute_large_fib(M: int, n: int = 1, fib: int = 1):
next_fib = requests.post(
"https://nemo.api.stdlib.com/[email protected]/", data={"nth": n}
).json()
try:
next_fib = requests.post(
"https://nemo.api.stdlib.com/[email protected]/", data={"nth": n}
).json()
assert isinstance(next_fib, int)
except AssertionError:
# TODO(suquark): The web service would fail sometimes. This is a workaround.
next_fib = fibonacci(n)
if next_fib > M:
return fib
else:
Expand Down
Loading

0 comments on commit b0e913f

Please sign in to comment.