Skip to content

Commit

Permalink
Merge pull request quantopian#1243 from quantopian/source-daily-from-…
Browse files Browse the repository at this point in the history
…minute

TST:  Enable sourcing daily data from minute data.
  • Loading branch information
ehebert committed Jun 2, 2016
2 parents 8f0b3f4 + 2f80e94 commit 38d6107
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 27 deletions.
91 changes: 69 additions & 22 deletions tests/test_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

class WithHistory(WithDataPortal):
TRADING_START_DT = TRADING_ENV_MIN_DATE = START_DATE = pd.Timestamp(
'2014-02-03',
'2014-01-03',
tz='UTC',
)
TRADING_END_DT = END_DATE = pd.Timestamp('2016-01-29', tz='UTC')
Expand Down Expand Up @@ -445,10 +445,13 @@ def check_internal_consistency(bar_data, assets, fields, bar_count, freq):


class MinuteEquityHistoryTestCase(WithHistory, ZiplineTestCase):

BCOLZ_DAILY_BAR_SOURCE_FROM_MINUTE = True

@classmethod
def make_minute_bar_data(cls):
data = {}
sids = {2, 4, 5, 6, cls.SHORT_ASSET_SID, cls.HALF_DAY_TEST_ASSET_SID}
sids = {2, 5, cls.SHORT_ASSET_SID, cls.HALF_DAY_TEST_ASSET_SID}
for sid in sids:
asset = cls.asset_finder.retrieve_asset(sid)
data[sid] = create_minute_df_for_asset(
Expand All @@ -465,6 +468,27 @@ def make_minute_bar_data(cls):
start_val=2,
)

# Start values are crafted so that the thousands place are equal when
# adjustments are applied correctly.
# The splits and mergers are defined as 2:1 splits, so the prices
# approximate that adjustment by halving the thousands place each day.
data[cls.MERGER_ASSET_SID] = data[cls.SPLIT_ASSET_SID] = pd.concat((
create_minute_df_for_asset(
cls.env,
pd.Timestamp('2015-01-05', tz='UTC'),
pd.Timestamp('2015-01-05', tz='UTC'),
start_val=4000),
create_minute_df_for_asset(
cls.env,
pd.Timestamp('2015-01-06', tz='UTC'),
pd.Timestamp('2015-01-06', tz='UTC'),
start_val=2000),
create_minute_df_for_asset(
cls.env,
pd.Timestamp('2015-01-07', tz='UTC'),
pd.Timestamp('2015-01-07', tz='UTC'),
start_val=1000)
))
asset3 = cls.asset_finder.retrieve_asset(3)
data[3] = create_minute_df_for_asset(
cls.env,
Expand Down Expand Up @@ -691,7 +715,8 @@ def test_minute_splits_and_mergers(self):
'close'
)[asset]

np.testing.assert_array_equal(np.array(range(382, 392)), window1)
np.testing.assert_array_equal(
np.array(range(4380, 4390)), window1)

# straddling the first event
window2 = self.data_portal.get_history_window(
Expand All @@ -704,7 +729,18 @@ def test_minute_splits_and_mergers(self):

# five minutes from 1/5 should be halved
np.testing.assert_array_equal(
[193.5, 194, 194.5, 195, 195.5, 392, 393, 394, 395, 396],
[2192.5,
2193,
2193.5,
2194,
2194.5,
# Split occurs. The value of the thousands place should
# match.
2000,
2001,
2002,
2003,
2004],
window2
)

Expand All @@ -717,20 +753,20 @@ def test_minute_splits_and_mergers(self):
'close'
)[asset]

# first five minutes should be 387-391, but quartered
# first five minutes should be 4385-4390, but quartered
np.testing.assert_array_equal(
[96.75, 97, 97.25, 97.5, 97.75],
[1096.25, 1096.5, 1096.75, 1097, 1097.25],
window3[0:5]
)

# next 390 minutes should be 392-781, but halved
# next 390 minutes should be 2000-2390, but halved
np.testing.assert_array_equal(
np.array(range(392, 782), dtype='float64') / 2,
np.array(range(2000, 2390), dtype='float64') / 2,
window3[5:395]
)

# final 5 minutes should be 782-787
np.testing.assert_array_equal(range(782, 787), window3[395:])
# final 5 minutes should be 1000-1004
np.testing.assert_array_equal(range(1000, 1005), window3[395:])

# after last event
window4 = self.data_portal.get_history_window(
Expand All @@ -741,8 +777,8 @@ def test_minute_splits_and_mergers(self):
'close'
)[asset]

# should not be adjusted, should be 787 to 791
np.testing.assert_array_equal(range(787, 792), window4)
# should not be adjusted, should be 1005 to 1009
np.testing.assert_array_equal(range(1005, 1010), window4)

def test_minute_dividends(self):
# self.DIVIDEND_ASSET had dividends on 1/6 and 1/7
Expand Down Expand Up @@ -823,6 +859,15 @@ def test_overnight_adjustments(self):
current_dt = pd.Timestamp('2015-01-06 8:45', tz='US/Eastern')
bar_data = BarData(self.data_portal, lambda: current_dt, 'minute')

adj_expected = {
'open': np.arange(4381, 4391) / 2.0,
'high': np.arange(4382, 4392) / 2.0,
'low': np.arange(4379, 4389) / 2.0,
'close': np.arange(4380, 4390) / 2.0,
'volume': np.arange(4380, 4390) * 100 * 2.0,
'price': np.arange(4380, 4390) / 2.0,
}

expected = {
'open': np.arange(383, 393) / 2.0,
'high': np.arange(384, 394) / 2.0,
Expand All @@ -836,23 +881,25 @@ def test_overnight_adjustments(self):
# Single field, single asset
for field in ALL_FIELDS:
values = bar_data.history(self.SPLIT_ASSET, field, 10, '1m')
np.testing.assert_array_equal(values.values, expected[field])
np.testing.assert_array_equal(values.values,
adj_expected[field],
err_msg=field)

# Multi field, single asset
values = bar_data.history(
self.SPLIT_ASSET, ['open', 'volume'], 10, '1m'
)
np.testing.assert_array_equal(values.open.values,
expected['open'])
adj_expected['open'])
np.testing.assert_array_equal(values.volume.values,
expected['volume'])
adj_expected['volume'])

# Single field, multi asset
values = bar_data.history(
[self.SPLIT_ASSET, self.ASSET2], 'open', 10, '1m'
)
np.testing.assert_array_equal(values[self.SPLIT_ASSET].values,
expected['open'])
adj_expected['open'])
np.testing.assert_array_equal(values[self.ASSET2].values,
expected['open'] * 2)

Expand All @@ -862,11 +909,11 @@ def test_overnight_adjustments(self):
)
np.testing.assert_array_equal(
values.open[self.SPLIT_ASSET].values,
expected['open']
adj_expected['open']
)
np.testing.assert_array_equal(
values.volume[self.SPLIT_ASSET].values,
expected['volume']
adj_expected['volume']
)
np.testing.assert_array_equal(
values.open[self.ASSET2].values,
Expand Down Expand Up @@ -946,8 +993,8 @@ def test_history_window_before_first_trading_day(self):
self.TRADING_START_DT
)
exp_msg = (
'History window extends before 2014-02-03. To use this history '
'window, start the backtest on or after 2014-02-04.'
'History window extends before 2014-01-03. To use this history '
'window, start the backtest on or after 2014-01-06.'
)
for field in OHLCP:
with self.assertRaisesRegexp(
Expand Down Expand Up @@ -1454,8 +1501,8 @@ def test_history_window_before_first_trading_day(self):
second_day = self.env.next_trading_day(self.TRADING_START_DT)

exp_msg = (
'History window extends before 2014-02-03. To use this history '
'window, start the backtest on or after 2014-02-07.'
'History window extends before 2014-01-03. To use this history '
'window, start the backtest on or after 2014-01-09.'
)

with self.assertRaisesRegexp(HistoryWindowStartsBeforeData, exp_msg):
Expand Down
66 changes: 61 additions & 5 deletions zipline/testing/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,9 @@ class WithBcolzDailyBarReader(WithTradingEnvironment, WithTmpDir):
BCOLZ_DAILY_BAR_READ_ALL_THRESHOLD : int
If this flag is set, use the value as the `read_all_threshold`
parameter to BcolzDailyBarReader, otherwise use the default value.
BCOLZ_DAILY_BAR_SOURCE_FROM_MINUTE : bool
If this flag is set, `make_daily_bar_data` will read data from the
minute bar reader defined by a `WithBcolzMinuteBarReader`.
Methods
-------
Expand All @@ -607,16 +610,67 @@ class WithBcolzDailyBarReader(WithTradingEnvironment, WithTmpDir):
BCOLZ_DAILY_BAR_START_DATE = alias('START_DATE')
BCOLZ_DAILY_BAR_END_DATE = alias('END_DATE')
BCOLZ_DAILY_BAR_READ_ALL_THRESHOLD = None
BCOLZ_DAILY_BAR_SOURCE_FROM_MINUTE = False
# allows WithBcolzDailyBarReaderFromCSVs to call the `write_csvs` method
# without needing to reimplement `init_class_fixtures`
_write_method_name = 'write'

@classmethod
def _make_daily_bar_from_minute(cls):
assets = cls.asset_finder.retrieve_all(cls.asset_finder.sids)
ohclv_how = {
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
# TODO: Change test data so that large minute volumes are not used,
# so that 'sum' can be used without going over the uint limit.
# When that data is changed, this function can and should be moved
# to the `data` module so that loaders and tests can use the same
# source from minute logic.
'volume': 'last'
}
mm = cls.env.market_minutes
m_opens = cls.env.open_and_closes.market_open
m_closes = cls.env.open_and_closes.market_close

for asset in assets:
first_minute = m_opens.loc[asset.start_date]
last_minute = m_closes.loc[asset.end_date]
window = cls.bcolz_minute_bar_reader.load_raw_arrays(
fields=['open', 'high', 'low', 'close', 'volume'],
start_dt=first_minute,
end_dt=last_minute,
sids=[asset.sid],
)
opens, highs, lows, closes, volumes = [c.reshape(-1)
for c in window]
minutes = mm[mm.slice_indexer(start=first_minute,
end=last_minute)]
df = pd.DataFrame(
{
'open': opens,
'high': highs,
'low': lows,
'close': closes,
'volume': volumes,
},
index=minutes
)

yield asset.sid, df.resample('1d', how=ohclv_how).dropna()

@classmethod
def make_daily_bar_data(cls):
return create_daily_bar_data(
cls.bcolz_daily_bar_days,
cls.asset_finder.sids,
)
# Requires a minute bar reader to come before in the MRO.
# Resample that data so that daily and minute bar data are aligned.
if cls.BCOLZ_DAILY_BAR_SOURCE_FROM_MINUTE:
return cls._make_daily_bar_from_minute()
else:
return create_daily_bar_data(
cls.bcolz_daily_bar_days,
cls.asset_finder.sids,
)

@classmethod
def init_class_fixtures(cls):
Expand Down Expand Up @@ -1069,7 +1123,9 @@ def run_pipeline(self, pipeline, start_date, end_date):
)


class WithDataPortal(WithBcolzMinuteBarReader, WithAdjustmentReader):
class WithDataPortal(WithAdjustmentReader,
# Ordered so that bcolz minute reader is used first.
WithBcolzMinuteBarReader):
"""
ZiplineTestCase mixin providing self.data_portal as an instance level
fixture.
Expand Down

0 comments on commit 38d6107

Please sign in to comment.