Skip to content

Commit

Permalink
Expose poloniex data curation methods to load benchmark dynamically
Browse files Browse the repository at this point in the history
  • Loading branch information
cfromknecht committed Jul 2, 2017
1 parent fa25e01 commit 59b7b1d
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 73 deletions.
Empty file added catalyst/curate/__init__.py
Empty file.
74 changes: 16 additions & 58 deletions curate/crypto_price_generator.py → catalyst/curate/poloniex.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,14 @@
import requests
import logbook

import catalyst.data.bundles.core as bundles

DT_START = time.mktime(datetime(2010, 01, 01, 0, 0).timetuple())
# DT_START = time.mktime(datetime(2017, 06, 13, 0, 0).timetuple()) # TODO: remove temp
CSV_OUT_FOLDER = '/var/tmp/catalyst/data/poloniex/'
CONN_RETRIES = 2

logbook.StderrHandler().push_application()
log = logbook.Logger(__name__)

class PoloniexDataGenerator(object):
class PoloniexCurator(object):
"""
OHLCV data feed generator for crypto data. Based on Poloniex market data
"""
Expand Down Expand Up @@ -89,7 +86,8 @@ def append_data_single_pair(self, currencyPair, repeat=0):
log.debug('Getting data for %s' % currencyPair)
csv_fn = CSV_OUT_FOLDER + 'crypto_prices-' + currencyPair + '.csv'
start = self._get_start_date(csv_fn)
if (time.time() > start): # Only fetch data if more than 5min have passed since last fetch
# Only fetch data if more than 5min have passed since last fetch
if (time.time() > start):
data = self.get_data(currencyPair, start)
if data is not None:
try:
Expand All @@ -98,7 +96,14 @@ def append_data_single_pair(self, currencyPair, repeat=0):
for item in data:
if item['date'] == 0:
continue
csvwriter.writerow([item['date'], item['open'], item['high'], item['low'], item['close'], item['volume']])
csvwriter.writerow([
item['date'],
item['open'],
item['high'],
item['low'],
item['close'],
item['volume'],
])
except Exception as e:
log.error('Error opening %s' % csv_fn)
log.exception(e)
Expand All @@ -112,7 +117,8 @@ def append_data_single_pair(self, currencyPair, repeat=0):
def append_data(self):
for currencyPair in self.currency_pairs:
self.append_data_single_pair(currencyPair)
time.sleep(0.17) # Rate limit is 6 calls per second, sleep 1sec/6 to be safe
# Rate limit is 6 calls per second, sleep 1sec/6 to be safe
time.sleep(0.17)

'''
Returns a data frame for all pairs, or for the requests currency pair.
Expand All @@ -130,57 +136,9 @@ def to_dataframe(self, start, end, currencyPair=None):
df['date']=pd.to_datetime(df['date'],unit='s')
df.set_index('date', inplace=True)

#return df.loc[(df.index > start) & (df.index <= end)]
return df[datetime.fromtimestamp(start):datetime.fromtimestamp(end-1)]

if __name__ == '__main__':
pdg = PoloniexDataGenerator()
pdg.get_currency_pairs()
pdg.append_data()



# from zipline.utils.calendars import get_calendar
# from zipline.data.us_equity_pricing import (
# BcolzDailyBarWriter,
# BcolzDailyBarReader,
# )

# open_calendar = get_calendar('OPEN')

# start_session = pd.Timestamp('2012-12-31', tz='UTC')
# end_session = pd.Timestamp('2015-01-01', tz='UTC')

# file_path = 'test.bcolz'

# writer = BcolzDailyBarWriter(
# file_path,
# open_calendar,
# start_session,
# end_session
# )

# index = open_calendar.schedule.index
# index = index[
# (index.date >= start_session.date()) &
# (index.date <= end_session.date())
# ]

# data = pd.DataFrame(
# 0,
# index=index,
# columns=['open', 'high', 'low', 'close', 'volume'],
# )

# writer.write(
# [(0, data)],
# assets=[0],
# show_progress=True
# )

# print 'len(index):', len(index)

# reader = BcolzDailyBarReader(file_path)

# print 'first_rows:', reader._first_rows
# print 'last_rows:', reader._last_rows
pc = PoloniexCurator()
pc.get_currency_pairs()
pc.append_data()
51 changes: 36 additions & 15 deletions catalyst/data/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
data_root,
)
from ..utils.deprecate import deprecated

from catalyst.curate.poloniex import PoloniexCurator
from catalyst.utils.calendars import get_calendar


Expand Down Expand Up @@ -310,6 +312,8 @@ def compute_daily_bars(five_min_bars, schedule):

return daily_bars


five_min_bars = None
try:
# load five minute bars from csv cache
five_min_bars = pd.read_csv(
Expand All @@ -320,31 +324,48 @@ def compute_daily_bars(five_min_bars, schedule):
date_parser=dateparse,
)
five_min_bars.index = pd.to_datetime(five_min_bars.index, utc=True, unit='s')
except (OSError, IOError):
# Otherwise load from Poloniex API
try:
pc = PoloniexCurator()
pc.append_data_single_pair(symbol)

# compute daily bars for open calendar
open_calendar = get_calendar('OPEN')
daily_bars = compute_daily_bars(
five_min_bars,
open_calendar.all_sessions,
)
five_min_bars = pc.to_dataframe(
first_date,
last_date,
currencyPair=symbol,
)
except (OSError, IOError, HTTPError):
logger.exception('Failed to new crypto benchmark returns')
raise

# compute daily bars for open calendar
open_calendar = get_calendar('OPEN')
daily_bars = compute_daily_bars(
five_min_bars,
open_calendar.all_sessions,
)

# filter daily bars to include first_date and last_date
daily_bars = daily_bars[
(daily_bars.index >= (first_date - trading_day)) &
(daily_bars.index <= last_date)
]
# filter daily bars to include first_date and last_date
daily_bars = daily_bars[
(daily_bars.index >= (first_date - trading_day)) &
(daily_bars.index <= last_date)
]

# select close column and compute percent change between days
daily_close = daily_bars[['close']]
daily_close = daily_close.pct_change(1).iloc[1:]
# select close column and compute percent change between days
daily_close = daily_bars[['close']]
daily_close = daily_close.pct_change(1).iloc[1:]

try:
# write to benchmark csv cache
daily_close.to_csv(get_data_filepath(filename, environ))
except (OSError, IOError, HTTPError):
logger.exception('Failed to cache the new benchmark returns')
raise

if not has_data_for_dates(daily_close, first_date, last_date):
logger.warn("Still don't have expected data after redownload!")

return daily_close


Expand Down Expand Up @@ -535,7 +556,7 @@ def _load_cached_data(filename, first_date, last_date, now, resource_name,
if os.path.exists(path):
try:
data = from_csv(path)
data.index = data.index.to_datetime().tz_localize('UTC')
data.index = pd.to_datetime(data.index).tz_localize('UTC')
if has_data_for_dates(data, first_date, last_date):
return data

Expand Down

0 comments on commit 59b7b1d

Please sign in to comment.