Skip to content

Commit

Permalink
Add configuration replication-num (OpenAtomFoundation#869)
Browse files Browse the repository at this point in the history
Replication-num defines how many followers in this single raft group
  • Loading branch information
whoiami committed Mar 20, 2020
1 parent 70c6fd0 commit be45f4c
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 13 deletions.
5 changes: 4 additions & 1 deletion conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,11 @@ slave-priority : 100
# if the freesize/disksize > 60%. NOTICE:compact-interval is prior than compact-cron;
#compact-interval :


# replication num defines how many followers in a single raft group, only [0, 1, 2, 3, 4] is valid
replication-num : 0
# consensus level defines how many confirms does leader get, before commit this log to client,
# only [0, 1, 2] is valid
# only [0, ...replicaiton-num] is valid
consensus-level : 0

# the size of flow control window while sync binlog between master and slave.Default is 9000 and the maximum is 90000.
Expand Down
2 changes: 2 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class PikaConf : public slash::BaseConf {
int sync_window_size() { return sync_window_size_.load(); }
int max_conn_rbuf_size() { return max_conn_rbuf_size_.load(); }
int consensus_level() { return consensus_level_.load(); }
int replication_num() { return replication_num_.load(); }

// Immutable config items, we don't use lock.
bool daemonize() { return daemonize_; }
Expand Down Expand Up @@ -318,6 +319,7 @@ class PikaConf : public slash::BaseConf {
std::atomic<int> sync_window_size_;
std::atomic<int> max_conn_rbuf_size_;
std::atomic<int> consensus_level_;
std::atomic<int> replication_num_;

std::string network_interface_;

Expand Down
5 changes: 5 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1532,6 +1532,11 @@ void ConfigCmd::ConfigGet(std::string &ret) {
EncodeInt32(&config_body, g_pika_conf->max_conn_rbuf_size());
}

if (slash::stringmatch(pattern.data(), "replication-num", 1)) {
elements += 2;
EncodeString(&config_body, "replication-num");
EncodeInt32(&config_body, g_pika_conf->replication_num());
}
if (slash::stringmatch(pattern.data(), "consensus-level", 1)) {
elements += 2;
EncodeString(&config_body, "consensus-level");
Expand Down
19 changes: 14 additions & 5 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,21 @@ int PikaConf::Load()
GetConfInt("slowlog-log-slower-than", &tmp_slowlog_log_slower_than);
slowlog_log_slower_than_.store(tmp_slowlog_log_slower_than);

int tmp_replication_num = 0;
GetConfInt("replication-num", &tmp_replication_num);
if (tmp_replication_num > 4 || tmp_replication_num < 0) {
LOG(FATAL) << "replication-num " << tmp_replication_num <<
"is invalid, please pick from [0...4]";
}
replication_num_.store(tmp_replication_num);


int tmp_consensus_level = 0;
GetConfInt("consensus-level", &tmp_consensus_level);
if (tmp_consensus_level != 0 &&
tmp_consensus_level != 1 &&
tmp_consensus_level != 2) {
LOG(FATAL) << "consensus-level " << tmp_consensus_level <<
" is invalid, please pick one of [0, 1, 2]";
if (tmp_consensus_level < 0 ||
tmp_consensus_level > replication_num_.load()) {
LOG(FATAL) << "consensus-level " << tmp_consensus_level
<< " is invalid, please pick from [0..." << replication_num_ << "]";
}
consensus_level_.store(tmp_consensus_level);

Expand Down Expand Up @@ -530,6 +538,7 @@ int PikaConf::ConfigRewrite() {
SetConfInt("slave-priority", slave_priority_);
SetConfInt("sync-window-size", sync_window_size_.load());
SetConfInt("consensus-level", consensus_level_.load());
SetConfInt("replication-num", replication_num_.load());
// slaveof config item is special
SetConfStr("slaveof", slaveof_);

Expand Down
25 changes: 18 additions & 7 deletions src/pika_repl_server_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,25 @@ void PikaReplServerConn::HandleTrySyncRequest(void* arg) {
}

if (pre_success && req->has_consensus_meta()) {
const InnerMessage::ConsensusMeta& meta = req->consensus_meta();
// need to response to outdated pb, new follower count on this response to update term
if (meta.term() > partition->ConsensusTerm()) {
LOG(INFO) << "Update " << partition_name
<< " term from " << partition->ConsensusTerm() << " to " << meta.term();
partition->ConsensusUpdateTerm(meta.term());
if (partition->GetNumberOfSlaveNode() >= g_pika_conf->replication_num()) {
LOG(WARNING) << "Current replication num: "
<< partition->GetNumberOfSlaveNode()
<< " hits configuration replication-num "
<< g_pika_conf->replication_num() << " stop trysync.";
pre_success = false;
}
if (pre_success) {
const InnerMessage::ConsensusMeta& meta = req->consensus_meta();
// need to response to outdated pb, new follower count on this response to update term
if (meta.term() > partition->ConsensusTerm()) {
LOG(INFO) << "Update " << partition_name
<< " term from " << partition->ConsensusTerm() << " to " << meta.term();
partition->ConsensusUpdateTerm(meta.term());
}
}
if (pre_success) {
pre_success = TrySyncConsensusOffsetCheck(partition, req->consensus_meta(), &response, try_sync_response);
}
pre_success = TrySyncConsensusOffsetCheck(partition, req->consensus_meta(), &response, try_sync_response);
} else if (pre_success) {
pre_success = TrySyncOffsetCheck(partition, try_sync_request, try_sync_response);
}
Expand Down

0 comments on commit be45f4c

Please sign in to comment.