Skip to content

Commit

Permalink
update example "change subscription" to data v2
Browse files Browse the repository at this point in the history
  • Loading branch information
camelpac committed Mar 16, 2021
1 parent a77bc6b commit 7e75fef
Showing 1 changed file with 34 additions and 35 deletions.
69 changes: 34 additions & 35 deletions examples/websockets/dynamic_subscription_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,29 @@
import threading
import asyncio
import time
from alpaca_trade_api import StreamConn
from alpaca_trade_api.stream import Stream
from alpaca_trade_api.common import URL

ALPACA_API_KEY = "<YOUR-API-KEY>"
ALPACA_SECRET_KEY = "<YOUR-SECRET-KEY>"
USE_POLYGON = False

conn: StreamConn = None

def consumer_thread():
async def print_trade(t):
print('trade', t)


async def print_quote(q):
print('quote', q)


async def print_bar(bar):
print('bar', bar)

PREVIOUS = None


def consumer_thread():
try:
# make sure we have an event loop, if not create a new one
loop = asyncio.get_event_loop()
Expand All @@ -25,45 +37,32 @@ def consumer_thread():
asyncio.set_event_loop(asyncio.new_event_loop())

global conn
conn = StreamConn(
ALPACA_API_KEY,
ALPACA_SECRET_KEY,
base_url=URL('https://paper-api.alpaca.markets'),
data_url=URL('https://data.alpaca.markets'),
# data_url=URL('http://127.0.0.1:8765'),
data_stream='polygon' if USE_POLYGON else 'alpacadatav1'
)

@conn.on(r'^AM\..+$')
async def on_minute_bars(conn, channel, bar):
print('bars', bar)


@conn.on(r'Q\..+')
async def on_quotes(conn, channel, quote):
print('quote', quote)


@conn.on(r'T\..+')
async def on_trades(conn, channel, trade):
print('trade', trade)
conn = Stream(ALPACA_API_KEY,
ALPACA_SECRET_KEY,
base_url=URL('https://paper-api.alpaca.markets'),
data_feed='iex')

conn.run(['alpacadatav1/Q.GOOG'])
conn.subscribe_quotes(print_quote, 'AAPL')
global PREVIOUS
PREVIOUS = "AAPL"
conn.run()

if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO)
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
level=logging.INFO)
threading.Thread(target=consumer_thread).start()

loop = asyncio.get_event_loop()

time.sleep(5) # give the initial connection time to be established
subscriptions = [['alpacadatav1/AM.TSLA'], ['alpacadatav1/Q.GOOG'],
['alpacadatav1/T.AAPL']]
subscriptions = {"BABA": print_quote,
"AAPL": print_quote,
"TSLA": print_quote,
}

while 1:
for channels in subscriptions:
loop.run_until_complete(conn.subscribe(channels))
if "AM." in channels[0]:
time.sleep(60) # aggs are once every minute. give it time
else:
time.sleep(20)
for ticker, handler in subscriptions.items():
conn.unsubscribe_quotes(PREVIOUS)
conn.subscribe_quotes(handler, ticker)
PREVIOUS = ticker
time.sleep(20)

0 comments on commit 7e75fef

Please sign in to comment.