Skip to content

Commit

Permalink
[workflow] Update workflow doc and examples (ray-project#24804)
Browse files Browse the repository at this point in the history
* update doc of workflow options

* update examples and make sure they are working
  • Loading branch information
suquark authored May 16, 2022
1 parent 86422a5 commit 2766284
Showing 17 changed files with 110 additions and 54 deletions.
20 changes: 15 additions & 5 deletions doc/source/workflows/advanced.rst
Original file line number Diff line number Diff line change
@@ -11,6 +11,9 @@ For example, this recursive workflow calculates the exponent. We write it with w
.. code-block:: python
:caption: Workflow without inplace execution:
import ray
from ray import workflow
@ray.remote
def exp_remote(k, n):
if n == 0:
@@ -22,11 +25,18 @@ We could optimize it with inplace option:
.. code-block:: python
:caption: Workflow with inplace execution:
def exp_inplace(k, n, worker_id=None):
import ray
from ray import workflow
@ray.remote
def exp_inplace(k, n):
if n == 0:
return k
return workflow.continuation(exp_inplace.options(allow_inplace=True).bind(
2 * k, n - 1, worker_id))
return workflow.continuation(
exp_inplace.options(**workflow.options(allow_inplace=True)).bind(2 * k, n - 1))
assert workflow.create(exp_inplace.bind(3, 7)).run() == 3 * 2 ** 7
With ``allow_inplace=True``, the task that called ``.bind()`` executes in the function. Ray options are ignored because they are used for remote execution. Also, you cannot retrieve the output of an inplace task using ``workflow.get_output()`` before it finishes execution.

@@ -37,7 +47,7 @@ Inplace is also useful when you need to pass something that is only valid in the
@ray.remote
def foo():
x = "<something that is only valid in the current process>"
return workflow.continuation(bar.options(allow_inplace=True).bind(x))
return workflow.continuation(bar.options(**workflow.options(allow_inplace=True)).bind(x))
Wait for Partial Results
@@ -78,7 +88,7 @@ We control the checkpoints by specify the checkpoint options like this:

.. code-block:: python
data = read_data.options(checkpoint=False).bind(10)
data = read_data.options(**workflow.options(checkpoint=False)).bind(10)
This example skips checkpointing the output of ``read_data``. During recovery, ``read_data`` would be executed again if recovery requires its output.

64 changes: 57 additions & 7 deletions doc/source/workflows/basics.rst
Original file line number Diff line number Diff line change
@@ -68,6 +68,27 @@ Each node in the original DAG becomes a workflow task.
Workflow tasks behave similarly to Ray tasks. They are executed in a parallel and distributed way.


Setting workflow options
------------------------

You can directly set Ray options to a workflow task just like to a normal
Ray remote function. To set workflow-specific options, you can use ``workflow.options``
either as a decorator or as a option feeding dictionary:

.. code-block:: python
import ray
from ray import workflow
@workflow.options(checkpoint=False)
@ray.remote(num_cpus=2, num_gpus=3)
def read_data(num: int):
return [i for i in range(num)]
read_data_with_options = read_data.options(
num_cpus=1, num_gpus=1, **workflow.options(checkpoint=True))
Retrieving results
------------------

@@ -78,6 +99,12 @@ To retrieve a workflow result, you can assign ``workflow_id`` when running a wor
import ray
from ray import workflow
try:
# cleanup previous workflows
workflow.delete("add_example")
except Exception:
pass
@ray.remote
def add(left: int, right: int) -> int:
return left + right
@@ -98,19 +125,29 @@ Then workflow results can be retrieved with ``workflow.get_output(workflow_id) -
We can retrieve the results for individual workflow tasks too with *named tasks*. A task can be named in two ways:

1) via ``.options(name="task_name")``
1) via ``.options(**workflow.options(name="task_name"))``
2) via decorator ``@workflow.options(name="task_name"``)

Once a task is given a name, the result of the task will be retrievable via ``workflow.get_output(workflow_id, name="task_name")``. If the task with the given name hasn't been executed yet, an exception will be thrown. Here are some examples:

.. code-block:: python
# TODO(suquark): Fix this example
import ray
from ray import workflow
try:
# cleanup previous workflows
workflow.delete("double")
except Exception:
pass
@ray.remote
def double(v):
return 2 * v
inner_task = double.options(name="inner").bind(1)
outer_task = double.options(name="outer").bind(inner_task)
inner_task = double.options(**workflow.options(name="inner")).bind(1)
outer_task = double.options(**workflow.options(name="outer")).bind(inner_task)
result = workflow.create(outer_task).run_async("double")
inner = workflow.get_output("double", name="inner")
outer = workflow.get_output("double", name="outer")
@@ -123,6 +160,16 @@ If there are multiple tasks with the same name, a suffix with a counter ``_n`` w

.. code-block:: python
# TODO(suquark): Fix this example
import ray
from ray import workflow
try:
# cleanup previous workflows
workflow.delete("double")
except Exception:
pass
@workflow.options(name="double")
@ray.remote
def double(s):
@@ -137,7 +184,7 @@ If there are multiple tasks with the same name, a suffix with a counter ``_n`` w
assert ray.get(inner) == 2
assert ray.get(outer) == 4
assert ray.get(result) == 4
assert result == 4
By default, each task will be given a name generated by the library, ``<WORKFLOW_ID>.<MODULE_NAME>.<FUNC_NAME>``.

@@ -151,6 +198,9 @@ The following error handling flags can be either set in the task decorator or vi

.. code-block:: python
from typing import Tuple
import random
import ray
from ray import workflow
@@ -161,7 +211,7 @@ The following error handling flags can be either set in the task decorator or vi
return "OK"
# Tries up to three times before giving up.
r1 = faulty_function.options(max_retries=5).bind()
r1 = faulty_function.options(**workflow.options(max_retries=5)).bind()
workflow.create(r1).run()
@ray.remote
@@ -174,7 +224,7 @@ The following error handling flags can be either set in the task decorator or vi
return "OK"
# `handle_errors` receives a tuple of (result, exception).
r2 = faulty_function.options(catch_exceptions=True).bind()
r2 = faulty_function.options(**workflow.options(catch_exceptions=True)).bind()
workflow.create(handle_errors.bind(r2)).run()
- If ``max_retries`` is given, the task will be retried for the given number of times if an exception is raised. It will only retry for the application level error. For system errors, it's controlled by ray. By default, ``max_retries`` is set to be 3.
10 changes: 5 additions & 5 deletions doc/source/workflows/metadata.rst
Original file line number Diff line number Diff line change
@@ -32,7 +32,7 @@ providing the task name:

.. code-block:: python
workflow.create(add.options(name="add_task").bind(10, 20)).run("add_example_2")
workflow.create(add.options(**workflow.options(name="add_task")).bind(10, 20)).run("add_example_2")
task_metadata = workflow.get_metadata("add_example_2", name="add_task")
@@ -47,11 +47,11 @@ which is useful when you want to attach some extra information to the
workflow or workflow task.

- workflow-level metadata can be added via ``.run(metadata=metadata)``
- task-level metadata can be added via ``.options(metadata=metadata)`` or in the decorator ``@workflow.options(metadata=metadata)``
- task-level metadata can be added via ``.options(**workflow.options(metadata=metadata))`` or in the decorator ``@workflow.options(metadata=metadata)``

.. code-block:: python
workflow.create(add.options(name="add_task", metadata={"task_k": "task_v"}).bind(10, 20))\
workflow.create(add.options(**workflow.options(name="add_task", metadata={"task_k": "task_v"})).bind(10, 20))\
.run("add_example_3", metadata={"workflow_k": "workflow_v"})
assert workflow.get_metadata("add_example_3")["user_metadata"] == {"workflow_k": "workflow_v"}
@@ -128,9 +128,9 @@ Available Metrics
- user_metadata: a python dictionary of custom metadata by the user via ``workflow.run()``.
- stats: workflow running stats, including workflow start time and end time.

**Step level**
**Task level**

- name: name of the task, either provided by the user via ``task.options()`` or generated by the system.
- name: name of the task, either provided by the user via ``task.options(**workflow.options(name=xxx))`` or generated by the system.
- task_options: options of the task, either provided by the user via ``task.options()`` or default by system.
- user_metadata: a python dictionary of custom metadata by the user via ``task.options()``.
- stats: task running stats, including task start time and end time.
Original file line number Diff line number Diff line change
@@ -26,7 +26,6 @@ def load(data_dict: dict) -> str:


if __name__ == "__main__":
workflow.init()
order_data = extract.bind()
order_summary = transform.bind(order_data)
etl = load.bind(order_summary)
14 changes: 7 additions & 7 deletions python/ray/workflow/examples/comparisons/argo/dag_workflow.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import ray
from ray import workflow


@workflow.step
@ray.remote
def echo(msg: str, *deps) -> None:
print(msg)


if __name__ == "__main__":
workflow.init()
A = echo.options(name="A").step("A")
B = echo.options(name="B").step("B", A)
C = echo.options(name="C").step("C", A)
D = echo.options(name="D").step("D", A, B)
D.run()
A = echo.options(**workflow.options(name="A")).bind("A")
B = echo.options(**workflow.options(name="B")).bind("B", A)
C = echo.options(**workflow.options(name="C")).bind("C", A)
D = echo.options(**workflow.options(name="D")).bind("D", A, B)
workflow.create(D).run()
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@
from ray import workflow


@workflow.step
@ray.remote
def intentional_fail() -> str:
raise RuntimeError("oops")

@@ -24,7 +24,7 @@ def send_email(result: str) -> None:
print("Sending email", result)


@workflow.step
@ray.remote
def exit_handler(res: Tuple[Optional[str], Optional[Exception]]) -> None:
result, error = res
email = send_email.bind(f"Raw result: {result}, {error}")
@@ -41,6 +41,5 @@ def wait_all(*deps):


if __name__ == "__main__":
workflow.init()
res = intentional_fail.options(catch_exceptions=True).step()
print(exit_handler.step(res).run())
res = intentional_fail.options(**workflow.options(catch_exceptions=True)).bind()
print(workflow.create(exit_handler.bind(res)).run())
Original file line number Diff line number Diff line change
@@ -9,5 +9,4 @@ def hello(msg: str) -> None:


if __name__ == "__main__":
workflow.init()
workflow.create(hello.bind("hello world")).run()
Original file line number Diff line number Diff line change
@@ -13,7 +13,6 @@ def wait_all(*args) -> None:


if __name__ == "__main__":
workflow.init()
children = []
for msg in ["hello world", "goodbye world"]:
children.append(hello.bind(msg))
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import ray
from ray import workflow


@workflow.step
@ray.remote
def hello(msg: str, *deps) -> None:
print(msg)


@workflow.step
@ray.remote
def wait_all(*args) -> None:
pass


if __name__ == "__main__":
workflow.init()
h1 = hello.options(name="hello1").step("hello1")
h2a = hello.options(name="hello2a").step("hello2a")
h2b = hello.options(name="hello2b").step("hello2b", h2a)
wait_all.step(h1, h2b).run()
h1 = hello.options(**workflow.options(name="hello1")).bind("hello1")
h2a = hello.options(**workflow.options(name="hello2a")).bind("hello2a")
h2b = hello.options(**workflow.options(name="hello2b")).bind("hello2b", h2a)
workflow.create(wait_all.bind(h1, h2b)).run()
Original file line number Diff line number Diff line change
@@ -28,5 +28,4 @@ def decide(heads: bool) -> str:


if __name__ == "__main__":
workflow.init()
print(workflow.create(flip_coin.bind()).run())
23 changes: 15 additions & 8 deletions python/ray/workflow/examples/comparisons/argo/retry_workflow.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from typing import Any, Tuple, Optional

import ray
from ray import workflow


@workflow.step
@ray.remote
def flaky_step() -> str:
import random

@@ -13,11 +14,11 @@ def flaky_step() -> str:
return "ok"


@workflow.step
@ray.remote
def custom_retry_strategy(func: Any, num_retries: int, delay_s: int) -> str:
import time

@workflow.step
@ray.remote
def handle_result(res: Tuple[Optional[str], Optional[Exception]]) -> str:
result, error = res
if result:
@@ -27,15 +28,21 @@ def handle_result(res: Tuple[Optional[str], Optional[Exception]]) -> str:
else:
print("Retrying exception after delay", error)
time.sleep(delay_s)
return custom_retry_strategy.step(func, num_retries - 1, delay_s)
return workflow.continuation(
custom_retry_strategy.bind(func, num_retries - 1, delay_s)
)

res = func.options(catch_exceptions=True).step()
return handle_result.step(res)
res = func.options(**workflow.options(catch_exceptions=True)).bind()
return workflow.continuation(handle_result.bind(res))


if __name__ == "__main__":
workflow.init()
# Default retry strategy.
print(flaky_step.options(max_retries=10).step().run())
print(
workflow.create(
flaky_step.options(**workflow.options(max_retries=10)).bind()
).run()
)
# Custom strategy.
print(custom_retry_strategy.step(flaky_step, 10, 1).run())
print(workflow.create(custom_retry_strategy.bind(flaky_step, 10, 1)).run())
Original file line number Diff line number Diff line change
@@ -13,6 +13,5 @@ def main_workflow(name: str) -> str:


if __name__ == "__main__":
workflow.init()
wf = workflow.create(main_workflow.bind("Alice"))
print(wf.run())
Original file line number Diff line number Diff line change
@@ -12,5 +12,4 @@ def iterate(array: List[str], result: str, i: int) -> str:


if __name__ == "__main__":
workflow.init()
print(workflow.create(iterate.bind(["foo", "ba", "r"], "", 0)).run())
Original file line number Diff line number Diff line change
@@ -38,5 +38,4 @@ def decide(result: int) -> str:


if __name__ == "__main__":
workflow.init()
print(workflow.create(decide.bind(get_size.bind())).run())
Original file line number Diff line number Diff line change
@@ -18,7 +18,6 @@ def report(msg: str) -> None:


if __name__ == "__main__":
workflow.init()
r1 = hello.bind("Kristof")
r2 = report.bind(r1)
workflow.create(r2).run()
Loading

0 comments on commit 2766284

Please sign in to comment.