This experimental API allows tasks and actors to generate signals which can be received by other tasks and actors. In addition, task failures and actor method failures generate error signals. The error signals enable applications to detect failures and potentially recover from failures.
.. autofunction:: ray.experimental.signal.send
Here is a simple example of a remote function that sends a user-defined signal.
import ray.experimental.signal as signal
# Define an application level signal.
class UserSignal(signal.Signal):
def __init__(self, value):
self.value = value
def get_value(self):
return self.value
# Define a remote function that sends a user-defined signal.
@ray.remote
def send_signal(value):
signal.send(UserSignal(value))
.. autofunction:: ray.experimental.signal.receive
Here is a simple example of how to receive signals from an actor or task identified
by a
. Note that an actor is identified by its handle, and a task by one of its
object ID return values.
import ray.experimental.signal as signal
# This returns a possibly empty list of all signals that have been sent by 'a'
# since the last invocation of signal.receive from within this process. If 'a'
# did not send any signals, then this will wait for up to 10 seconds to receive
# a signal from 'a'.
signal_list = signal.receive([a], timeout=10)
.. autofunction:: ray.experimental.signal.reset
The code below show a simple example in which a task, called send_signal()
sends a user signal and the driver gets it by invoking signal.receive()
.
import ray.experimental.signal as signal
# Define a user signal.
class UserSignal(signal.Signal):
def __init__(self, value):
self.value = value
def get_value(self):
return self.value
@ray.remote
def send_signal(value):
signal.send(UserSignal(value))
return
signal_value = 'simple signal'
object_id = send_signal.remote(signal_value)
# Wait up to 10sec to receive a signal from the task. Note the task is
# identified by the object_id it returns.
result_list = signal.receive([object_id], timeout=10)
# Print signal values. This should print "simple_signal".
# Note that result_list[0] is the signal we expect from the task.
# The signal is a tuple where the first element is the first object ID
# returned by the task and the second element is the signal object.
print(result_list[0][1].get_value())
This is a simple example in which a driver gets an error signal caused
by the failure of task()
.
@ray.remote
def task():
raise Exception('exception message')
object_id = task.remote()
try:
ray.get(object_id)
except Exception as e:
pass
finally:
result_list = signal.receive([object_id], timeout=10)
# Expected signal is 'ErrorSignal'.
assert type(result_list[0][1]) == signal.ErrorSignal
# Print the error.
print(result_list[0][1].get_error())
This is a more involved example in which two actors a1
and a2
each
generate five signals, and another actor b
waits to receive all signals
generated by a1
and a2
, respectively. Note that b
recursively calls
its own method get_signals()
until it gets all signals it expects.
@ray.remote
class ActorSendSignals(object):
def send_signals(self, value, count):
for i in range(count):
signal.send(UserSignal(value + str(i)))
@ray.remote
class ActorGetAllSignals(object):
def __init__(self, num_expected_signals, *source_ids):
self.received_signals = []
self.num_expected_signals = num_expected_signals
self.source_ids = source_ids
def register_handle(self, handle):
self.this_actor = handle
def get_signals(self):
new_signals = signal.receive(self.source_ids, timeout=10)
self.received_signals.extend(new_signals)
if len(self.received_signals) < self.num_expected_signals:
self.this_actor.get_signals.remote()
def get_count(self):
return len(self.received_signals)
# Create two actors to send signals.
a1 = ActorSendSignals.remote()
a2 = ActorSendSignals.remote()
signal_value = 'simple signal'
count = 5
# Each actor sends five signals.
a1.send_signals.remote(signal_value, count)
a2.send_signals.remote(signal_value, count)
# Create an actor that waits for all five signals sent by each actor.
b = ActorGetAllSignals.remote(2 * count, *[a1, a2])
# Provide actor to its own handle, so it can recursively call itself
# to get all signals from a1, and a2, respectively. This enables the actor
# execute other methods if needed.
ray.get(b.register_handle.remote(b))
b.get_signals.remote()
# Print total number of signals. This should be 2*count = 10.
print(ray.get(b.get_count.remote()))
A failed actor (e.g., an actor that crashed) generates an error message only when another actor or task invokes one of its methods.
Please let us know any issues you encounter.