diff --git a/docs/source/examples.rst b/docs/source/examples.rst index d451a368..7ffcb2e9 100644 --- a/docs/source/examples.rst +++ b/docs/source/examples.rst @@ -10,4 +10,5 @@ Examples examples/validating_agent_output examples/advanced_output_handling examples/streaming_agent_output + examples/event_driven autogen diff --git a/examples/Event-driven orchestration for AI systems.ipynb b/examples/Event-driven orchestration for AI systems.ipynb new file mode 100644 index 00000000..94d407b0 --- /dev/null +++ b/examples/Event-driven orchestration for AI systems.ipynb @@ -0,0 +1,462 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Event-driven orchestration for AI systems. Is LlamaIndex Workflows basically Kafka Streams?\n", + "\n", + "Data processing tools like Kafka, Spark, Flink etc. have long been used to build workflows. Today, as people create complex AI systems, they often find themselves (surprise!) needing orchestration frameworks. Is the field really so specific it needs new tools for solving the same problems? We argue that there is no difference: after all, LLMs are just APIs, and the fundamental patterns of workflow architecture remain the same.\n", + "\n", + "A widespread data processing paradigm is event-driven architecture. The purpose of this example is to demonstrate how you can use tried-and-tested tools like Kafka Streams (ported to Python via Faust) to build workflows, side-stepping the need to re-invent the wheel for something as fundamental as event processing. This also comes with reliability and scalability of Kafka, easing production deployment.\n", + "\n", + "We'll walk through the basic concepts of the newly released LlamaIndex Workflows library, and compare them to what you get with Faust.\n", + "\n", + "To get a basic understanding of event-driven workflows, we suggest you check out this guide to LlamaIndex Workflows: https://docs.llamaindex.ai/en/stable/module_guides/workflow/\n", + "\n", + "Here we are using the well-maintained fork of Faust: https://github.com/faust-streaming/faust" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%pip install faust-streaming llama-index -U" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The first example is a simple workflow with two steps:" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Analysis:\n", + "This joke plays on the pun of \"fish and ships\" sounding like \"fish and chips,\" a popular dish at seafood restaurants. The joke also incorporates the pirate theme by referencing ships, which is a common element in pirate lore.\n", + "\n", + "Critique:\n", + "Overall, this joke is light-hearted and playful, making it suitable for a general audience. The use of wordplay adds a clever twist to the punchline, making it more engaging for the listener. However, the joke may be considered somewhat predictable as the setup leads directly to the pun, leaving little room for surprise or unexpected humor. Additionally, the joke relies heavily on the pun itself, which may not appeal to all audiences. Overall, while the joke is amusing and well-crafted, it may not be considered particularly original or innovative.\n" + ] + } + ], + "source": [ + "from llama_index.core.workflow import Event, StartEvent, StopEvent, Workflow, step\n", + "\n", + "# `pip install llama-index-llms-openai` if you don't already have it\n", + "from llama_index.llms.openai import OpenAI\n", + "\n", + "\n", + "class JokeEvent(Event):\n", + " joke: str\n", + "\n", + "\n", + "class JokeFlow(Workflow):\n", + " llm = OpenAI()\n", + "\n", + " @step\n", + " async def generate_joke(self, ev: StartEvent) -> JokeEvent:\n", + " topic = ev.topic\n", + "\n", + " prompt = f\"Write your best joke about {topic}.\"\n", + " response = await self.llm.acomplete(prompt)\n", + " return JokeEvent(joke=str(response))\n", + "\n", + " @step\n", + " async def critique_joke(self, ev: JokeEvent) -> StopEvent:\n", + " joke = ev.joke\n", + "\n", + " prompt = f\"Give a thorough analysis and critique of the following joke: {joke}\"\n", + " response = await self.llm.acomplete(prompt)\n", + " return StopEvent(result=str(response))\n", + "\n", + "\n", + "w = JokeFlow(timeout=60, verbose=False)\n", + "result = await w.run(topic=\"pirates\")\n", + "print(str(result))\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Let's try do do the same thing with Faust.\n", + "\n", + "Instead of different event classes, we'll use [channels](https://faust-streaming.github.io/faust/userguide/channels.html) to pass messages between our workflow steps. In this example, we just need to pass single strings, so we'll use a single container type for that. If you want, you of course can use more complex types, including custom classes, dataclasses, etc.\n", + "\n", + "Faust is stream-based, which means it will indefinitely process messages that are sent to the streams in an async loop until the app shuts down. Of course, concurrency also comes out of the box.\n", + "\n", + "The catch with Faust is that it requires a Kafka broker (it's the only dependency). Still, it's quite easy to set up (see https://github.com/wurstmeister/kafka-docker), and your organization is probably already using Kafka for something else.\n", + "\n", + "If you're just playing around, you can execute the following cell to start a Kafka broker in a Docker container." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "55d2b52b9a9ccc72fdfec3122223789b598c10d6009f8660063914434915d338\n" + ] + } + ], + "source": [ + "!docker run -d --name kafka-zookeeper \\\n", + " -p 2181:2181 -p 9092:9092 \\\n", + " -e ADVERTISED_HOST=127.0.0.1 -e NUM_PARTITIONS=10 \\\n", + " -e ZOOKEEPER_HOST=127.0.0.1 \\\n", + " johnnypark/kafka-zookeeper\n", + "\n", + "import time\n", + "time.sleep(30) # Wait for the broker to start" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "import asyncio\n", + "import faust\n", + "from llama_index.llms.openai import OpenAI\n", + "\n", + "\n", + "app = faust.App('joke-flow', broker='kafka://localhost:9092', web_enabled=False)\n", + "\n", + "class StringRecord(faust.Record):\n", + " content: str\n", + "\n", + "# We create channels for passing messages between our workflow steps, just like we did with the events before.\n", + "# Start and end channels will be used as entry and exit points, respectively.\n", + "start_channel = app.channel(value_type=StringRecord)\n", + "joke_channel = app.channel(value_type=StringRecord)\n", + "end_channel = app.channel(value_type=StringRecord)\n", + "\n", + "llm = OpenAI()\n", + "results = asyncio.Queue() # For collecting results (in practice, you'd use a database or something)\n", + "\n", + "@app.agent(start_channel, sink=[joke_channel]) # Specify the input and output channels\n", + "async def generate_joke(requests):\n", + " async for request in requests: # Use an async loop to process incoming messages\n", + " prompt = f\"Write your best joke about {request.content}.\"\n", + " response = await llm.acomplete(prompt)\n", + " yield StringRecord(content=str(response))\n", + "\n", + "@app.agent(joke_channel, sink=[end_channel]) # We can even set the sink directly to another agent, like [handle_end]\n", + "async def critique_joke(jokes):\n", + " async for joke in jokes:\n", + " prompt = f\"Give a thorough analysis and critique of the following joke: {joke.content}\"\n", + " response = await llm.acomplete(prompt)\n", + " yield StringRecord(content=str(response))\n", + "\n", + "@app.agent(end_channel)\n", + "async def handle_end(events):\n", + " async for event in events:\n", + " await results.put(event.content) # Put the result in the queue" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "To have more control over what channels to use, you can send the messages explicitly without specifying the `sink` parameter:\n", + "\n", + "```python\n", + "@app.agent(start_channel)\n", + "async def generate_joke(requests):\n", + " async for request in requests:\n", + " ...\n", + " await joke_channel.send(value=StringRecord(content=str(response)))\n", + "```\n", + "\n", + "By the way, you can also use Kafka topics in place of channels:\n", + "\n", + "```python\n", + "topic = app.topic('my-topic', value_type=...)\n", + "@app.agent(topic)\n", + "async def process_stream(stream):\n", + " async for record in stream:\n", + " ...\n", + "```\n", + "\n", + "Let's run the example:" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Analysis:\n", + "This joke plays on the pun of \"fish and ships\" sounding like \"fish and chips,\" a popular dish at seafood restaurants. The joke also incorporates the pirate theme by referencing ships, which is a common element in pirate lore. The humor comes from the unexpected twist on a familiar phrase and the clever wordplay.\n", + "\n", + "Critique:\n", + "Overall, this joke is light-hearted and fun, making it suitable for a wide audience. The use of wordplay is clever and adds an element of surprise to the punchline. However, the joke may be considered somewhat predictable as the setup hints at the punchline with the mention of ships. Additionally, the humor may not be particularly sophisticated or nuanced, which could limit its appeal to some audiences. Overall, while this joke is entertaining and likely to elicit a chuckle, it may not be memorable or particularly original in the long run.\n" + ] + } + ], + "source": [ + "await app.start()\n", + "await start_channel.send(value=StringRecord(content=\"pirates\"))\n", + "result = await results.get() # Get the first and only result from the queue\n", + "await app.stop()\n", + "\n", + "print(result)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Using context\n", + "\n", + "A useful feature of LlamaIndex Workflows is the ability to pass context between steps. Let's see how you can achieve the same effect with Faust.\n", + "\n", + "The simplest way is to pass the context around in a record:\n", + "\n", + "```python\n", + "class Context(faust.Record):\n", + " user_id: str\n", + " query: str\n", + " ...\n", + "\n", + "class RecordWithContext(faust.Record):\n", + " ...\n", + " context: Context\n", + "\n", + "@app.agent\n", + "async def process_stream(stream):\n", + " async for record in stream:\n", + " ...\n", + " # Pass the context to the next agent\n", + " await next_stream.send(value=RecordWithContext(..., context=record.context))\n", + "```\n", + "\n", + "What about persisting state? We can use Faust's [tables](https://faust-streaming.github.io/faust/userguide/tables.html) for that. Tables are distributed key-value stores that can be used as regular Python dictionaries.\n", + "\n", + "```python\n", + "class RecordWithContext(faust.Record):\n", + " ...\n", + " context_id: str\n", + "\n", + "context_table = app.Table('context', default=dict)\n", + "\n", + "@app.agent\n", + "async def process_stream(stream):\n", + " async for record in stream:\n", + " context = context_table.get(record.context_id)\n", + " ...\n", + " context[\"new_key\"] = \"new_value\"\n", + " context_table[record.context_id] = context\n", + " await next_stream.send(value=record)\n", + "```\n", + "\n", + "Refer to Faust [user guide](https://faust-streaming.github.io/faust/userguide/index.html) to learn about these and other more advanced Faust capabilities, like timers, web views, statistics, etc.\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "\n", + "### Waiting for multiple events\n", + "\n", + "LlamaIndex Context provides a handy `collect_events` method to wait for multiple events. We can replicate the same behavior with Faust by using a table to store the incoming records and then checking if we have received all the required ones. This gives you even more flexibility on how you want to join the events and trigger the next step.\n", + "\n", + "Here's a simple example:\n", + "\n", + "```python\n", + "# The channels for the records we're waiting for\n", + "apples_channel = app.channel(value_type=AppleRecord)\n", + "oranges_channel = app.channel(value_type=OrangeRecord)\n", + "\n", + "# Let's create tables to buffer the records we've received\n", + "apples_buffer = app.Table('apples_buffer', default=lambda: None)\n", + "oranges_buffer = app.Table('oranges_buffer', default=lambda: None)\n", + "\n", + "\n", + "async def try_process_fruits(context_id):\n", + " apples = apples_buffer.get(context_id)\n", + " oranges = oranges_buffer.get(context_id)\n", + " if apples is not None and oranges is not None:\n", + " # Do something with apples and oranges\n", + " ...\n", + "\n", + " del apples_buffer[context_id]\n", + " del oranges_buffer[context_id]\n", + "\n", + "\n", + "@app.agent(apples_channel)\n", + "async def process_first_event(stream):\n", + " async for record in stream:\n", + " apples_buffer[record.id] = record\n", + " await try_process_fruits(record.id)\n", + "\n", + "@app.agent(oranges_channel)\n", + "async def process_second_event(stream):\n", + " async for record in stream:\n", + " oranges_buffer[record.id] = record\n", + " await try_process_fruits(record.id)\n", + "\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Constructing simple workflows in a concise manner using FaustWorkflow\n", + "\n", + "We've created a wrapper around Faust that makes it easy to construct simple workflows in a LlamaIndex-Workflows-like manner, powered by event classes and type annotations. It also allows you to visualize the workflow.\n", + "\n", + "This is our side project, and serves mainly as a proof of concept. If you like the idea and have suggestions for improvement, please let us know!\n", + "\n", + "Here's how you can implement the joke workflow from the previous section using FaustWorkflow:" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/whimo/motleycrew/motleycrew/tools/__init__.py:3: LangChainDeprecationWarning: As of langchain-core 0.3.0, LangChain uses pydantic v2 internally. The langchain_core.pydantic_v1 module was a compatibility shim for pydantic v1, and should no longer be used. Please update the code to import from Pydantic directly.\n", + "\n", + "For example, replace imports like: `from langchain_core.pydantic_v1 import BaseModel`\n", + "with: `from pydantic import BaseModel`\n", + "or the v1 compatibility namespace if you are working in a code base that has not been fully upgraded to pydantic 2 yet. \tfrom pydantic.v1 import BaseModel\n", + "\n", + " from motleycrew.tools.tool import MotleyTool\n" + ] + } + ], + "source": [ + "from motleycrew.applications.faust_workflow import FaustWorkflow, Event, step\n", + "\n", + "\n", + "class StartEvent(Event):\n", + " topic: str\n", + "\n", + "class JokeEvent(Event):\n", + " joke: str\n", + "\n", + "class StopEvent(Event):\n", + " result: str\n", + "\n", + "class JokeFlow(FaustWorkflow):\n", + " result_event_type = StopEvent\n", + " llm = OpenAI()\n", + "\n", + " @step\n", + " async def generate_joke(self, ev: StartEvent) -> JokeEvent:\n", + " prompt = f\"Write your best joke about {ev.topic}.\"\n", + " response = await self.llm.acomplete(prompt)\n", + " return JokeEvent(joke=str(response))\n", + "\n", + " @step\n", + " async def critique_joke(self, ev: JokeEvent) -> StopEvent:\n", + " prompt = f\"Give a thorough analysis and critique of the following joke: {ev.joke}\"\n", + " response = await self.llm.acomplete(prompt)\n", + " return StopEvent(result=str(response))" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Analysis:\n", + "This joke plays on the pun of \"fish and ships\" sounding like \"fish and chips,\" a popular dish at seafood restaurants. The joke also incorporates the pirate theme by mentioning a pirate going to a seafood restaurant, which adds an element of humor.\n", + "\n", + "Critique:\n", + "Overall, this joke is light-hearted and easy to understand, making it suitable for a wide audience. The use of wordplay is clever and adds a fun twist to the punchline. However, the joke may be considered somewhat predictable as the setup leads directly to the pun, leaving little room for surprise or unexpected humor. Additionally, the joke relies heavily on the pun itself, which may not appeal to everyone's sense of humor. Overall, while this joke is amusing and well-crafted, it may not be particularly memorable or original.\n" + ] + } + ], + "source": [ + "app = faust.App(\"faust-joke-workflow-demo\", broker=\"kafka://localhost:9092\", web_enabled=False)\n", + "joke_workflow = JokeFlow(app=app, timeout=120)\n", + "\n", + "result = await joke_workflow.run(StartEvent(topic=\"pirates\"))\n", + "print(result.result)" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "faust_workflow.html\n" + ] + } + ], + "source": [ + "from motleycrew.applications.faust_workflow import draw_faust_workflow\n", + "draw_faust_workflow(joke_workflow)" + ] + }, + { + "attachments": { + "image-2.png": { + "image/png": "" + } + }, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "![image-2.png](attachment:image-2.png)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.2" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/motleycrew/applications/faust_workflow/__init__.py b/motleycrew/applications/faust_workflow/__init__.py new file mode 100644 index 00000000..d9cd8cd9 --- /dev/null +++ b/motleycrew/applications/faust_workflow/__init__.py @@ -0,0 +1,4 @@ +from .faust_workflow import Event, FaustWorkflow, step +from .visualize import draw_faust_workflow + +__all__ = ["Event", "FaustWorkflow", "step", "draw_faust_workflow"] diff --git a/motleycrew/applications/faust_workflow/faust_workflow.py b/motleycrew/applications/faust_workflow/faust_workflow.py new file mode 100644 index 00000000..25167e77 --- /dev/null +++ b/motleycrew/applications/faust_workflow/faust_workflow.py @@ -0,0 +1,168 @@ +import asyncio +import contextvars +from typing import Callable, Dict, List, Tuple, Type, get_type_hints + +from faust import App, Channel, Record + + +class ExecutionContext(Record): + execution_id: int + step_depth: int + step_name: str + + +class Event(Record, abstract=True): + execution_context: ExecutionContext = None + + +def step(func): + func._is_step = True + type_hints = get_type_hints(func) + func._event_types = type_hints.get("ev", type_hints.get("event", None)) + func._output_types = type_hints.get("return", None) + return func + + +class FaustWorkflow: + """ + Event-driven workflow implementation using Faust. + """ + + result_event_type: Type[Event] = None + + def __init__(self, app: App, verbose: bool = False, timeout: int = 60): + self.app = app + self.verbose = verbose + self.timeout = timeout + self.steps: Dict[Type[Event], Callable] = {} + self.channels: Dict[Type[Event], Channel] = {} + self._initialize_steps() + + self.execution_context = contextvars.ContextVar( + "execution_context", + default=ExecutionContext(execution_id=0, step_name=None, step_depth=0), + ) + self.results_by_execution_id: Dict[int, asyncio.Future] = {} + + self.execution_history_lock = asyncio.Lock() + self.execution_history_by_execution_id: Dict[int, List[Tuple[str, str, str]]] = {} + + self._create_final_agent() + + def _initialize_steps(self): + for attr_name in dir(self): + attr = getattr(self, attr_name) + if callable(attr) and hasattr(attr, "_is_step"): + if attr._event_types and attr._output_types: + input_types = ( + attr._event_types.__args__ + if hasattr(attr._event_types, "__args__") + else [attr._event_types] + ) + output_types = ( + attr._output_types.__args__ + if hasattr(attr._output_types, "__args__") + else [attr._output_types] + ) + for input_type in input_types: + self.steps[input_type] = attr + self._create_agent(input_type, output_types, attr) + else: + print(f"Warning: Step {attr_name} has incomplete type hints") + + def _create_agent( + self, input_type: Type[Event], output_types: list[Type[Event]], step_func: Callable + ): + if input_type not in self.channels: + self.channels[input_type] = self.app.channel() + for output_type in output_types: + if output_type not in self.channels: + self.channels[output_type] = self.app.channel() + + @self.app.agent( + self.channels[input_type], name=f"{step_func.__name__}_{input_type.__name__}" + ) + async def agent(stream): + async for event in stream: + if self.verbose: + print( + f"Executing step: {step_func.__name__} with input type {input_type.__name__}" + ) + + self.execution_context.set( + ExecutionContext( + execution_id=event.execution_context.execution_id, + step_depth=event.execution_context.step_depth + 1, + step_name=step_func.__name__, + ) + ) + result = await step_func(event) + if result is None: + continue + + assert any( + isinstance(result, t) for t in output_types + ), f"Step {step_func.__name__} did not return an event of type {output_types}" + await self.send_event(result) + + def _create_final_agent(self): + if self.result_event_type is None: + return + + if self.result_event_type not in self.channels: + self.channels[self.result_event_type] = self.app.channel() + + @self.app.agent(self.channels[self.result_event_type]) + async def final_agent(stream): + async for event in stream: + if isinstance(event, self.result_event_type): + execution_id = event.execution_context.execution_id + if execution_id in self.results_by_execution_id: + self.results_by_execution_id[execution_id].set_result(event) + + async def send_event(self, ev: Event): + """ + Sends an event to the relevant channel. + This method can be used by agents to emit multiple events. + """ + if type(ev) not in self.channels: + raise ValueError(f"No channel found for event type: {type(ev).__name__}") + + channel = self.channels[type(ev)] + + current_context = ev.execution_context + if current_context is None: + current_context = self.execution_context.get() + + ev.execution_context = current_context + async with self.execution_history_lock: + self.execution_history_by_execution_id[current_context.execution_id].append( + (current_context.step_name, current_context.step_depth, type(ev).__name__) + ) + + await channel.send(value=ev) + + if self.verbose: + print(f"Sent event of type {type(ev).__name__} to channel") + + async def run(self, ev: Event) -> Event: + execution_id = self.execution_context.get().execution_id + 1 + async with self.execution_history_lock: + self.execution_history_by_execution_id[execution_id] = [] + + self.results_by_execution_id[execution_id] = asyncio.Future() + + # Start the Faust app if it's not already running + if not self.app.started: + await self.app.start() + + self.execution_context.set( + ExecutionContext(execution_id=execution_id, step_depth=0, step_name=None) + ) + await self.send_event(ev) + + # Wait for the result with a timeout + result = await asyncio.wait_for( + self.results_by_execution_id[execution_id], timeout=self.timeout + ) + return result diff --git a/motleycrew/applications/faust_workflow/visualize.py b/motleycrew/applications/faust_workflow/visualize.py new file mode 100644 index 00000000..7f1233a5 --- /dev/null +++ b/motleycrew/applications/faust_workflow/visualize.py @@ -0,0 +1,71 @@ +from typing import get_args, get_origin + +from pyvis.network import Network + +from .faust_workflow import Event, FaustWorkflow + + +def draw_faust_workflow( + workflow: FaustWorkflow, + filename: str = "faust_workflow.html", + notebook: bool = False, +) -> None: + """Draws the Faust workflow as a graph.""" + net = Network(directed=True, height="750px", width="100%") + + # Add the nodes + edge for stop events + if workflow.result_event_type is not None: + net.add_node( + workflow.result_event_type.__name__, + label=workflow.result_event_type.__name__, + color="#FFA07A", + shape="ellipse", + ) + net.add_node("_done", label="_done", color="#ADD8E6", shape="box") + net.add_edge(workflow.result_event_type.__name__, "_done") + + # Add nodes from all steps + steps = { + name: func + for name, func in workflow.__class__.__dict__.items() + if hasattr(func, "_is_step") + } + + for step_name, step_func in steps.items(): + net.add_node( + step_name, label=step_name, color="#ADD8E6", shape="box" + ) # Light blue for steps + + # Get input and output types + input_type = step_func.__annotations__.get("ev") + output_types = [ + t + for t in step_func.__annotations__.values() + if isinstance(t, type) and issubclass(t, Event) + ] + + if input_type: + input_types = [input_type] if not get_origin(input_type) else get_args(input_type) + for t in input_types: + net.add_node( + t.__name__, + label=t.__name__, + color="#90EE90", + shape="ellipse", + ) + net.add_edge(t.__name__, step_name) + + output_type = step_func.__annotations__.get("return") + if output_type: + output_types = [output_type] if not get_origin(output_type) else get_args(output_type) + for t in output_types: + if t != type(None): + net.add_node( + t.__name__, + label=t.__name__, + color="#90EE90" if t != workflow.result_event_type else "#FFA07A", + shape="ellipse", + ) + net.add_edge(step_name, t.__name__) + + net.show(filename, notebook=notebook)