Skip to content

Commit

Permalink
kernel: Remove dependency on CScheduler
Browse files Browse the repository at this point in the history
By defining a virtual interface class for the scheduler client, users of
the kernel can now define their own event consuming infrastructure,
without having to spawn threads or rely on the scheduler design.

Removing CScheduler also allows removing the thread and
exception modules from the kernel library.
  • Loading branch information
TheCharlatan committed Feb 16, 2024
1 parent 06069b3 commit d5228ef
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 37 deletions.
4 changes: 1 addition & 3 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ BITCOIN_CORE_H = \
util/spanparsing.h \
util/string.h \
util/syserror.h \
util/task_runner.h \
util/thread.h \
util/threadinterrupt.h \
util/threadnames.h \
Expand Down Expand Up @@ -975,7 +976,6 @@ libbitcoinkernel_la_SOURCES = \
pubkey.cpp \
random.cpp \
randomenv.cpp \
scheduler.cpp \
script/interpreter.cpp \
script/script.cpp \
script/script_error.cpp \
Expand All @@ -992,7 +992,6 @@ libbitcoinkernel_la_SOURCES = \
util/batchpriority.cpp \
util/chaintype.cpp \
util/check.cpp \
util/exception.cpp \
util/fs.cpp \
util/fs_helpers.cpp \
util/hasher.cpp \
Expand All @@ -1003,7 +1002,6 @@ libbitcoinkernel_la_SOURCES = \
util/strencodings.cpp \
util/string.cpp \
util/syserror.cpp \
util/thread.cpp \
util/threadnames.cpp \
util/time.cpp \
util/tokenpipe.cpp \
Expand Down
15 changes: 2 additions & 13 deletions src/bitcoin-chainstate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@
#include <node/caches.h>
#include <node/chainstate.h>
#include <random.h>
#include <scheduler.h>
#include <script/sigcache.h>
#include <util/chaintype.h>
#include <util/fs.h>
#include <util/thread.h>
#include <util/task_runner.h>
#include <validation.h>
#include <validationinterface.h>

Expand Down Expand Up @@ -68,16 +67,7 @@ int main(int argc, char* argv[])
Assert(InitSignatureCache(validation_cache_sizes.signature_cache_bytes));
Assert(InitScriptExecutionCache(validation_cache_sizes.script_execution_cache_bytes));


// SETUP: Scheduling and Background Signals
CScheduler scheduler{};
// Start the lightweight task scheduler thread
scheduler.m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { scheduler.serviceQueue(); });

ValidationSignals validation_signals{scheduler};

// Gather some entropy once per minute.
scheduler.scheduleEvery(RandAddPeriodic, std::chrono::minutes{1});
ValidationSignals validation_signals{std::make_unique<util::ImmediateTaskRunner>()};

class KernelNotifications : public kernel::Notifications
{
Expand Down Expand Up @@ -288,7 +278,6 @@ int main(int argc, char* argv[])
epilogue:
// Without this precise shutdown sequence, there will be a lot of nullptr
// dereferencing and UB.
scheduler.stop();
if (chainman.m_thread_load.joinable()) chainman.m_thread_load.join();

validation_signals.FlushBackgroundCallbacks();
Expand Down
2 changes: 1 addition & 1 deletion src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1164,7 +1164,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
}, std::chrono::minutes{5});

assert(!node.validation_signals);
node.validation_signals = std::make_unique<ValidationSignals>(scheduler);
node.validation_signals = std::make_unique<ValidationSignals>(std::make_unique<SerialTaskRunner>(scheduler));
auto& validation_signals = *node.validation_signals;

// Create client interfaces for wallets that are supposed to be loaded
Expand Down
13 changes: 9 additions & 4 deletions src/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <attributes.h>
#include <sync.h>
#include <threadsafety.h>
#include <util/task_runner.h>

#include <chrono>
#include <condition_variable>
Expand Down Expand Up @@ -120,12 +121,16 @@ class CScheduler
* B() will be able to observe all of the effects of callback A() which executed
* before it.
*/
class SerialTaskRunner
class SerialTaskRunner : public util::TaskRunnerInterface
{
private:
CScheduler& m_scheduler;

Mutex m_callbacks_mutex;

// We are not allowed to assume the scheduler only runs in one thread,
// but must ensure all callbacks happen in-order, so we end up creating
// our own queue here :(
std::list<std::function<void()>> m_callbacks_pending GUARDED_BY(m_callbacks_mutex);
bool m_are_callbacks_running GUARDED_BY(m_callbacks_mutex) = false;

Expand All @@ -141,15 +146,15 @@ class SerialTaskRunner
* Practically, this means that callbacks can behave as if they are executed
* in order by a single thread.
*/
void insert(std::function<void()> func) EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
void insert(std::function<void()> func) override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);

/**
* Processes all remaining queue members on the calling thread, blocking until queue is empty
* Must be called after the CScheduler has no remaining processing threads!
*/
void flush() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
void flush() override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);

size_t size() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
size_t size() override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
};

#endif // BITCOIN_SCHEDULER_H
2 changes: 1 addition & 1 deletion src/test/util/setup_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ ChainTestingSetup::ChainTestingSetup(const ChainType chainType, const std::vecto
// from blocking due to queue overrun.
m_node.scheduler = std::make_unique<CScheduler>();
m_node.scheduler->m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { m_node.scheduler->serviceQueue(); });
m_node.validation_signals = std::make_unique<ValidationSignals>(*m_node.scheduler);
m_node.validation_signals = std::make_unique<ValidationSignals>(std::make_unique<SerialTaskRunner>(*m_node.scheduler));

m_node.fee_estimator = std::make_unique<CBlockPolicyEstimator>(FeeestPath(*m_node.args), DEFAULT_ACCEPT_STALE_FEE_ESTIMATES);
m_node.mempool = std::make_unique<CTxMemPool>(MemPoolOptionsForTest(m_node));
Expand Down
52 changes: 52 additions & 0 deletions src/util/task_runner.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright (c) 2024-present The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

#ifndef BITCOIN_UTIL_TASK_RUNNER_H
#define BITCOIN_UTIL_TASK_RUNNER_H

#include <cstddef>
#include <functional>

namespace util {

/** @file
* This header provides an interface and simple implementation for a task
* runner. Another threaded, serial implementation using a queue is available in
* the scheduler module's SerialTaskRunner.
*/

class TaskRunnerInterface
{
public:
virtual ~TaskRunnerInterface() {}

/**
* The callback can either be queued for later/asynchronous/threaded
* processing, or be executed immediately for synchronous processing.
*/

virtual void insert(std::function<void()> func) = 0;

/**
* Forces the processing of all pending events.
*/
virtual void flush() = 0;

/**
* Returns the number of currently pending events.
*/
virtual size_t size() = 0;
};

class ImmediateTaskRunner : public TaskRunnerInterface
{
public:
void insert(std::function<void()> func) override { func(); }
void flush() override {}
size_t size() override { return 0; }
};

} // namespace util

#endif // BITCOIN_UTIL_TASK_RUNNER_H
24 changes: 11 additions & 13 deletions src/validationinterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@

#include <validationinterface.h>

#include <attributes.h>
#include <chain.h>
#include <consensus/validation.h>
#include <kernel/chain.h>
#include <kernel/mempool_entry.h>
#include <logging.h>
#include <primitives/block.h>
#include <primitives/transaction.h>
#include <scheduler.h>
#include <util/check.h>
#include <util/task_runner.h>

#include <future>
#include <unordered_map>
Expand Down Expand Up @@ -42,12 +42,10 @@ class ValidationSignalsImpl
std::unordered_map<CValidationInterface*, std::list<ListEntry>::iterator> m_map GUARDED_BY(m_mutex);

public:
// We are not allowed to assume the scheduler only runs in one thread,
// but must ensure all callbacks happen in-order, so we end up creating
// our own queue here :(
SerialTaskRunner m_task_runner;
std::unique_ptr<util::TaskRunnerInterface> m_task_runner;

explicit ValidationSignalsImpl(CScheduler& scheduler LIFETIMEBOUND) : m_task_runner(scheduler) {}
explicit ValidationSignalsImpl(std::unique_ptr<util::TaskRunnerInterface> task_runner)
: m_task_runner{std::move(Assert(task_runner))} {}

void Register(std::shared_ptr<CValidationInterface> callbacks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
Expand Down Expand Up @@ -94,19 +92,19 @@ class ValidationSignalsImpl
}
};

ValidationSignals::ValidationSignals(CScheduler& scheduler)
: m_internals{std::make_unique<ValidationSignalsImpl>(scheduler)} {}
ValidationSignals::ValidationSignals(std::unique_ptr<util::TaskRunnerInterface> task_runner)
: m_internals{std::make_unique<ValidationSignalsImpl>(std::move(task_runner))} {}

ValidationSignals::~ValidationSignals() {}

void ValidationSignals::FlushBackgroundCallbacks()
{
m_internals->m_task_runner.flush();
m_internals->m_task_runner->flush();
}

size_t ValidationSignals::CallbacksPending()
{
return m_internals->m_task_runner.size();
return m_internals->m_task_runner->size();
}

void ValidationSignals::RegisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks)
Expand Down Expand Up @@ -140,7 +138,7 @@ void ValidationSignals::UnregisterAllValidationInterfaces()

void ValidationSignals::CallFunctionInValidationInterfaceQueue(std::function<void()> func)
{
m_internals->m_task_runner.insert(std::move(func));
m_internals->m_task_runner->insert(std::move(func));
}

void ValidationSignals::SyncWithValidationInterfaceQueue()
Expand All @@ -162,7 +160,7 @@ void ValidationSignals::SyncWithValidationInterfaceQueue()
do { \
auto local_name = (name); \
LOG_EVENT("Enqueuing " fmt, local_name, __VA_ARGS__); \
m_internals->m_task_runner.insert([=] { \
m_internals->m_task_runner->insert([=] { \
LOG_EVENT(fmt, local_name, __VA_ARGS__); \
event(); \
}); \
Expand Down
10 changes: 8 additions & 2 deletions src/validationinterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
#include <memory>
#include <vector>

namespace util {
class TaskRunnerInterface;
} // namespace util

class BlockValidationState;
class CBlock;
class CBlockIndex;
struct CBlockLocator;
class CScheduler;
enum class MemPoolRemovalReason;
struct RemovedMempoolTransactionInfo;
struct NewMempoolTransactionInfo;
Expand Down Expand Up @@ -160,7 +163,10 @@ class ValidationSignals {
std::unique_ptr<ValidationSignalsImpl> m_internals;

public:
ValidationSignals(CScheduler& scheduler LIFETIMEBOUND);
// The task runner will block validation if it calls its insert method's
// func argument synchronously. In this class func contains a loop that
// dispatches a single validation event to all subscribers sequentially.
explicit ValidationSignals(std::unique_ptr<util::TaskRunnerInterface> task_runner);

~ValidationSignals();

Expand Down

0 comments on commit d5228ef

Please sign in to comment.