Skip to content

Commit

Permalink
Ingest poloniex data from remote tar
Browse files Browse the repository at this point in the history
  • Loading branch information
cfromknecht committed Jul 2, 2017
1 parent bc9d60a commit 4b15df7
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 192 deletions.
2 changes: 1 addition & 1 deletion catalyst/data/bundles/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# These imports are necessary to force module-scope register calls to happen.
from . import quandl # noqa
from . import poloniex
from .core import (
UnknownBundle,
bundles,
Expand All @@ -13,7 +14,6 @@
unregister,
)
from .yahoo import yahoo_equities
from .poloniex import poloniex_cryptoassets

__all__ = [
'UnknownBundle',
Expand Down
222 changes: 31 additions & 191 deletions catalyst/data/bundles/poloniex.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,199 +9,39 @@

from catalyst.utils.calendars import register_calendar_alias
from catalyst.utils.cli import maybe_show_progress
from .core import register

from . import core as bundles

def _cachpath(symbol, type_):
return '-'.join((symbol.replace(os.path.sep, '_'), type_))


def poloniex_cryptoassets(symbols, start=None, end=None):
"""Create a data bundle ingest function from a set of symbols loaded from
poloniex
Parameters
----------
symbols : iterable[str]
The ticker symbols to load data for.
start : datetime, optional
The start date to query for. By default this pulls the full history
for the calendar.
end : datetime, optional
The end date to query for. By default this pulls the full history
for the calendar.
Returns
-------
ingest : callable
The bundle ingest function for the given set of symbols.
Examples
--------
This code should be added to ~/.catalyst/extension.py
.. code-block:: python
from catalyst.data.bundles import poloniex_cryptoassets, register
symbols = (
'USDT_BTC',
'USDT_ETH',
'USDT_LTC',
)
register('my_bundle', poloniex_cryptoassets(symbols))
Notes
-----
The sids for each symbol will be the index into the symbols sequence.
"""
# strict this in memory so that we can reiterate over it
symbols = tuple(symbols)

def ingest(environ,
asset_db_writer,
minute_bar_writer, # unused
daily_bar_writer,
adjustment_writer,
calendar,
start_session,
end_session,
cache,
show_progress,
output_dir,
# pass these as defaults to make them 'nonlocal' in py2
start=start,
end=end):
if start is None:
start = start_session
if end is None:
end = None

metadata = pd.DataFrame(np.empty(len(symbols), dtype=[
('start_date', 'datetime64[ns]'),
('end_date', 'datetime64[ns]'),
('auto_close_date', 'datetime64[ns]'),
('symbol', 'object'),
]))

day_offset = pd.Timedelta(days=1)

def compute_daily_bars(five_min_bars):
# filter and copy the entry at the beginning of each session
daily_bars = five_min_bars[
five_min_bars.index.isin(calendar.all_sessions)
].copy()

# iterate through session starts doing:
# 1. filter five_min_bars to get all entries in one day
# 2. compute daily bar entry
# 3. record in rid-th row of daily_bars
for rid, start_date in enumerate(daily_bars.index):
# compute beginning of next session
end_date = start_date + day_offset

# filter for entries session entries
day_data = five_min_bars[
(five_min_bars.index >= start_date) &
(five_min_bars.index < end_date)
]

# compute and record daily bar
daily_bars.iloc[rid] = (
day_data.open.iloc[0], # first open price
day_data.high.max(), # max of high prices
day_data.low.min(), # min of low prices
day_data.close.iloc[-1], # last close price
day_data.volume.sum(), # sum of all volumes
)

# scale to allow trading 10-ths of a coin
scale = 10.0
daily_bars.loc[:, 'open'] /= scale
daily_bars.loc[:, 'high'] /= scale
daily_bars.loc[:, 'low'] /= scale
daily_bars.loc[:, 'close'] /= scale
daily_bars.loc[:, 'volume'] *= scale

return daily_bars

def _pricing_iter():
sid = 0
print 'Ingesting symbols: {0}'.format(symbols)
with maybe_show_progress(
symbols,
show_progress,
show_percent=True,
item_show_func=lambda s: 'building {0}'.format(s)
if s is not None
else 'DONE',
info_sep=' | ',
label='Compiling daily bar pricing datasets:',
) as it:

for symbol in it:
#def to_dataframe(self, start, end, currencyPair=None):
csv_fn = '/var/tmp/catalyst/data/poloniex/crypto_prices-' +\
symbol + '.csv'

#last_date = self._get_start_date(csv_fn)
#if last_date + 300 < end or not os.path.exists(csv_fn):
# get latest data
#self.append_data_single_pair(currencyPair)

# CSV holds the latest snapshot
columns = ['date', 'open', 'high', 'low', 'close', 'volume']
five_min_bars = pd.read_csv(csv_fn, names=columns)
five_min_bars.set_index('date', inplace=True)
five_min_bars.index = pd.to_datetime(
five_min_bars.index,
utc=True,
unit='s',
)

daily_bars = compute_daily_bars(five_min_bars)

# the start date is the date of the first trade and
# the end date is the date of the last trade
start_date = daily_bars.index[0].tz_localize(None)
end_date = daily_bars.index[-1].tz_localize(None)
# The auto_close date is the day after the last trade.
ac_date = end_date + day_offset
metadata.iloc[sid] = start_date, end_date, ac_date, symbol

yield sid, daily_bars
sid += 1
POLONIEX_BUNDLE_URL = (
'https://enigma.co/api/poloniex_bundle.tar',
)

daily_bar_writer.write(
_pricing_iter(),
assets=metadata.symbol.index,
@bundles.register(
'poloniex',
create_writers=False,
calendar_name='OPEN',
minutes_per_day=1440)
def quantopian_quandl_bundle(environ,
asset_db_writer,
minute_bar_writer,
daily_bar_writer,
adjustment_writer,
calendar,
start_session,
end_session,
cache,
show_progress,
output_dir):
if show_progress:
data = bundles.download_with_progress(
POLONIEX_BUNDLE_URL,
chunk_size=bundles.ONE_MEGABYTE,
label="Downloading Bundle: poloniex",
)
else:
data = bundles.download_without_progress(POLONIEX_BUNDLE_URL)

symbol_map = pd.Series(metadata.symbol.index, metadata.symbol)

# Hardcode the exchange to "POLO" for all assets and (elsewhere)
# register "POLO" to resolve to the OPEN calendar, because these are
# all cryptoassets and thus use the OPEN calendar.
metadata['exchange'] = 'POLO'
asset_db_writer.write(equities=metadata)

adjustment_writer.write()

return ingest


# bundle used when creating test data
register(
'.test-poloniex',
poloniex_cryptoassets(
(
'USDT_BTC',
'USDT_ETH',
'USDT_LTC',
),
pd.Timestamp('2010-01-01', tz='utc'),
pd.Timestamp('2015-01-01', tz='utc'),
),
calendar_name='OPEN',
minutes_per_day=1440,
)
with tarfile.open('r', fileobj=data) as tar:
if show_progress:
print("Writing data to %s." % output_dir)
tar.extractall(output_dir)

0 comments on commit 4b15df7

Please sign in to comment.