Skip to content

Commit

Permalink
Adjust nested parallelization to deal with OMP (#8723)
Browse files Browse the repository at this point in the history
* Adjust parallelization to deal with OMP
  • Loading branch information
cpuhrsch authored Jun 22, 2018
1 parent 54a2e81 commit 0750967
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 46 deletions.
18 changes: 11 additions & 7 deletions aten/src/ATen/Parallel.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,18 @@ inline void parallel_for(
const int64_t end,
const int64_t grain_size,
const F f) {
if (get_num_threads() == 1) {
f(begin, end);
} else {
#pragma omp parallel for if ((end - begin) >= grain_size)
for (int64_t i = begin; i < end; i += grain_size) {
f(i, i + std::min(end - i, grain_size));
}
#ifdef _OPENMP
#pragma omp parallel if ((end - begin) >= grain_size)
{
int64_t num_threads = omp_get_num_threads();
int64_t tid = omp_get_thread_num();
int64_t chunk_size = divup((end - begin), num_threads);
int64_t begin_tid = begin + tid * chunk_size;
f(begin_tid, std::min(end, chunk_size + begin_tid));
}
#else
f(begin, end);
#endif
}

template <class scalar_t, class F, class SF>
Expand Down
106 changes: 67 additions & 39 deletions aten/src/ATen/native/cpu/ReduceOpsKernel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ struct Reduction {
using ReduceScalar = Op<scalar_t>;

static void apply(Tensor& res, const Tensor& self, at::optional<int64_t> dim) {
auto out = res.data<scalar_t>();
auto data = self.data<scalar_t>();
auto out_ = res.data<scalar_t>();
auto data_ = self.data<scalar_t>();
auto numel = self.numel();
if (!dim.has_value()) {
*out = reduce_all(data, numel);
*out_ = reduce_all(data_, numel);
return;
}

Expand All @@ -67,13 +67,70 @@ struct Reduction {
}
int64_t batch = numel / (n * stride);
bool paralellize = batch * n > internal::GRAIN_SIZE;
_parallel_for(batch, 1, paralellize, [=](int64_t b) {
if (stride == 1) {
out[b] = reduce_all(&data[b * n], n);
} else {
reduce2d(&data[b * n * stride], &out[b * stride], n, stride, stride);
}
});
if (stride == 1) {
parallel_for(0, batch, 1, [=](int64_t begin, int64_t end) {
for (int64_t b = begin; b < end; b++) {
const scalar_t* data = &data_[b * n];
scalar_t* out = &out_[b];
scalar_t buf[WIDTH] = {0};
std::fill(buf, buf + WIDTH, ident);
int64_t cols_rounded = n / WIDTH;
reduce128(data, buf, cols_rounded, WIDTH);
scalar_t result = ident;
for (int64_t i = 0; i < WIDTH; i++) {
result = ReduceScalar()(result, buf[i]);
}
for (int64_t col = cols_rounded * WIDTH; col != n; col++) {
result = ReduceScalar()(result, data[col]);
}
out_[b] = result;
}
});
} else {
int64_t rows = n;
int64_t cols = stride;
int64_t cols_rounded = round_down(cols, WIDTH);
int64_t size = cols_rounded;
parallel_for(
0,
batch * (size / WIDTH),
1,
[out_, data_, n, stride, rows, cols, cols_rounded, size](
int64_t begin, int64_t end) {
for (int64_t bi = begin; bi < end; bi++) {
int64_t b = bi / (size / WIDTH);
int64_t i = bi % (size / WIDTH);
int64_t k = i * WIDTH;
reduce128(
&data_[b * n * stride + k],
&out_[b * stride + k],
rows,
stride);
}
});

_parallel_for(batch, 1, paralellize, [=](int64_t b) {
const scalar_t* data = &data_[b * n * stride];
scalar_t* out = &out_[b * stride];
int64_t rows = n;
int64_t cols = stride;

int64_t cols_rounded = round_down(cols, WIDTH);
if (cols_rounded != cols) {
scalar_t buf[WIDTH] = {0};
std::fill(buf, buf + WIDTH, ident);
for (int64_t row = 0; row != rows; row++) {
for (int64_t j = 0; j != cols - cols_rounded; j++) {
auto val = data[row * stride + j + cols_rounded];
buf[j] = ReduceScalar()(buf[j], val);
}
}
for (int64_t j = 0; j != cols - cols_rounded; j++) {
out[j + cols_rounded] = buf[j];
}
}
});
}
}

static scalar_t reduce_all(const scalar_t* data, int64_t size) {
Expand Down Expand Up @@ -112,35 +169,6 @@ struct Reduction {
acc[j].store(&out[j * Vec::size]);
}
}

// Reduce a 2d matrix down each column. Stores the results in out[0 ... cols-1]
static void reduce2d(const scalar_t* data, scalar_t* out, int64_t rows, int64_t cols, int64_t stride) {
int64_t cols_rounded = round_down(cols, WIDTH);
bool paralellize = cols * rows > internal::GRAIN_SIZE;
_parallel_for(cols_rounded, WIDTH, paralellize, [=](int64_t col) {
reduce128(&data[col], &out[col], rows, stride);
});

if (cols_rounded != cols) {
#if !defined(__APPLE__)
#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
#endif
scalar_t buf[WIDTH];

// Initializes the entire (tiny) array to avoid uninitialized warnings
std::fill(std::begin(buf), std::end(buf), ident);

for (int64_t row = 0; row != rows; row++) {
for (int64_t j = 0; j != cols - cols_rounded; j++) {
auto val = data[row * stride + j + cols_rounded];
buf[j] = ReduceScalar()(buf[j], val);
}
}
for (int64_t j = 0; j != cols - cols_rounded; j++) {
out[j + cols_rounded] = buf[j];
}
}
}
};

static void sum_kernel_impl(Tensor& result, const Tensor& self, at::optional<int64_t> dim) {
Expand Down

0 comments on commit 0750967

Please sign in to comment.