Skip to content

Latest commit

 

History

History
168 lines (126 loc) · 5.42 KB

signals.rst

File metadata and controls

168 lines (126 loc) · 5.42 KB

Signal API (Experimental)

This experimental API allows tasks and actors to generate signals which can be received by other tasks and actors. In addition, task failures and actor method failures generate error signals. The error signals enable applications to detect failures and potentially recover from failures.

.. autofunction:: ray.experimental.signal.send

Here is a simple example of a remote function that sends a user-defined signal.

import ray.experimental.signal as signal

# Define an application level signal.
class UserSignal(signal.Signal):
    def __init__(self, value):
          self.value = value

    def get_value(self):
          return self.value

# Define a remote function that sends a user-defined signal.
@ray.remote
def send_signal(value):
    signal.send(UserSignal(value))
.. autofunction:: ray.experimental.signal.receive

Here is a simple example of how to receive signals from an actor or task identified by a. Note that an actor is identified by its handle, and a task by one of its object ID return values.

import ray.experimental.signal as signal

# This returns a possibly empty list of all signals that have been sent by 'a'
# since the last invocation of signal.receive from within this process. If 'a'
# did not send any signals, then this will wait for up to 10 seconds to receive
# a signal from 'a'.
signal_list = signal.receive([a], timeout=10)
.. autofunction:: ray.experimental.signal.reset


Example: sending a user signal

The code below show a simple example in which a task, called send_signal() sends a user signal and the driver gets it by invoking signal.receive().

import ray.experimental.signal as signal

# Define a user signal.
class UserSignal(signal.Signal):
    def __init__(self, value):
          self.value = value

    def get_value(self):
          return self.value

@ray.remote
def send_signal(value):
    signal.send(UserSignal(value))
    return

signal_value = 'simple signal'
object_id = send_signal.remote(signal_value)
# Wait up to 10sec to receive a signal from the task. Note the task is
# identified by the object_id it returns.
result_list = signal.receive([object_id], timeout=10)
# Print signal values. This should print "simple_signal".
# Note that result_list[0] is the signal we expect from the task.
# The signal is a tuple where the first element is the first object ID
# returned by the task and the second element is the signal object.
print(result_list[0][1].get_value())

Example: Getting an error signals

This is a simple example in which a driver gets an error signal caused by the failure of task().

@ray.remote
def task():
    raise Exception('exception message')

object_id = task.remote()
try:
    ray.get(object_id)
except Exception as e:
    pass
finally:
    result_list = signal.receive([object_id], timeout=10)
    # Expected signal is 'ErrorSignal'.
    assert type(result_list[0][1]) == signal.ErrorSignal
    # Print the error.
    print(result_list[0][1].get_error())

Example: Sending signals between multiple actors

This is a more involved example in which two actors a1 and a2 each generate five signals, and another actor b waits to receive all signals generated by a1 and a2, respectively. Note that b recursively calls its own method get_signals() until it gets all signals it expects.

@ray.remote
class ActorSendSignals(object):
    def send_signals(self, value, count):
        for i in range(count):
            signal.send(UserSignal(value + str(i)))

@ray.remote
class ActorGetAllSignals(object):
    def __init__(self, num_expected_signals, *source_ids):
        self.received_signals = []
        self.num_expected_signals = num_expected_signals
        self.source_ids = source_ids

    def register_handle(self, handle):
        self.this_actor = handle

    def get_signals(self):
        new_signals = signal.receive(self.source_ids, timeout=10)
        self.received_signals.extend(new_signals)
        if len(self.received_signals) < self.num_expected_signals:
            self.this_actor.get_signals.remote()

    def get_count(self):
        return len(self.received_signals)

# Create two actors to send signals.
a1 = ActorSendSignals.remote()
a2 = ActorSendSignals.remote()
signal_value = 'simple signal'
count = 5
# Each actor sends five signals.
a1.send_signals.remote(signal_value, count)
a2.send_signals.remote(signal_value, count)

# Create an actor that waits for all five signals sent by each actor.
b = ActorGetAllSignals.remote(2 * count, *[a1, a2])
# Provide actor to its own handle, so it can recursively call itself
# to get all signals from a1, and a2, respectively. This enables the actor
# execute other methods if needed.
ray.get(b.register_handle.remote(b))
b.get_signals.remote()
# Print total number of signals. This should be 2*count = 10.
print(ray.get(b.get_count.remote()))

Note

A failed actor (e.g., an actor that crashed) generates an error message only when another actor or task invokes one of its methods.

Please let us know any issues you encounter.