Skip to content

Commit

Permalink
graph: backend: dnnl: add threadpool runtime support
Browse files Browse the repository at this point in the history
  • Loading branch information
gyhintel authored and TaoLv committed Mar 12, 2024
1 parent be0cf60 commit f90fcfb
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 32 deletions.
79 changes: 56 additions & 23 deletions src/graph/backend/dnnl/kernels/sdp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
#include <vector>

#include "common/dnnl_thread.hpp"
#include "common/utils.hpp"
#include "cpu/cpu_stream.hpp"
#include "oneapi/dnnl/dnnl_threadpool.h"

#include "graph/interface/backend.hpp"
#include "graph/interface/graph.hpp"

Expand Down Expand Up @@ -137,19 +141,12 @@ struct sdp_decomp_config_t {
public:
// The function is used to check if the configuration of SDP is supported by
// current implementation of decomp kernel. Currently, this implementation
// can only handle 4-dims tensor and limits the layout of the SDP's input.
// For better performance, we also limit the numerical relationship between
// batch size and thread num.
// can handle 4-dims tensor and limits the numerical relationship between
// batch_size, num_head and thread num.
// If the check passes, initialize few members according to inputs
// If no, return unimplemented status directly and fallback to large kernel
// TODOs: we have follow to-do tasks in the future:
// 1. The batch_size and max_threads conditions need to be further checked
// 2. Enable the latency scenario with batch_size = 1
bool initial_check(const std::shared_ptr<subgraph_t> &sg,
const std::vector<logical_tensor_t> &inputs) {
// Initialize nthr with current threads num
nthr = dnnl_get_current_num_threads();

// The order of input logical tensors in inputs is not certain, we need
// to record the input offset in a certain order of ops.
record_input_offset(sg, inputs);
Expand All @@ -163,7 +160,24 @@ struct sdp_decomp_config_t {
seq_len = src1_user_dims[2];
size_per_head = src1_user_dims[3];

#if DNNL_CPU_RUNTIME == DNNL_RUNTIME_OMP
// RATIO is an empirical value used to determine the numerical relationship
// between batch_size, num_head and thread number to determine whether to use
// decompose kernel. The key to the decompose kernel is that we do parallel in
// the batch_size and num_head dimensions. Therefore, if the batch_size or
// num_head is too small, it will cause many idle threads and affect efficiency
// which may even worse than the original sequential kernel. Here we set this
// ratio based on the experimental value to ensure that users do not have any
// regression when using the decompose kernel.
// TODO: Refine the inequation based on the relationship of cache size and sdp
// memory footprint requirements.
#define RATIO 2
// Initialize nthr with current threads num
nthr = dnnl_get_current_num_threads();
return batch_size * num_head > RATIO * nthr;
#else
return true;
#endif
}

// Used to construct all params that SDP need
Expand Down Expand Up @@ -532,15 +546,15 @@ struct sdp_decomp_config_t {
{sub_mm2_dst.get(), 3}, {sub_scratchpad.get(), 4}};

temporary_registrar.book(mem_key_map[sub_max_src1_src2.get()],
sub_max_src1_src2.get_desc().get_size() * nthr);
sub_max_src1_src2.get_desc().get_size());
temporary_registrar.book(mem_key_map[sub_mm1_wei.get()],
sub_mm1_wei.get_desc().get_size() * nthr);
sub_mm1_wei.get_desc().get_size());
temporary_registrar.book(mem_key_map[sub_max_dst1_wei2.get()],
sub_max_dst1_wei2.get_desc().get_size() * nthr);
sub_max_dst1_wei2.get_desc().get_size());
temporary_registrar.book(mem_key_map[sub_mm2_dst.get()],
sub_mm2_dst.get_desc().get_size() * nthr);
sub_mm2_dst.get_desc().get_size());
temporary_registrar.book(mem_key_map[sub_scratchpad.get()],
sub_scratchpad.get_desc().get_size() * nthr);
sub_scratchpad.get_desc().get_size());
}

impl::status_t prepare_sdp_scales_zps(const fusion_info_mgr_t &mgr,
Expand Down Expand Up @@ -786,45 +800,58 @@ class sdp_decomp_kernel_t : public kernel_base_t {
}

void prepare_sub_args(const grantor_t &var_grantor, const int id,
const size_t block_size,
std::unordered_map<dnnl_memory_t, std::vector<memory>> &mem_map) {
auto size_offset = id * block_size;
mem_map[sdp_cfg_.sub_mm1_wei.get()][id].set_data_handle(
var_grantor.get(
sdp_cfg_.mem_key_map[sdp_cfg_.sub_mm1_wei.get()])
+ id * sdp_cfg_.sub_mm1_wei.get_desc().get_size());
+ size_offset);
// mm1
mem_map[sdp_cfg_.sub_mm1_src.get()][id].set_data_handle(
var_grantor.get(
sdp_cfg_.mem_key_map[sdp_cfg_.sub_max_src1_src2.get()])
+ id * sdp_cfg_.sub_max_src1_src2.get_desc().get_size());
+ size_offset);
mem_map[sdp_cfg_.sub_mm1_dst.get()][id].set_data_handle(
var_grantor.get(
sdp_cfg_.mem_key_map[sdp_cfg_.sub_max_dst1_wei2.get()])
+ id * sdp_cfg_.sub_max_dst1_wei2.get_desc().get_size());
+ size_offset);
// softmax
mem_map[sdp_cfg_.sub_softmax_dst.get()][id].set_data_handle(
var_grantor.get(
sdp_cfg_.mem_key_map[sdp_cfg_.sub_max_src1_src2.get()])
+ id * sdp_cfg_.sub_max_src1_src2.get_desc().get_size());
+ size_offset);
// mm2
mem_map[sdp_cfg_.sub_mm2_wei.get()][id].set_data_handle(
var_grantor.get(
sdp_cfg_.mem_key_map[sdp_cfg_.sub_max_dst1_wei2.get()])
+ id * sdp_cfg_.sub_max_dst1_wei2.get_desc().get_size());
+ size_offset);
mem_map[sdp_cfg_.sub_mm2_dst.get()][id].set_data_handle(
var_grantor.get(
sdp_cfg_.mem_key_map[sdp_cfg_.sub_mm2_dst.get()])
+ id * sdp_cfg_.sub_mm2_dst.get_desc().get_size());
+ size_offset);
// scratchpad, each thread will have a largest scratchpad.
mem_map[sdp_cfg_.sub_scratchpad.get()][id].set_data_handle(
var_grantor.get(
sdp_cfg_.mem_key_map[sdp_cfg_.sub_scratchpad.get()])
+ id * sdp_cfg_.sub_scratchpad.get_desc().get_size());
+ size_offset);
}

status_t execute_impl(const stream_t *g_stream,
const std::vector<tensor_t> &inputs,
const std::vector<tensor_t> &outputs) override {
dnnl::stream strm = make_dnnl_stream(p_engine_, *g_stream);

#if DNNL_CPU_RUNTIME == DNNL_RUNTIME_THREADPOOL
auto *tp_stream
= dnnl::impl::utils::downcast<dnnl::impl::cpu::cpu_stream_t *>(
const_cast<stream_t *>(g_stream));
tp_stream->before_exec_hook();
int thread_num = 1;
dnnl_threadpool_interop_get_max_concurrency(&thread_num);
sdp_cfg_.nthr = thread_num;
#endif

// each thread's own local resource
thread_local_cache_t<sdp_args_set_t> res_cache;
sdp_args_set_t *res = res_cache.get_or_add(
Expand All @@ -842,8 +869,9 @@ class sdp_decomp_kernel_t : public kernel_base_t {
= static_cast<char *>(outputs[0].get_data_handle());

// allocate the internal memory
size_t block_size = sdp_registry_.size();
temporary_scratchpad_t scratchpad(
sdp_registry_.size(), p_engine_, *g_alloc_);
block_size * sdp_cfg_.nthr, p_engine_, *g_alloc_);
assertm(scratchpad.size() >= sdp_registry_.size(),
"no enough scratchpad memory");
grantor_t var_grantor = sdp_registry_.grantor(scratchpad.get_buffer());
Expand All @@ -854,7 +882,7 @@ class sdp_decomp_kernel_t : public kernel_base_t {

const auto loop = [&](int tid, int nthr, dim_t bo, dim_t bi) {
// prepare execution args and allocate real memory
prepare_sub_args(var_grantor, tid, res->mem_map);
prepare_sub_args(var_grantor, tid, block_size, res->mem_map);

// reorder0
auto &sub_src1_tid = res->mem_map[sdp_cfg_.sub_src1.get()][tid];
Expand Down Expand Up @@ -924,7 +952,12 @@ class sdp_decomp_kernel_t : public kernel_base_t {
#if DNNL_CPU_RUNTIME == DNNL_RUNTIME_OMP
omp_set_num_threads(sdp_cfg_.nthr);
#endif

parallel_nd_ext(sdp_cfg_.nthr, MBO, MBI, loop);

#if DNNL_CPU_RUNTIME == DNNL_RUNTIME_THREADPOOL
tp_stream->after_exec_hook();
#endif
return status::success;
}

Expand Down
14 changes: 5 additions & 9 deletions src/graph/backend/dnnl/kernels/sdp_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,14 @@ struct sdp_base_t : public kernel_base_t {
// to openmp. We also only enable the decompose kernel in the machine with
// specific ISA support. There is also an internal env var to decide if use
// the kernel.
// TODOs: we have follow to-do tasks in the future:
// 1. Remove CPU runtime check when we extend the support for others
// 2. Currently we support machines whose ISA is avx512_core_amx, we will
// need to remove this check once the effectiveness of the algorithm
// is verified on other machines
// TODO: Remove CPU runtime check when we extend the support for others
bool enable_decomp_kernel() {
#if DNNL_CPU_RUNTIME != DNNL_RUNTIME_OMP
#if DNNL_CPU_RUNTIME == DNNL_RUNTIME_OMP \
|| DNNL_CPU_RUNTIME == DNNL_RUNTIME_THREADPOOL
return graph::utils::getenv_int_internal("ENABLE_SDP_DECOMP", 1) > 0;
#else
return false;
#endif
bool enable_sdp_decomp
= graph::utils::getenv_int_internal("ENABLE_SDP_DECOMP", 1) > 0;
return enable_sdp_decomp;
}

status_t execute_impl(const stream_t *g_stream,
Expand Down

0 comments on commit f90fcfb

Please sign in to comment.