Skip to content

Commit

Permalink
KUDU-2187. Don't hold threadpool lock while creating threads
Browse files Browse the repository at this point in the history
This changes the threadpool implementation so that if a task submission
needs to start a thread, it does so after releasing the lock. This
enables other worker threads to continue to process items even if thread
creation is slow.

This is meant to address various cases where I've seen stalled writes
with high 'mutex_wait_us' metrics. Looking at the logs it is sometimes
the case that this correlates with a single slow pthread creation.

Change-Id: If91cb032db25ed539ec8a952f302cf4501b3c240
Reviewed-on: http://gerrit.cloudera.org:8080/8256
Reviewed-by: Adar Dembo <[email protected]>
Tested-by: Kudu Jenkins
  • Loading branch information
toddlipcon committed Oct 17, 2017
1 parent 57b8b8f commit f5e1203
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 56 deletions.
17 changes: 15 additions & 2 deletions src/kudu/util/thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

#include <boost/bind.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>
#include <gflags/gflags.h>
#include <glog/logging.h>

#include "kudu/gutil/atomicops.h"
Expand All @@ -46,18 +47,20 @@
#include "kudu/gutil/dynamic_annotations.h"
#include "kudu/gutil/mathlimits.h"
#include "kudu/gutil/once.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/debug-util.h"
#include "kudu/util/errno.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/kernel_stack_watchdog.h"
#include "kudu/util/logging.h"
#include "kudu/util/monotime.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/mutex.h"
#include "kudu/util/os-util.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/url-coding.h"
#include "kudu/util/trace.h"
#include "kudu/util/url-coding.h"
#include "kudu/util/web_callback_registry.h"

using boost::bind;
Expand Down Expand Up @@ -105,6 +108,11 @@ METRIC_DEFINE_gauge_uint64(server, involuntary_context_switches,
"Total involuntary context switches",
kudu::EXPOSE_AS_COUNTER);

DEFINE_int32(thread_inject_start_latency_ms, 0,
"Number of ms to sleep when starting a new thread. (For tests).");
TAG_FLAG(thread_inject_start_latency_ms, hidden);
TAG_FLAG(thread_inject_start_latency_ms, unsafe);

namespace kudu {

static uint64_t GetCpuUTime() {
Expand Down Expand Up @@ -521,6 +529,11 @@ Status Thread::StartThread(const std::string& category, const std::string& name,
// Temporary reference for the duration of this function.
scoped_refptr<Thread> t(new Thread(category, name, functor));

if (PREDICT_FALSE(FLAGS_thread_inject_start_latency_ms > 0)) {
LOG(INFO) << "Injecting " << FLAGS_thread_inject_start_latency_ms << "ms sleep on thread start";
SleepFor(MonoDelta::FromMilliseconds(FLAGS_thread_inject_start_latency_ms));
}

{
SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix, "creating pthread");
SCOPED_WATCH_STACK((flags & NO_STACK_WATCHDOG) ? 0 : 250);
Expand Down
89 changes: 74 additions & 15 deletions src/kudu/util/threadpool-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

#include <boost/bind.hpp> // IWYU pragma: keep
#include <boost/smart_ptr/shared_ptr.hpp>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <gtest/gtest.h>

Expand Down Expand Up @@ -65,6 +66,8 @@ using std::vector;

using strings::Substitute;

DECLARE_int32(thread_inject_start_latency_ms);

namespace kudu {

static const char* kDefaultPoolName = "test";
Expand Down Expand Up @@ -186,26 +189,26 @@ TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) {
.set_idle_timeout(MonoDelta::FromMilliseconds(1))));

// There are no threads to start with.
ASSERT_TRUE(pool_->num_threads_ == 0);
ASSERT_TRUE(pool_->num_threads() == 0);
// We get up to 3 threads when submitting work.
CountDownLatch latch(1);
auto cleanup = MakeScopedCleanup([&]() {
latch.CountDown();
});
ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
ASSERT_EQ(2, pool_->num_threads_);
ASSERT_EQ(2, pool_->num_threads());
ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
ASSERT_EQ(3, pool_->num_threads_);
ASSERT_EQ(3, pool_->num_threads());
// The 4th piece of work gets queued.
ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
ASSERT_EQ(3, pool_->num_threads_);
ASSERT_EQ(3, pool_->num_threads());
// Finish all work
latch.CountDown();
pool_->Wait();
ASSERT_EQ(0, pool_->active_threads_);
pool_->Shutdown();
ASSERT_EQ(0, pool_->num_threads_);
ASSERT_EQ(0, pool_->num_threads());
}

TEST_F(ThreadPoolTest, TestThreadPoolWithNoMaxThreads) {
Expand All @@ -226,7 +229,7 @@ TEST_F(ThreadPoolTest, TestThreadPoolWithNoMaxThreads) {
for (int i = 0; i < kNumCPUs * 2; i++) {
ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
}
ASSERT_EQ((kNumCPUs * 2), pool_->num_threads_);
ASSERT_EQ((kNumCPUs * 2), pool_->num_threads());

// Submit tasks on two tokens. Only two threads should be created.
unique_ptr<ThreadPoolToken> t1 = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
Expand All @@ -235,13 +238,13 @@ TEST_F(ThreadPoolTest, TestThreadPoolWithNoMaxThreads) {
ThreadPoolToken* t = (i % 2 == 0) ? t1.get() : t2.get();
ASSERT_OK(t->Submit(SlowTask::NewSlowTask(&latch)));
}
ASSERT_EQ((kNumCPUs * 2) + 2, pool_->num_threads_);
ASSERT_EQ((kNumCPUs * 2) + 2, pool_->num_threads());

// Submit more tokenless tasks. Each should create a new thread.
for (int i = 0; i < kNumCPUs; i++) {
ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
}
ASSERT_EQ((kNumCPUs * 3) + 2, pool_->num_threads_);
ASSERT_EQ((kNumCPUs * 3) + 2, pool_->num_threads());

latch.CountDown();
pool_->Wait();
Expand Down Expand Up @@ -277,26 +280,26 @@ TEST_F(ThreadPoolTest, TestVariableSizeThreadPool) {
.set_idle_timeout(MonoDelta::FromMilliseconds(1))));

// There is 1 thread to start with.
ASSERT_EQ(1, pool_->num_threads_);
ASSERT_EQ(1, pool_->num_threads());
// We get up to 4 threads when submitting work.
CountDownLatch latch(1);
ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
ASSERT_EQ(1, pool_->num_threads_);
ASSERT_EQ(1, pool_->num_threads());
ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
ASSERT_EQ(2, pool_->num_threads_);
ASSERT_EQ(2, pool_->num_threads());
ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
ASSERT_EQ(3, pool_->num_threads_);
ASSERT_EQ(3, pool_->num_threads());
ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
ASSERT_EQ(4, pool_->num_threads_);
ASSERT_EQ(4, pool_->num_threads());
// The 5th piece of work gets queued.
ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
ASSERT_EQ(4, pool_->num_threads_);
ASSERT_EQ(4, pool_->num_threads());
// Finish all work
latch.CountDown();
pool_->Wait();
ASSERT_EQ(0, pool_->active_threads_);
pool_->Shutdown();
ASSERT_EQ(0, pool_->num_threads_);
ASSERT_EQ(0, pool_->num_threads());
}

TEST_F(ThreadPoolTest, TestMaxQueueSize) {
Expand Down Expand Up @@ -337,6 +340,62 @@ TEST_F(ThreadPoolTest, TestZeroQueueSize) {
pool_->Shutdown();
}

// Regression test for KUDU-2187:
//
// If a threadpool thread is slow to start up, it shouldn't block progress of
// other tasks on the same pool.
TEST_F(ThreadPoolTest, TestSlowThreadStart) {
// Start a pool of threads from which we'll submit tasks.
gscoped_ptr<ThreadPool> submitter_pool;
ASSERT_OK(ThreadPoolBuilder("submitter")
.set_min_threads(5)
.set_max_threads(5)
.Build(&submitter_pool));

// Start the actual test pool, which starts with one thread
// but will start a second one on-demand.
ASSERT_OK(RebuildPoolWithMinMax(1, 2));
// Ensure that the second thread will take a long time to start.
FLAGS_thread_inject_start_latency_ms = 3000;

// Now submit 10 tasks to the 'submitter' pool, each of which
// submits a single task to 'pool_'. The 'pool_' task sleeps
// for 10ms.
//
// Because the 'submitter' tasks submit faster than they can be
// processed on a single thread (due to the sleep), we expect that
// this will trigger 'pool_' to start up its second worker thread.
// The thread startup will have some latency injected.
//
// We expect that the thread startup will block only one of the
// tasks in the 'submitter' pool after it submits its task. Other
// tasks will continue to be processed by the other (already-running)
// thread on 'pool_'.
std::atomic<int32_t> total_queue_time_ms(0);
for (int i = 0; i < 10; i++) {
ASSERT_OK(submitter_pool->SubmitFunc([&]() {
auto submit_time = MonoTime::Now();
CHECK_OK(pool_->SubmitFunc([&,submit_time]() {
auto queue_time = MonoTime::Now() - submit_time;
total_queue_time_ms += queue_time.ToMilliseconds();
SleepFor(MonoDelta::FromMilliseconds(10));
}));
}));
}
submitter_pool->Wait();
pool_->Wait();

// Since the total amount of work submitted was only 100ms, we expect
// that the performance would be equivalent to a single-threaded
// threadpool. So, we expect the total queue time to be approximately
// 0 + 10 + 20 ... + 80 + 90 = 450ms.
//
// If, instead, throughput had been blocked while starting threads,
// we'd get something closer to 18000ms (3000ms delay * 5 submitter threads).
ASSERT_GE(total_queue_time_ms, 400);
ASSERT_LE(total_queue_time_ms, 10000);
}

// Test that setting a promise from another thread yields
// a value on the current thread.
TEST_F(ThreadPoolTest, TestPromises) {
Expand Down
86 changes: 53 additions & 33 deletions src/kudu/util/threadpool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
no_threads_cond_(&lock_),
not_empty_(&lock_),
num_threads_(0),
num_threads_pending_start_(0),
active_threads_(0),
total_queued_tasks_(0),
tokenless_(NewToken(ExecutionMode::CONCURRENT)),
Expand All @@ -341,13 +342,13 @@ ThreadPool::~ThreadPool() {
}

Status ThreadPool::Init() {
MutexLock unique_lock(lock_);
if (!pool_status_.IsUninitialized()) {
return Status::NotSupported("The thread pool is already initialized");
}
pool_status_ = Status::OK();
num_threads_pending_start_ = min_threads_;
for (int i = 0; i < min_threads_; i++) {
Status status = CreateThreadUnlocked();
Status status = CreateThread();
if (!status.ok()) {
Shutdown();
return status;
Expand Down Expand Up @@ -400,7 +401,7 @@ void ThreadPool::Shutdown() {
// while others will exit after they finish executing an outstanding task.
total_queued_tasks_ = 0;
not_empty_.Broadcast();
while (num_threads_ > 0) {
while (num_threads_ + num_threads_pending_start_ > 0) {
no_threads_cond_.Wait();
}

Expand Down Expand Up @@ -473,34 +474,34 @@ Status ThreadPool::DoSubmit(shared_ptr<Runnable> r, ThreadPoolToken* token) {
if (capacity_remaining < 1) {
return Status::ServiceUnavailable(
Substitute("Thread pool is at capacity ($0/$1 tasks running, $2/$3 tasks queued)",
num_threads_, max_threads_, total_queued_tasks_, max_queue_size_));
num_threads_ + num_threads_pending_start_, max_threads_,
total_queued_tasks_, max_queue_size_));
}

// Should we create another thread?

// We assume that each current inactive thread will grab one item from the
// queue. If it seems like we'll need another thread, we create one.
//
// Rather than creating the thread here, while holding the lock, we defer
// it to down below. This is because thread creation can be rather slow
// (hundreds of milliseconds in some cases) and we'd like to allow the
// existing threads to continue to process tasks while we do so.
//
// In theory, a currently active thread could finish immediately after this
// calculation. This would mean we created a thread we didn't really need.
// However, this race is unavoidable, since we don't do the work under a lock.
// It's also harmless.
// calculation but before our new worker starts running. This would mean we
// created a thread we didn't really need. However, this race is unavoidable
// and harmless.
//
// Of course, we never create more than max_threads_ threads no matter what.
int threads_from_this_submit =
token->IsActive() && token->mode() == ExecutionMode::SERIAL ? 0 : 1;
int inactive_threads = num_threads_ - active_threads_;
int inactive_threads = num_threads_ + num_threads_pending_start_ - active_threads_;
int additional_threads = (queue_.size() + threads_from_this_submit) - inactive_threads;
if (additional_threads > 0 && num_threads_ < max_threads_) {
Status status = CreateThreadUnlocked();
if (!status.ok()) {
if (num_threads_ == 0) {
// If we have no threads, we can't do any work.
return status;
}
// If we failed to create a thread, but there are still some other
// worker threads, log a warning message and continue.
LOG(ERROR) << "Thread pool failed to create thread: "
<< status.ToString();
}
bool need_a_thread = false;
if (additional_threads > 0 && num_threads_ + num_threads_pending_start_ < max_threads_) {
need_a_thread = true;
num_threads_pending_start_++;
}

Task task;
Expand Down Expand Up @@ -528,6 +529,7 @@ Status ThreadPool::DoSubmit(shared_ptr<Runnable> r, ThreadPoolToken* token) {
int length_at_submit = total_queued_tasks_++;

guard.Unlock();

not_empty_.Signal();

if (metrics_.queue_length_histogram) {
Expand All @@ -537,6 +539,23 @@ Status ThreadPool::DoSubmit(shared_ptr<Runnable> r, ThreadPoolToken* token) {
token->metrics_.queue_length_histogram->Increment(length_at_submit);
}

if (need_a_thread) {
Status status = CreateThread();
if (!status.ok()) {
guard.Lock();
num_threads_pending_start_--;
if (num_threads_ + num_threads_pending_start_ == 0) {
// If we have no threads, we can't do any work.
return status;
}
// If we failed to create a thread, but there are still some other
// worker threads, log a warning message and continue.
LOG(ERROR) << "Thread pool failed to create thread: "
<< status.ToString();
}
}


return Status::OK();
}

Expand All @@ -563,8 +582,16 @@ bool ThreadPool::WaitFor(const MonoDelta& delta) {
return true;
}

void ThreadPool::DispatchThread(bool permanent) {
void ThreadPool::DispatchThread() {
MutexLock unique_lock(lock_);
InsertOrDie(&threads_, Thread::current_thread());
DCHECK_GT(num_threads_pending_start_, 0);
num_threads_++;
num_threads_pending_start_--;
// If we are one of the first 'min_threads_' to start, we must be
// a "permanent" thread.
bool permanent = num_threads_ <= min_threads_;

while (true) {
// Note: Status::Aborted() is used to indicate normal shutdown.
if (!pool_status_.ok()) {
Expand Down Expand Up @@ -679,7 +706,8 @@ void ThreadPool::DispatchThread(bool permanent) {
CHECK(unique_lock.OwnsLock());

CHECK_EQ(threads_.erase(Thread::current_thread()), 1);
if (--num_threads_ == 0) {
num_threads_--;
if (num_threads_ + num_threads_pending_start_ == 0) {
no_threads_cond_.Broadcast();

// Sanity check: if we're the last thread exiting, the queue ought to be
Expand All @@ -689,17 +717,9 @@ void ThreadPool::DispatchThread(bool permanent) {
}
}

Status ThreadPool::CreateThreadUnlocked() {
// The first few threads are permanent, and do not time out.
bool permanent = (num_threads_ < min_threads_);
scoped_refptr<Thread> t;
Status s = kudu::Thread::Create("thread pool", strings::Substitute("$0 [worker]", name_),
&ThreadPool::DispatchThread, this, permanent, &t);
if (s.ok()) {
InsertOrDie(&threads_, t.get());
num_threads_++;
}
return s;
Status ThreadPool::CreateThread() {
return kudu::Thread::Create("thread pool", strings::Substitute("$0 [worker]", name_),
&ThreadPool::DispatchThread, this, nullptr);
}

void ThreadPool::CheckNotPoolThreadUnlocked() {
Expand Down
Loading

0 comments on commit f5e1203

Please sign in to comment.