Skip to content

Commit

Permalink
Merge pull request #1323 from quantopian/pmap-blaze-query
Browse files Browse the repository at this point in the history
ENH: Adds the ability to run blaze queries concurrently
  • Loading branch information
llllllllll authored Jul 14, 2016
2 parents 90035e7 + 5473ec2 commit 835fab8
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 19 deletions.
62 changes: 43 additions & 19 deletions zipline/pipeline/loaders/blaze/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,11 @@
from toolz import (
complement,
compose,
concat,
flip,
groupby,
identity,
memoize,
merge,
)
import toolz.curried.operator as op

Expand All @@ -188,6 +188,7 @@
)
from zipline.utils.numpy_utils import bool_dtype, categorical_dtype
from zipline.utils.pandas_utils import sort_values
from zipline.utils.pool import SequentialPool
from zipline.utils.preprocess import preprocess


Expand Down Expand Up @@ -915,19 +916,40 @@ class BlazeLoader(dict):
object.
data_query_time : time, optional
The time to use for the data query cutoff.
data_query_tz : tzinfo or str
data_query_tz : tzinfo or str, optional
The timezeone to use for the data query cutoff.
pool : Pool, optional
The pool to use to run blaze queries concurrently. This object must
support ``imap_unordered``, ``apply`` and ``apply_async`` methods.
Attributes
----------
pool : Pool
The pool to use to run blaze queries concurrently. This object must
support ``imap_unordered``, ``apply`` and ``apply_async`` methods.
It is possible to change the pool after the loader has been
constructed. This allows us to set a new pool for the ``global_loader``
like: ``global_loader.pool = multiprocessing.Pool(4)``.
See Also
--------
:class:`zipline.utils.pool.SequentialPool`
:class:`multiprocessing.Pool`
"""
@preprocess(data_query_tz=optionally(ensure_timezone))
def __init__(self,
dsmap=None,
data_query_time=None,
data_query_tz=None):
data_query_tz=None,
pool=SequentialPool()):
self.update(dsmap or {})
check_data_query_args(data_query_time, data_query_tz)
self._data_query_time = data_query_time
self._data_query_tz = data_query_tz

# explicitly public
self.pool = pool

@classmethod
@memoize(cache=WeakKeyDictionary())
def global_instance(cls):
Expand All @@ -948,11 +970,11 @@ def __repr__(self):
)

def load_adjusted_array(self, columns, dates, assets, mask):
return dict(
concat(map(
return merge(
self.pool.imap_unordered(
partial(self._load_dataset, dates, assets, mask),
itervalues(groupby(getdataset, columns))
))
itervalues(groupby(getdataset, columns)),
),
)

def _load_dataset(self, dates, assets, mask, columns):
Expand Down Expand Up @@ -1022,21 +1044,22 @@ def collect_expr(e, lower):
materialized_checkpoints = pd.DataFrame(columns=colnames)
lower = None

materialized_expr = collect_expr(expr, lower)
materialized_expr = self.pool.apply_async(collect_expr, (expr, lower))
materialized_deltas = (
self.pool.apply(collect_expr, (deltas, lower))
if deltas is not None else
pd.DataFrame(columns=colnames)
)

if materialized_checkpoints is not None:
materialized_expr = pd.concat(
(
materialized_checkpoints,
materialized_expr,
materialized_expr.get(),
),
ignore_index=True,
copy=False,
)
materialized_deltas = (
collect_expr(deltas, lower)
if deltas is not None else
pd.DataFrame(columns=colnames)
)

# It's not guaranteed that assets returned by the engine will contain
# all sids from the deltas table; filter out such mismatches here.
Expand Down Expand Up @@ -1147,23 +1170,24 @@ def last_in_date_group(df, reindex, have_sids=have_sids):
shape=(len(mask), 1), fill_value=True, dtype=bool_dtype,
)

for column_idx, column in enumerate(columns):
column_name = column.name
yield column, AdjustedArray(
return {
column: AdjustedArray(
column_view(
dense_output[column_name].values.astype(column.dtype),
dense_output[column.name].values.astype(column.dtype),
),
mask,
adjustments_from_deltas(
dates,
sparse_output[TS_FIELD_NAME].values,
column_idx,
column_name,
column.name,
asset_idx,
sparse_deltas,
),
column.missing_value,
)
for column_idx, column in enumerate(columns)
}

global_loader = BlazeLoader.global_instance()

Expand Down
16 changes: 16 additions & 0 deletions zipline/utils/functional.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,3 +330,19 @@ def decorator(f):
# Example:
with_name = set_attribute('__name__')
with_doc = set_attribute('__doc__')


def let(a):
"""Box a value to be bound in a for binding.
Examples
--------
.. code-block:: python
[f(y, y) for x in xs for y in let(g(x)) if p(y)]
Here, ``y`` is available in both the predicate and the expression
of the comprehension. We can see that this allows us to cache the work
of computing ``g(x)`` even within the expression.
"""
return a,
136 changes: 136 additions & 0 deletions zipline/utils/pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
from six.moves import map as imap
from toolz import compose, identity


class ApplyAsyncResult(object):
"""An object that boxes results for calls to
:meth:`~zipline.utils.pool.SequentialPool.apply_async`.
Parameters
----------
value : any
The result of calling the function, or any exception that was raised.
successful : bool
If ``True``, ``value`` is the return value of the function.
If ``False``, ``value`` is the exception that was raised when calling
the functions.
"""
def __init__(self, value, successful):
self._value = value
self._successful = successful

def successful(self):
"""Did the function execute without raising an exception?
"""
return self._successful

def get(self):
"""Return the result of calling the function or reraise any exceptions
that were raised.
"""
if not self._successful:
raise self._value
return self._value

def ready(self):
"""Has the function finished executing.
Notes
-----
In the :class:`~zipline.utils.pool.SequentialPool` case, this is always
``True``.
"""
return True

def wait(self):
"""Wait until the function is finished executing.
Notes
-----
In the :class:`~zipline.utils.pool.SequentialPool` case, this is a nop
because the function is computed eagerly in the same thread as the
call to :meth:`~zipline.utils.pool.SequentialPool.apply_async`.
"""
pass


class SequentialPool(object):
"""A dummy pool object that iterates sequentially in a single thread.
Methods
-------
map(f: callable[A, B], iterable: iterable[A]) -> list[B]
Apply a function to each of the elements of ``iterable``.
imap(f: callable[A, B], iterable: iterable[A]) -> iterable[B]
Lazily apply a function to each of the elements of ``iterable``.
imap_unordered(f: callable[A, B], iterable: iterable[A]) -> iterable[B]
Lazily apply a function to each of the elements of ``iterable`` but
yield values as they become available. The resulting iterable is
unordered.
Notes
-----
This object is useful for testing to mock out the ``Pool`` interface
provided by gevent or multiprocessing.
See Also
--------
:class:`multiprocessing.Pool`
"""
map = staticmethod(compose(list, imap))
imap = imap_unordered = staticmethod(imap)

@staticmethod
def apply_async(f, args=(), kwargs=None, callback=None):
"""Apply a function but emulate the API of an asynchronous call.
Parameters
----------
f : callable
The function to call.
args : tuple, optional
The positional arguments.
kwargs : dict, optional
The keyword arguments.
Returns
-------
future : ApplyAsyncResult
The result of calling the function boxed in a future-like api.
Notes
-----
This calls the function eagerly but wraps it so that ``SequentialPool``
can be used where a :class:`multiprocessing.Pool` or
:class:`gevent.pool.Pool` would be used.
"""
try:
value = (identity if callback is None else callback)(
f(*args, **kwargs or {}),
)
successful = True
except Exception as e:
value = e
successful = False

return ApplyAsyncResult(value, successful)

@staticmethod
def apply(f, args=(), kwargs=None):
"""Apply a function.
Parameters
----------
f : callable
The function to call.
args : tuple, optional
The positional arguments.
kwargs : dict, optional
The keyword arguments.
Returns
-------
result : any
f(*args, **kwargs)
"""
return f(*args, **kwargs or {})

0 comments on commit 835fab8

Please sign in to comment.