Skip to content

Commit

Permalink
DB::Open() to automatically increase thread pool size if it is smalle…
Browse files Browse the repository at this point in the history
…r than max number of parallel compactions or flushes

Summary:
With the patch, thread pool size will be automatically increased if DB's options ask for more parallelism of compactions or flushes.

Too many users have been confused by the API. Change it to make it harder for users to make mistakes

Test Plan: Add two unit tests to cover the function.

Reviewers: yhchiang, rven, igor, MarkCallaghan, ljin

Reviewed By: ljin

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D27555
  • Loading branch information
siying committed Nov 4, 2014
1 parent 636e57b commit 09899f0
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 6 deletions.
2 changes: 1 addition & 1 deletion HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
### Public API changes
* Introduce 4 new convenient functions for converting Options from string: GetColumnFamilyOptionsFromMap(), GetColumnFamilyOptionsFromString(), GetDBOptionsFromMap(), GetDBOptionsFromString()
* Remove WriteBatchWithIndex.Delete() overloads using SliceParts

* When opening a DB, if options.max_background_compactions is larger than the existing low pri pool of options.env, it will enlarge it. Similarly, options.max_background_flushes is larger than the existing high pri pool of options.env, it will enlarge it.

## 3.6.0 (10/7/2014)
### Disk format changes
Expand Down
4 changes: 4 additions & 0 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
result.info_log = nullptr;
}
}
result.env->IncBackgroundThreadsIfNeeded(src.max_background_compactions,
Env::Priority::LOW);
result.env->IncBackgroundThreadsIfNeeded(src.max_background_flushes,
Env::Priority::HIGH);

if (!result.rate_limiter) {
if (result.bytes_per_sync == 0) {
Expand Down
41 changes: 41 additions & 0 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,8 @@ class DBTest {
mem_env_(!getenv("MEM_ENV") ? nullptr :
new MockEnv(Env::Default())),
env_(new SpecialEnv(mem_env_ ? mem_env_ : Env::Default())) {
env_->SetBackgroundThreads(1, Env::LOW);
env_->SetBackgroundThreads(1, Env::HIGH);
dbname_ = test::TmpDir(env_) + "/db_test";
auto options = CurrentOptions();
ASSERT_OK(DestroyDB(dbname_, options));
Expand Down Expand Up @@ -8193,6 +8195,45 @@ TEST(DBTest, TableOptionsSanitizeTest) {
ASSERT_OK(TryReopen(options));
}

TEST(DBTest, SanitizeNumThreads) {
for (int attempt = 0; attempt < 2; attempt++) {
const size_t kTotalTasks = 8;
SleepingBackgroundTask sleeping_tasks[kTotalTasks];

Options options = CurrentOptions();
if (attempt == 0) {
options.max_background_compactions = 3;
options.max_background_flushes = 2;
}
options.create_if_missing = true;
DestroyAndReopen(options);

for (size_t i = 0; i < kTotalTasks; i++) {
// Insert 5 tasks to low priority queue and 5 tasks to high priority queue
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_tasks[i],
(i < 4) ? Env::Priority::LOW : Env::Priority::HIGH);
}

// Wait 100 milliseconds for they are scheduled.
env_->SleepForMicroseconds(100000);

// pool size 3, total task 4. Queue size should be 1.
ASSERT_EQ(1U, options.env->GetThreadPoolQueueLen(Env::Priority::LOW));
// pool size 2, total task 4. Queue size should be 2.
ASSERT_EQ(2U, options.env->GetThreadPoolQueueLen(Env::Priority::HIGH));

for (size_t i = 0; i < kTotalTasks; i++) {
sleeping_tasks[i].WakeUp();
sleeping_tasks[i].WaitUntilDone();
}

ASSERT_OK(Put("abc", "def"));
ASSERT_EQ("def", Get("abc"));
Flush();
ASSERT_EQ("def", Get("abc"));
}
}

TEST(DBTest, DBIteratorBoundTest) {
Options options = CurrentOptions();
options.env = env_;
Expand Down
6 changes: 5 additions & 1 deletion hdfs/env_hdfs.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ class HdfsEnv : public Env {
posixEnv->SetBackgroundThreads(number, pri);
}

virtual void IncBackgroundThreadsIfNeeded(int number, Priority pri) override {
posixEnv->IncBackgroundThreadsIfNeeded(number, pri);
}

virtual std::string TimeToString(uint64_t number) {
return posixEnv->TimeToString(number);
}
Expand Down Expand Up @@ -319,7 +323,7 @@ class HdfsEnv : public Env {
std::string* outputpath) {return notsup;}

virtual void SetBackgroundThreads(int number, Priority pri = LOW) {}

virtual void IncBackgroundThreadsIfNeeded(int number, Priority pri) {}
virtual std::string TimeToString(uint64_t number) { return "";}
};
}
Expand Down
10 changes: 10 additions & 0 deletions include/rocksdb/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ class Env {
// default number: 1
virtual void SetBackgroundThreads(int number, Priority pri = LOW) = 0;

// Enlarge number of background worker threads of a specific thread pool
// for this environment if it is smaller than specified. 'LOW' is the default
// pool.
virtual void IncBackgroundThreadsIfNeeded(int number, Priority pri) = 0;

// Lower IO priority for threads from the specified pool.
virtual void LowerThreadPoolIOPriority(Priority pool = LOW) {}

Expand Down Expand Up @@ -782,6 +787,11 @@ class EnvWrapper : public Env {
void SetBackgroundThreads(int num, Priority pri) {
return target_->SetBackgroundThreads(num, pri);
}

void IncBackgroundThreadsIfNeeded(int num, Priority pri) {
return target_->IncBackgroundThreadsIfNeeded(num, pri);
}

void LowerThreadPoolIOPriority(Priority pool = LOW) override {
target_->LowerThreadPoolIOPriority(pool);
}
Expand Down
19 changes: 17 additions & 2 deletions util/env_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1422,6 +1422,12 @@ class PosixEnv : public Env {
thread_pools_[pri].SetBackgroundThreads(num);
}

// Allow increasing the number of worker threads.
virtual void IncBackgroundThreadsIfNeeded(int num, Priority pri) {
assert(pri >= Priority::LOW && pri <= Priority::HIGH);
thread_pools_[pri].IncBackgroundThreadsIfNeeded(num);
}

virtual void LowerThreadPoolIOPriority(Priority pool = LOW) override {
assert(pool >= Priority::LOW && pool <= Priority::HIGH);
#ifdef OS_LINUX
Expand Down Expand Up @@ -1642,13 +1648,14 @@ class PosixEnv : public Env {
PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_));
}

void SetBackgroundThreads(int num) {
void SetBackgroundThreadsInternal(int num, bool allow_reduce) {
PthreadCall("lock", pthread_mutex_lock(&mu_));
if (exit_all_threads_) {
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
return;
}
if (num != total_threads_limit_) {
if (num > total_threads_limit_ ||
(num < total_threads_limit_ && allow_reduce)) {
total_threads_limit_ = num;
WakeUpAllThreads();
StartBGThreads();
Expand All @@ -1657,6 +1664,14 @@ class PosixEnv : public Env {
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
}

void IncBackgroundThreadsIfNeeded(int num) {
SetBackgroundThreadsInternal(num, false);
}

void SetBackgroundThreads(int num) {
SetBackgroundThreadsInternal(num, true);
}

void StartBGThreads() {
// Start background thread if necessary
while ((int)bgthreads_.size() < total_threads_limit_) {
Expand Down
38 changes: 36 additions & 2 deletions util/env_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ TEST(EnvPosixTest, TwoPools) {
std::cout << "Pool " << pool_name_ << ": "
<< num_running_ << " running threads.\n";
// make sure we don't have more than pool_size_ jobs running.
ASSERT_LE(num_running_, pool_size_);
ASSERT_LE(num_running_, pool_size_.load());
}

// sleep for 1 sec
Expand All @@ -162,11 +162,16 @@ TEST(EnvPosixTest, TwoPools) {
return num_finished_;
}

void Reset(int pool_size) {
pool_size_.store(pool_size);
num_finished_ = 0;
}

private:
port::Mutex mu_;
int num_running_;
int num_finished_;
int pool_size_;
std::atomic<int> pool_size_;
std::string pool_name_;
};

Expand Down Expand Up @@ -205,6 +210,35 @@ TEST(EnvPosixTest, TwoPools) {

ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));

// call IncBackgroundThreadsIfNeeded to two pools. One increasing and
// the other decreasing
env_->IncBackgroundThreadsIfNeeded(kLowPoolSize - 1, Env::Priority::LOW);
env_->IncBackgroundThreadsIfNeeded(kHighPoolSize + 1, Env::Priority::HIGH);
high_pool_job.Reset(kHighPoolSize + 1);
low_pool_job.Reset(kLowPoolSize);

// schedule same number of jobs in each pool
for (int i = 0; i < kJobs; i++) {
env_->Schedule(&CB::Run, &low_pool_job);
env_->Schedule(&CB::Run, &high_pool_job, Env::Priority::HIGH);
}
// Wait a short while for the jobs to be dispatched.
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_EQ((unsigned int)(kJobs - kLowPoolSize),
env_->GetThreadPoolQueueLen());
ASSERT_EQ((unsigned int)(kJobs - kLowPoolSize),
env_->GetThreadPoolQueueLen(Env::Priority::LOW));
ASSERT_EQ((unsigned int)(kJobs - (kHighPoolSize + 1)),
env_->GetThreadPoolQueueLen(Env::Priority::HIGH));

// wait for all jobs to finish
while (low_pool_job.NumFinished() < kJobs ||
high_pool_job.NumFinished() < kJobs) {
env_->SleepForMicroseconds(kDelayMicros);
}

env_->SetBackgroundThreads(kHighPoolSize, Env::Priority::HIGH);
}

TEST(EnvPosixTest, DecreaseNumBgThreads) {
Expand Down

0 comments on commit 09899f0

Please sign in to comment.