diff --git a/etc/evergreen.yml b/etc/evergreen.yml index ea9f488bb3e1e..19e2a4cbaad2c 100644 --- a/etc/evergreen.yml +++ b/etc/evergreen.yml @@ -2434,15 +2434,6 @@ tasks: resmoke_args: --suites=core --storageEngine=mmapv1 run_multiple_jobs: true -- <<: *task_template - name: jsCore_async - commands: - - func: "do setup" - - func: "run tests" - vars: - resmoke_args: --suites=core --storageEngine=wiredTiger --serviceExecutor=fixedForTesting - run_multiple_jobs: true - - <<: *task_template name: jsCore_inMem commands: @@ -2919,16 +2910,6 @@ tasks: vars: resmoke_args: --suites=parallel --storageEngine=wiredTiger -- <<: *task_template - name: parallel_async - depends_on: - - name: jsCore_async - commands: - - func: "do setup" - - func: "run tests" - vars: - resmoke_args: --suites=parallel --storageEngine=wiredTiger --serviceExecutor=fixedForTesting - - <<: *task_template name: parallel_compatibility depends_on: @@ -3264,15 +3245,6 @@ tasks: resmoke_args: --suites=ssl run_multiple_jobs: true -- <<: *task_template - name: ssl_async - commands: - - func: "do setup" - - func: "run tests" - vars: - resmoke_args: --suites=ssl --serviceExecutor=fixedForTesting - run_multiple_jobs: true - - <<: *task_template name: sslSpecial commands: @@ -3282,15 +3254,6 @@ tasks: resmoke_args: --suites=ssl_special run_multiple_jobs: true -- <<: *task_template - name: sslSpecial_async - commands: - - func: "do setup" - - func: "run tests" - vars: - resmoke_args: --suites=ssl_special --serviceExecutor=fixedForTesting - run_multiple_jobs: true - - <<: *task_template name: tool commands: @@ -5609,11 +5572,12 @@ buildvariants: num_jobs_available: $(grep -c ^processor /proc/cpuinfo) ext: zip use_scons_cache: true + test_flags: --serviceExecutor=adaptive tasks: - name: compile distros: - windows-64-vs2015-large - - name: jsCore_async + - name: jsCore - name: enterprise-windows-64-2k8-inmem display_name: Enterprise Windows 2008R2 (inMemory) @@ -9287,12 +9251,137 @@ buildvariants: num_jobs_available: $(($(grep -c ^processor /proc/cpuinfo) / 3)) # Avoid starting too many mongod's under {A,UB}SAN build. build_mongoreplay: false hang_analyzer_dump_core: false + test_flags: --serviceExecutor=adaptive tasks: - name: compile - - name: jsCore_async - - name: parallel_async - - name: ssl_async - - name: sslSpecial_async + - name: compile_all + - name: aggregation + - name: aggregation_WT + - name: aggregation_WT_ese + - name: aggregation_auth + - name: aggregation_facet_unwind_passthrough_WT + - name: aggregation_read_concern_majority_passthrough_WT + - name: aggregation_sharded_collections_passthrough_WT +# TODO Put back when SERVER-30471 is done +# - name: audit +# - name: audit_WT +# - name: auth +# - name: auth_WT +# - name: auth_audit +# - name: auth_audit_WT + - name: bulk_gle_passthrough + - name: bulk_gle_passthrough_WT + - name: causally_consistent_jscore_passthrough_WT + - name: causally_consistent_jscore_passthrough_auth_WT + - name: concurrency + - name: concurrency_WT + - name: concurrency_replication + - name: concurrency_replication_WT + - name: concurrency_sharded + - name: concurrency_sharded_WT + - name: concurrency_simultaneous + - name: concurrency_simultaneous_WT + - name: dbtest + - name: dbtest_WT + - name: disk + - name: disk_WT + - name: dur_jscore_passthrough + - name: durability + - name: ese_WT + - name: failpoints + - name: failpoints_auth + - name: gle_auth + - name: gle_auth_WT + - name: gle_auth_basics_passthrough + - name: gle_auth_basics_passthrough_WT + - name: gle_auth_basics_passthrough_write_cmd + - name: gle_auth_basics_passthrough_write_cmd_WT + - name: gle_auth_write_cmd + - name: gle_auth_write_cmd_WT + - name: integration_tests_replset + - name: integration_tests_sharded + - name: integration_tests_standalone + - name: jsCore + - name: jsCore_WT + - name: jsCore_WT_ese + - name: jsCore_inMem + - name: jsCore_auth + - name: jsCore_compatibility + - name: jsCore_compatibility_WT + - name: jsCore_decimal + - name: jsCore_decimal_WT + - name: jsCore_minimum_batch_size_WT + - name: jsCore_op_query + - name: jsCore_op_query_WT + - name: mmap + - name: mongosTest + - name: multiversion + - name: multiversion_auth + - name: multiversion_WT + - name: multiversion_multistorage_engine + - name: noPassthrough + - name: noPassthroughWithMongod + - name: noPassthroughWithMongod_WT + - name: noPassthrough_WT + - name: parallel + - name: parallel_WT + - name: parallel_compatibility + - name: parallel_compatibility_WT + - name: read_concern_linearizable_passthrough + - name: read_concern_linearizable_passthrough_WT + - name: read_concern_majority_passthrough_WT + - name: read_only + - name: read_only_WT + - name: read_only_sharded + - name: read_only_sharded_WT +# TODO Put back when SERVER-30471 is done +# - name: replica_sets +# - name: replica_sets_WT +# - name: replica_sets_WT_ese +# - name: replica_sets_auth +# - name: replica_sets_pv0 + - name: replica_sets_jscore_passthrough + - name: replica_sets_jscore_passthrough_WT + - name: replica_sets_initsync_jscore_passthrough + - name: replica_sets_initsync_jscore_passthrough_WT + - name: replica_sets_initsync_static_jscore_passthrough + - name: replica_sets_initsync_static_jscore_passthrough_WT + - name: replica_sets_resync_static_jscore_passthrough + - name: replica_sets_resync_static_jscore_passthrough_WT + - name: replica_sets_kill_secondaries_jscore_passthrough_WT + - name: master_slave + - name: master_slave_WT + - name: master_slave_auth + - name: master_slave_jscore_passthrough + - name: master_slave_jscore_passthrough_WT + - name: sasl + - name: sharded_collections_jscore_passthrough + - name: sharded_collections_jscore_passthrough_WT + - name: sharding + - name: sharding_WT + - name: sharding_WT_ese + - name: sharding_auth + - name: sharding_auth_audit_WT + - name: sharding_gle_auth_basics_passthrough + - name: sharding_gle_auth_basics_passthrough_WT + - name: sharding_gle_auth_basics_passthrough_write_cmd + - name: sharding_gle_auth_basics_passthrough_write_cmd_WT + - name: sharding_last_stable_mongos_and_mixed_shards + - name: sharding_jscore_passthrough + - name: sharding_jscore_passthrough_WT + - name: sharding_jscore_op_query_passthrough_WT + - name: sharding_jscore_passthrough_wire_ops_WT + - name: sharding_op_query_WT + - name: slow1 + - name: slow1_WT + - name: serial_run + - name: serial_run_WT + - name: snmp + - name: snmp_WT + - name: ssl + - name: sslSpecial + - name: tool + - name: tool_WT - name: enterprise-ubuntu-dynamic-1604-64-bit display_name: "* Shared Library Enterprise Ubuntu 16.04" diff --git a/jstests/noPassthrough/transportlayer_boot_cmdline.js b/jstests/noPassthrough/transportlayer_boot_cmdline.js index 29064893f9951..bbb5054be06ee 100644 --- a/jstests/noPassthrough/transportlayer_boot_cmdline.js +++ b/jstests/noPassthrough/transportlayer_boot_cmdline.js @@ -9,18 +9,14 @@ var baseDir = "jstests_transportlayer_boot_cmdline"; var dbpath = MongoRunner.dataPath + baseDir + "/"; - var m = MongoRunner.runMongod({dbpath: dbpath, transportLayer: 'legacy'}); - assert(m, 'MongoDB with transportLayer=legacy failed to start up'); - MongoRunner.stopMongod(m); - - m = MongoRunner.runMongod( + var m = MongoRunner.runMongod( {dbpath: dbpath, transportLayer: 'legacy', serviceExecutor: 'synchronous'}); assert(m, 'MongoDB with transportLayer=legacy and serviceExecutor=synchronous failed to start up'); MongoRunner.stopMongod(m); m = MongoRunner.runMongod( - {dbpath: dbpath, transportLayer: 'legacy', serviceExecutor: 'fixedForTesting'}); + {dbpath: dbpath, transportLayer: 'legacy', serviceExecutor: 'adaptive'}); assert.isnull( m, 'MongoDB with transportLayer=legacy and serviceExecutor=fixedForTesting managed to startup which is an unsupported combination'); diff --git a/src/mongo/db/server_options_helpers.cpp b/src/mongo/db/server_options_helpers.cpp index 7cbd6013d32e8..753acfd38c13d 100644 --- a/src/mongo/db/server_options_helpers.cpp +++ b/src/mongo/db/server_options_helpers.cpp @@ -825,7 +825,7 @@ Status storeServerOptions(const moe::Environment& params) { "must be \"synchronous\""}; } } else { - const auto valid = {"synchronous"_sd, "fixedForTesting"_sd}; + const auto valid = {"synchronous"_sd, "adaptive"_sd}; if (std::find(valid.begin(), valid.end(), value) == valid.end()) { return {ErrorCodes::BadValue, "Unsupported value for serviceExecutor"}; } diff --git a/src/mongo/db/service_context.h b/src/mongo/db/service_context.h index db486a7285742..06c7892c50447 100644 --- a/src/mongo/db/service_context.h +++ b/src/mongo/db/service_context.h @@ -40,7 +40,7 @@ #include "mongo/stdx/functional.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/mutex.h" -#include "mongo/transport/service_executor_base.h" +#include "mongo/transport/service_executor.h" #include "mongo/transport/session.h" #include "mongo/util/clock_source.h" #include "mongo/util/decorable.h" diff --git a/src/mongo/shell/servers.js b/src/mongo/shell/servers.js index 995dfd993cf7b..a1b9acb38a5e0 100644 --- a/src/mongo/shell/servers.js +++ b/src/mongo/shell/servers.js @@ -1058,7 +1058,7 @@ var MongoRunner, _startMongod, startMongoProgram, runMongoProgram, startMongoPro if (jsTest.options().serviceExecutor && (!programVersion || (parseInt(programVersion.split(".")[0]) >= 3 && parseInt(programVersion.split(".")[1]) >= 5))) { - if (!argArrayContains("serviceExecutor")) { + if (!argArrayContains("--serviceExecutor")) { argArray.push(...["--serviceExecutor", jsTest.options().serviceExecutor]); } } diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index 5fe57bfa9e5a6..760108e737733 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -29,30 +29,32 @@ env.Library( ) env.Library( - target='transport_layer_manager', + target='transport_layer_mock', source=[ - 'transport_layer_manager.cpp', + 'transport_layer_mock.cpp', ], LIBDEPS=[ - 'transport_layer', - ], - LIBDEPS_PRIVATE=[ - 'service_executor', + 'transport_layer_common', ], ) -env.Library( - target='transport_layer_mock', +tlEnv = env.Clone() +tlEnv.InjectThirdPartyIncludePaths(libraries=['asio']) + +tlEnv.Library( + target='transport_layer_manager', source=[ - 'transport_layer_mock.cpp', + 'transport_layer_manager.cpp', ], LIBDEPS=[ - 'transport_layer_common', + 'transport_layer', + ], + LIBDEPS_PRIVATE=[ + 'service_executor', + '$BUILD_DIR/third_party/shim_asio', ], ) -tlEnv = env.Clone() -tlEnv.InjectThirdPartyIncludePaths(libraries=['asio']) tlEnv.Library( target='transport_layer', source=[ @@ -67,6 +69,8 @@ tlEnv.Library( '$BUILD_DIR/mongo/db/server_options_core', '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/db/stats/counters', + ], + LIBDEPS_PRIVATE=[ '$BUILD_DIR/third_party/shim_asio', ], ) @@ -74,8 +78,7 @@ tlEnv.Library( tlEnv.Library( target='service_executor', source=[ - 'service_executor_base.cpp', - 'service_executor_fixed.cpp', + 'service_executor_adaptive.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/service_context', @@ -86,6 +89,20 @@ tlEnv.Library( ], ) +tlEnv.CppUnitTest( + target='service_executor_adaptive_test', + source=[ + 'service_executor_adaptive_test.cpp', + ], + LIBDEPS=[ + 'service_executor', + '$BUILD_DIR/mongo/db/service_context', + '$BUILD_DIR/mongo/unittest/unittest', + '$BUILD_DIR/mongo/util/clock_source_mock', + '$BUILD_DIR/third_party/shim_asio', + ], +) + env.Library( target='service_entry_point_test_suite', source=[ diff --git a/src/mongo/transport/service_executor.h b/src/mongo/transport/service_executor.h index fc2d442eed51e..b4570d3e2b661 100644 --- a/src/mongo/transport/service_executor.h +++ b/src/mongo/transport/service_executor.h @@ -45,6 +45,10 @@ class ServiceExecutor { public: virtual ~ServiceExecutor() = default; using Task = stdx::function; + enum ScheduleFlags { + EmptyFlags = 0, + DeferredTask = 1, + }; /* * Starts the ServiceExecutor. This may create threads even if no tasks are scheduled. @@ -56,8 +60,11 @@ class ServiceExecutor { * * This is guaranteed to unwind the stack before running the task, although the task may be * run later in the same thread. + * + * If defer is true, then the executor may defer execution of this Task until an available + * thread is available. */ - virtual Status schedule(Task task) = 0; + virtual Status schedule(Task task, ScheduleFlags flags) = 0; /* * Stops and joins the ServiceExecutor. Any outstanding tasks will not be executed, and any diff --git a/src/mongo/transport/service_executor_adaptive.cpp b/src/mongo/transport/service_executor_adaptive.cpp new file mode 100644 index 0000000000000..9ef23266c2a51 --- /dev/null +++ b/src/mongo/transport/service_executor_adaptive.cpp @@ -0,0 +1,491 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kExecutor; + +#include "mongo/platform/basic.h" + +#include "mongo/transport/service_executor_adaptive.h" + +#include + +#include "mongo/db/server_parameters.h" +#include "mongo/util/concurrency/thread_name.h" +#include "mongo/util/concurrency/threadlocal.h" +#include "mongo/util/log.h" +#include "mongo/util/processinfo.h" +#include "mongo/util/scopeguard.h" +#include "mongo/util/stringutils.h" + +#include + +namespace mongo { +namespace transport { +namespace { +// The executor will always keep this many number of threads around. If the value is -1, +// (the default) then it will be set to number of cores / 2. +MONGO_EXPORT_SERVER_PARAMETER(adaptiveServiceExecutorReservedThreads, int, -1); + +// Each worker thread will allow ASIO to run for this many milliseconds before checking +// whether it should exit +MONGO_EXPORT_SERVER_PARAMETER(adaptiveServiceExecutorRunTimeMillis, int, 5000); + +// The above parameter will be offset of some random value between -runTimeJitters/ +// +runTimeJitters so that not all threads are starting/stopping execution at the same time +MONGO_EXPORT_SERVER_PARAMETER(adaptiveServiceExecutorRunTimeJitterMillis, int, 500); + +// This is the maximum amount of time the controller thread will sleep before doing any +// stuck detection +MONGO_EXPORT_SERVER_PARAMETER(adaptiveServiceExecutorStuckThreadTimeoutMillis, int, 250); + +// The maximum allowed latency between when a task is scheduled and a thread is started to +// service it. +MONGO_EXPORT_SERVER_PARAMETER(adaptiveServiceExecutorMaxQueueLatencyMicros, int, 50); + +// Threads will exit themselves if they spent less than this percentage of the time they ran +// doing actual work. +MONGO_EXPORT_SERVER_PARAMETER(adaptiveServiceExecutorIdlePctThreshold, int, 60); + +MONGO_TRIVIALLY_CONSTRUCTIBLE_THREAD_LOCAL TickSource::Tick ticksSpentExecuting = 0; +MONGO_TRIVIALLY_CONSTRUCTIBLE_THREAD_LOCAL TickSource::Tick ticksSpentScheduled = 0; +MONGO_TRIVIALLY_CONSTRUCTIBLE_THREAD_LOCAL int tasksExecuted = 0; + +constexpr auto kTotalScheduled = "totalScheduled"_sd; +constexpr auto kTotalExecuted = "totalExecuted"_sd; +constexpr auto kQueueDepth = "queueDepth"_sd; +constexpr auto kTotalTimeExecutingUs = "totaltimeExecutingMicros"_sd; +constexpr auto kTotalTimeRunningUs = "totalTimeRunningMicros"_sd; +constexpr auto kTotalTimeQueuedUs = "totalTimeQueuedMicros"_sd; +constexpr auto kTasksExecuting = "tasksExecutng"; +constexpr auto kThreadsRunning = "threadsRunning"; +constexpr auto kThreadsPending = "threadsPending"; +constexpr auto kExecutorLabel = "executor"; +constexpr auto kExecutorName = "adaptive"; + +int64_t ticksToMicros(TickSource::Tick ticks, TickSource* tickSource) { + invariant(tickSource->getTicksPerSecond() > 1000000); + static const auto ticksPerMicro = tickSource->getTicksPerSecond() / 1000000; + return ticks / ticksPerMicro; +} + +struct ServerParameterOptions : public ServiceExecutorAdaptive::Options { + int reservedThreads() const final { + int value = adaptiveServiceExecutorReservedThreads.load(); + if (value == -1) { + ProcessInfo pi; + value = pi.getNumAvailableCores().value_or(pi.getNumCores()) / 2; + value = std::max(value, 2); + adaptiveServiceExecutorReservedThreads.store(value); + log() << "No thread count configured for executor. Using number of cores / 2: " + << value; + } + return value; + } + + Milliseconds workerThreadRunTime() const final { + return Milliseconds{adaptiveServiceExecutorRunTimeMillis.load()}; + } + + int runTimeJitter() const final { + return adaptiveServiceExecutorRunTimeJitterMillis.load(); + } + + Milliseconds stuckThreadTimeout() const final { + return Milliseconds{adaptiveServiceExecutorStuckThreadTimeoutMillis.load()}; + } + + Microseconds maxQueueLatency() const final { + return Microseconds{adaptiveServiceExecutorMaxQueueLatencyMicros.load()}; + } + + int idlePctThreshold() const final { + return adaptiveServiceExecutorIdlePctThreshold.load(); + } +}; + + +} // namespace + +ServiceExecutorAdaptive::ServiceExecutorAdaptive(ServiceContext* ctx, + std::shared_ptr ioCtx) + : ServiceExecutorAdaptive(ctx, std::move(ioCtx), stdx::make_unique()) {} + +ServiceExecutorAdaptive::ServiceExecutorAdaptive(ServiceContext* ctx, + std::shared_ptr ioCtx, + std::unique_ptr config) + : _ioContext(std::move(ioCtx)), + _config(std::move(config)), + _tickSource(ctx->getTickSource()), + _lastScheduleTimer(_tickSource) {} + +ServiceExecutorAdaptive::~ServiceExecutorAdaptive() { + invariant(!_isRunning.load()); +} + +Status ServiceExecutorAdaptive::start() { + invariant(!_isRunning.load()); + _isRunning.store(true); + _controllerThread = stdx::thread(&ServiceExecutorAdaptive::_controllerThreadRoutine, this); + for (auto i = 0; i < _config->reservedThreads(); i++) { + _startWorkerThread(); + } + + return Status::OK(); +} + +Status ServiceExecutorAdaptive::shutdown() { + if (!_isRunning.load()) + return Status::OK(); + + _isRunning.store(false); + + _scheduleCondition.notify_one(); + _controllerThread.join(); + + stdx::unique_lock lk(_threadsMutex); + _ioContext->stop(); + _deathCondition.wait(lk, [&] { return _threads.empty(); }); + + return Status::OK(); +} + +Status ServiceExecutorAdaptive::schedule(ServiceExecutorAdaptive::Task task, ScheduleFlags flags) { + auto scheduleTime = _tickSource->getTicks(); + auto pending = _tasksPending.addAndFetch(1); + + _ioContext->post([ this, task = std::move(task), scheduleTime ] { + _tasksPending.subtractAndFetch(1); + auto start = _tickSource->getTicks(); + ticksSpentScheduled += (start - scheduleTime); + _tasksExecuting.addAndFetch(1); + + const auto guard = MakeGuard([this, start] { + _tasksExecuting.subtractAndFetch(1); + ++tasksExecuted; + auto thisSpentExecuting = _tickSource->getTicks() - start; + ticksSpentExecuting += thisSpentExecuting; + _totalExecuted.addAndFetch(1); + }); + + task(); + }); + + _lastScheduleTimer.reset(); + _totalScheduled.addAndFetch(1); + + if (_isStarved(pending) && !(flags & DeferredTask)) { + _scheduleCondition.notify_one(); + } + + return Status::OK(); +} + +bool ServiceExecutorAdaptive::_isStarved(int pending) const { + // If threads are still starting, then assume we won't be starved pretty soon, return false + if (_threadsPending.load() > 0) + return false; + + if (pending == -1) { + pending = _tasksPending.load(); + } + // If there are no pending tasks, then we definitely aren't starved + if (pending == 0) + return false; + + // The available threads is the number that are running - the number that are currently + // executing + auto available = _threadsRunning.load() - _tasksExecuting.load(); + if (pending <= available) + return false; + + return (pending > available); +} + +void ServiceExecutorAdaptive::_controllerThreadRoutine() { + setThreadName("worker-controller"_sd); + // The scheduleCondition needs a lock to wait on. + stdx::mutex fakeMutex; + stdx::unique_lock fakeLk(fakeMutex); + + TickTimer sinceLastControlRound(_tickSource); + TickSource::Tick lastSpentExecuting = _totalSpentExecuting.load(); + TickSource::Tick lastSpentRunning = _totalSpentRunning.load(); + + while (_isRunning.load()) { + // Make sure that the timer gets reset whenever this loop completes + const auto timerResetGuard = + MakeGuard([&sinceLastControlRound] { sinceLastControlRound.reset(); }); + + _scheduleCondition.wait_for(fakeLk, _config->stuckThreadTimeout().toSystemDuration()); + + // If the executor has stopped, then stop the controller altogether + if (!_isRunning.load()) + break; + + double utilizationPct; + { + auto spentExecuting = _totalSpentExecuting.load(); + auto spentRunning = _totalSpentRunning.load(); + auto diffExecuting = spentExecuting - lastSpentExecuting; + auto diffRunning = spentRunning - lastSpentRunning; + + // If no threads have run yet, then don't update anything + if (spentRunning == 0 || diffRunning == 0) + utilizationPct = 0.0; + else { + lastSpentExecuting = spentExecuting; + lastSpentRunning = spentRunning; + + utilizationPct = diffExecuting / static_cast(diffRunning); + utilizationPct *= 100; + } + } + + // If the wait timed out then either the executor is idle or stuck + if (sinceLastControlRound.sinceStart() >= _config->stuckThreadTimeout()) { + // Each call to schedule updates the last schedule ticks so we know the last time a + // task was scheduled + Milliseconds sinceLastSchedule = _lastScheduleTimer.sinceStart(); + + // If the number of tasks executing is the number of threads running (that is all + // threads are currently busy), and the last time a task was able to be scheduled was + // longer than our wait timeout, then we can assume all threads are stuck. + // + // In that case we should start the reserve number of threads so fully unblock the + // thread pool. + // + if ((_tasksExecuting.load() == _threadsRunning.load()) && + (sinceLastSchedule >= _config->stuckThreadTimeout())) { + log() << "Detected blocked worker threads, " + << "starting new reserve threads to unblock service executor"; + for (int i = 0; i < _config->reservedThreads(); i++) { + _startWorkerThread(); + } + } + continue; + } + + auto threadsRunning = _threadsRunning.load(); + if (threadsRunning < _config->reservedThreads()) { + log() << "Starting " << _config->reservedThreads() - threadsRunning + << " to replenish reserved worker threads"; + while (_threadsRunning.load() < _config->reservedThreads()) { + _startWorkerThread(); + } + } + + // If the utilization percentage is lower than our idle threshold, then the threads we + // already have aren't saturated and we shouldn't consider adding new threads at this + // time. + if (utilizationPct < _config->idlePctThreshold()) { + continue; + } + + // While there are threads pending sleep for 50 microseconds (this is our thread latency + // perf budget). + // + // If waiting for pending threads takes longer than the stuckThreadTimeout, then the + // pending threads may be stuck and we should loop back around. + do { + stdx::this_thread::sleep_for(_config->maxQueueLatency().toSystemDuration()); + } while ((_threadsPending.load() > 0) && + (sinceLastControlRound.sinceStart() < _config->stuckThreadTimeout())); + + + // If the number of pending tasks is greater than the number of running threads minus the + // number of tasks executing (the number of free threads), then start a new worker to + // avoid starvation. + if (_isStarved()) { + log() << "Starting worker thread to avoid starvation."; + _startWorkerThread(); + } + } +} + +void ServiceExecutorAdaptive::_startWorkerThread() { + stdx::unique_lock lk(_threadsMutex); + auto it = _threads.emplace(_threads.begin()); + auto num = _threads.size(); + + _threadsPending.addAndFetch(1); + _threadsRunning.addAndFetch(1); + *it = stdx::thread(&ServiceExecutorAdaptive::_workerThreadRoutine, this, num, it); +} + +Milliseconds ServiceExecutorAdaptive::_getThreadJitter() const { + static stdx::mutex jitterMutex; + static std::default_random_engine randomEngine = [] { + std::random_device seed; + return std::default_random_engine(seed()); + }(); + + auto jitterParam = _config->runTimeJitter(); + if (jitterParam == 0) + return Milliseconds{0}; + + std::uniform_int_distribution<> jitterDist(-jitterParam, jitterParam); + + stdx::lock_guard lk(jitterMutex); + auto jitter = jitterDist(randomEngine); + if (jitter < _config->workerThreadRunTime().count()) + jitter = 0; + + return Milliseconds{jitter}; +} + +void ServiceExecutorAdaptive::_workerThreadRoutine( + int threadId, ServiceExecutorAdaptive::ThreadList::iterator it) { + + { + std::string threadName = str::stream() << "worker-" << threadId; + setThreadName(threadName); + } + + log() << "Starting new database worker thread " << threadId; + + // Whether a thread is "pending" reflects whether its had a chance to do any useful work. + // When a thread is pending, it will only try to run one task through ASIO, and report back + // as soon as possible so that the thread controller knows not to keep starting threads while + // the threads it's already created are finishing starting up. + bool stillPending = true; + + const auto guard = MakeGuard([this, &stillPending, it] { + if (stillPending) + _threadsPending.subtractAndFetch(1); + _threadsRunning.subtractAndFetch(1); + + { + stdx::lock_guard lk(_threadsMutex); + it->detach(); + _threads.erase(it); + } + _deathCondition.notify_one(); + }); + + auto jitter = _getThreadJitter(); + + while (_isRunning.load()) { + // We don't want all the threads to start/stop running at exactly the same time, so the + // jitter setParameter adds/removes a random small amount of time to the runtime. + Milliseconds runTime = _config->workerThreadRunTime() + jitter; + dassert(runTime.count() > 0); + + size_t handlersRun; + // Reset our thread-local counters + tasksExecuted = 0; + ticksSpentExecuting = 0; + ticksSpentScheduled = 0; + + auto startRun = _tickSource->getTicks(); + try { + asio::io_context::work work(*_ioContext); + // If we're still "pending" only try to run one task, that way the controller will + // know that it's okay to start adding threads to avoid starvation again. + if (stillPending) { + handlersRun = _ioContext->run_one_for(runTime.toSystemDuration()); + } else { // Otherwise, just run for the full run period + handlersRun = _ioContext->run_for(runTime.toSystemDuration()); + } + + // _ioContext->run_one() will return when all the scheduled handlers are completed, and + // you must call restart() to call run_one() again or else it will return immediately. + // In the case where the server has just started and there has been no work yet, this + // means this loop will spin until the first client connect. Thsi call to restart avoids + // that. + if (_ioContext->stopped()) + _ioContext->restart(); + // If an exceptione escaped from ASIO, then break from this thread and start a new one. + } catch (std::exception& e) { + log() << "Exception escaped worker thread: " << e.what() + << " Starting new worker thread."; + _startWorkerThread(); + break; + } catch (...) { + log() << "Unknown exception escaped worker thread. Starting new worker thread."; + _startWorkerThread(); + break; + } + auto spentRunning = _tickSource->getTicks() - startRun; + _totalSpentRunning.addAndFetch(spentRunning); + _totalSpentExecuting.addAndFetch(ticksSpentExecuting); + _totalSpentScheduled.addAndFetch(ticksSpentScheduled); + + // If we're still pending, let the controller know and go back around for another go + // + // Otherwise we can think about exiting if the last call to run_for() wasn't very + // productive + if (stillPending) { + _threadsPending.subtractAndFetch(1); + stillPending = false; + } else if (_threadsRunning.load() > _config->reservedThreads()) { + // If we executed NO tasks, than just exit. + if (tasksExecuted == 0) { + log() << "Thread was idle for the past " << runTime << ". Exiting thread."; + break; + } + + // If we spent less than our idle threshold actually running tasks then exit the thread. + // This time measurement doesn't include time spent running network callbacks, so the + // threshold is lower than you'd expect. + dassert(ticksSpentExecuting < spentRunning); + dassert(spentRunning < std::numeric_limits::max()); + + // First get the ratio of ticks spent executing to ticks spent running. We expect this + // to be <= 1.0 + double executingToRunning = ticksSpentExecuting / static_cast(spentRunning); + + // Multiply that by 100 to get the percentage of time spent executing tasks. We expect + // this to be <= 100. + executingToRunning *= 100; + dassert(executingToRunning <= 100); + + int pctExecuting = static_cast(executingToRunning); + if (pctExecuting < _config->idlePctThreshold()) { + log() << "Thread was only executing tasks " << pctExecuting << "% over the last " + << runTime << ". Exiting thread."; + break; + } + } + } +} + +void ServiceExecutorAdaptive::appendStats(BSONObjBuilder* bob) const { + BSONObjBuilder section(bob->subobjStart("serviceExecutorTaskStats")); + section << kExecutorLabel << kExecutorName // + << kTotalScheduled << _totalScheduled.load() << kTotalExecuted << _totalExecuted.load() + << kQueueDepth << _tasksPending.load() << kTasksExecuting << _tasksExecuting.load() + << kTotalTimeRunningUs << ticksToMicros(_totalSpentRunning.load(), _tickSource) + << kTotalTimeExecutingUs << ticksToMicros(_totalSpentExecuting.load(), _tickSource) + << kTotalTimeQueuedUs << ticksToMicros(_totalSpentScheduled.load(), _tickSource) + << kThreadsRunning << _threadsRunning.load() << kThreadsPending + << _threadsPending.load(); + section.doneFast(); +} + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/transport/service_executor_adaptive.h b/src/mongo/transport/service_executor_adaptive.h new file mode 100644 index 0000000000000..1ff32831d168a --- /dev/null +++ b/src/mongo/transport/service_executor_adaptive.h @@ -0,0 +1,172 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include + +#include "mongo/db/service_context.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/list.h" +#include "mongo/stdx/thread.h" +#include "mongo/transport/service_executor.h" +#include "mongo/util/tick_source.h" + +#include + +namespace mongo { +namespace transport { + +/** + * This is an ASIO-based adaptive ServiceExecutor. It guarantees that threads will not become stuck + * or deadlocked longer that its configured timeout and that idle threads will terminate themselves + * if they spend more than its configure idle threshold idle. + */ +class ServiceExecutorAdaptive : public ServiceExecutor { +public: + struct Options { + virtual ~Options() = default; + // The minimum number of threads the executor will keep running to service tasks. + virtual int reservedThreads() const = 0; + + // The amount of time each worker thread runs before considering exiting because of + // idleness. + virtual Milliseconds workerThreadRunTime() const = 0; + + // workerThreadRuntime() is offset by a random value between -jitter and +jitter to prevent + // thundering herds + virtual int runTimeJitter() const = 0; + + // The amount of time the controller thread will wait before checking for stuck threads + // to guarantee forward progress + virtual Milliseconds stuckThreadTimeout() const = 0; + + // The maximum allowed latency between when a task is scheduled and a thread is started to + // service it. + virtual Microseconds maxQueueLatency() const = 0; + + // Threads that spend less than this threshold doing work during their workerThreadRunTime + // period will exit + virtual int idlePctThreshold() const = 0; + }; + + explicit ServiceExecutorAdaptive(ServiceContext* ctx, std::shared_ptr ioCtx); + explicit ServiceExecutorAdaptive(ServiceContext* ctx, + std::shared_ptr ioCtx, + std::unique_ptr config); + + ServiceExecutorAdaptive(ServiceExecutorAdaptive&&) = default; + ServiceExecutorAdaptive& operator=(ServiceExecutorAdaptive&&) = default; + virtual ~ServiceExecutorAdaptive(); + + Status start() final; + Status shutdown() final; + Status schedule(Task task, ScheduleFlags flags) final; + + void appendStats(BSONObjBuilder* bob) const final; + + int threadsRunning() { + return _threadsRunning.load(); + } + +private: + using ThreadList = stdx::list; + class TickTimer { + public: + explicit TickTimer(TickSource* tickSource) + : _tickSource(tickSource), + _ticksPerMillisecond(_tickSource->getTicksPerSecond() / 1000), + _start(_tickSource->getTicks()) { + invariant(_ticksPerMillisecond > 0); + } + + TickSource::Tick sinceStartTicks() const { + return _tickSource->getTicks() - _start.load(); + } + + Milliseconds sinceStart() const { + return Milliseconds{sinceStartTicks() / _ticksPerMillisecond}; + } + + void reset() { + _start.store(_tickSource->getTicks()); + } + + private: + TickSource* const _tickSource; + const TickSource::Tick _ticksPerMillisecond; + AtomicWord _start; + }; + + void _startWorkerThread(); + void _workerThreadRoutine(int threadId, ThreadList::iterator it); + void _controllerThreadRoutine(); + bool _isStarved(int pending = -1) const; + Milliseconds _getThreadJitter() const; + TickSource::Tick _getCurrentThreadsRunningTime() const; + + std::shared_ptr _ioContext; + + std::unique_ptr _config; + + stdx::mutex _threadsMutex; + ThreadList _threads; + stdx::thread _controllerThread; + + TickSource* const _tickSource; + AtomicWord _isRunning{false}; + + // These counters are used to detect stuck threads and high task queuing. + AtomicWord _threadsRunning{0}; + AtomicWord _threadsPending{0}; + AtomicWord _tasksExecuting{0}; + AtomicWord _tasksPending{0}; + TickTimer _lastScheduleTimer; + AtomicWord _totalSpentExecuting{0}; + AtomicWord _totalSpentRunning{0}; + + mutable stdx::mutex _threadsRunningTimersMutex; + std::list _threadsRunningTimers; + + // These counters are only used for reporting in serverStatus. + AtomicWord _totalScheduled{0}; + AtomicWord _totalExecuted{0}; + AtomicWord _totalSpentScheduled{0}; + + // Threads signal this condition variable when they exit so we can gracefully shutdown + // the executor. + stdx::condition_variable _deathCondition; + + // Tasks should signal this condition variable if they want the thread controller to + // track their progress and do fast stuck detection + stdx::condition_variable _scheduleCondition; +}; + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/transport/service_executor_adaptive_test.cpp b/src/mongo/transport/service_executor_adaptive_test.cpp new file mode 100644 index 0000000000000..9500458d20c71 --- /dev/null +++ b/src/mongo/transport/service_executor_adaptive_test.cpp @@ -0,0 +1,249 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault; + +#include "mongo/platform/basic.h" + +#include "boost/optional.hpp" + +#include "mongo/db/service_context_noop.h" +#include "mongo/transport/service_executor_adaptive.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/log.h" +#include "mongo/util/scopeguard.h" + +#include + +namespace mongo { +namespace { +using namespace transport; + +struct TestOptions : public ServiceExecutorAdaptive::Options { + int reservedThreads() const final { + return 1; + } + + Milliseconds workerThreadRunTime() const final { + return Milliseconds{1000}; + } + + int runTimeJitter() const final { + return 0; + } + + Milliseconds stuckThreadTimeout() const final { + return Milliseconds{100}; + } + + Microseconds maxQueueLatency() const final { + return duration_cast(Milliseconds{5}); + } + + int idlePctThreshold() const final { + return 0; + } +}; + +class ServiceExecutorAdaptiveFixture : public unittest::Test { +protected: + void setUp() override { + auto scOwned = stdx::make_unique(); + setGlobalServiceContext(std::move(scOwned)); + asioIoCtx = std::make_shared(); + } + + std::shared_ptr asioIoCtx; + + stdx::mutex mutex; + int waitFor = -1; + stdx::condition_variable cond; + stdx::function notifyCallback = [this] { + stdx::unique_lock lk(mutex); + invariant(waitFor != -1); + waitFor--; + cond.notify_one(); + log() << "Ran callback"; + }; + + void waitForCallback(int expected, boost::optional timeout = boost::none) { + stdx::unique_lock lk(mutex); + invariant(waitFor != -1); + if (timeout) { + ASSERT_TRUE(cond.wait_for( + lk, timeout->toSystemDuration(), [&] { return waitFor == expected; })); + } else { + cond.wait(lk, [&] { return waitFor == expected; }); + } + } + + ServiceExecutorAdaptive::Options* config; + std::unique_ptr makeAndStartExecutor() { + auto configOwned = stdx::make_unique(); + config = configOwned.get(); + auto exec = stdx::make_unique( + getGlobalServiceContext(), asioIoCtx, std::move(configOwned)); + + ASSERT_OK(exec->start()); + log() << "wait for executor to finish starting"; + waitFor = 1; + ASSERT_OK(exec->schedule(notifyCallback, ServiceExecutor::EmptyFlags)); + waitForCallback(0); + ASSERT_EQ(exec->threadsRunning(), 1); + + return exec; + } +}; + +/* + * This tests that the executor will launch a new thread if the current threads are blocked, and + * that those threads retire when they become idle. + */ +TEST_F(ServiceExecutorAdaptiveFixture, TestStuckTask) { + stdx::mutex blockedMutex; + stdx::unique_lock blockedLock(blockedMutex); + + auto exec = makeAndStartExecutor(); + auto guard = MakeGuard([&] { + if (blockedLock) + blockedLock.unlock(); + ASSERT_OK(exec->shutdown()); + }); + + log() << "Scheduling blocked task"; + waitFor = 3; + ASSERT_OK(exec->schedule( + [this, &blockedMutex] { + notifyCallback(); + stdx::unique_lock lk(blockedMutex); + notifyCallback(); + }, + ServiceExecutor::EmptyFlags)); + + log() << "Scheduling task stuck on blocked task"; + ASSERT_OK(exec->schedule(notifyCallback, ServiceExecutor::EmptyFlags)); + + log() << "Waiting for second thread to start"; + waitForCallback(1); + ASSERT_EQ(exec->threadsRunning(), 2); + + log() << "Waiting for unstuck task to run"; + blockedLock.unlock(); + waitForCallback(0); + ASSERT_EQ(exec->threadsRunning(), 2); + + log() << "Waiting for second thread to idle out"; + stdx::this_thread::sleep_for(config->workerThreadRunTime().toSystemDuration() * 1.5); + ASSERT_EQ(exec->threadsRunning(), 1); +} + +/* + * This tests that the executor will start a new batch of reserved threads if it detects that + * all + * threads are running a task for longer than the stuckThreadTimeout. + */ +TEST_F(ServiceExecutorAdaptiveFixture, TestStuckThreads) { + stdx::mutex blockedMutex; + stdx::unique_lock blockedLock(blockedMutex); + + auto exec = makeAndStartExecutor(); + auto guard = MakeGuard([&] { + if (blockedLock) + blockedLock.unlock(); + ASSERT_OK(exec->shutdown()); + }); + + auto blockedTask = [this, &blockedMutex] { + log() << "waiting on blocked mutex"; + notifyCallback(); + stdx::unique_lock lk(blockedMutex); + notifyCallback(); + }; + + waitFor = 6; + log() << "Scheduling " << waitFor << " blocked tasks"; + for (auto i = 0; i < waitFor / 2; i++) { + ASSERT_OK(exec->schedule(blockedTask, ServiceExecutor::EmptyFlags)); + } + + log() << "Waiting for executor to start new threads"; + waitForCallback(3); + stdx::this_thread::sleep_for(config->stuckThreadTimeout().toSystemDuration() * 2); + +// TODO The timing of this test is broken on windows, re-enable this test in SERVER-30475 +#ifndef _WIN32 + ASSERT_EQ(exec->threadsRunning(), waitFor + 1); +#endif + + log() << "Waiting for unstuck task to run"; + blockedLock.unlock(); + waitForCallback(0); +} + +/* + * This tests that deferred tasks don't cause a new thread to be created, and they don't + * interfere + * with new normal tasks + */ +TEST_F(ServiceExecutorAdaptiveFixture, TestDeferredTasks) { + stdx::mutex blockedMutex; + stdx::unique_lock blockedLock(blockedMutex); + + auto exec = makeAndStartExecutor(); + auto guard = MakeGuard([&] { + if (blockedLock) + blockedLock.unlock(); + ASSERT_OK(exec->shutdown()); + }); + + waitFor = 3; + log() << "Scheduling a blocking task"; + ASSERT_OK(exec->schedule( + [this, &blockedMutex] { + stdx::unique_lock lk(blockedMutex); + notifyCallback(); + }, + ServiceExecutor::EmptyFlags)); + + log() << "Scheduling deferred task"; + ASSERT_OK(exec->schedule(notifyCallback, ServiceExecutor::DeferredTask)); + + ASSERT_THROWS(waitForCallback(1, config->stuckThreadTimeout()), + unittest::TestAssertionFailureException); + + log() << "Scheduling non-deferred task"; + ASSERT_OK(exec->schedule(notifyCallback, ServiceExecutor::EmptyFlags)); + waitForCallback(1, config->stuckThreadTimeout()); + ASSERT_GT(exec->threadsRunning(), 1); + + blockedLock.unlock(); + waitForCallback(0); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/transport/service_executor_base.cpp b/src/mongo/transport/service_executor_base.cpp deleted file mode 100644 index b2284c13f23ac..0000000000000 --- a/src/mongo/transport/service_executor_base.cpp +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Copyright (C) 2017 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kExecutor; - -#include "mongo/platform/basic.h" - -#include "mongo/transport/service_executor_base.h" - -#include "mongo/bson/bsonobjbuilder.h" -#include "mongo/db/service_context.h" - -namespace mongo { -namespace transport { -namespace { - -constexpr auto kTotalScheduled = "totalScheduled"_sd; -constexpr auto kTotalExecuted = "totalExecuted"_sd; -constexpr auto kQueueDepth = "queueDepth"_sd; -constexpr auto kTotalTimeRunningUs = "totalTimeRunningMicros"_sd; -constexpr auto kTotalTimeQueuedUs = "totalTimeQueuedMicros"_sd; - -int64_t ticksToMicros(TickSource::Tick ticks, TickSource* tickSource) { - invariant(tickSource->getTicksPerSecond() > 1000000); - static const auto ticksPerMicro = tickSource->getTicksPerSecond() / 1000000; - return ticks / ticksPerMicro; -} - -} // namespace - -ServiceExecutorBase::ServiceExecutorBase(ServiceContext* ctx) : _tickSource{ctx->getTickSource()} {} - -Status ServiceExecutorBase::schedule(ServiceExecutorBase::Task task) { - - const auto scheduledTime = _tickSource->getTicks(); - auto wrapped = [ this, task = std::move(task), scheduledTime ] { - auto start = _tickSource->getTicks(); - task(); - auto end = _tickSource->getTicks(); - _outstandingTasks.subtractAndFetch(1); - _tasksExecuted.addAndFetch(1); - _ticksRunning.addAndFetch(end - start); - _ticksQueued.addAndFetch(start - scheduledTime); - }; - - auto ret = _schedule(std::move(wrapped)); - if (ret.isOK()) { - _tasksScheduled.addAndFetch(1); - _outstandingTasks.addAndFetch(1); - } - - return ret; -} - -ServiceExecutorBase::Stats ServiceExecutorBase::getStats() const { - return {_ticksRunning.load(), - _ticksQueued.load(), - _tasksExecuted.load(), - _tasksScheduled.load(), - _outstandingTasks.load()}; -} - -void ServiceExecutorBase::appendStats(BSONObjBuilder* bob) const { - const auto stats = getStats(); - - BSONObjBuilder section(bob->subobjStart("serviceExecutorTaskStats")); - section << kTotalScheduled << static_cast(stats.tasksScheduled) << kTotalExecuted - << static_cast(stats.tasksExecuted) << kQueueDepth << stats.outstandingTasks - << kTotalTimeRunningUs << ticksToMicros(stats.ticksRunning, _tickSource) - << kTotalTimeQueuedUs << ticksToMicros(stats.ticksQueued, _tickSource); - section.doneFast(); -} - -TickSource* ServiceExecutorBase::tickSource() const { - return _tickSource; -} - -} // namespace transport -} // namespace mongo diff --git a/src/mongo/transport/service_executor_base.h b/src/mongo/transport/service_executor_base.h deleted file mode 100644 index 0cc024efaa88a..0000000000000 --- a/src/mongo/transport/service_executor_base.h +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Copyright (C) 2017 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/platform/atomic_word.h" -#include "mongo/transport/service_executor.h" -#include "mongo/util/tick_source.h" - -namespace mongo { -namespace transport { -/* - * This is the base class of ServiceExecutors. - * - * Service executors should derive from this class and implement scheduleImpl(). They may - * get timing/counter statistics by calling getStats(). - */ -class ServiceExecutorBase : public ServiceExecutor { -public: - Status schedule(Task task) final; - - struct Stats { - TickSource::Tick ticksRunning; // Total number of ticks spent running tasks - TickSource::Tick ticksQueued; // Total number of ticks tasks have spent waiting to run - int64_t tasksExecuted; // Total number of tasks executed - int64_t tasksScheduled; // Total number of tasks scheduled - int64_t outstandingTasks; // Current number of tasks waiting to be run - }; - - Stats getStats() const; - void appendStats(BSONObjBuilder* bob) const final; - -protected: - explicit ServiceExecutorBase(ServiceContext* ctx); - - TickSource* tickSource() const; - -private: - // Sub-classes should implement this function to actually schedule the task. It will be called - // by schedule() with a wrapped task that does all the necessary stats/timing tracking. - virtual Status _schedule(Task task) = 0; - - TickSource* _tickSource; - AtomicWord _ticksRunning{0}; - AtomicWord _ticksQueued{0}; - AtomicWord _tasksExecuted{0}; - AtomicWord _tasksScheduled{0}; - AtomicWord _outstandingTasks{0}; -}; - -} // namespace transport -} // namespace mongo diff --git a/src/mongo/transport/service_executor_fixed.cpp b/src/mongo/transport/service_executor_fixed.cpp deleted file mode 100644 index cd299a75ac4a9..0000000000000 --- a/src/mongo/transport/service_executor_fixed.cpp +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Copyright (C) 2017 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kExecutor; - -#include "mongo/platform/basic.h" - -#include "mongo/transport/service_executor_fixed.h" - -#include "mongo/db/server_parameters.h" -#include "mongo/util/log.h" -#include "mongo/util/processinfo.h" - -#include - -namespace mongo { -namespace transport { -namespace { -MONGO_EXPORT_STARTUP_SERVER_PARAMETER(fixedServiceExecutorNumThreads, int, -1); - -} // namespace - -ServiceExecutorFixed::ServiceExecutorFixed(ServiceContext* ctx, - std::shared_ptr ioCtx) - : ServiceExecutorBase(ctx), _ioContext(std::move(ioCtx)) {} - -ServiceExecutorFixed::~ServiceExecutorFixed() { - invariant(!_isRunning.load()); -} - -Status ServiceExecutorFixed::start() { - invariant(!_isRunning.load()); - - auto threadCount = fixedServiceExecutorNumThreads; - if (threadCount == -1) { - ProcessInfo pi; - threadCount = pi.getNumAvailableCores().value_or(pi.getNumCores()); - log() << "No thread count configured for fixed executor. Using number of cores: " - << threadCount; - } - - _isRunning.store(true); - for (auto i = 0; i < threadCount; i++) { - _threads.push_back(stdx::thread([this, i] { - auto threadId = i + 1; - LOG(3) << "Starting worker thread " << threadId; - asio::io_context::work work(*_ioContext); - while (_isRunning.load()) { - _ioContext->run(); - } - LOG(3) << "Exiting worker thread " << threadId; - })); - } - - return Status::OK(); -} - -Status ServiceExecutorFixed::shutdown() { - invariant(_isRunning.load()); - - _isRunning.store(false); - _ioContext->stop(); - for (auto& thread : _threads) { - thread.join(); - } - _threads.clear(); - - return Status::OK(); -} - -Status ServiceExecutorFixed::_schedule(ServiceExecutorFixed::Task task) { - _ioContext->post(std::move(task)); - return Status::OK(); -} - -} // namespace transport -} // namespace mongo diff --git a/src/mongo/transport/service_executor_fixed.h b/src/mongo/transport/service_executor_fixed.h deleted file mode 100644 index 14203cd794fc2..0000000000000 --- a/src/mongo/transport/service_executor_fixed.h +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Copyright (C) 2017 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include - -#include "mongo/stdx/thread.h" -#include "mongo/transport/service_executor_base.h" -#include "mongo/transport/transport_layer_asio.h" - -namespace mongo { -namespace transport { - -/** - * This is an ASIO-based fixed-size ServiceExecutor. It should only be used for testing because - * it won't add any threads if all threads become blocked. - */ -class ServiceExecutorFixed : public ServiceExecutorBase { -public: - explicit ServiceExecutorFixed(ServiceContext* ctx, std::shared_ptr ioCtx); - virtual ~ServiceExecutorFixed(); - - Status start() final; - Status shutdown() final; - -private: - Status _schedule(Task task) final; - - std::shared_ptr _ioContext; - std::vector _threads; - AtomicWord _isRunning{false}; -}; - -} // namespace transport -} // namespace mongo diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index 9c2d335cee735..b520bcc6956ab 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -90,6 +90,7 @@ bool setExhaustMessage(Message* m, const DbResponse& dbresponse) { } // namespace using transport::TransportLayer; +using transport::ServiceExecutor; /* * This class wraps up the logic for swapping/unswapping the Client during runNext(). @@ -162,6 +163,12 @@ class ServiceStateMachine::ThreadGuard { return _okayToRunNext; } + // Returns whether the thread guard is the owner of the SSM's state or not. Callers can use this + // to determine whether their callchain is recursive. + bool isOwner() const { + return _haveTakenOwnership; + } + private: ServiceStateMachine* _ssm; bool _haveTakenOwnership; @@ -200,7 +207,8 @@ void ServiceStateMachine::_sourceCallback(Status status) { // runNext() so that this thread can do other useful work with its timeslice instead of going // to sleep while waiting for the SSM to be released. if (!guard) { - return _scheduleFunc([this, status] { _sourceCallback(status); }); + return _scheduleFunc([this, status] { _sourceCallback(status); }, + ServiceExecutor::DeferredTask); } // Make sure we just called sourceMessage(); @@ -213,7 +221,12 @@ void ServiceStateMachine::_sourceCallback(Status status) { // Since we know that we're going to process a message, call scheduleNext() immediately // to schedule the call to processMessage() on the serviceExecutor (or just unwind the // stack) - return scheduleNext(); + + // If this callback doesn't own the ThreadGuard, then we're being called recursively, + // and the executor shouldn't start a new thread to process the message - it can use this + // one just after this returns. + auto flags = guard.isOwner() ? ServiceExecutor::EmptyFlags : ServiceExecutor::DeferredTask; + return scheduleNext(flags); } else if (ErrorCodes::isInterruption(status.code()) || ErrorCodes::isNetworkError(status.code())) { LOG(2) << "Session from " << remote << " encountered a network error during SourceMessage"; @@ -241,7 +254,8 @@ void ServiceStateMachine::_sinkCallback(Status status) { // runNext() so that this thread can do other useful work with its timeslice instead of going // to sleep while waiting for the SSM to be released. if (!guard) { - return _scheduleFunc([this, status] { _sinkCallback(status); }); + return _scheduleFunc([this, status] { _sinkCallback(status); }, + ServiceExecutor::DeferredTask); } invariant(state() == State::SinkWait); @@ -266,7 +280,11 @@ void ServiceStateMachine::_sinkCallback(Status status) { if (state() == State::EndSession) { _runNextInGuard(guard); } else { // Otherwise scheduleNext to unwind the stack and run the next step later - scheduleNext(); + // If this callback doesn't own the ThreadGuard, then we're being called recursively, + // and the executor shouldn't start a new thread to process the message - it can use this + // one just after this returns. + auto flags = guard.isOwner() ? ServiceExecutor::EmptyFlags : ServiceExecutor::DeferredTask; + return scheduleNext(flags); } } @@ -337,7 +355,7 @@ void ServiceStateMachine::_processMessage(ThreadGuard& guard) { } else { _state.store(State::Source); _inMessage.reset(); - return scheduleNext(); + return scheduleNext(ServiceExecutor::DeferredTask); } } @@ -349,7 +367,7 @@ void ServiceStateMachine::runNext() { // runNext() so that this thread can do other useful work with its timeslice instead of going // to sleep while waiting for the SSM to be released. if (!guard) { - return scheduleNext(); + return scheduleNext(ServiceExecutor::DeferredTask); } return _runNextInGuard(guard); } @@ -420,8 +438,8 @@ void ServiceStateMachine::_runNextInGuard(ThreadGuard& guard) { _cleanupSession(guard); } -void ServiceStateMachine::scheduleNext() { - _maybeScheduleFunc(_serviceContext->getServiceExecutor(), [this] { runNext(); }); +void ServiceStateMachine::scheduleNext(ServiceExecutor::ScheduleFlags flags) { + _maybeScheduleFunc(_serviceContext->getServiceExecutor(), [this] { runNext(); }, flags); } void ServiceStateMachine::terminateIfTagsDontMatch(transport::Session::TagMask tags) { diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h index bd56bf374b47c..620d957572088 100644 --- a/src/mongo/transport/service_state_machine.h +++ b/src/mongo/transport/service_state_machine.h @@ -37,10 +37,11 @@ #include "mongo/stdx/memory.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" +#include "mongo/transport/service_entry_point.h" +#include "mongo/transport/service_executor.h" #include "mongo/transport/session.h" namespace mongo { -class ServiceEntryPoint; /* * The ServiceStateMachine holds the state of a single client connection and represents the @@ -105,7 +106,8 @@ class ServiceStateMachine : public std::enable_shared_from_this - void _maybeScheduleFunc(Executor* svcExec, Func&& func) { + void _maybeScheduleFunc(Executor* svcExec, + Func&& func, + transport::ServiceExecutor::ScheduleFlags flags) { if (svcExec) { uassertStatusOK(svcExec->schedule( - [ func = std::move(func), anchor = shared_from_this() ] { func(); })); + [ func = std::move(func), anchor = shared_from_this() ] { func(); }, flags)); } } template - void _scheduleFunc(Func&& func) { + void _scheduleFunc(Func&& func, transport::ServiceExecutor::ScheduleFlags flags) { auto svcExec = _serviceContext->getServiceExecutor(); invariant(svcExec); - _maybeScheduleFunc(svcExec, func); + _maybeScheduleFunc(svcExec, func, flags); } /* diff --git a/src/mongo/transport/transport_layer_manager.cpp b/src/mongo/transport/transport_layer_manager.cpp index e28115c1e787b..c841cf7993c31 100644 --- a/src/mongo/transport/transport_layer_manager.cpp +++ b/src/mongo/transport/transport_layer_manager.cpp @@ -34,7 +34,7 @@ #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" #include "mongo/stdx/memory.h" -#include "mongo/transport/service_executor_fixed.h" +#include "mongo/transport/service_executor_adaptive.h" #include "mongo/transport/session.h" #include "mongo/transport/transport_layer_asio.h" #include "mongo/transport/transport_layer_legacy.h" @@ -154,9 +154,9 @@ std::unique_ptr TransportLayerManager::createWithConfig( auto transportLayerASIO = stdx::make_unique(opts, sep); - if (config->serviceExecutor == "fixedForTesting") { - ctx->setServiceExecutor( - stdx::make_unique(ctx, transportLayerASIO->getIOContext())); + if (config->serviceExecutor == "adaptive") { + ctx->setServiceExecutor(stdx::make_unique( + ctx, transportLayerASIO->getIOContext())); } transportLayer = std::move(transportLayerASIO); } else if (serverGlobalParams.transportLayer == "legacy") {