Skip to content

Commit

Permalink
KUDU-1913: cap number of threads on server-wide pools
Browse files Browse the repository at this point in the history
The last piece of work is to establish an upper bound on the number of
threads that may be started in the Raft and Prepare server-wide threadpools.
Such caps will make it easier for admins to reason about appropriate values
for the configuration of the Kudu processes' RLIMIT_NPROC resource.

KUDU-1913 proposed a cap of "number of cores + number of disks", but a
lively Slack discussion yielded a better solution: set the cap at some
percentage of the process' RLIMIT_NPROC value. Given that the rest of Kudu
generally uses a constant number of threads, this should prevent spikes from
ever exceeding the RLIMIT_NPROC and crashing the server due to an election
storm. This patch implements a cap of 10% per pool and also provides a new
gflag as an "escape hatch" (in case we were horribly wrong).

Note: it's still possible for a massive number of "hot" replicas to exceed
RLIMIT_NPROC by virtue of each replica's log append thread, but the server
is more likely to run out of memory for MemRowSets before that happens.

Change-Id: I194907a7f8a483c9cba71eba8caed6bc6090f618
Reviewed-on: http://gerrit.cloudera.org:8080/9522
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <[email protected]>
Reviewed-by: Todd Lipcon <[email protected]>
  • Loading branch information
adembo committed Mar 8, 2018
1 parent ede0cf0 commit debcb8e
Showing 1 changed file with 57 additions and 11 deletions.
68 changes: 57 additions & 11 deletions src/kudu/kserver/kserver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,44 @@

#include "kudu/kserver/kserver.h"

#include <limits>
#include <cstdint>
#include <memory>
#include <mutex>
#include <ostream>
#include <string>
#include <utility>

#include <gflags/gflags.h>
#include <glog/logging.h>

#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/messenger.h"
#include "kudu/util/env.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/metrics.h"
#include "kudu/util/status.h"
#include "kudu/util/threadpool.h"

DEFINE_int64(server_thread_pool_max_thread_count, -1,
"Maximum number of threads to allow in each server-wide thread "
"pool. If -1, Kudu will use 10% of its running thread per "
"effective uid resource limit as per getrlimit(). It is an error "
"to use a value of 0.");
TAG_FLAG(server_thread_pool_max_thread_count, advanced);
TAG_FLAG(server_thread_pool_max_thread_count, evolving);

static bool ValidateThreadPoolThreadLimit(const char* /*flagname*/, int64_t value) {
if (value == 0) {
LOG(ERROR) << "Invalid thread pool thread limit: cannot be 0";
return false;
}
return true;
}
DEFINE_validator(server_thread_pool_max_thread_count, &ValidateThreadPoolThreadLimit);

using std::string;
using strings::Substitute;

namespace kudu {

Expand Down Expand Up @@ -56,6 +83,30 @@ METRIC_DEFINE_histogram(server, op_apply_run_time, "Operation Apply Run Time",
"that operations consist of very large batches.",
10000000, 2);

namespace {

int64_t GetThreadPoolThreadLimit(Env* env) {
// Maximize this process' running thread limit first, if possible.
static std::once_flag once;
std::call_once(once, [&]() {
env->IncreaseResourceLimit(Env::ResourceLimitType::RUNNING_THREADS_PER_EUID);
});

int64_t rlimit = env->GetResourceLimit(Env::ResourceLimitType::RUNNING_THREADS_PER_EUID);
// See server_thread_pool_max_thread_count.
if (FLAGS_server_thread_pool_max_thread_count == -1) {
return rlimit / 10;
}
LOG_IF(FATAL, FLAGS_server_thread_pool_max_thread_count > rlimit) <<
Substitute(
"Configured server-wide thread pool running thread limit "
"(server_thread_pool_max_thread_count) $0 exceeds euid running "
"thread limit (ulimit) $1",
FLAGS_server_thread_pool_max_thread_count, rlimit);
return FLAGS_server_thread_pool_max_thread_count;
}

} // anonymous namespace

KuduServer::KuduServer(string name,
const ServerBaseOptions& options,
Expand All @@ -75,20 +126,15 @@ Status KuduServer::Init() {
.set_metrics(std::move(metrics))
.Build(&tablet_apply_pool_));

// These pools are shared by all replicas hosted by this server.
//
// Submitted tasks use blocking IO (raft_pool_) or acquire long-held locks
// (tablet_prepare_pool_) so we configure no upper bound on the maximum
// number of threads in each pool (otherwise the default value of "number of
// CPUs" may cause blocking tasks to starve other "fast" tasks). However, the
// effective upper bound is the number of replicas as each will submit its
// own tasks via a dedicated token.
// These pools are shared by all replicas hosted by this server, and thus
// are capped at a portion of the overall per-euid thread resource limit.
int64_t server_wide_pool_limit = GetThreadPoolThreadLimit(fs_manager_->env());
RETURN_NOT_OK(ThreadPoolBuilder("prepare")
.set_max_threads(std::numeric_limits<int>::max())
.set_max_threads(server_wide_pool_limit)
.Build(&tablet_prepare_pool_));
RETURN_NOT_OK(ThreadPoolBuilder("raft")
.set_trace_metric_prefix("raft")
.set_max_threads(std::numeric_limits<int>::max())
.set_max_threads(server_wide_pool_limit)
.Build(&raft_pool_));

return Status::OK();
Expand Down

0 comments on commit debcb8e

Please sign in to comment.