Skip to content

Commit

Permalink
Simple parallelism, add -openmp flags and omp parallel for Acc16/32 U…
Browse files Browse the repository at this point in the history
…nit Test (pytorch#14)

Summary:
Pull Request resolved: pytorch#14

This DIFF triggered a concurrent bug in the unit test.

It is weird that there are no errors for "SpMDMTest", while errors are reported for "NoRequantizeTest".

Update 1:
There might be problems with "memCopy" function. Then I change "Cint32_buffer.data()" to "Cint32_fb.data()" (see my inline comment) so that the accumulation buffer and the output buffer are the same. It appears that we can output the correct result.

I have a discussion with Daya. Now I understand the reason for the failure of this unit test
- For the purpose of this unit test, we should just use the same buffer "Cint32_fb.data()" for the accumulation and output. Not sure why this issue is not found in the original code.
- If the thread number is not 1, and we we use different buffers: "Cint32_buffer" for the accumulation buffer and "Cint32_fb" for the output buffer, then the pointers of "Cint32_buffer.data()" is actually shared by different threads. When doing the accumulation inside "ExecuteKernelU8S8.cc", different threads will just write to the same memory location: Check the code below
    int32_t* C_buffer_row_start = C_buffer_ +
        ((C_buffer_ == reinterpret_cast<int32_t*>(matC_)) ? row_start_A * ldc_
                                                          : 0);
- If the thread number is not 1, and we use the same buffers: "Cint32_fb.data()" for the accumulation and output. According to the above code, different threads will write to different memory locations.

Update 2:
I add a new test case "{1024, 512, 258}" in Acc16 and Acc32 unit tests. "PackedRequantizeAcc16Test" runs well, but "PackedRequantizeTest" is broken.

Update 3:
I change the above code snippet to
    int32_t* C_buffer_row_start = C_buffer_ + row_start_A * ldc_;

Finally we get both Acc16 and Acc32 tests passed. Now different threads will always write to different memory locations.

Update 4:

Jongsoo comments that reusing the first row block of C_buffer_ is mostly to optimize for cache not for memory allocation size (this was making a big difference in xray ocr perf. don't remember exact number). A right thing to do is to have each thread to use different portion of C_buffer_.

So I optimize the above code snippet to

   // If the accumulation buffer C_buffer_ is the same as matC_ (inplace output
    // processing), then each thread use the different parts of output buffer
    // matC_;
    // Otherwise, each thread uses different portions of the accumulation
    // buffer C_buffer_. Note that each thread can use at most MC * n portion of
    // C_buffer_. If the number of threads is 1, the only thread (thread 0) will
    // always reuse the first rowblock of C_buffer_.
    int32_t* C_buffer_row_start = C_buffer_ +
      ((C_buffer_ == reinterpret_cast<int32_t*>(matC_)) ? row_start_A * ldc_
       : std::min(thread_id_ * mbSize_ * ldc_, row_start_A * ldc_));

Note that `thread_id` and `num_threads` is passed as the arguments into `ExecuteKernel`.

Update 5:
Rebase, Also add the parts of D12937408 to remove the dependency.

Reviewed By: jspark1105

Differential Revision: D13001149

fbshipit-source-id: b16c20863dc467de6faaefcaf1134cf1036f8a65
  • Loading branch information
jianyuh authored and facebook-github-bot committed Nov 20, 2018
1 parent 3b7936e commit 7346431
Show file tree
Hide file tree
Showing 7 changed files with 336 additions and 211 deletions.
10 changes: 10 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ if(NOT COMPILER_SUPPORTS_AVX512)
message(FATAL_ERROR "A compiler with AVX512 support is required.")
endif()

#check if compiler supports openmp
find_package(OpenMP)
if (OPENMP_FOUND)
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${OpenMP_C_FLAGS}")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${OpenMP_CXX_FLAGS}")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} ${OpenMP_EXE_LINKER_FLAGS}")
else()
message(WARNING "OpenMP is not supported by the compiler")
endif()

#All the source files that use avx512 instructions statically
set(FBGEMM_AVX512_SRCS src/Utils_avx512.cc)

Expand Down
4 changes: 3 additions & 1 deletion src/ExecuteKernelGeneric.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ class ExecuteKernel : public CodeGenBase<
cT* matC,
typename packingBMatrix::accType* C_buffer,
int32_t ldc,
const processOutputType& outputProcess);
const processOutputType& outputProcess,
int thread_id,
int num_threads);
void execute(int kBlock);

private:
Expand Down
25 changes: 19 additions & 6 deletions src/ExecuteKernelU8S8.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,18 @@ ExecuteKernel<
cT* matC,
int32_t* C_buffer,
int32_t ldc,
const processOutputType& outputProcess)
const processOutputType& outputProcess,
int thread_id,
int num_threads)
: packedA_(packA),
packedB_(packB),
kBlock_(kBlock),
matC_(matC),
C_buffer_(C_buffer),
ldc_(ldc),
outputProcess_(outputProcess) {
outputProcess_(outputProcess),
thread_id_(thread_id),
num_threads_(num_threads) {
if (cpuinfo_has_x86_avx512f()) {
mbSize_ = PackingTraits<
int8_t,
Expand Down Expand Up @@ -125,12 +129,21 @@ void ExecuteKernel<
// prefetch addr of the next packed block of B matrix
bBuf_pf = packedB_.getBuf(jb == bColBlocks - 1 ? jb : jb + 1, kBlock);

// Reuse the first rowblock of C_buffer_ unless when C_buffer_ is same as
// matC_ (inplace output processing)
// If the accumulation buffer C_buffer_ is the same as matC_ (inplace output
// processing), then each thread use the different parts of output buffer
// matC_;
// Otherwise, each thread uses different portions of the accumulation
// buffer C_buffer_. If m is large enough (m >= nthreads * MC), then we only
// need to use (nthreads * MC) x n portion of C_buffer_, each thread access
// the C_buffer_row_start as tid * MC * ldc_; else when m is very small, we
// juse use the whole m x n C_buffer_: each thread use the different
// portion.
int32_t* C_buffer_row_start = C_buffer_ +
((C_buffer_ == reinterpret_cast<int32_t*>(matC_))
((C_buffer_ == reinterpret_cast<int32_t*>(matC_) ||
num_threads_ * mbSize_ > packedA_.numRows())
? row_start_A * ldc_ + NDim * group
: 0);
: thread_id_ * mbSize_ * ldc_ + NDim * group);

int32_t* C_buffer_start = C_buffer_row_start + jb * nbSize_;
int32_t leadingDim = ldc_;
if (packedB_.isThereColRemainder() && (jb == bColBlocks - 1)) {
Expand Down
6 changes: 5 additions & 1 deletion src/ExecuteKernelU8S8.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ class ExecuteKernel<
cT* matC,
int32_t* C_buffer,
int32_t ldc,
const processOutputType& outputProcess);
const processOutputType& outputProcess,
int thread_id,
int num_threads);
void execute(int kBlock);

~ExecuteKernel() {
Expand All @@ -63,6 +65,8 @@ class ExecuteKernel<
int32_t ldc_; ///< the leading dimension of matrix C.
const processOutputType& outputProcess_; ///< output processing function for
///< matrix C in the macro-kernel.
int thread_id_; ///< the thread id.
int num_threads_; ///< the total number of threads
int32_t* C_tile_; ///< buffer for the last N block when NCB is not an exact
///< multiple of N.
int mbSize_; ///< block size in the m dimension.
Expand Down
20 changes: 16 additions & 4 deletions src/Fbgemm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void fbgemmPacked(
uint32_t ldc,
const processOutputType& outProcess,
int thread_id,
int /* num_threads */) {
int num_threads) {
static_assert(
std::is_same<
typename packingAMatrix::accType,
Expand Down Expand Up @@ -107,11 +107,23 @@ void fbgemmPacked(
t_very_start = std::chrono::high_resolution_clock::now();
#endif

// ToDo: thread based work division
for (int g = 0; g < packA.numGroups(); ++g) {
int i_per_thread = (mBlocks + num_threads - 1) / num_threads;
int i_begin = std::min(thread_id * i_per_thread, mBlocks);
int i_end = std::min(i_begin + i_per_thread, mBlocks);

ExecuteKernel<packingAMatrix, packingBMatrix, cT, processOutputType>
exeKernelObj(packA, packB, 0, C, C_buffer, ldc, outProcess);
for (int i = 0; i < mBlocks; ++i) {
exeKernelObj(
packA,
packB,
0,
C,
C_buffer,
ldc,
outProcess,
thread_id,
num_threads);
for (int i = i_begin; i < i_end; ++i) {
mc = (i != mBlocks - 1 || _mc == 0) ? MCB : _mc;
for (int k = 0; k < kBlocks; ++k) {
kc = (k != kBlocks - 1 || _kc == 0) ? KCB : _kc;
Expand Down
Loading

0 comments on commit 7346431

Please sign in to comment.