Rayvens augments Ray with events. With Rayvens, Ray applications can produce events, subscribe to event streams, and process events. Rayvens leverages Apache Camel to make it possible for data scientists to access hundreds data services with little effort.
For example, one can periodically fetch data from a REST API with code:
source_config = dict(
kind='http-source',
url='http://financialmodelingprep.com/api/v3/quote-short/AAPL?apikey=demo',
period=3000)
source = client.create_topic('http', source=source_config)
Publish messages to Slack with code:
sink_config = dict(kind='slack-sink',
channel='#rayvens',
webhookUrl=os.getenv('RAYVENS_WEBHOOK'))
sink = client.create_topic('slack', sink=sink_config)
Connect the two together with code:
source >> sink
Or do some event processing with code:
source >> operator >> sink
These instructions have been tested on Big Sur and Ubuntu 18.04.4.
We recommend installing Python 3.8.7 using pyenv.
Install Ray and Ray Serve with Kubernetes support:
pip install --upgrade pip
# for osx
pip install https://s3-us-west-2.amazonaws.com/ray-wheels/master/6d5511cf8079f04d4f70ac724de8b62437adf0e7/ray-2.0.0.dev0-cp38-cp38-macosx_10_13_x86_64.whl
# for linux
pip install https://s3-us-west-2.amazonaws.com/ray-wheels/master/6d5511cf8079f04d4f70ac724de8b62437adf0e7/ray-2.0.0.dev0-cp38-cp38-manylinux2014_x86_64.whl
# for both
pip install "ray[serve]"
pip install kubernetes
Clone this repository and install Rayvens:
git clone https://github.ibm.com/solsa/rayvens.git
pip install -e rayvens
Try Rayvens:
python rayvens/examples/hello.py
Rayvens configures and uses Ray Serve to accept incoming external events.
The hello.py file demonstrates an elementary Rayvens program.
import ray
import rayvens
ray.init()
client = rayvens.Client()
topic = client.create_topic('example')
topic >> print
topic << 'hello' << 'world'
This program initialize Ray and Rayvens and creates a Topic
. Topics and events
are the core facilities offered by Rayvens. Topics bridge event publishers and
subscribers. Topic are currently implemented as Ray actors.
In this example, a subscriber is added to topic
with the statement topic >> print
. This subscriber simply invokes the Python print
method on every event
it receives. In general, subscribers can be Python callables, Ray tasks, or Ray
actors.
A couple of events are then published to topic
using the syntax topic << value
. As illustrate here, events are just arbitrary values in general, but of
course publishers and subscribers can agree on specific event schemas. The <<
operator has left-to-right associativity making it possible to send multiple
events with one statement.
Run this program with:
python rayvens/examples/hello.py
Observe the two events are delivered in order.
Other examples are provided in the examples folder. See in particular the pubsub.py, task.py, and actor.py examples for further discussions of in-order and out-of-order event delivery.
To run Rayvens programs including Camel components, there are three choices:
- local mode: run Ray on the host with a local installation of the Camel-K client, Java, and Maven,
- operator mode: run Ray and Camel-K inside a Kubernetes cluster relying on the Camel-K operator to run Camel components in dedicated pods,
- anywhere mode: run Ray and Camel-K inside a Kubernetes cluster without relying on the Camel-K operator by running the Camel components inside the Ray pods in jointly with the Ray processes.
Modes 2 and 3 rely on a custom Ray container image that adds to the base Ray image the Rayvens package and the kamel CLI and its dependencies. This image is built automatically as part of the setup described below. This setup also includes the deployment of the Camel-K operator used in mode 2 and the necessary RBAC rules for the operator.
In principle, mode 3 permits running Rayvens anywhere Ray can by simply replacing the base Ray image with the Rayvens image. At this time however, we only include deployment instructions for Kubernetes.
To run Camel event sources and sinks locally, a Camel-K client installation is required. Download the Camel-K client from the release page and put it in your path. Install a Java 11 JDK. Install Apache Maven 3.6.3.
Test your installation with:
kamel local run rayvens/scripts/camel-test-source.yaml
To test Rayvens on a development Kubernetes cluster we recommend using Kind.
We assume Docker Desktop is
installed. We assume Kubernetes support in Docker Desktop is turned off. We
assume kubectl
is installed.
Follow instructions to install the Kind client.
Setup Ray on Kind:
./rayvens/scripts/start-kind.sh
This script launches a persistent docker registry on the host at port 5000, build the custom Rayvens image, creates a Kind cluster, installs Ray on this cluster as well as the Camel-K operator.
Try your Ray cluster on Kind with:
ray submit rayvens/scripts/cluster.yaml rayvens/examples/pubsub.py
Our example cluster.yaml configuration file is derived from Ray's example-full.yaml configuration file with some notable enhancements:
- additional configuration parameters for the head and worker pods (RBAC rules to manage Camel integrations, downward api, custom resource tags),
- an additional service port (8000) to receive external events from Camel,
- file mounts and the installation of Rayvens of every node,
- a 2-cpu-per-pod resource requirement to make room for Ray Serve.
We plan to support the Ray operator in the near future.
To take down the Kind cluster (including Ray and Camel-K) run:
kind delete cluster
To take down the docker registry run:
docker stop registry
docker rm registry
The source.py example demonstrates how to process external events with Rayvens.
First, we create a topic connected to an external event source:
source_config = dict(
kind='http-source',
url='http://financialmodelingprep.com/api/v3/quote-short/AAPL?apikey=demo',
period=3000)
source = client.create_topic('http', source=source_config)
An event source configuration is a dictionary. The kind
key specifies the
source type. Other keys vary. An http-source
periodically makes a REST call to
the specified url
. The period
is expressed in milliseconds. The events
generated by this source are the bodies of the responses encoded as strings.
In this example, we use the http-source
to fetch the current price of the AAPL
stock.
We then implement a Ray actor to process these events:
@ray.remote
class Comparator:
def __init__(self):
self.last_quote = None
def ingest(self, event):
payload = json.loads(event) # parse event string to json
quote = payload[0]['price'] # payload[0] is AAPL
try:
if self.last_quote:
if quote > self.last_quote:
print('AAPL is up')
elif quote < self.last_quote:
print('AAPL is down')
else:
print('AAPL is unchanged')
finally:
self.last_quote = quote
comparator = Comparator.remote()
This actor instance compares the current price with the last price and prints a message accordingly.
We then simply subscribe the comparator
actor instance to the source
topic.
source >> comparator
By using a Ray actor to process events, we can implement stateful processing and guarantee that events will be processed in order.
The Comparator
class follows the convention that it accepts events by means of
a method named ingest
. If for instance this method were to be named accept
instead, then we would have to subscribe the actor to the source using syntax
source >> comparator.accept
.
Run the example locally with:
python run rayvens/examples/source.py
Run the example on Kind with:
ray submit rayvens/scripts/cluster.yaml rayvens/examples/source.py
When running locally, the Camel-K client may need to download and cache dependencies on first run (using Maven). When running on Kubernetes, the Camel-K operator is used to build and cache a container image for source. In both cases, the the source may take a minute or more to start the first time. The source should start in matter of seconds on subsequent runs.
Rayvens manages the Camel processes automatically and in most case should be
able to terminate these processes when the main program exits. In rare case,
there may be leftover java
and kamel
processes when running locally or
Kubernetes integrations
objects. Please clean these manually.
The slack.py builds upon the previous example by pushing the output messages to Slack.
In addition to the same source as before, it instantiates a sink:
sink_config = dict(kind='slack-sink',
channel=slack_channel,
webhookUrl=slack_webhook)
sink = client.create_topic('slack', sink=sink_config)
This sink sends messages to Slack. It requires two configuration parameters that must be provided as command-line parameters to the program:
- the slack channel to publish to, e.g.,
#test
, and - a webhook url for this channel.
Please refer to the Slack webhooks documentation for details on how to obtain these.
This example program includes a Comparator
actor similar to the previous
example:
# Actor to compare APPL quote with last quote
@ray.remote
class Comparator:
def __init__(self):
self.last_quote = None
def ingest(self, event):
payload = json.loads(event) # parse event string to json
quote = payload[0]['price'] # payload[0] is AAPL
try:
if self.last_quote:
if quote > self.last_quote:
return 'AAPL is up'
elif quote < self.last_quote:
return 'AAPL is down'
else:
return 'AAPL is unchanged'
finally:
self.last_quote = quote
# comparator instance
comparator = Comparator.remote()
But in this case, the ingest
method returns the status message instead of
printing it.
To make is possible to publish these messages to Slack, we first need to build a topic around this actor using code:
operator = client.create_topic('comparator', operator=comparator)
This basically makes it possible for the comparator to act as an event source,
where the events produced are simply the stream of values returned from the
ingest method. Observe the ingest
method does not have to produce an event for
every event it ingests.
We can then link the three topics using code:
source >> operator >> sink
We assume the SLACK_CHANNEL
and SLACK_WEBHOOK
environment variables contain
the necessary configuration parameters.
Run the example locally with:
python run rayvens/examples/sink.py "$SLACK_CHANNEL" "$SLACK_WEBHOOK"
Run the example on Kind with:
ray submit rayvens/scripts/cluster.yaml rayvens/examples/sink.py "$SLACK_CHANNEL" "$SLACK_WEBHOOK"
Rayvens is an open-source project with an Apache 2.0 license.