Skip to content

Commit

Permalink
ENH: Add threaded Van Der Corput (scipy#14564)
Browse files Browse the repository at this point in the history
* ENH: Add threaded Van Der Corput
* DEV: Add `workers` to Halton.random
* Create `_validate_workers` function to avoid code repetition
* BENCH: add workers to Halton benchmarks

Co-authored-by: Pamphile ROY <[email protected]>
  • Loading branch information
V0lantis and tupui authored Aug 23, 2021
1 parent a0986b2 commit 2f60c33
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 46 deletions.
11 changes: 6 additions & 5 deletions benchmarks/benchmarks/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,19 +478,20 @@ def time_discrepancy(self, method):


class BenchQMCHalton(Benchmark):
param_names = ['d', 'scramble', 'n']
param_names = ['d', 'scramble', 'n', 'workers']
params = [
[1, 10],
[True, False],
[10, 1_000, 100_000]
[10, 1_000, 100_000],
[1, 4]
]

def setup(self, d, scramble, n):
def setup(self, d, scramble, n, workers):
self.rng = np.random.default_rng(1234)

def time_halton(self, d, scramble, n):
def time_halton(self, d, scramble, n, workers):
seq = stats.qmc.Halton(d, scramble=scramble, seed=self.rng)
seq.random(n)
seq.random(n, workers=workers)


class NumericalInverseHermite(Benchmark):
Expand Down
71 changes: 52 additions & 19 deletions scipy/stats/_qmc.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
"""Quasi-Monte Carlo engines and helpers."""
from __future__ import annotations

import os
import copy
import math
import numbers
import os
import warnings
from abc import ABC, abstractmethod
import math
from typing import (
Callable,
ClassVar,
Expand All @@ -15,7 +16,6 @@
overload,
TYPE_CHECKING,
)
import warnings

import numpy as np

Expand Down Expand Up @@ -305,17 +305,7 @@ def discrepancy(
if not (np.all(sample >= 0) and np.all(sample <= 1)):
raise ValueError("Sample is not in unit hypercube")

workers = int(workers)
if workers == -1:
workers = os.cpu_count() # type: ignore[assignment]
if workers is None:
raise NotImplementedError(
"Cannot determine the number of cpus using os.cpu_count(), "
"cannot use -1 for the number of workers"
)
elif workers <= 0:
raise ValueError(f"Invalid number of workers: {workers}, must be -1 "
"or > 0")
workers = _validate_workers(workers)

methods = {
"CD": _cy_wrapper_centered_discrepancy,
Expand Down Expand Up @@ -558,7 +548,8 @@ def van_der_corput(
*,
start_index: IntNumber = 0,
scramble: bool = False,
seed: SeedType = None) -> np.ndarray:
seed: SeedType = None,
workers: IntNumber = 1) -> np.ndarray:
"""Van der Corput sequence.
Pseudo-random number generator based on a b-adic expansion.
Expand All @@ -584,6 +575,9 @@ def van_der_corput(
seeded with `seed`.
If `seed` is already a ``Generator`` instance then that instance is
used.
workers : int, optional
Number of workers to use for parallel processing. If -1 is
given all CPU threads are used. Default is 1.
Returns
-------
Expand Down Expand Up @@ -612,10 +606,11 @@ def van_der_corput(
for perm in permutations:
rng.shuffle(perm)

return _cy_van_der_corput_scrambled(n, base, start_index, permutations)
return _cy_van_der_corput_scrambled(n, base, start_index,
permutations, workers)

else:
return _cy_van_der_corput(n, base, start_index)
return _cy_van_der_corput(n, base, start_index, workers)


class QMCEngine(ABC):
Expand Down Expand Up @@ -862,25 +857,33 @@ def __init__(
self.base = n_primes(d)
self.scramble = scramble

def random(self, n: IntNumber = 1) -> np.ndarray:
def random(
self, n: IntNumber = 1, *, workers: IntNumber = 1
) -> np.ndarray:
"""Draw `n` in the half-open interval ``[0, 1)``.
Parameters
----------
n : int, optional
Number of samples to generate in the parameter space. Default is 1.
workers : int, optional
Number of workers to use for parallel processing. If -1 is
given all CPU threads are used. Default is 1. It becomes faster
than one worker for `n` greater than :math:`10^3`.
Returns
-------
sample : array_like (n, d)
QMC sample.
"""
workers = _validate_workers(workers)
# Generate a sample using a Van der Corput sequence per dimension.
# important to have ``type(bdim) == int`` for performance reason
sample = [van_der_corput(n, int(bdim), start_index=self.num_generated,
scramble=self.scramble,
seed=copy.deepcopy(self.seed))
seed=copy.deepcopy(self.seed),
workers=workers)
for bdim in self.base]

self.num_generated += n
Expand Down Expand Up @@ -1713,3 +1716,33 @@ def reset(self) -> MultinomialQMC:
super().reset()
self.engine.reset()
return self


def _validate_workers(workers: IntNumber = 1) -> IntNumber:
"""Validate `workers` based on platform and value.
Parameters
----------
workers : int, optional
Number of workers to use for parallel processing. If -1 is
given all CPU threads are used. Default is 1.
Returns
-------
Workers : int
Number of CPU used by the algorithm
"""
workers = int(workers)
if workers == -1:
workers = os.cpu_count() # type: ignore[assignment]
if workers is None:
raise NotImplementedError(
"Cannot determine the number of cpus using os.cpu_count(), "
"cannot use -1 for the number of workers"
)
elif workers <= 0:
raise ValueError(f"Invalid number of workers: {workers}, must be -1 "
"or > 0")

return workers
10 changes: 6 additions & 4 deletions scipy/stats/_qmc_cy.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,28 @@ from scipy._lib._util import DecimalNumber, IntNumber
def _cy_wrapper_centered_discrepancy(
sample: np.ndarray,
iterative: bool,
workers: int,
workers: IntNumber,
) -> float: ...


def _cy_wrapper_wrap_around_discrepancy(
sample: np.ndarray,
iterative: bool,
workers: int,
workers: IntNumber,
) -> float: ...


def _cy_wrapper_mixture_discrepancy(
sample: np.ndarray,
iterative: bool,
workers: int,
workers: IntNumber,
) -> float: ...


def _cy_wrapper_l2_star_discrepancy(
sample: np.ndarray,
iterative: bool,
workers: int,
workers: IntNumber,
) -> float: ...


Expand All @@ -41,6 +41,7 @@ def _cy_van_der_corput(
n: IntNumber,
base: IntNumber,
start_index: IntNumber,
workers: IntNumber,
) -> np.ndarray: ...


Expand All @@ -49,4 +50,5 @@ def _cy_van_der_corput_scrambled(
base: IntNumber,
start_index: IntNumber,
permutations: np.ndarray,
workers: IntNumber,
) -> np.ndarray: ...
108 changes: 93 additions & 15 deletions scipy/stats/_qmc_cy.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@ np.import_array()
cdef extern from "<thread>" namespace "std" nogil:
cdef cppclass thread:
thread()
void thread[A, B, C, D, E, F](A, B, C, D, E, F)
# Since we are using the C++ thread constructor, Cython forces us
# to define how many arguments will take the constructor. Here,
# `_cy_wrapper_van_der_corput_scrambled` takes 7 arguments thus
# the `A, B, C, D, E, F, G` inside the constructor. Since the other
# threaded loops take less arguments, we need to find a workaround
# to pass 7 arguments to the functions. Here we use `_`.
void thread[A, B, C, D, E, F, G](A, B, C, D, E, F, G)
void join()

cdef extern from "<mutex>" namespace "std" nogil:
Expand Down Expand Up @@ -309,7 +315,7 @@ cdef double threaded_loops(func_type loop_func,
n / workers * (tid + 1)) if tid < workers - 1 else n
threads.push_back(
thread(one_thread_loop, loop_func, ref(disc2),
sample_view, istart, istop)
sample_view, istart, istop, None)
)

for tid in range(workers):
Expand All @@ -318,8 +324,12 @@ cdef double threaded_loops(func_type loop_func,
return disc2


cdef void one_thread_loop(func_type loop_func, double& disc, double[:,
::1] sample_view, Py_ssize_t istart, Py_ssize_t istop) nogil:
cdef void one_thread_loop(func_type loop_func,
double& disc,
double[:, ::1] sample_view,
Py_ssize_t istart,
Py_ssize_t istop,
_) nogil:

cdef double tmp = loop_func(sample_view, istart, istop)

Expand All @@ -328,14 +338,50 @@ cdef void one_thread_loop(func_type loop_func, double& disc, double[:,
threaded_sum_mutex.unlock()


def _cy_van_der_corput(long n, long base, long start_index):
sequence = np.zeros(n)
def _cy_van_der_corput(Py_ssize_t n,
long base,
long start_index,
unsigned int workers):
sequence = np.zeros(n, dtype=np.double)

cdef:
long i, quotient, remainder
double[::1] sequence_view = sequence
vector[thread] threads
unsigned int tid
Py_ssize_t istart, istop

if workers <= 1:
_cy_van_der_corput_threaded_loop(0, n, base, start_index,
sequence_view, None)
return sequence

for tid in range(workers):
istart = <Py_ssize_t> (n / workers * tid)
istop = <Py_ssize_t> (
n / workers * (tid + 1)) if tid < workers - 1 else n
threads.push_back(
thread(_cy_van_der_corput_threaded_loop, istart, istop, base,
start_index, sequence_view, None)
)

for tid in range(workers):
threads[tid].join()

return sequence


cdef _cy_van_der_corput_threaded_loop(Py_ssize_t istart,
Py_ssize_t istop,
long base,
long start_index,
double[::1] sequence_view,
_):
cdef:
long quotient, remainder
Py_ssize_t i
double b2r

for i in range(n):
for i in range(istart, istop):
quotient = start_index + i
b2r = 1.0 / base
while quotient > 0:
Expand All @@ -344,18 +390,52 @@ def _cy_van_der_corput(long n, long base, long start_index):
b2r /= base
quotient //= base


def _cy_van_der_corput_scrambled(Py_ssize_t n,
long base,
long start_index,
long[:,::1] permutations,
unsigned int workers):
sequence = np.zeros(n)

cdef:
double[::1] sequence_view = sequence
vector[thread] threads
unsigned int tid
Py_ssize_t istart, istop

if workers <= 1:
_cy_van_der_corput_scrambled_loop(0, n, base, start_index,
permutations, sequence_view)
return sequence

for tid in range(workers):
istart = <Py_ssize_t> (n / workers * tid)
istop = <Py_ssize_t> (
n / workers * (tid + 1)) if tid < workers - 1 else n
threads.push_back(
thread(_cy_van_der_corput_scrambled_loop, istart, istop, base,
start_index, permutations, sequence_view)
)

for tid in range(workers):
threads[tid].join()

return sequence


def _cy_van_der_corput_scrambled(long n, long base, long start_index,
long[:, ::1] permutations):
sequence = np.zeros(n)
cdef _cy_van_der_corput_scrambled_loop(Py_ssize_t istart,
Py_ssize_t istop,
long base,
long start_index,
long[:,::1] permutations,
double[::1] sequence_view):

cdef:
long i, j, quotient, remainder
double[::1] sequence_view = sequence
double b2r

for i in range(n):
for i in range(istart, istop):
quotient = start_index + i
b2r = 1.0 / base
for j in range(permutations.shape[0]):
Expand All @@ -364,5 +444,3 @@ def _cy_van_der_corput_scrambled(long n, long base, long start_index,
sequence_view[i] += remainder * b2r
b2r /= base
quotient //= base

return sequence
Loading

0 comments on commit 2f60c33

Please sign in to comment.