Skip to content

Commit

Permalink
feature:sync window config (OpenAtomFoundation#795)
Browse files Browse the repository at this point in the history
  • Loading branch information
kernelai authored and whoiami committed Nov 7, 2019
1 parent 0146681 commit bcb589f
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 12 deletions.
2 changes: 2 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ slave-priority : 100

# server-id for hub
server-id : 1
# the size of flow control window while sync binlog between master and slave.Default is 9000 and the maximum is 90000.
sync-window-size : 9000

###################
## Critical Settings
Expand Down
11 changes: 10 additions & 1 deletion include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
#include "include/pika_define.h"
#include "include/pika_meta.h"

#define kBinlogReadWinDefaultSize 9000
#define kBinlogReadWinMaxSize 90000

typedef slash::RWLock RWLock;

// global class, class members well initialized
Expand Down Expand Up @@ -82,6 +85,7 @@ class PikaConf : public slash::BaseConf {
int slowlog_slower_than() { return slowlog_log_slower_than_.load(); }
int slowlog_max_len() { RWLock L(&rwlock_, false); return slowlog_max_len_; }
std::string network_interface() { RWLock l(&rwlock_, false); return network_interface_; }
int sync_window_size() { return sync_window_size_.load(); }

// Immutable config items, we don't use lock.
bool daemonize() { return daemonize_; }
Expand Down Expand Up @@ -176,7 +180,7 @@ class PikaConf : public slash::BaseConf {
slash::StringToLower(item);
}
}
void SetExpireLogsNums(const int value){
void SetExpireLogsNums(const int value) {
RWLock l(&rwlock_, true);
TryPushDiffCommands("expire-logs-nums", std::to_string(value));
expire_logs_nums_ = value;
Expand Down Expand Up @@ -226,6 +230,10 @@ class PikaConf : public slash::BaseConf {
TryPushDiffCommands("compact-interval", value);
compact_interval_ = value;
}
void SetSyncWindowSize(const int &value) {
TryPushDiffCommands("sync-window-size", std::to_string(value));
sync_window_size_.store(value);
}

Status TablePartitionsSanityCheck(const std::string& table_name,
const std::set<uint32_t>& partition_ids,
Expand Down Expand Up @@ -296,6 +304,7 @@ class PikaConf : public slash::BaseConf {
bool cache_index_and_filter_blocks_;
bool optimize_filters_for_hits_;
bool level_compaction_dynamic_level_bytes_;
std::atomic<int> sync_window_size_;

std::string network_interface_;

Expand Down
5 changes: 2 additions & 3 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
#include "include/pika_repl_client.h"
#include "include/pika_repl_server.h"

#define kBinlogSendPacketNum 30
#define kBinlogSendPacketNum 40
#define kBinlogSendBatchNum 100
#define kBinlogReadWinSize 3000

// unit seconds
#define kSendKeepAliveTimeout (10 * 1000000)
Expand Down Expand Up @@ -66,7 +65,7 @@ class SyncWindow {
}
private:
// TODO(whoiami) ring buffer maybe
std::vector<SyncWinItem> win_;
std::deque<SyncWinItem> win_;
};

// role master use
Expand Down
20 changes: 19 additions & 1 deletion src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1506,6 +1506,12 @@ void ConfigCmd::ConfigGet(std::string &ret) {
EncodeInt32(&config_body, g_pika_conf->slave_priority());
}

if (slash::stringmatch(pattern.data(), "sync-window-size", 1)) {
elements += 2;
EncodeString(&config_body, "sync-window-size");
EncodeInt32(&config_body, g_pika_conf->sync_window_size());
}

std::stringstream resp;
resp << "*" << std::to_string(elements) << "\r\n" << config_body;
ret = resp.str();
Expand All @@ -1515,7 +1521,7 @@ void ConfigCmd::ConfigGet(std::string &ret) {
void ConfigCmd::ConfigSet(std::string& ret) {
std::string set_item = config_args_v_[1];
if (set_item == "*") {
ret = "*22\r\n";
ret = "*23\r\n";
EncodeString(&ret, "timeout");
EncodeString(&ret, "requirepass");
EncodeString(&ret, "masterauth");
Expand All @@ -1538,6 +1544,7 @@ void ConfigCmd::ConfigSet(std::string& ret) {
EncodeString(&ret, "compact-cron");
EncodeString(&ret, "compact-interval");
EncodeString(&ret, "slave-priority");
EncodeString(&ret, "sync-window-size");
return;
}
long int ival;
Expand Down Expand Up @@ -1740,6 +1747,17 @@ void ConfigCmd::ConfigSet(std::string& ret) {
g_pika_conf->SetCompactInterval(value);
ret = "+OK\r\n";
}
} else if (set_item == "sync-window-size") {
if (!slash::string2l(value.data(), value.size(), &ival)) {
ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'sync-window-size'\r\n";
return;
}
if (ival <= 0 || ival > kBinlogReadWinMaxSize) {
ret = "-ERR Argument exceed range \'" + value + "\' for CONFIG SET 'sync-window-size'\r\n";
return;
}
g_pika_conf->SetSyncWindowSize(ival);
ret = "+OK\r\n";
} else {
ret = "-ERR Unsupported CONFIG parameter: " + set_item + "\r\n";
}
Expand Down
12 changes: 12 additions & 0 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,17 @@ int PikaConf::Load()
// slaveof
slaveof_ = "";
GetConfStr("slaveof", &slaveof_);

// sync window size
int tmp_sync_window_size = kBinlogReadWinDefaultSize;
GetConfInt("sync-window-size", &tmp_sync_window_size);
if (tmp_sync_window_size <= 0) {
sync_window_size_.store(kBinlogReadWinDefaultSize);
} else if (tmp_sync_window_size > kBinlogReadWinMaxSize) {
sync_window_size_.store(kBinlogReadWinMaxSize);
} else {
sync_window_size_.store(tmp_sync_window_size);
}
return ret;
}

Expand Down Expand Up @@ -451,6 +462,7 @@ int PikaConf::ConfigRewrite() {
SetConfStr("compact-cron", compact_cron_);
SetConfStr("compact-interval", compact_interval_);
SetConfInt("slave-priority", slave_priority_);
SetConfInt("sync-window-size", sync_window_size_.load());
// slaveof config item is special
SetConfStr("slaveof", slaveof_);

Expand Down
17 changes: 10 additions & 7 deletions src/pika_rm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -693,8 +693,9 @@ void SyncWindow::Push(const SyncWinItem& item) {
win_.push_back(item);
}

bool SyncWindow::Update(const SyncWinItem& start_item, const SyncWinItem& end_item, BinlogOffset* acked_offset) {
size_t start_pos = kBinlogReadWinSize, end_pos = kBinlogReadWinSize;
bool SyncWindow::Update(const SyncWinItem& start_item,
const SyncWinItem& end_item, BinlogOffset* acked_offset) {
size_t start_pos = win_.size(), end_pos = win_.size();
for (size_t i = 0; i < win_.size(); ++i) {
if (win_[i] == start_item) {
start_pos = i;
Expand All @@ -704,8 +705,9 @@ bool SyncWindow::Update(const SyncWinItem& start_item, const SyncWinItem& end_it
break;
}
}
if (start_pos == kBinlogReadWinSize || end_pos == kBinlogReadWinSize) {
LOG(WARNING) << "Ack offset Start: " << start_item.ToString() << "End: " << end_item.ToString() <<
if (start_pos == win_.size() || end_pos == win_.size()) {
LOG(WARNING) << "Ack offset Start: " <<
start_item.ToString() << "End: " << end_item.ToString() <<
" not found in binlog controller window." <<
std::endl << "window status "<< std::endl << ToStringStatus();
return false;
Expand All @@ -716,7 +718,7 @@ bool SyncWindow::Update(const SyncWinItem& start_item, const SyncWinItem& end_it
while (!win_.empty()) {
if (win_[0].acked_) {
*acked_offset = win_[0].offset_;
win_.erase(win_.begin());
win_.pop_front();
} else {
break;
}
Expand All @@ -725,13 +727,14 @@ bool SyncWindow::Update(const SyncWinItem& start_item, const SyncWinItem& end_it
}

int SyncWindow::Remainings() {
return kBinlogReadWinSize - win_.size();
std::size_t remaining_size = g_pika_conf->sync_window_size() - win_.size();
return remaining_size > 0? remaining_size:0 ;
}

/* PikaReplicaManger */

PikaReplicaManager::PikaReplicaManager()
: last_meta_sync_timestamp_(0){
: last_meta_sync_timestamp_(0) {
std::set<std::string> ips;
ips.insert("0.0.0.0");
int port = g_pika_conf->port() + kPortShiftReplServer;
Expand Down

0 comments on commit bcb589f

Please sign in to comment.