Skip to content

Commit

Permalink
Merge pull request #1227 from quantopian/blaze-loader-perf
Browse files Browse the repository at this point in the history
ENH: improve performance of blaze core loader
  • Loading branch information
llllllllll committed Jun 3, 2016
2 parents b4892d3 + 9448117 commit cf1687e
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 41 deletions.
6 changes: 3 additions & 3 deletions etc/requirements_blaze.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
-e git://github.com/quantopian/datashape.git@9bd8fb970a0fc55e866a0b46b5101c9aa47e24ed#egg=datashape-dev
-e git://github.com/quantopian/odo.git@4f7f45fb039d89ea101803b95da21fc055901d66#egg=odo-dev
-e git://github.com/quantopian/blaze.git@9c3fa1327236f777ca112a5bd8c3bb7e442d1052#egg=blaze-dev
-e git://github.com/quantopian/datashape.git@bf06a41dc0908baf7c324aeacadba8820468ee78#egg=datashape-dev
-e git://github.com/quantopian/odo.git@9e16310b5f2c3f05162145200db7e7908f0a866e#egg=odo-dev
-e git://github.com/quantopian/blaze.git@8921fdd00bb040c61457937902036de5c404b6f3#egg=blaze-dev
37 changes: 2 additions & 35 deletions zipline/pipeline/loaders/blaze/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
from abc import ABCMeta, abstractproperty
from collections import namedtuple, defaultdict
from copy import copy
from functools import partial, reduce
from functools import partial
from itertools import count
import warnings
from weakref import WeakKeyDictionary
Expand All @@ -137,7 +137,6 @@
Date,
DateTime,
Option,
floating,
isrecord,
isscalar,
)
Expand Down Expand Up @@ -904,44 +903,12 @@ def where(e):
q : Expr
The query to run.
"""
def lower_for_col(column):
pred = e[TS_FIELD_NAME] <= lower_dt
colname = column.name
schema = e[colname].schema.measure
if isinstance(schema, Option):
pred &= e[colname].notnull()
schema = schema.ty
if schema in floating:
pred &= ~e[colname].isnan()

filtered = e[pred]
lower = filtered[TS_FIELD_NAME].max()
if have_sids:
# If we have sids, then we need to take the earliest of the
# greatest date that has a non-null value by sid.
lower = bz.by(
filtered[SID_FIELD_NAME],
timestamp=lower,
).timestamp.min()
return lower

lower = odo(
reduce(
bz.least,
map(lower_for_col, columns),
),
pd.Timestamp,
**odo_kwargs
)
if lower is pd.NaT:
lower = lower_dt
return e[
(e[TS_FIELD_NAME] >= lower) &
(e[TS_FIELD_NAME] <= upper_dt)
][added_query_fields + list(map(getname, columns))]

def collect_expr(e):
"""Execute and merge all of the per-column subqueries.
"""Materialize the expression as a dataframe.
Parameters
----------
Expand Down
11 changes: 10 additions & 1 deletion zipline/pipeline/loaders/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from six.moves import zip

from zipline.utils.numpy_utils import categorical_dtype, NaTns
from zipline.utils.pandas_utils import mask_between_time


def next_event_frame(events_by_sid,
Expand Down Expand Up @@ -209,6 +210,9 @@ def normalize_data_query_bounds(lower, upper, time, tz):
return lower, upper


_midnight = datetime.time(0, 0)


def normalize_timestamp_to_query_time(df,
time,
tz,
Expand Down Expand Up @@ -246,7 +250,12 @@ def normalize_timestamp_to_query_time(df,

dtidx = pd.DatetimeIndex(df.loc[:, ts_field], tz='utc')
dtidx_local_time = dtidx.tz_convert(tz)
to_roll_forward = dtidx_local_time.time >= time
to_roll_forward = mask_between_time(
dtidx_local_time,
time,
_midnight,
include_end=False,
)
# for all of the times that are greater than our query time add 1
# day and truncate to the date
df.loc[to_roll_forward, ts_field] = (
Expand Down
93 changes: 91 additions & 2 deletions zipline/utils/pandas_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""
Utilities for working with pandas objects.
"""
from itertools import product
import operator as op

import pandas as pd


Expand All @@ -15,6 +18,92 @@ def explode(df):

try:
# pandas 0.16 compat
sort_values = pd.DataFrame.sort_values
_df_sort_values = pd.DataFrame.sort_values
_series_sort_values = pd.Series.sort_values
except AttributeError:
sort_values = pd.DataFrame.sort
_df_sort_values = pd.DataFrame.sort
_series_sort_values = pd.Series.sort


def sort_values(ob, *args, **kwargs):
if isinstance(ob, pd.DataFrame):
return _df_sort_values(ob, *args, **kwargs)
elif isinstance(ob, pd.Series):
return _series_sort_values(ob, *args, **kwargs)
raise ValueError(
'sort_values expected a dataframe or series, not %s: %r' % (
type(ob).__name__, ob,
),
)


def _time_to_micros(time):
"""Convert a time into microseconds since midnight.
Parameters
----------
time : datetime.time
The time to convert.
Returns
-------
us : int
The number of microseconds since midnight.
Notes
-----
This does not account for leap seconds or daylight savings.
"""
seconds = time.hour * 60 * 60 + time.minute * 60 + time.second
return 1000000 * seconds + time.microsecond


_opmap = dict(zip(
product((True, False), repeat=3),
product((op.le, op.lt), (op.le, op.lt), (op.and_, op.or_)),
))


def mask_between_time(dts, start, end, include_start=True, include_end=True):
"""Return a mask of all of the datetimes in ``dts`` that are between
``start`` and ``end``.
Parameters
----------
dts : pd.DatetimeIndex
The index to mask.
start : time
Mask away times less than the start.
end : time
Mask away times greater than the end.
include_start : bool, optional
Inclusive on ``start``.
include_end : bool, optional
Inclusive on ``end``.
Returns
-------
mask : np.ndarray[bool]
A bool array masking ``dts``.
See Also
--------
:meth:`pandas.DatetimeIndex.indexer_between_time`
"""
# This function is adapted from
# `pandas.Datetime.Index.indexer_between_time` which was originally
# written by Wes McKinney, Chang She, and Grant Roch.
time_micros = dts._get_time_micros()
start_micros = _time_to_micros(start)
end_micros = _time_to_micros(end)

left_op, right_op, join_op = _opmap[
bool(include_start),
bool(include_end),
start_micros <= end_micros,
]

return join_op(
left_op(start_micros, time_micros),
right_op(time_micros, end_micros),
)

0 comments on commit cf1687e

Please sign in to comment.