From bcb589f9f30191f68e5f17faa78e9da537f0d027 Mon Sep 17 00:00:00 2001 From: kernelai Date: Thu, 7 Nov 2019 19:56:33 +0800 Subject: [PATCH] feature:sync window config (#795) --- conf/pika.conf | 2 ++ include/pika_conf.h | 11 ++++++++++- include/pika_rm.h | 5 ++--- src/pika_admin.cc | 20 +++++++++++++++++++- src/pika_conf.cc | 12 ++++++++++++ src/pika_rm.cc | 17 ++++++++++------- 6 files changed, 55 insertions(+), 12 deletions(-) diff --git a/conf/pika.conf b/conf/pika.conf index 80c4d753b3..15bdb4e099 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -85,6 +85,8 @@ slave-priority : 100 # server-id for hub server-id : 1 +# the size of flow control window while sync binlog between master and slave.Default is 9000 and the maximum is 90000. +sync-window-size : 9000 ################### ## Critical Settings diff --git a/include/pika_conf.h b/include/pika_conf.h index 3012367d99..4161d44b0c 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -18,6 +18,9 @@ #include "include/pika_define.h" #include "include/pika_meta.h" +#define kBinlogReadWinDefaultSize 9000 +#define kBinlogReadWinMaxSize 90000 + typedef slash::RWLock RWLock; // global class, class members well initialized @@ -82,6 +85,7 @@ class PikaConf : public slash::BaseConf { int slowlog_slower_than() { return slowlog_log_slower_than_.load(); } int slowlog_max_len() { RWLock L(&rwlock_, false); return slowlog_max_len_; } std::string network_interface() { RWLock l(&rwlock_, false); return network_interface_; } + int sync_window_size() { return sync_window_size_.load(); } // Immutable config items, we don't use lock. bool daemonize() { return daemonize_; } @@ -176,7 +180,7 @@ class PikaConf : public slash::BaseConf { slash::StringToLower(item); } } - void SetExpireLogsNums(const int value){ + void SetExpireLogsNums(const int value) { RWLock l(&rwlock_, true); TryPushDiffCommands("expire-logs-nums", std::to_string(value)); expire_logs_nums_ = value; @@ -226,6 +230,10 @@ class PikaConf : public slash::BaseConf { TryPushDiffCommands("compact-interval", value); compact_interval_ = value; } + void SetSyncWindowSize(const int &value) { + TryPushDiffCommands("sync-window-size", std::to_string(value)); + sync_window_size_.store(value); + } Status TablePartitionsSanityCheck(const std::string& table_name, const std::set& partition_ids, @@ -296,6 +304,7 @@ class PikaConf : public slash::BaseConf { bool cache_index_and_filter_blocks_; bool optimize_filters_for_hits_; bool level_compaction_dynamic_level_bytes_; + std::atomic sync_window_size_; std::string network_interface_; diff --git a/include/pika_rm.h b/include/pika_rm.h index dbae12d719..cb20a8b250 100644 --- a/include/pika_rm.h +++ b/include/pika_rm.h @@ -18,9 +18,8 @@ #include "include/pika_repl_client.h" #include "include/pika_repl_server.h" -#define kBinlogSendPacketNum 30 +#define kBinlogSendPacketNum 40 #define kBinlogSendBatchNum 100 -#define kBinlogReadWinSize 3000 // unit seconds #define kSendKeepAliveTimeout (10 * 1000000) @@ -66,7 +65,7 @@ class SyncWindow { } private: // TODO(whoiami) ring buffer maybe - std::vector win_; + std::deque win_; }; // role master use diff --git a/src/pika_admin.cc b/src/pika_admin.cc index 41788c9c77..8424a6e8c0 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -1506,6 +1506,12 @@ void ConfigCmd::ConfigGet(std::string &ret) { EncodeInt32(&config_body, g_pika_conf->slave_priority()); } + if (slash::stringmatch(pattern.data(), "sync-window-size", 1)) { + elements += 2; + EncodeString(&config_body, "sync-window-size"); + EncodeInt32(&config_body, g_pika_conf->sync_window_size()); + } + std::stringstream resp; resp << "*" << std::to_string(elements) << "\r\n" << config_body; ret = resp.str(); @@ -1515,7 +1521,7 @@ void ConfigCmd::ConfigGet(std::string &ret) { void ConfigCmd::ConfigSet(std::string& ret) { std::string set_item = config_args_v_[1]; if (set_item == "*") { - ret = "*22\r\n"; + ret = "*23\r\n"; EncodeString(&ret, "timeout"); EncodeString(&ret, "requirepass"); EncodeString(&ret, "masterauth"); @@ -1538,6 +1544,7 @@ void ConfigCmd::ConfigSet(std::string& ret) { EncodeString(&ret, "compact-cron"); EncodeString(&ret, "compact-interval"); EncodeString(&ret, "slave-priority"); + EncodeString(&ret, "sync-window-size"); return; } long int ival; @@ -1740,6 +1747,17 @@ void ConfigCmd::ConfigSet(std::string& ret) { g_pika_conf->SetCompactInterval(value); ret = "+OK\r\n"; } + } else if (set_item == "sync-window-size") { + if (!slash::string2l(value.data(), value.size(), &ival)) { + ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'sync-window-size'\r\n"; + return; + } + if (ival <= 0 || ival > kBinlogReadWinMaxSize) { + ret = "-ERR Argument exceed range \'" + value + "\' for CONFIG SET 'sync-window-size'\r\n"; + return; + } + g_pika_conf->SetSyncWindowSize(ival); + ret = "+OK\r\n"; } else { ret = "-ERR Unsupported CONFIG parameter: " + set_item + "\r\n"; } diff --git a/src/pika_conf.cc b/src/pika_conf.cc index 3fe9866f20..88e98e88c2 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -415,6 +415,17 @@ int PikaConf::Load() // slaveof slaveof_ = ""; GetConfStr("slaveof", &slaveof_); + + // sync window size + int tmp_sync_window_size = kBinlogReadWinDefaultSize; + GetConfInt("sync-window-size", &tmp_sync_window_size); + if (tmp_sync_window_size <= 0) { + sync_window_size_.store(kBinlogReadWinDefaultSize); + } else if (tmp_sync_window_size > kBinlogReadWinMaxSize) { + sync_window_size_.store(kBinlogReadWinMaxSize); + } else { + sync_window_size_.store(tmp_sync_window_size); + } return ret; } @@ -451,6 +462,7 @@ int PikaConf::ConfigRewrite() { SetConfStr("compact-cron", compact_cron_); SetConfStr("compact-interval", compact_interval_); SetConfInt("slave-priority", slave_priority_); + SetConfInt("sync-window-size", sync_window_size_.load()); // slaveof config item is special SetConfStr("slaveof", slaveof_); diff --git a/src/pika_rm.cc b/src/pika_rm.cc index 120ba4276b..b0e5a23ce5 100644 --- a/src/pika_rm.cc +++ b/src/pika_rm.cc @@ -693,8 +693,9 @@ void SyncWindow::Push(const SyncWinItem& item) { win_.push_back(item); } -bool SyncWindow::Update(const SyncWinItem& start_item, const SyncWinItem& end_item, BinlogOffset* acked_offset) { - size_t start_pos = kBinlogReadWinSize, end_pos = kBinlogReadWinSize; +bool SyncWindow::Update(const SyncWinItem& start_item, + const SyncWinItem& end_item, BinlogOffset* acked_offset) { + size_t start_pos = win_.size(), end_pos = win_.size(); for (size_t i = 0; i < win_.size(); ++i) { if (win_[i] == start_item) { start_pos = i; @@ -704,8 +705,9 @@ bool SyncWindow::Update(const SyncWinItem& start_item, const SyncWinItem& end_it break; } } - if (start_pos == kBinlogReadWinSize || end_pos == kBinlogReadWinSize) { - LOG(WARNING) << "Ack offset Start: " << start_item.ToString() << "End: " << end_item.ToString() << + if (start_pos == win_.size() || end_pos == win_.size()) { + LOG(WARNING) << "Ack offset Start: " << + start_item.ToString() << "End: " << end_item.ToString() << " not found in binlog controller window." << std::endl << "window status "<< std::endl << ToStringStatus(); return false; @@ -716,7 +718,7 @@ bool SyncWindow::Update(const SyncWinItem& start_item, const SyncWinItem& end_it while (!win_.empty()) { if (win_[0].acked_) { *acked_offset = win_[0].offset_; - win_.erase(win_.begin()); + win_.pop_front(); } else { break; } @@ -725,13 +727,14 @@ bool SyncWindow::Update(const SyncWinItem& start_item, const SyncWinItem& end_it } int SyncWindow::Remainings() { - return kBinlogReadWinSize - win_.size(); + std::size_t remaining_size = g_pika_conf->sync_window_size() - win_.size(); + return remaining_size > 0? remaining_size:0 ; } /* PikaReplicaManger */ PikaReplicaManager::PikaReplicaManager() - : last_meta_sync_timestamp_(0){ + : last_meta_sync_timestamp_(0) { std::set ips; ips.insert("0.0.0.0"); int port = g_pika_conf->port() + kPortShiftReplServer;