Skip to content

Commit

Permalink
Merge pull request ClickHouse#55591 from ClickHouse/fix-keeper-context
Browse files Browse the repository at this point in the history
Apply Context changes to standalone Keeper
  • Loading branch information
antonio2368 authored Oct 16, 2023
2 parents 6d16a85 + 6706884 commit c69838e
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 52 deletions.
107 changes: 66 additions & 41 deletions src/Coordination/Standalone/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <Common/Config/ConfigProcessor.h>
#include <Common/Macros.h>
#include <Common/ThreadPool.h>
#include <Common/callOnce.h>

#include <Core/ServerSettings.h>

Expand All @@ -14,6 +15,7 @@
namespace ProfileEvents
{
extern const Event ContextLock;
extern const Event ContextLockWaitMicroseconds;
}

namespace CurrentMetrics
Expand All @@ -39,8 +41,8 @@ struct ContextSharedPart : boost::noncopyable
: macros(std::make_unique<Macros>())
{}

/// For access of most of shared objects. Recursive mutex.
mutable std::recursive_mutex mutex;
/// For access of most of shared objects.
mutable SharedMutex mutex;

mutable std::mutex keeper_dispatcher_mutex;
mutable std::shared_ptr<KeeperDispatcher> keeper_dispatcher TSA_GUARDED_BY(keeper_dispatcher_mutex);
Expand All @@ -50,27 +52,31 @@ struct ContextSharedPart : boost::noncopyable
String path; /// Path to the data directory, with a slash at the end.
ConfigurationPtr config; /// Global configuration settings.
MultiVersion<Macros> macros; /// Substitutions extracted from config.
OnceFlag schedule_pool_initialized;
mutable std::unique_ptr<BackgroundSchedulePool> schedule_pool; /// A thread pool that can run different jobs in background
RemoteHostFilter remote_host_filter; /// Allowed URL from config.xml
///

mutable OnceFlag readers_initialized;
mutable std::unique_ptr<IAsynchronousReader> asynchronous_remote_fs_reader;
mutable std::unique_ptr<IAsynchronousReader> asynchronous_local_fs_reader;
mutable std::unique_ptr<IAsynchronousReader> synchronous_local_fs_reader;

mutable OnceFlag threadpool_writer_initialized;
mutable std::unique_ptr<ThreadPool> threadpool_writer;

mutable ThrottlerPtr remote_read_throttler; /// A server-wide throttler for remote IO reads
mutable ThrottlerPtr remote_write_throttler; /// A server-wide throttler for remote IO writes

mutable ThrottlerPtr local_read_throttler; /// A server-wide throttler for local IO reads
mutable ThrottlerPtr local_write_throttler; /// A server-wide throttler for local IO writes

};

ContextData::ContextData() = default;
ContextData::ContextData(const ContextData &) = default;

Context::Context() = default;
Context::Context(const Context & rhs) : ContextData(rhs), std::enable_shared_from_this<Context>(rhs) {}
Context::~Context() = default;
Context::Context(const Context &) = default;
Context & Context::operator=(const Context &) = default;

SharedContextHolder::SharedContextHolder(SharedContextHolder &&) noexcept = default;
SharedContextHolder & SharedContextHolder::operator=(SharedContextHolder &&) noexcept = default;
Expand All @@ -87,10 +93,10 @@ void Context::makeGlobalContext()
global_context = shared_from_this();
}

ContextMutablePtr Context::createGlobal(ContextSharedPart * shared)
ContextMutablePtr Context::createGlobal(ContextSharedPart * shared_part)
{
auto res = std::shared_ptr<Context>(new Context);
res->shared = shared;
res->shared = shared_part;
return res;
}

Expand All @@ -105,29 +111,63 @@ SharedContextHolder Context::createShared()
return SharedContextHolder(std::make_unique<ContextSharedPart>());
}


ContextMutablePtr Context::getGlobalContext() const
{
auto ptr = global_context.lock();
if (!ptr) throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no global context or global context has expired");
return ptr;
}

std::unique_lock<std::recursive_mutex> Context::getLock() const
std::unique_lock<SharedMutex> Context::getGlobalLock() const
{
ProfileEvents::increment(ProfileEvents::ContextLock);
CurrentMetrics::Increment increment{CurrentMetrics::ContextLockWait};
Stopwatch watch;
auto lock = std::unique_lock(shared->mutex);
ProfileEvents::increment(ProfileEvents::ContextLockWaitMicroseconds, watch.elapsedMicroseconds());
return lock;
}

std::shared_lock<SharedMutex> Context::getGlobalSharedLock() const
{
ProfileEvents::increment(ProfileEvents::ContextLock);
CurrentMetrics::Increment increment{CurrentMetrics::ContextLockWait};
Stopwatch watch;
auto lock = std::shared_lock(shared->mutex);
ProfileEvents::increment(ProfileEvents::ContextLockWaitMicroseconds, watch.elapsedMicroseconds());
return lock;
}

std::unique_lock<SharedMutex> Context::getLocalLock() const
{
ProfileEvents::increment(ProfileEvents::ContextLock);
CurrentMetrics::Increment increment{CurrentMetrics::ContextLockWait};
return std::unique_lock(shared->mutex);
Stopwatch watch;
auto lock = std::unique_lock(mutex);
ProfileEvents::increment(ProfileEvents::ContextLockWaitMicroseconds, watch.elapsedMicroseconds());
return lock;
}

std::shared_lock<SharedMutex> Context::getLocalSharedLock() const
{
ProfileEvents::increment(ProfileEvents::ContextLock);
CurrentMetrics::Increment increment{CurrentMetrics::ContextLockWait};
Stopwatch watch;
auto lock = std::shared_lock(mutex);
ProfileEvents::increment(ProfileEvents::ContextLockWaitMicroseconds, watch.elapsedMicroseconds());
return lock;
}

String Context::getPath() const
{
auto lock = getLock();
auto lock = getGlobalSharedLock();
return shared->path;
}

void Context::setPath(const String & path)
{
auto lock = getLock();
auto lock = getGlobalLock();
shared->path = path;
}

Expand All @@ -143,15 +183,13 @@ void Context::setMacros(std::unique_ptr<Macros> && macros)

BackgroundSchedulePool & Context::getSchedulePool() const
{
auto lock = getLock();
if (!shared->schedule_pool)
{
callOnce(shared->schedule_pool_initialized, [&] {
shared->schedule_pool = std::make_unique<BackgroundSchedulePool>(
shared->server_settings.background_schedule_pool_size,
CurrentMetrics::BackgroundSchedulePoolTask,
CurrentMetrics::BackgroundSchedulePoolSize,
"BgSchPool");
}
});

return *shared->schedule_pool;
}
Expand All @@ -168,30 +206,21 @@ const RemoteHostFilter & Context::getRemoteHostFilter() const

IAsynchronousReader & Context::getThreadPoolReader(FilesystemReaderType type) const
{
auto lock = getLock();
callOnce(shared->readers_initialized, [&] {
const auto & config = getConfigRef();
shared->asynchronous_remote_fs_reader = createThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER, config);
shared->asynchronous_local_fs_reader = createThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER, config);
shared->synchronous_local_fs_reader = createThreadPoolReader(FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER, config);
});

switch (type)
{
case FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER:
{
if (!shared->asynchronous_remote_fs_reader)
shared->asynchronous_remote_fs_reader = createThreadPoolReader(type, getConfigRef());
return *shared->asynchronous_remote_fs_reader;
}
case FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER:
{
if (!shared->asynchronous_local_fs_reader)
shared->asynchronous_local_fs_reader = createThreadPoolReader(type, getConfigRef());

return *shared->asynchronous_local_fs_reader;
}
case FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER:
{
if (!shared->synchronous_local_fs_reader)
shared->synchronous_local_fs_reader = createThreadPoolReader(type, getConfigRef());

return *shared->synchronous_local_fs_reader;
}
}
}

Expand All @@ -207,38 +236,34 @@ std::shared_ptr<FilesystemReadPrefetchesLog> Context::getFilesystemReadPrefetche

void Context::setConfig(const ConfigurationPtr & config)
{
auto lock = getLock();
auto lock = getGlobalLock();
shared->config = config;
}

const Poco::Util::AbstractConfiguration & Context::getConfigRef() const
{
auto lock = getLock();
auto lock = getGlobalSharedLock();
return shared->config ? *shared->config : Poco::Util::Application::instance().config();
}

std::shared_ptr<AsyncReadCounters> Context::getAsyncReadCounters() const
{
auto lock = getLock();
auto lock = getLocalLock();
if (!async_read_counters)
async_read_counters = std::make_shared<AsyncReadCounters>();
return async_read_counters;
}

ThreadPool & Context::getThreadPoolWriter() const
{
const auto & config = getConfigRef();

auto lock = getLock();

if (!shared->threadpool_writer)
{
callOnce(shared->threadpool_writer_initialized, [&] {
const auto & config = getConfigRef();
auto pool_size = config.getUInt(".threadpool_writer_pool_size", 100);
auto queue_size = config.getUInt(".threadpool_writer_queue_size", 1000000);

shared->threadpool_writer = std::make_unique<ThreadPool>(
CurrentMetrics::IOWriterThreads, CurrentMetrics::IOWriterThreadsActive, pool_size, pool_size, queue_size);
}
});

return *shared->threadpool_writer;
}
Expand Down
39 changes: 28 additions & 11 deletions src/Coordination/Standalone/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <Common/MultiVersion.h>
#include <Common/RemoteHostFilter.h>
#include <Common/SharedMutex.h>

#include <Disks/IO/getThreadPoolReader.h>

Expand Down Expand Up @@ -44,17 +45,9 @@ struct SharedContextHolder
std::unique_ptr<ContextSharedPart> shared;
};


class Context : public std::enable_shared_from_this<Context>
class ContextData
{
private:
/// Use copy constructor or createGlobal() instead
Context();
Context(const Context &);
Context & operator=(const Context &);

std::unique_lock<std::recursive_mutex> getLock() const;

protected:
ContextWeakMutablePtr global_context;
inline static ContextPtr global_context_instance;
ContextSharedPart * shared;
Expand All @@ -63,9 +56,33 @@ class Context : public std::enable_shared_from_this<Context>
mutable std::shared_ptr<AsyncReadCounters> async_read_counters;

Settings settings; /// Setting for query execution.

public:
/// Use copy constructor or createGlobal() instead
ContextData();
ContextData(const ContextData &);
};

class Context : public ContextData, public std::enable_shared_from_this<Context>
{
private:
/// ContextData mutex
mutable SharedMutex mutex;

Context();
Context(const Context &);

std::unique_lock<SharedMutex> getGlobalLock() const;

std::shared_lock<SharedMutex> getGlobalSharedLock() const;

std::unique_lock<SharedMutex> getLocalLock() const;

std::shared_lock<SharedMutex> getLocalSharedLock() const;

public:
/// Create initial Context with ContextShared and etc.
static ContextMutablePtr createGlobal(ContextSharedPart * shared);
static ContextMutablePtr createGlobal(ContextSharedPart * shared_part);
static SharedContextHolder createShared();

ContextMutablePtr getGlobalContext() const;
Expand Down

0 comments on commit c69838e

Please sign in to comment.