Skip to content

Commit

Permalink
cr fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
camelpac committed Sep 3, 2021
1 parent d7bff6a commit da0ca61
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 23 deletions.
1 change: 1 addition & 0 deletions alpaca_trade_api/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .rest import REST # noqa
from .rest_async import AsyncRest # noqa
from .stream import Stream # noqa
from .stream2 import StreamConn # noqa

Expand Down
33 changes: 17 additions & 16 deletions alpaca_trade_api/rest_async.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
import os
import aiohttp
import asyncio
import time
import yaml
from alpaca_trade_api.entity_v2 import BarsV2, QuotesV2, TradesV2, \
EntityList, TradeV2, QuoteV2
import pandas as pd
from alpaca_trade_api.common import URL, get_api_version, get_data_url

NY = 'America/New_York'


class AsyncRest:
def __init__(self,
key_id: str = None,
secret_key: str = None,
data_url: URL = None,
api_version: str = None,
oauth=None,
raw_data: bool = False
):
"""
Expand All @@ -24,10 +23,6 @@ def __init__(self,
"""
self._key_id, self._secret_key = key_id, secret_key
self._data_url: URL = URL(data_url or get_data_url())
self._api_version = get_api_version(api_version)
self._use_raw_data = raw_data
self._retry = int(os.environ.get('APCA_RETRY_MAX', 3))
self._retry_wait = int(os.environ.get('APCA_RETRY_WAIT', 3))

def _get_historic_url(self, _type, symbol):
return f"{self._data_url}/v2/stocks/{symbol}/{_type}"
Expand All @@ -51,7 +46,7 @@ async def _iterate_requests(self,
"""
df = pd.DataFrame({})
url = self._get_historic_url(entity_type, symbol)
async for packet in self._request(symbol, url, payload):
async for packet in self._request(url, payload):
if packet.get(entity_type):
response = entity_list_type(packet[entity_type]).df
df = pd.concat([df, response], axis=0)
Expand All @@ -60,11 +55,17 @@ async def _iterate_requests(self,

return df

async def get_bars_async(self, symbol, start, end, timeframe, limit=1000):
async def get_bars_async(self,
symbol,
start,
end,
timeframe,
limit=1000,
adjustment='raw'):
_type = "bars"

payload = {
"adjustment": 'raw',
"adjustment": adjustment,
"start": start,
"end": end,
"timeframe": timeframe.value,
Expand All @@ -73,7 +74,7 @@ async def get_bars_async(self, symbol, start, end, timeframe, limit=1000):
df = await self._iterate_requests(symbol, payload, limit, _type,
BarsV2)

return df
return symbol, df

async def get_trades_async(self, symbol, start, end, timeframe,
limit=1000):
Expand All @@ -87,7 +88,7 @@ async def get_trades_async(self, symbol, start, end, timeframe,
df = await self._iterate_requests(symbol, payload, limit, _type,
TradesV2)

return df
return symbol, df

async def get_quotes_async(self, symbol, start, end, timeframe,
limit=1000):
Expand All @@ -101,7 +102,7 @@ async def get_quotes_async(self, symbol, start, end, timeframe,
df = await self._iterate_requests(symbol, payload, limit, _type,
QuotesV2)

return df
return symbol, df

async def get_latest_trade_async(self, symbol: str) -> TradeV2:
"""
Expand All @@ -116,7 +117,7 @@ async def get_latest_trade_async(self, symbol: str) -> TradeV2:
response = await response.json()
if response.get("quote"):
result = TradeV2(response["trade"])
return result
return symbol, result

async def get_latest_quote_async(self, symbol: str) -> QuoteV2:
"""
Expand All @@ -131,7 +132,7 @@ async def get_latest_quote_async(self, symbol: str) -> QuoteV2:
response = await response.json()
if response.get("quote"):
result = QuoteV2(response["quote"])
return result
return symbol, result

def _get_opts(self, payload=None):
headers = {}
Expand All @@ -149,7 +150,7 @@ def _get_opts(self, payload=None):

return opts

async def _request(self, symbol, url, payload):
async def _request(self, url, payload):
opts = self._get_opts(payload)
async with aiohttp.ClientSession() as session:
try:
Expand Down
21 changes: 14 additions & 7 deletions examples/historic_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
import asyncio
import alpaca_trade_api as tradeapi
from alpaca_trade_api.rest import TimeFrame, URL
from alpaca_trade_api import rest_async
from alpaca_trade_api.rest_async import gather_with_concurrency
from alpaca_trade_api import AsyncRest
from alpaca_trade_api.rest_async import gather_with_concurrency, AsyncRest
import pandas as pd

NY = 'America/New_York'
Expand All @@ -30,7 +28,7 @@ async def get_historic_bars(symbols, start, end, timeframe: TimeFrame):

bad_requests = 0
for response in results:
if not len(response):
if not len(response[1]):
bad_requests += 1

print(f"Total of {len(results)} Bars, and {bad_requests} empty responses.")
Expand All @@ -56,7 +54,7 @@ async def get_historic_trades(symbols, start, end, timeframe: TimeFrame):

bad_requests = 0
for response in results:
if not len(response):
if not len(response[1]):
bad_requests += 1

print(
Expand All @@ -83,7 +81,7 @@ async def get_historic_quotes(symbols, start, end, timeframe: TimeFrame):

bad_requests = 0
for response in results:
if not len(response):
if not len(response[1]):
bad_requests += 1

print(
Expand All @@ -100,6 +98,16 @@ async def main(symbols):


if __name__ == '__main__':
"""
Credentials for this example is kept in a yaml config file.
an example to such a file:
key_id: "<YOUR-API-KEY>"
secret: "<YOUR-API-SECRET>"
feed: iex
base_url: https://paper-api.alpaca.markets
"""
import time
import yaml

Expand All @@ -112,7 +120,6 @@ async def main(symbols):

rest = AsyncRest(key_id=api_key_id,
secret_key=api_secret)
NY = 'America/New_York'

api = tradeapi.REST(key_id=api_key_id,
secret_key=api_secret,
Expand Down
2 changes: 2 additions & 0 deletions requirements/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ urllib3>1.24,<2
websocket-client>=0.56.0,<2
websockets>=8.0,<10
msgpack==1.0.2
aiohttp==3.7.4
PyYAML==5.4.1

0 comments on commit da0ca61

Please sign in to comment.