Skip to content

Commit

Permalink
Simplify api (project-codeflare#18)
Browse files Browse the repository at this point in the history
* Simplify api

* Eliminate topic._post

* Move atexit from api to impl

* Tweak start method
  • Loading branch information
tardieu committed Mar 10, 2021
1 parent fc5268a commit 0d21684
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 66 deletions.
4 changes: 0 additions & 4 deletions examples/slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,3 @@ def ingest(self, event):

# run for a while
time.sleep(300)

# optionally disconnect source and sink
client.disconnect(source)
client.disconnect(sink)
3 changes: 0 additions & 3 deletions examples/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,3 @@ def ingest(self, event):

# run for a while
time.sleep(300)

# optionally disconnect source
client.disconnect(source)
67 changes: 24 additions & 43 deletions rayvens/api.py
Original file line number Diff line number Diff line change
@@ -1,58 +1,43 @@
import atexit
import ray
import requests

from .impl import Camel
from .impl import start as start_mode_1


@ray.remote(num_cpus=0)
class Topic:
def __init__(self, name):
self.name = name
self._subscribers = []
self._integrations = []
self._callable = None
self._operator = None

def send_to(self, callable, name=None):
self._subscribers.append({'callable': callable, 'name': name})
def send_to(self, subscriber, name=None):
self._subscribers.append({'subscriber': subscriber, 'name': name})

def ingest(self, data):
if data is None:
return
if self._callable is not None:
data = self._callable(data)
if self._operator is not None:
data = _eval(self._operator, data)
for s in self._subscribers:
s['callable'](data)
_eval(s['subscriber'], data)

def add_operator(self, callable):
self._callable = callable
def add_operator(self, operator):
self._operator = operator

def _register(self, name, integration):
self._integrations.append({'name': name, 'integration': integration})

def _disconnect(self, camel):
self._subscribers = []
camel.cancel.remote(self._integrations)
self._integrations = []

def _post(self, url, data):
if data is not None:
requests.post(url, data)


def _remote(x):
if isinstance(x, ray.actor.ActorHandle):
return x.ingest.remote
elif isinstance(x, ray.actor.ActorMethod) or isinstance(
x, ray.remote_function.RemoteFunction):
return x.remote
def _eval(f, data):
if isinstance(f, ray.actor.ActorHandle):
return f.ingest.remote(data)
elif isinstance(f, ray.actor.ActorMethod) or isinstance(
f, ray.remote_function.RemoteFunction):
return f.remote(data)
else:
return x
return f(data)


def _rshift(source, sink):
source.send_to.remote(_remote(sink))
return sink
def _rshift(topic, subscriber):
topic.send_to.remote(subscriber)
return subscriber


def _lshift(topic, data):
Expand All @@ -64,20 +49,19 @@ def _lshift(topic, data):
setattr(ray.actor.ActorHandle, '__lshift__', _lshift)


def _select(camel_mode):
def _start(camel_mode):
if camel_mode in ['local', 'operator1']:
return Camel
return start_mode_1
elif camel_mode == 'auto':
return Camel # TODO
return start_mode_1 # TODO
else:
raise TypeError(
'Unsupported camel_mode. Must be one of auto, local, operator1.')


class Client:
def __init__(self, prefix='/rayvens', camel_mode='auto'):
self._camel = _select(camel_mode).start(prefix, camel_mode)
atexit.register(self._camel.exit.remote)
self._camel = _start(camel_mode)(prefix, camel_mode)

def create_topic(self, name, source=None, sink=None, operator=None):
topic = Topic.remote(name)
Expand All @@ -86,14 +70,11 @@ def create_topic(self, name, source=None, sink=None, operator=None):
if sink is not None:
self.add_sink(name, topic, sink)
if operator is not None:
topic.add_operator.remote(_remote(operator))
topic.add_operator.remote(operator)
return topic

def add_source(self, name, topic, source):
self._camel.add_source.remote(name, topic, source)

def add_sink(self, name, topic, sink):
self._camel.add_sink.remote(name, topic, sink)

def disconnect(self, topic):
topic._disconnect.remote(self._camel)
39 changes: 23 additions & 16 deletions rayvens/impl.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
import atexit
import os
import ray
from ray import serve
import requests
import signal
import subprocess
import yaml


def start(prefix, mode):
if os.getenv('KUBE_POD_NAMESPACE') is not None and mode != 'local':
camel = Camel.options(resources={'head': 1}).remote(prefix, 'operator')
else:
camel = Camel.remote(prefix, 'local')
atexit.register(camel.exit.remote)
return camel


@ray.remote(num_cpus=0)
class Camel:
@staticmethod
def start(prefix, mode):
if os.getenv('KUBE_POD_NAMESPACE') is not None and mode != 'local':
return Camel.options(resources={
'head': 1
}).remote(prefix, 'operator')
else:
return Camel.remote(prefix, 'local')

def __init__(self, prefix, mode):
self.client = serve.start(http_options={
'host': '0.0.0.0',
Expand Down Expand Up @@ -65,7 +67,6 @@ async def f(data):
}]
}
}])
topic._register.remote(name, integration)
if self.integrations is None:
integration.cancel()
else:
Expand All @@ -88,24 +89,30 @@ def add_sink(self, name, topic, sink):
}])

url = f'{integration.url}/{name}'
topic.send_to.remote(lambda data: topic._post.remote(url, data), name)
topic._register.remote(name, integration)
helper = Helper.remote(url)
topic.send_to.remote(helper, name)
if self.integrations is None:
integration.cancel()
else:
self.integrations.append(integration)

def cancel(self, integrations):
for i in integrations:
i['integration'].cancel()

def exit(self):
integrations = self.integrations
self.integrations = None
for i in integrations:
i.cancel()


@ray.remote(num_cpus=0)
class Helper:
def __init__(self, url):
self.url = url

def ingest(self, data):
if data is not None:
requests.post(self.url, data)


class Integration:
def __init__(self, name, mode, integration):
self.name = name
Expand Down

0 comments on commit 0d21684

Please sign in to comment.