Skip to content

Commit

Permalink
Add multi-source (project-codeflare#20)
Browse files Browse the repository at this point in the history
* Add multi-source.

* Add example.

* Fix comment.

* Fix comment.
  • Loading branch information
doru1004 authored and tardieu committed May 17, 2021
1 parent be70025 commit ca1a528
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 10 deletions.
76 changes: 76 additions & 0 deletions examples/binance_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#
# Copyright IBM Corporation 2021
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import ray
import rayvens
import time
import sys
import json

# Source for coin prices.

# Initialize ray.
if len(sys.argv) < 2:
print(f'usage: {sys.argv[0]} <run_mode>')
sys.exit(1)
run_mode = sys.argv[1]

if run_mode not in ['local', 'operator']:
raise RuntimeError(f'Invalid run mode provided: {run_mode}')

# Initialize ray either on the cluster or locally otherwise.
if run_mode == 'operator':
ray.init(address='auto')
else:
ray.init()

# Start rayvens in the desired mode.
rayvens.init(mode=run_mode)

# Establish an existing portfolio.
coins = ["BTC", "ETH"]

# Create stream.
stream = rayvens.Stream('crypto')

# Event source config.
source_config = dict(kind='binance-source', coin=coins, period='3000')

# Attach source to stream.
source = stream.add_source(source_config)


def process_message(event):
# Parse event:
parsed_event = json.loads(event)

# Extract currency name:
currency = parsed_event['currencyPair'].split("/")[0]

# Extract price:
price = parsed_event['last']

# Output latest currency price:
print(f"{currency} : {price}")


# Send message to processor.
stream >> process_message

# Wait before ending program.
time.sleep(20)

stream.disconnect_all()
79 changes: 69 additions & 10 deletions rayvens/core/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,41 @@ def telegram_source(config):
}


def _binance_route(period, coin):
return {
'uri':
f"timer:update?period={period}",
'steps': [{
'to':
"xchange:binance?service=marketdata&method=ticker&"
f"currencyPair={coin}/USDT"
}, {
'marshal': {
'json': {}
}
}]
}


def binance_source(config):
if 'coin' not in config:
raise TypeError(
'Crypto source requires the official cryptocurrency symbol'
'(example, for Bitcoin the symbol is BTC).')
coin = config['coin']
period = 3000
if 'period' in config:
period = config['period']

if isinstance(coin, list):
routes = []
for coin_id in coin:
routes.append(_binance_route(period, coin_id))
return routes

return _binance_route(period, coin)


def generic_source(config):
if 'spec' not in config:
raise TypeError('Kind generic-source requires a spec.')
Expand All @@ -65,10 +100,20 @@ def generic_source(config):
'http-source': http_source,
'kafka-source': kafka_source,
'telegram-source': telegram_source,
'binance-source': binance_source,
'generic-source': generic_source
}


def _finalize_route(spec, endpoint, inverted):
if inverted:
spec['steps'].append({'bean': 'addToQueue'})
else:
spec['steps'].append({'to': endpoint})
spec = [{'from': spec}]
return spec


# construct a camel source specification from a rayvens source config
def construct_source(config, endpoint, inverted=False):
if 'kind' not in config:
Expand All @@ -77,22 +122,36 @@ def construct_source(config, endpoint, inverted=False):
source_handler = sources.get(kind)
if source_handler is None:
raise TypeError(f'Unsupported Camel source: {kind}.')

spec = source_handler(config)

# Multi-source integration with several routes:
if isinstance(spec, list):
spec_list = []
for spec_entry in spec:
spec_list.extend(_finalize_route(spec_entry, endpoint, inverted))
if inverted:
spec_list.append({
'from': {
'uri': endpoint,
'steps': [{
'bean': 'takeFromQueue'
}]
}
})
print(yaml.dump(spec_list))
return spec_list

# Regular integration with only one route:
spec = _finalize_route(spec, endpoint, inverted)
if inverted:
spec['steps'].append({'bean': 'addToQueue'})
spec = [{
'from': spec
}, {
'from': {
spec.append(
{'from': {
'uri': endpoint,
'steps': [{
'bean': 'takeFromQueue'
}]
}
}]
else:
spec['steps'].append({'to': endpoint})
spec = [{'from': spec}]
}})
return spec


Expand Down

0 comments on commit ca1a528

Please sign in to comment.