Skip to content

Commit

Permalink
Merge remote-tracking branch 'sjperkins/tree-reduction'
Browse files Browse the repository at this point in the history
  • Loading branch information
o-smirnov committed Apr 22, 2020
2 parents dba9080 + 7d381ef commit cb984d4
Showing 1 changed file with 53 additions and 9 deletions.
62 changes: 53 additions & 9 deletions datashader/data_libraries/dask.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from __future__ import absolute_import, division

import dask
import dask.array as da
import dask.dataframe as dd
from collections import OrderedDict
from dask.base import tokenize, compute
import numpy as np

from datashader.core import bypixel
from datashader.compatibility import apply
Expand All @@ -21,12 +23,15 @@ def dask_pipeline(df, schema, canvas, glyph, summary, cuda=False):
# Get user configured scheduler (if any), or fall back to default
# scheduler for dask DataFrame
scheduler = dask.base.get_scheduler() or df.__dask_scheduler__

if isinstance(dsk, da.Array):
return da.compute(dsk, scheduler=scheduler)[0]

keys = df.__dask_keys__()
optimize = df.__dask_optimize__
graph = df.__dask_graph__()

dsk.update(optimize(graph, keys))

return scheduler(dsk, name)


Expand Down Expand Up @@ -70,18 +75,57 @@ def default(glyph, df, schema, canvas, summary, cuda=False):
y_mapper = canvas.y_axis.mapper
extend = glyph._build_extend(x_mapper, y_mapper, info, append)

def chunk(df):
def chunk(df, axis, keepdims):
aggs = create(shape)
extend(aggs, df, st, bounds)
return aggs

name = tokenize(df.__dask_tokenize__(), canvas, glyph, summary)
keys = df.__dask_keys__()
keys2 = [(name, i) for i in range(len(keys))]
dsk = dict((k2, (chunk, k)) for (k2, k) in zip(keys2, keys))
dsk[name] = (apply, finalize, [(combine, keys2)],
dict(cuda=cuda, coords=axis, dims=[glyph.y_label, glyph.x_label]))
return dsk, name
# Get the dataframe graph
graph = df.__dask_graph__()
# Get the topmost layer representing the creation of the dataframe
df_layer = graph.layers[df._name]
# Create a chunks tuple, a singleton for each dataframe chunk
chunks = (tuple(1 for _ in range(len(df_layer))),)
# Guess output dtype from combination of dataframe dtypes
dtype = np.result_type(*df.dtypes)
# Create a meta object
meta = np.empty((0,), dtype=dtype)

# Now create a dask array from the dataframe graph layer
df_array = da.Array(graph, df._name, chunks, meta=meta)

def wrapped_combine(x, axis, keepdims):
""" wrap datashader combine in dask.array.reduction combine """
if isinstance(x, list):
# list of tuples of ndarrays
# assert all(isinstance(item, tuple) and
# len(item) == 1 and
# isinstance(item[0], np.ndarray)
# for item in x)
return combine(x)
elif isinstance(x, tuple):
# tuple with single ndarray
# assert len(x) == 1 and isinstance(x[0], np.ndarray)
return x
else:
raise TypeError("Unknown type %s in wrapped_combine" % type(x))

local_ax = axis

def wrapped_finalize(x, axis, keepdims):
return finalize(wrapped_combine(x, axis, keepdims),
cuda=cuda, coords=local_ax,
dims=[glyph.y_label, glyph.x_label])

R = da.reduction(df_array,
aggregate=wrapped_finalize,
chunk=chunk,
combine=wrapped_combine,
concatenate=False,
meta=meta,
dtype=meta.dtype)

return R, R.name


@glyph_dispatch.register(LineAxis0)
Expand Down

0 comments on commit cb984d4

Please sign in to comment.