Skip to content

Commit

Permalink
ENH: condense the blaze query
Browse files Browse the repository at this point in the history
  • Loading branch information
Joe Jevnik committed Feb 18, 2016
1 parent 62020f6 commit 2036f3d
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 48 deletions.
5 changes: 3 additions & 2 deletions etc/requirements_blaze.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
-e git://github.com/quantopian/blaze.git@831116adba808b89f42cef48c7d96cc44603d05a#egg=blaze-dev
-e git://github.com/quantopian/odo.git@4f7f45fb039d89ea101803b95da21fc055901d66#egg=odo-dev
-e git://github.com/quantopian/datashape.git@9bd8fb970a0fc55e866a0b46b5101c9aa47e24ed#egg=datashape-dev
-e git://github.com/quantopian/odo.git@4f7f45fb039d89ea101803b95da21fc055901d66#egg=odo-dev
-e git://github.com/quantopian/blaze.git@0b6e76122a57c7115f18c6fdbd5fbab5501fd486#egg=blaze-dev

37 changes: 26 additions & 11 deletions tests/pipeline/test_blaze.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from nose_parameterized import parameterized
import numpy as np
from numpy.testing.utils import assert_array_almost_equal
from odo import odo
import pandas as pd
from pandas.util.testing import assert_frame_equal
from toolz import keymap, valmap, concatv
Expand Down Expand Up @@ -845,11 +846,18 @@ def compute(self, today, assets, out, data):
@with_extra_sid
def test_deltas(self, asset_info):
expr = bz.Data(self.df, name='expr', dshape=self.dshape)
deltas = bz.Data(self.df, name='deltas', dshape=self.dshape)
deltas = bz.transform(
deltas,
value=deltas.value + 10,
timestamp=deltas.timestamp + timedelta(days=1),
deltas = bz.Data(self.df, dshape=self.dshape)
deltas = bz.Data(
odo(
bz.transform(
deltas,
value=deltas.value + 10,
timestamp=deltas.timestamp + timedelta(days=1),
),
pd.DataFrame,
),
name='delta',
dshape=self.dshape,
)

expected_views = keymap(pd.Timestamp, {
Expand Down Expand Up @@ -996,16 +1004,23 @@ def test_novel_deltas(self, asset_info):
repeated_dates = base_dates.repeat(3)
baseline = pd.DataFrame({
'sid': self.sids * 2,
'value': (0, 1, 2, 1, 2, 3),
'value': (0., 1., 2., 1., 2., 3.),
'int_value': (0, 1, 2, 1, 2, 3),
'asof_date': repeated_dates,
'timestamp': repeated_dates,
})
expr = bz.Data(baseline, name='expr', dshape=self.dshape)
deltas = bz.Data(baseline, name='deltas', dshape=self.dshape)
deltas = bz.transform(
deltas,
value=deltas.value + 10,
timestamp=deltas.timestamp + timedelta(days=1),
deltas = bz.Data(
odo(
bz.transform(
expr,
value=expr.value + 10,
timestamp=expr.timestamp + timedelta(days=1),
),
pd.DataFrame,
),
name='delta',
dshape=self.dshape,
)
expected_views = keymap(pd.Timestamp, {
'2014-01-03': np.array([[10.0, 11.0, 12.0],
Expand Down
70 changes: 35 additions & 35 deletions zipline/pipeline/loaders/blaze/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -872,50 +872,54 @@ def _load_dataset(self, dates, assets, mask, columns):
data_query_tz,
)

def where(e, column):
def where(e):
"""Create the query to run against the resources.
Parameters
----------
e : Expr
The baseline or deltas expression.
column : BoundColumn
The column to query for.
Returns
-------
q : Expr
The query to run for the given column.
The query to run.
"""
colname = column.name
pred = e[TS_FIELD_NAME] <= lower_dt
schema = e[colname].schema.measure
if isinstance(schema, Option):
pred &= e[colname].notnull()
schema = schema.ty
if schema in floating:
pred &= ~e[colname].isnan()
filtered = e[pred]
lower = filtered.timestamp.max()

if have_sids:
# If we have sids, then we need to take the earliest of the
# greatest date that has a non-null value by sid.
lower = bz.by(
filtered[SID_FIELD_NAME],
timestamp=lower,
).timestamp.min()

lower = odo(lower, pd.Timestamp)
def lower_for_col(column):
pred = e[TS_FIELD_NAME] <= lower_dt
colname = column.name
schema = e[colname].schema.measure
if isinstance(schema, Option):
pred &= e[colname].notnull()
schema = schema.ty
if schema in floating:
pred &= ~e[colname].isnan()

filtered = e[pred]
lower = filtered[TS_FIELD_NAME].max()
if have_sids:
# If we have sids, then we need to take the earliest of the
# greatest date that has a non-null value by sid.
lower = bz.by(
filtered[SID_FIELD_NAME],
timestamp=lower,
).timestamp.min()
return lower

lower = odo(
reduce(
bz.least,
map(lower_for_col, columns),
),
pd.Timestamp,
**odo_kwargs
)
if lower is pd.NaT:
# If there is no lower date, just query for data in he date
# range. It must all be null anyways.
lower = lower_dt

return e[
(e[TS_FIELD_NAME] >= lower) &
(e[TS_FIELD_NAME] <= upper_dt)
][added_query_fields + [colname]]
][added_query_fields + list(map(getname, columns))]

def collect_expr(e):
"""Execute and merge all of the per-column subqueries.
Expand All @@ -935,13 +939,9 @@ def collect_expr(e):
This can return more data than needed. The in memory reindex will
handle this.
"""
return sort_values(reduce(
partial(pd.merge, on=added_query_fields, how='outer'),
(
odo(where(e, column), pd.DataFrame, **odo_kwargs)
for column in columns
),
), TS_FIELD_NAME) # sort for the groupby later
df = odo(where(e), pd.DataFrame, **odo_kwargs)
df.sort(TS_FIELD_NAME, inplace=True) # sort for the groupby later
return df

materialized_expr = collect_expr(expr)
materialized_deltas = (
Expand Down

0 comments on commit 2036f3d

Please sign in to comment.