diff --git a/conf/pika.conf b/conf/pika.conf index 5763e2aaca..7f71964c9d 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -12,6 +12,10 @@ log-path : ./log/ db-path : ./db/ # Pika write-buffer-size write-buffer-size : 268435456 +# size of one block in arena memory allocation. +# If <= 0, a proper value is automatically calculated +# (usually 1/8 of writer-buffer-size, rounded up to a multiple of 4KB) +arena-block-size : # Pika timeout timeout : 60 # Requirepass @@ -115,6 +119,12 @@ small-compaction-threshold : 5000 # the limit, a flush will be triggered in the next DB to which the next write # is issued. max-write-buffer-size : 10737418240 +# The maximum number of write buffers that are built up in memory for one ColumnFamily in DB. +# The default and the minimum number is 2, so that when 1 write buffer +# is being flushed to storage, new writes can continue to the other write buffer. +# If max-write-buffer-number > 3, writing will be slowed down +# if we are writing to the last write buffer allowed. +max-write-buffer-number : 2 # Limit some command response size, like Scan, Keys* max-client-response-size : 1073741824 # Compression type supported [snappy, zlib, lz4, zstd] diff --git a/include/pika_conf.h b/include/pika_conf.h index 948d8664ad..98b711272c 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -44,7 +44,9 @@ class PikaConf : public slash::BaseConf { std::string compact_cron() { RWLock l(&rwlock_, false); return compact_cron_; } std::string compact_interval() { RWLock l(&rwlock_, false); return compact_interval_; } int64_t write_buffer_size() { RWLock l(&rwlock_, false); return write_buffer_size_; } + int64_t arena_block_size() { RWLock l(&rwlock_, false); return arena_block_size_; } int64_t max_write_buffer_size() { RWLock l(&rwlock_, false); return max_write_buffer_size_; } + int max_write_buffer_number() { RWLock l(&rwlock_, false); return max_write_buffer_num_; } int64_t max_client_response_size() { RWLock L(&rwlock_, false); return max_client_response_size_;} int timeout() { RWLock l(&rwlock_, false); return timeout_; } std::string server_id() { RWLock l(&rwlock_, false); return server_id_; } @@ -242,6 +244,31 @@ class PikaConf : public slash::BaseConf { TryPushDiffCommands("max-conn-rbuf-size", std::to_string(value)); max_conn_rbuf_size_.store(value); } + void SetMaxCacheFiles(const int& value) { + RWLock l(&rwlock_, true); + TryPushDiffCommands("max-cache-files", std::to_string(value)); + max_cache_files_ = value; + } + void SetMaxBackgroudCompactions(const int& value) { + RWLock l(&rwlock_, true); + TryPushDiffCommands("max-background-compactions", std::to_string(value)); + max_background_compactions_ = value; + } + void SetWriteBufferSize(const int& value) { + RWLock l(&rwlock_, true); + TryPushDiffCommands("write-buffer-size", std::to_string(value)); + write_buffer_size_ = value; + } + void SetMaxWriteBufferNumber(const int& value) { + RWLock l(&rwlock_, true); + TryPushDiffCommands("max-write-buffer-number", std::to_string(value)); + max_write_buffer_num_ = value; + } + void SetArenaBlockSize(const int& value) { + RWLock l(&rwlock_, true); + TryPushDiffCommands("arena-block-size", std::to_string(value)); + arena_block_size_ = value; + } Status TablePartitionsSanityCheck(const std::string& table_name, const std::set& partition_ids, @@ -276,7 +303,9 @@ class PikaConf : public slash::BaseConf { std::string compact_cron_; std::string compact_interval_; int64_t write_buffer_size_; + int64_t arena_block_size_; int64_t max_write_buffer_size_; + int max_write_buffer_num_; int64_t max_client_response_size_; bool daemonize_; int timeout_; diff --git a/include/pika_server.h b/include/pika_server.h index 4fa31e75d2..f555d3eb89 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -299,6 +299,12 @@ class PikaServer { // info debug use void ServerStatus(std::string* info); + /* + * BlackwidowOptions used + */ + blackwidow::Status RewriteBlackwidowOptions(const blackwidow::OptionType& option_type, + const std::unordered_map& options); + friend class Cmd; friend class InfoCmd; friend class PkClusterAddSlotsCmd; @@ -322,6 +328,7 @@ class PikaServer { int port_; time_t start_time_s_; + pthread_rwlock_t bw_options_rw_; blackwidow::BlackwidowOptions bw_options_; void InitBlackwidowOptions(); diff --git a/src/pika_admin.cc b/src/pika_admin.cc index 15ec1aedeb..c0d3f4882f 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -1245,6 +1245,18 @@ void ConfigCmd::ConfigGet(std::string &ret) { EncodeInt64(&config_body, g_pika_conf->write_buffer_size()); } + if (slash::stringmatch(pattern.data(), "arena-block-size", 1)) { + elements += 2; + EncodeString(&config_body, "arena-block-size"); + EncodeInt64(&config_body, g_pika_conf->arena_block_size()); + } + + if (slash::stringmatch(pattern.data(), "max-write-buffer-number", 1)) { + elements += 2; + EncodeString(&config_body, "max-write-buffer-number"); + EncodeInt32(&config_body, g_pika_conf->max_write_buffer_number()); + } + if (slash::stringmatch(pattern.data(), "timeout", 1)) { elements += 2; EncodeString(&config_body, "timeout"); @@ -1561,7 +1573,7 @@ void ConfigCmd::ConfigGet(std::string &ret) { void ConfigCmd::ConfigSet(std::string& ret) { std::string set_item = config_args_v_[1]; if (set_item == "*") { - ret = "*23\r\n"; + ret = "*28\r\n"; EncodeString(&ret, "timeout"); EncodeString(&ret, "requirepass"); EncodeString(&ret, "masterauth"); @@ -1585,6 +1597,14 @@ void ConfigCmd::ConfigSet(std::string& ret) { EncodeString(&ret, "compact-interval"); EncodeString(&ret, "slave-priority"); EncodeString(&ret, "sync-window-size"); + // Options for storage engine + // MutableDBOptions + EncodeString(&ret, "max-cache-files"); + EncodeString(&ret, "max-background-compactions"); + // MutableColumnFamilyOptions + EncodeString(&ret, "write-buffer-size"); + EncodeString(&ret, "max-write-buffer-number"); + EncodeString(&ret, "arena-block-size"); return; } long int ival; @@ -1798,6 +1818,71 @@ void ConfigCmd::ConfigSet(std::string& ret) { } g_pika_conf->SetSyncWindowSize(ival); ret = "+OK\r\n"; + } else if (set_item == "max-cache-files") { + if (!slash::string2l(value.data(), value.size(), &ival)) { + ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-cache-files'\r\n"; + return; + } + std::unordered_map options_map{{"max_open_files", value}}; + blackwidow::Status s = g_pika_server->RewriteBlackwidowOptions(blackwidow::OptionType::kDB, options_map); + if (!s.ok()) { + ret = "-ERR Set max-cache-files wrong: " + s.ToString() + "\r\n"; + return; + } + g_pika_conf->SetMaxCacheFiles(ival); + ret = "+OK\r\n"; + } else if (set_item == "max-background-compactions") { + if (!slash::string2l(value.data(), value.size(), &ival)) { + ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-background-compactions'\r\n"; + return; + } + std::unordered_map options_map{{"max_background_compactions", value}}; + blackwidow::Status s = g_pika_server->RewriteBlackwidowOptions(blackwidow::OptionType::kDB, options_map); + if (!s.ok()) { + ret = "-ERR Set max-background-compactions wrong: " + s.ToString() + "\r\n"; + return; + } + g_pika_conf->SetMaxBackgroudCompactions(ival); + ret = "+OK\r\n"; + } else if (set_item == "write-buffer-size") { + if (!slash::string2l(value.data(), value.size(), &ival)) { + ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'write-buffer-size'\r\n"; + return; + } + std::unordered_map options_map{{"write_buffer_size", value}}; + blackwidow::Status s = g_pika_server->RewriteBlackwidowOptions(blackwidow::OptionType::kColumnFamily, options_map); + if (!s.ok()) { + ret = "-ERR Set write-buffer-size wrong: " + s.ToString() + "\r\n"; + return; + } + g_pika_conf->SetWriteBufferSize(ival); + ret = "+OK\r\n"; + } else if (set_item == "max-write-buffer-number") { + if (!slash::string2l(value.data(), value.size(), &ival)) { + ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-write-buffer-number'\r\n"; + return; + } + std::unordered_map options_map{{"max_write_buffer_number", value}}; + blackwidow::Status s = g_pika_server->RewriteBlackwidowOptions(blackwidow::OptionType::kColumnFamily, options_map); + if (!s.ok()) { + ret = "-ERR Set max-write-buffer-number wrong: " + s.ToString() + "\r\n"; + return; + } + g_pika_conf->SetMaxWriteBufferNumber(ival); + ret = "+OK\r\n"; + } else if (set_item == "arena-block-size") { + if (!slash::string2l(value.data(), value.size(), &ival)) { + ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'arena-block-size'\r\n"; + return; + } + std::unordered_map options_map{{"arena_block_size", value}}; + blackwidow::Status s = g_pika_server->RewriteBlackwidowOptions(blackwidow::OptionType::kColumnFamily, options_map); + if (!s.ok()) { + ret = "-ERR Set arena-block-size wrong: " + s.ToString() + "\r\n"; + return; + } + g_pika_conf->SetArenaBlockSize(ival); + ret = "+OK\r\n"; } else { ret = "-ERR Unsupported CONFIG parameter: " + set_item + "\r\n"; } diff --git a/src/pika_conf.cc b/src/pika_conf.cc index a46be71aa8..4b58698021 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -363,12 +363,25 @@ int PikaConf::Load() write_buffer_size_ = 268435456; // 256Mb } + // arena_block_size + GetConfInt64("arena-block-size", &arena_block_size_); + if (arena_block_size_ <= 0) { + arena_block_size_ = write_buffer_size_ >> 3; // 1/8 of the write_buffer_size_ + } + // max_write_buffer_size GetConfInt64("max-write-buffer-size", &max_write_buffer_size_); if (max_write_buffer_size_ <= 0) { max_write_buffer_size_ = 10737418240; // 10Gb } + // max_write_buffer_num + max_write_buffer_num_ = 2; + GetConfInt("max-write-buffer-num", &max_write_buffer_num_); + if (max_write_buffer_num_ <= 0) { + max_write_buffer_num_ = 2; // 1 for immutable memtable, 1 for mutable memtable + } + // max_client_response_size GetConfInt64("max-client-response-size", &max_client_response_size_); if (max_client_response_size_ <= 0) { @@ -545,6 +558,12 @@ int PikaConf::ConfigRewrite() { SetConfInt("sync-window-size", sync_window_size_.load()); SetConfInt("consensus-level", consensus_level_.load()); SetConfInt("replication-num", replication_num_.load()); + // options for storage engine + SetConfInt("max-cache-files", max_cache_files_); + SetConfInt("max-background-compactions", max_background_compactions_); + SetConfInt("max-write-buffer-number", max_write_buffer_num_); + SetConfInt64("write-buffer-size", write_buffer_size_); + SetConfInt64("arena-block-size", arena_block_size_); // slaveof config item is special SetConfStr("slaveof", slaveof_); diff --git a/src/pika_server.cc b/src/pika_server.cc index c8d7e5f0e8..ff65b1ffe8 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -65,6 +65,12 @@ PikaServer::PikaServer() : LOG(FATAL) << "ServerInit iotcl error"; } + pthread_rwlockattr_t bw_options_rw_attr; + pthread_rwlockattr_init(&bw_options_rw_attr); + pthread_rwlockattr_setkind_np(&bw_options_rw_attr, + PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP); + pthread_rwlock_init(&bw_options_rw_, &bw_options_rw_attr); + InitBlackwidowOptions(); pthread_rwlockattr_t tables_rw_attr; @@ -408,6 +414,7 @@ void PikaServer::SetDispatchQueueLimit(int queue_limit) { } blackwidow::BlackwidowOptions PikaServer::bw_options() { + slash::RWLock rwl(&bw_options_rw_, false); return bw_options_; } @@ -1587,6 +1594,7 @@ void PikaServer::AutoKeepAliveRSync() { } void PikaServer::InitBlackwidowOptions() { + slash::RWLock rwl(&bw_options_rw_, true); // For rocksdb::Options bw_options_.options.create_if_missing = true; @@ -1596,8 +1604,12 @@ void PikaServer::InitBlackwidowOptions() { bw_options_.options.write_buffer_size = g_pika_conf->write_buffer_size(); + bw_options_.options.arena_block_size = + g_pika_conf->arena_block_size(); bw_options_.options.write_buffer_manager.reset( new rocksdb::WriteBufferManager(g_pika_conf->max_write_buffer_size())); + bw_options_.options.max_write_buffer_number = + g_pika_conf->max_write_buffer_number(); bw_options_.options.target_file_size_base = g_pika_conf->target_file_size_base(); bw_options_.options.max_background_flushes = @@ -1651,6 +1663,23 @@ void PikaServer::InitBlackwidowOptions() { g_pika_conf->small_compaction_threshold(); } +blackwidow::Status PikaServer::RewriteBlackwidowOptions(const blackwidow::OptionType& option_type, + const std::unordered_map& options_map) { + blackwidow::Status s; + for (const auto& table_item : tables_) { + slash::RWLock partition_rwl(&table_item.second->partitions_rw_, true); + for (const auto& partition_item: table_item.second->partitions_) { + partition_item.second->DbRWLockWriter(); + s = partition_item.second->db()->SetOptions(option_type, blackwidow::ALL_DB, options_map); + partition_item.second->DbRWUnLock(); + if (!s.ok()) return s; + } + } + slash::RWLock rwl(&bw_options_rw_, true); + s = bw_options_.ResetOptions(option_type, options_map); + return s; +} + void PikaServer::ServerStatus(std::string* info) { std::stringstream tmp_stream; size_t q_size = ClientProcessorThreadPoolCurQueueSize(); diff --git a/third/blackwidow b/third/blackwidow index ae38f5b4c5..7565ce9f09 160000 --- a/third/blackwidow +++ b/third/blackwidow @@ -1 +1 @@ -Subproject commit ae38f5b4c5c01c7f8b9deec58db752e056659264 +Subproject commit 7565ce9f09f9528b90984feefe6ad850bf87fe90