Skip to content

Commit

Permalink
Merge pull request #1301 from quantopian/blaze-loader-single-columns
Browse files Browse the repository at this point in the history
Return column vector for datasets with no sids
  • Loading branch information
dmichalowicz authored Jul 1, 2016
2 parents d6c1c5f + d8e9fa9 commit 459366c
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 141 deletions.
216 changes: 97 additions & 119 deletions tests/pipeline/test_blaze.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from toolz.curried import operator as op

from zipline.assets.synthetic import make_simple_equity_info
from zipline.errors import UnsupportedPipelineOutput
from zipline.pipeline import Pipeline, CustomFactor
from zipline.pipeline.data import DataSet, BoundColumn, Column
from zipline.pipeline.engine import SimplePipelineEngine
Expand All @@ -38,12 +39,9 @@
tmp_asset_finder,
)
from zipline.testing.fixtures import WithAssetFinder
from zipline.utils.numpy_utils import (
float64_dtype,
int64_dtype,
repeat_last_axis,
)
from zipline.testing.predicates import assert_equal, assert_isidentical
from zipline.utils.numpy_utils import float64_dtype, int64_dtype


nameof = op.attrgetter('name')
dtypeof = op.attrgetter('dtype')
Expand Down Expand Up @@ -778,6 +776,44 @@ def _test_id(self, df, dshape, expected, finder, add):
check_dtype=False,
)

def _test_id_macro(self, df, dshape, expected, finder, add):
dates = self.dates
expr = bz.data(df, name='expr', dshape=dshape)
loader = BlazeLoader()
ds = from_blaze(
expr,
loader=loader,
no_deltas_rule='ignore',
missing_values=self.missing_values,
)

p = Pipeline()
macro_inputs = []
for column_name in add:
column = getattr(ds, column_name)
macro_inputs.append(column)
with self.assertRaises(UnsupportedPipelineOutput):
# Single column output terms cannot be added to a pipeline.
p.add(column.latest, column_name)

class UsesMacroInputs(CustomFactor):
inputs = macro_inputs
window_length = 1

def compute(self, today, assets, out, *inputs):
e = expected.loc[today]
for i, input_ in enumerate(inputs):
# Each macro input should only have one column.
assert input_.shape == (self.window_length, 1)
assert_equal(input_[0, 0], e[i])

# Run the pipeline with our custom factor. Assertions about the
# expected macro data are made in the `compute` function of our custom
# factor above.
p.add(UsesMacroInputs(), 'uses_macro_inputs')
engine = SimplePipelineEngine(loader, dates, finder)
engine.run_pipeline(p, dates[0], dates[-1])

def test_custom_query_time_tz(self):
df = self.df.copy()
df['timestamp'] = (
Expand Down Expand Up @@ -972,27 +1008,19 @@ def test_id_macro_dataset(self):
6 2014-01-03 2014-01-03 2
output (expected):
value
2014-01-01 Equity(65 [A]) 0
Equity(66 [B]) 0
Equity(67 [C]) 0
2014-01-02 Equity(65 [A]) 1
Equity(66 [B]) 1
Equity(67 [C]) 1
2014-01-03 Equity(65 [A]) 2
Equity(66 [B]) 2
Equity(67 [C]) 2
value
2014-01-01 0
2014-01-02 1
2014-01-03 2
"""
nassets = len(simple_asset_info)
expected = pd.DataFrame(
list(concatv([0] * nassets, [1] * nassets, [2] * nassets)),
index=pd.MultiIndex.from_product((
self.macro_df.timestamp,
self.asset_finder.retrieve_all(simple_asset_info.index),
)),
columns=('value',),
data=[[0],
[1],
[2]],
columns=['value'],
index=self.dates,
)
self._test_id(
self._test_id_macro(
self.macro_df,
self.macro_dshape,
expected,
Expand All @@ -1009,16 +1037,10 @@ def test_id_ffill_out_of_window_macro_dataset(self):
2 2013-12-24 2013-12-24 NaN NaN
output (expected):
other value
2014-01-01 Equity(65 [A]) 1 0
Equity(66 [B]) 1 0
Equity(67 [C]) 1 0
2014-01-02 Equity(65 [A]) 1 0
Equity(66 [B]) 1 0
Equity(67 [C]) 1 0
2014-01-03 Equity(65 [A]) 1 0
Equity(66 [B]) 1 0
Equity(67 [C]) 1 0
other value
2014-01-01 1 0
2014-01-02 1 0
2014-01-03 1 0
"""
dates = self.dates - timedelta(days=10)
df = pd.DataFrame({
Expand All @@ -1031,23 +1053,13 @@ def test_id_ffill_out_of_window_macro_dataset(self):
fields['other'] = fields['value']

expected = pd.DataFrame(
np.array([[0, 1],
[0, 1],
[0, 1],
[0, 1],
[0, 1],
[0, 1],
[0, 1],
[0, 1],
[0, 1]]),
columns=['value', 'other'],
index=pd.MultiIndex.from_product(
(self.dates, self.asset_finder.retrieve_all(
self.ASSET_FINDER_EQUITY_SIDS
)),
),
).sort_index(axis=1)
self._test_id(
data=[[0, 1],
[0, 1],
[0, 1]],
columns=['other', 'value'],
index=self.dates,
)
self._test_id_macro(
df,
var * Record(fields),
expected,
Expand All @@ -1064,16 +1076,10 @@ def test_id_macro_dataset_multiple_columns(self):
6 2014-01-03 2014-01-03 3 2
output (expected):
other value
2014-01-01 Equity(65 [A]) 1 0
Equity(66 [B]) 1 0
Equity(67 [C]) 1 0
2014-01-02 Equity(65 [A]) 2 1
Equity(66 [B]) 2 1
Equity(67 [C]) 2 1
2014-01-03 Equity(65 [A]) 3 2
Equity(66 [B]) 3 2
Equity(67 [C]) 3 2
other value
2014-01-01 1 0
2014-01-02 2 1
2014-01-03 3 2
"""
df = self.macro_df.copy()
df['other'] = df.value + 1
Expand All @@ -1082,16 +1088,14 @@ def test_id_macro_dataset_multiple_columns(self):

with tmp_asset_finder(equities=simple_asset_info) as finder:
expected = pd.DataFrame(
np.array([[0, 1],
[1, 2],
[2, 3]]).repeat(3, axis=0),
index=pd.MultiIndex.from_product((
df.timestamp,
finder.retrieve_all(simple_asset_info.index),
)),
columns=('value', 'other'),
).sort_index(axis=1)
self._test_id(
data=[[0, 1],
[1, 2],
[2, 3]],
columns=['value', 'other'],
index=self.dates,
dtype=np.float64,
)
self._test_id_macro(
df,
var * Record(fields),
expected,
Expand Down Expand Up @@ -1158,16 +1162,10 @@ def test_id_take_last_in_group_macro(self):
"""
output (expected):
other value
2014-01-01 Equity(65 [A]) NaN 1
Equity(66 [B]) NaN 1
Equity(67 [C]) NaN 1
2014-01-02 Equity(65 [A]) 1 2
Equity(66 [B]) 1 2
Equity(67 [C]) 1 2
2014-01-03 Equity(65 [A]) 2 2
Equity(66 [B]) 2 2
Equity(67 [C]) 2 2
other value
2014-01-01 NaN 1
2014-01-02 1 2
2014-01-03 2 2
"""
T = pd.Timestamp
df = pd.DataFrame(
Expand All @@ -1185,32 +1183,18 @@ def test_id_take_last_in_group_macro(self):
fields['other'] = fields['value']

expected = pd.DataFrame(
columns=[
'other', 'value',
],
data=[
[np.nan, 1], # 2014-01-01 Equity(65 [A])
[np.nan, 1], # Equity(66 [B])
[np.nan, 1], # Equity(67 [C])
[1, 2], # 2014-01-02 Equity(65 [A])
[1, 2], # Equity(66 [B])
[1, 2], # Equity(67 [C])
[2, 2], # 2014-01-03 Equity(65 [A])
[2, 2], # Equity(66 [B])
[2, 2], # Equity(67 [C])
],
index=pd.MultiIndex.from_product(
(self.dates, self.asset_finder.retrieve_all(
self.ASSET_FINDER_EQUITY_SIDS
)),
),
data=[[np.nan, 1], # 2014-01-01
[1, 2], # 2014-01-02
[2, 2]], # 2014-01-03
columns=['other', 'value'],
index=self.dates,
)
self._test_id(
self._test_id_macro(
df,
var * Record(fields),
expected,
self.asset_finder,
('value', 'other'),
('other', 'value'),
)

def _run_pipeline(self,
Expand Down Expand Up @@ -1400,8 +1384,10 @@ def test_deltas_macro(self):

nassets = len(simple_asset_info)
expected_views = keymap(pd.Timestamp, {
'2014-01-02': repeat_last_axis(np.array([10.0, 1.0]), nassets),
'2014-01-03': repeat_last_axis(np.array([11.0, 2.0]), nassets),
'2014-01-02': np.array([[10.0],
[1.0]]),
'2014-01-03': np.array([[11.0],
[2.0]]),
})

with tmp_asset_finder(equities=simple_asset_info) as finder:
Expand Down Expand Up @@ -1523,14 +1509,12 @@ def test_novel_deltas_macro(self):

nassets = len(simple_asset_info)
expected_views = keymap(pd.Timestamp, {
'2014-01-03': repeat_last_axis(
np.array([10.0, 10.0, 10.0]),
nassets,
),
'2014-01-06': repeat_last_axis(
np.array([10.0, 10.0, 11.0]),
nassets,
),
'2014-01-03': np.array([[10.0],
[10.0],
[10.0]]),
'2014-01-06': np.array([[10.0],
[10.0],
[11.0]]),
})

cal = pd.DatetimeIndex([
Expand Down Expand Up @@ -1586,14 +1570,8 @@ def _test_checkpoints_macro(self, checkpoints, ffilled_value=-1.0):

nassets = len(simple_asset_info)
expected_views = keymap(pd.Timestamp, {
'2014-01-03': repeat_last_axis(
np.array([ffilled_value]),
nassets,
),
'2014-01-04': repeat_last_axis(
np.array([1.0]),
nassets,
),
'2014-01-03': np.array([[ffilled_value]]),
'2014-01-04': np.array([[1.0]]),
})

with tmp_asset_finder(equities=simple_asset_info) as finder:
Expand Down
3 changes: 3 additions & 0 deletions zipline/pipeline/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def __new__(cls, dtype, missing_value, dataset, name):
missing_value=missing_value,
dataset=dataset,
name=name,
ndim=dataset.ndim,
)

def _init(self, dataset, name, *args, **kwargs):
Expand Down Expand Up @@ -176,6 +177,7 @@ def latest(self):
inputs=(self,),
dtype=dtype,
missing_value=self.missing_value,
ndim=self.ndim,
)

def __repr__(self):
Expand Down Expand Up @@ -227,3 +229,4 @@ def __repr__(self):

class DataSet(with_metaclass(DataSetMeta, object)):
domain = None
ndim = 2
Loading

0 comments on commit 459366c

Please sign in to comment.