Skip to content

Commit

Permalink
add set_async in background thread (pytorch#3036)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: pytorch#3036

X-link: facebookresearch/FBGEMM#134

insert L2 evicted emb_idx and embeddings back to storage tier

Reviewed By: q10

Differential Revision: D61418055

fbshipit-source-id: 03143c815465d65fea2b9609c2b073b39b0e714a
  • Loading branch information
duduyi2013 authored and facebook-github-bot committed Sep 4, 2024
1 parent 9a0e7b0 commit 65ba8c2
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 43 deletions.
2 changes: 1 addition & 1 deletion fbgemm_gpu/fbgemm_gpu/tbe/ssd/training.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def __init__(
use_passed_in_path: int = True,
gather_ssd_cache_stats: Optional[bool] = False,
stats_reporter_config: Optional[TBEStatsReporterConfig] = None,
l2_cache_size: int = 1,
l2_cache_size: int = 0,
# Set to True to enable pipeline prefetching
prefetch_pipeline: bool = False,
# Set to True to alloc a UVM tensor using malloc+cudaHostRegister.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,59 @@ std::tuple<at::Tensor, at::Tensor, at::Tensor> tensor_copy(
return std::make_tuple(new_indices, new_weights, count.clone());
}

EmbeddingKVDB::EmbeddingKVDB(
int64_t num_shards,
int64_t max_D,
int64_t cache_size_gb,
int64_t unique_id)
: unique_id_(unique_id),
num_shards_(num_shards),
max_D_(max_D),
executor_tp_(std::make_unique<folly::CPUThreadPoolExecutor>(num_shards)) {
assert(num_shards > 0);
l2_cache_ = cache_size_gb > 0
? std::make_unique<l2_cache::CacheLibCache>(
cache_size_gb * 1024 * 1024 * 1024, num_shards_)
: nullptr;
cache_filling_thread_ = std::make_unique<std::thread>([=] {
while (!stop_) {
auto filling_item_ptr = weights_to_fill_queue_.try_peek();
if (!filling_item_ptr) {
// TODO: make it tunable interval for background thread
// only sleep on empty queue
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
if (stop_) {
return;
}
auto& indices = std::get<0>(*filling_item_ptr);
auto& weights = std::get<1>(*filling_item_ptr);
auto& count = std::get<2>(*filling_item_ptr);

if (l2_cache_) {
auto evicted_pairs_opt = set_cache(indices, weights, count);
if (evicted_pairs_opt.has_value()) {
auto& evicted_indices = evicted_pairs_opt->first;
auto& evicted_weights = evicted_pairs_opt->second;

folly::coro::blockingWait(
set_kv_db_async(evicted_indices, evicted_weights, count));
}
} else {
folly::coro::blockingWait(set_kv_db_async(indices, weights, count));
}

weights_to_fill_queue_.dequeue();
}
});
}

EmbeddingKVDB::~EmbeddingKVDB() {
stop_ = true;
cache_filling_thread_->join();
}

void EmbeddingKVDB::get_cuda(
const at::Tensor& indices,
const at::Tensor& weights,
Expand Down Expand Up @@ -283,7 +336,9 @@ folly::Optional<std::pair<at::Tensor, at::Tensor>> EmbeddingKVDB::set_cache(
const at::Tensor& indices,
const at::Tensor& weights,
const at::Tensor& count) {
// caller's responsibility to make sure l2_cache_ exists
if (l2_cache_ == nullptr) {
return folly::none;
}

// TODO: consider whether need to reconstruct indices/weights/count and free
// the original tensor since most of the tensor elem will be invalid,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,43 +76,9 @@ class EmbeddingKVDB : public std::enable_shared_from_this<EmbeddingKVDB> {
int64_t num_shards,
int64_t max_D,
int64_t cache_size_gb = 0,
int64_t unique_id = 0)
: unique_id_(unique_id),
num_shards_(num_shards),
max_D_(max_D),
executor_tp_(
std::make_unique<folly::CPUThreadPoolExecutor>(num_shards)) {
assert(num_shards > 0);
l2_cache_ = cache_size_gb > 0
? std::make_unique<l2_cache::CacheLibCache>(
cache_size_gb * 1024 * 1024 * 1024, num_shards_)
: nullptr;
cache_filling_thread_ = std::make_unique<std::thread>([=] {
while (!stop_) {
auto filling_item_ptr = weights_to_fill_queue_.try_peek();
if (!filling_item_ptr) {
// TODO: make it tunable interval for background thread
// only sleep on empty queue
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
if (stop_) {
return;
}
auto& indices = std::get<0>(*filling_item_ptr);
auto& weights = std::get<1>(*filling_item_ptr);
auto& count = std::get<2>(*filling_item_ptr);
set_cache(indices, weights, count);
// TODO: add logic to kick off spilled item back to rocksdb
int64_t unique_id = 0);

weights_to_fill_queue_.dequeue();
}
});
}
virtual ~EmbeddingKVDB() {
stop_ = true;
cache_filling_thread_->join();
}
virtual ~EmbeddingKVDB();

/// Insert non-negative elements in <indices> and its paired embeddings
/// from <weights> for the first <count> elements in the tensor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ class EmbeddingRocksDBWrapper : public torch::jit::CustomClassHolder {
int64_t cache_size = 0,
bool use_passed_in_path = false,
int64_t tbe_unique_id = 0,
int64_t l2_cache_size_gb = 1)
int64_t l2_cache_size_gb = 0)
: impl_(std::make_shared<ssd::EmbeddingRocksDB>(
path,
num_shards,
Expand Down Expand Up @@ -354,7 +354,7 @@ static auto embedding_rocks_db_wrapper =
torch::arg("cache_size"),
torch::arg("use_passed_in_path") = true,
torch::arg("tbe_unique_id") = 0,
torch::arg("l2_cache_size_gb") = 1,
torch::arg("l2_cache_size_gb") = 0,
})
.def(
"set_cuda",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class EmbeddingRocksDB : public kv_db::EmbeddingKVDB {
int64_t cache_size = 0,
bool use_passed_in_path = false,
int64_t tbe_unqiue_id = 0,
int64_t l2_cache_size_gb = 1)
int64_t l2_cache_size_gb = 0)
: kv_db::EmbeddingKVDB(
num_shards,
max_D,
Expand Down Expand Up @@ -369,8 +369,7 @@ class EmbeddingRocksDB : public kv_db::EmbeddingKVDB {
(2 * (count_ + dbs_.size() - 1) / dbs_.size()) *
(sizeof(int64_t) + sizeof(scalar_t) * D));
for (auto i = 0; i < count_; ++i) {
// TODO: Check whether this is OK
if (indices_acc[i] == -1) {
if (indices_acc[i] < 0) {
continue;
}
if (kv_db_utils::hash_shard(
Expand Down
3 changes: 3 additions & 0 deletions fbgemm_gpu/test/tbe/ssd/ssd_split_tbe_inference_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# pyre-ignore-all-errors[3,6,56]

import random
import time
import unittest

import hypothesis.strategies as st
Expand Down Expand Up @@ -246,6 +247,7 @@ def test_nbit_ssd_forward(
rtol=1.0e-2,
equal_nan=True,
)
time.sleep(0.1)

@given(
T=st.integers(min_value=1, max_value=10),
Expand Down Expand Up @@ -454,3 +456,4 @@ def test_nbit_ssd_cache(
atol=1.0e-2,
rtol=1.0e-2,
)
time.sleep(0.1)

0 comments on commit 65ba8c2

Please sign in to comment.