By swapping out in-memory numpy arrays with in-memory sparse arrays we can reuse the blocked algorithms of Dask.array to achieve parallel and distributed sparse arrays.
The blocked algorithms in Dask.array normally parallelize around in-memory numpy arrays. However, if another in-memory array library supports the NumPy interface then it too can take advantage of dask.array's parallel algorithms. In particular the sparse array library satisfies a subset of the NumPy API and works well with, and is tested against, Dask.array.
Say we have a dask.array with mostly zeros
x = da.random.random((100000, 100000), chunks=(1000, 1000))
x[x < 0.95] = 0
We can convert each of these chunks of NumPy arrays into a sparse.COO array.
import sparse
s = x.map_blocks(sparse.COO)
Now our array is composed not of many NumPy arrays, but rather of many
sparse arrays. Semantically this does not change anything. Operations that
work will work identically (assuming that the behavior of numpy
and
sparse
are identical) but performance characteristics and storage costs may
change significantly
>>> s.sum(axis=0)[:100].compute()
<COO: shape=(100,), dtype=float64, nnz=100>
>>> _.todense()
array([ 4803.06859272, 4913.94964525, 4877.13266438, 4860.7470773 ,
4938.94446802, 4849.51326473, 4858.83977856, 4847.81468485,
... ])
Any in-memory library that copies the NumPy ndarray interface should work here. The sparse library is a minimal example. In particular an in-memory library should implement at least the following operations:
- Simple slicing with slices, lists, and elements (for slicing, rechunking, reshaping, etc).
- A
concatenate
function matching the interface ofnp.concatenate
. This must be registered indask.array.core.concatenate_lookup
. - All ufuncs must support the full ufunc interface, including
dtype=
andout=
parameters (even if they don't function properly) - All reductions must support the full
axis=
andkeepdims=
keywords and behave like numpy in this respect - The array class should follow the
__array_priority__
protocol and be prepared to respond to other arrays of lower priority. - If
dot
support is desired, atensordot
function matching the interface ofnp.tensordot
should be registered indask.array.core.tensordot_lookup
.
The implementation of other operations like reshape, transpose, etc.
should follow standard NumPy conventions regarding shape and dtype. Not
implementing these is fine; the parallel dask.array
will err at runtime if
these operations are attempted.
Dask.array supports mixing different kinds of in-memory arrays. This relies
on the in-memory arrays knowing how to interact with each other when necessary.
When two arrays interact the functions from the array with the highest
__array_priority__
will take precedence (for example for concatenate,
tensordot, etc.).