Skip to content

Commit

Permalink
Dask trimesh support (holoviz#696)
Browse files Browse the repository at this point in the history
* When calling datashader.utils.mesh with dask dataframes, return a dask dataframe. Computing the mesh still requires bringing the entire vertices/simplices dataframes into memory, but the resulting mesh is now a Dask dataframe with partitions that are chosen intentionally to not cause triangles to straddle partitions.

* Add documentation note about parallelizing trimesh aggregation with Dask
  • Loading branch information
jonmmease authored Feb 7, 2019
1 parent fb01a23 commit 030fd10
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 12 deletions.
1 change: 0 additions & 1 deletion .appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ environment:
install:
- "SET PATH=%CONDA%;%CONDA%\\Scripts;%PATH%"
- "conda install -y -c pyviz pyctdev && doit ecosystem_setup"
- conda install -y "conda<4.6"
- "doit env_create %CHANNELS% --name=test --python=%PY%"
- "activate test"
- "doit develop_install %CHANNELS%"
Expand Down
3 changes: 2 additions & 1 deletion datashader/glyphs.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ def __init__(self, x, y, z=None, weight_type=True, interp=True):

@property
def inputs(self):
return tuple([self.x, self.y] + list(self.z))
return (tuple([self.x, self.y] + list(self.z)) +
(self.weight_type, self.interpolate))

def validate(self, in_dshape):
for col in [self.x, self.y] + list(self.z):
Expand Down
52 changes: 52 additions & 0 deletions datashader/tests/test_dask.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from __future__ import division
from dask.context import config
import dask.dataframe as dd
import numpy as np
import pandas as pd
import xarray as xr

import datashader as ds
import datashader.utils as du

import pytest

Expand Down Expand Up @@ -476,3 +478,53 @@ def test_trimesh_no_double_edge():
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
], dtype='i4')
np.testing.assert_array_equal(np.flipud(agg.fillna(0).astype('i4').values)[:5], sol)


@pytest.mark.parametrize('npartitions', list(range(1, 6)))
def test_trimesh_dask_partitions(npartitions):
"""Assert that when two triangles share an edge that would normally get
double-drawn, the edge is only drawn for the rightmost (or bottommost)
triangle.
"""
# Test left/right edge shared
verts = dd.from_pandas(pd.DataFrame({'x': [4, 1, 5, 5, 5, 4],
'y': [4, 5, 5, 5, 4, 4]}),
npartitions=npartitions)
tris = dd.from_pandas(
pd.DataFrame(
{'v0': [0, 3], 'v1': [1, 4], 'v2': [2, 5], 'val': [1, 2]}),
npartitions=npartitions)

cvs = ds.Canvas(plot_width=20, plot_height=20,
x_range=(0, 5), y_range=(0, 5))

# Precompute mesh with dask dataframes
mesh = du.mesh(verts, tris)

# Make sure mesh is a dask DataFrame
assert isinstance(mesh, dd.DataFrame)

# Check mesh length
n = len(mesh)
assert n == 6

# Make sure we have expected number of partitions
expected_chunksize = int(np.ceil(len(mesh) / (3*npartitions)) * 3)
expected_npartitions = int(np.ceil(n / expected_chunksize))
assert expected_npartitions == mesh.npartitions

# Make sure triangles don't straddle partitions
partitions_lens = mesh.map_partitions(len).compute()
for partitions_len in partitions_lens:
assert partitions_len % 3 == 0

agg = cvs.trimesh(verts, tris, mesh)
sol = np.array([
[0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 2, 2, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
], dtype='i4')
np.testing.assert_array_equal(
np.flipud(agg.fillna(0).astype('i4').values)[:5], sol)
18 changes: 8 additions & 10 deletions datashader/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,18 +450,16 @@ def _dd_mesh(vertices, simplices):
Dask DataFrame objects.
"""
# Construct mesh by indexing into vertices with simplex indices
# TODO: For dask: avoid .compute() calls, and add winding auto-detection
vertex_idxs = simplices.values[:, :3].astype(np.int64)
vals = vertices.values.compute()[vertex_idxs]
vals = vals.reshape(np.prod(vals.shape[:2]), vals.shape[2])
res = pd.DataFrame(vals, columns=vertices.columns)
# TODO: For dask: avoid .compute() calls
res = _pd_mesh(vertices.compute(), simplices.compute())

# If vertices don't have weights, use simplex weights
verts_have_weights = len(vertices.columns) > 2
if not verts_have_weights:
weight_col = simplices.columns[3]
res[weight_col] = simplices.values[:, 3].compute().repeat(3)
# Compute a chunksize that will not split the vertices of a single
# triangle across partitions
approx_npartitions = max(vertices.npartitions, simplices.npartitions)
chunksize = int(np.ceil(len(res) / (3*approx_npartitions)) * 3)

# Create dask dataframe
res = dd.from_pandas(res, chunksize=chunksize)
return res


Expand Down
32 changes: 32 additions & 0 deletions examples/user_guide/6_Trimesh.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
"import numpy as np, datashader as ds, pandas as pd\n",
"import datashader.utils as du, datashader.transfer_functions as tf\n",
"from scipy.spatial import Delaunay\n",
"import dask.dataframe as dd\n",
"\n",
"n = 10\n",
"np.random.seed(2)\n",
Expand Down Expand Up @@ -345,6 +346,37 @@
" tf.shade(cvs.trimesh(verts, tris, mesh=mesh, agg=ds.std('z')), name='std')).cols(3)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Parallelizing trimesh aggregation with Dask\n",
"The trimesh aggregation process can be parallelized by providing `du.mesh` and `Canvas.trimesh` with partitioned Dask dataframes.\n",
"\n",
"**Note:** While the calls to `Canvas.trimesh` will be parallelized across the partitions of the Dask dataframe, the construction of the partitioned mesh using `du.mesh` is not currently parallelized. Furthermore, it currently requires loading the entire `verts` and `tris` dataframes into memory in order to construct the partitioned mesh. Because of these constraints, this approach is most useful for the repeated aggregation of large meshes that fit in memory on a single multicore machine."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"verts_ddf = dd.from_pandas(verts, npartitions=4)\n",
"tris_ddf = dd.from_pandas(tris, npartitions=4)\n",
"mesh_ddf = du.mesh(verts_ddf, tris_ddf)\n",
"mesh_ddf"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"tf.shade(cvs.trimesh(verts_ddf, tris_ddf, mesh=mesh_ddf))"
]
},
{
"cell_type": "markdown",
"metadata": {},
Expand Down

0 comments on commit 030fd10

Please sign in to comment.