Skip to content

Commit

Permalink
Dev consensus term logic (OpenAtomFoundation#854)
Browse files Browse the repository at this point in the history
send trysync request will increase current term number by 1
leader will not commit log which is not equals to its current term
leader and follower will drop package if reveived term is smaller then current term
leader and folloer will update current term if received term is larger then current term
  • Loading branch information
whoiami committed Mar 20, 2020
1 parent f08bf12 commit f6ac111
Show file tree
Hide file tree
Showing 11 changed files with 86 additions and 12 deletions.
1 change: 1 addition & 0 deletions include/pika_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ class ConsensusCoordinator {
Status AddSlaveNode(const std::string& ip, int port, int session_id);
Status RemoveSlaveNode(const std::string& ip, int port);
void UpdateTerm(uint32_t term);
uint32_t term();
Status CheckEnoughFollower();

// invoked by follower
Expand Down
1 change: 0 additions & 1 deletion include/pika_repl_bgworker.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ class PikaReplBgWorker {

BinlogItem binlog_item_;
pink::RedisParser redis_parser_;
LogOffset offset_;
std::string ip_port_;
std::string table_name_;
uint32_t partition_id_;
Expand Down
2 changes: 2 additions & 0 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ class SyncMasterPartition : public SyncPartition {
Status ConsensusProcessLocalUpdate(const LogOffset& leader_commit);
LogOffset ConsensusCommittedIndex();
LogOffset ConsensusLastIndex();
uint32_t ConsensusTerm();
void ConsensusUpdateTerm(uint32_t term);
Status ConsensusUpdateAppliedIndex(const LogOffset& offset);
Status ConsensusLeaderNegotiate(const LogOffset& f_last_offset,
bool* reject, std::vector<LogOffset>* hints);
Expand Down
10 changes: 10 additions & 0 deletions src/pika_consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,11 @@ Status ConsensusCoordinator::UpdateSlave(const std::string& ip, int port,
return Status::OK();
}

// do not commit log which is not current term log
if (committed_index.l_offset.term != term()) {
return Status::OK();
}

LogOffset updated_committed_index;
bool need_update = false;
{
Expand Down Expand Up @@ -516,6 +521,11 @@ void ConsensusCoordinator::UpdateTerm(uint32_t term) {
stable_logger_->Logger()->Unlock();
}

uint32_t ConsensusCoordinator::term() {
slash::RWLock l(&term_rwlock_, false);
return term_;
}

bool ConsensusCoordinator::MatchConsensusLevel() {
return sync_pros_.SlaveSize() >= static_cast<int>(g_pika_conf->consensus_level());
}
Expand Down
3 changes: 1 addition & 2 deletions src/pika_inner_message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ message PikaMeta {
message ConsensusMeta {
optional uint32 term = 1;
// Leader -> Follower prev_log_offset
// Follower <- Leader last_log_offset
// Follower -> Leader last_log_offset
optional BinlogOffset log_offset = 2;
optional BinlogOffset commit = 3;
optional bool reject = 4;
Expand Down Expand Up @@ -95,7 +95,6 @@ message InnerRequest {
optional DBSync db_sync = 4;
optional BinlogSync binlog_sync = 5;
repeated RemoveSlaveNode remove_slave_node = 6;
//
optional ConsensusMeta consensus_meta = 7;
}

Expand Down
31 changes: 22 additions & 9 deletions src/pika_repl_bgworker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ extern PikaReplicaManager* g_pika_rm;
extern PikaCmdTableManager* g_pika_cmd_table_manager;

PikaReplBgWorker::PikaReplBgWorker(int queue_size)
: offset_(), bg_thread_(queue_size) {
: bg_thread_(queue_size) {
bg_thread_.set_thread_name("ReplBgWorker");
pink::RedisParserSettings settings;
settings.DealMessage = &(PikaReplBgWorker::HandleWriteBinlog);
Expand Down Expand Up @@ -61,12 +61,6 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) {
PikaReplBgWorker* worker = task_arg->worker;
worker->ip_port_ = conn->ip_port();

if (res->has_consensus_meta()) {
ParseBinlogOffset(res->consensus_meta().commit(), &worker->offset_);
} else {
worker->offset_ = LogOffset();
}

std::string table_name;
uint32_t partition_id = 0;
LogOffset ack_start, ack_end;
Expand Down Expand Up @@ -95,6 +89,21 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) {
return;
}

if (res->has_consensus_meta()) {
const InnerMessage::ConsensusMeta& meta = res->consensus_meta();
if (meta.term() > partition->ConsensusTerm()) {
LOG(INFO) << "Update " << table_name << "_" << partition_id << " term from "
<< partition->ConsensusTerm() << " to " << meta.term();
partition->ConsensusUpdateTerm(meta.term());
} else if (meta.term() < partition->ConsensusTerm()) /*outdated pb*/{
LOG(WARNING) << "Drop outdated binlog sync response " << table_name << "_" << partition_id
<< " recv term: " << meta.term() << " local term: " << partition->ConsensusTerm();
delete index;
delete task_arg;
return;
}
}

std::shared_ptr<SyncSlavePartition> slave_partition =
g_pika_rm->GetSyncSlavePartitionByName(
PartitionInfo(table_name, partition_id));
Expand Down Expand Up @@ -159,8 +168,12 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) {
delete index;
delete task_arg;

// Update follower commit && apply
partition->ConsensusProcessLocalUpdate(worker->offset_);
if (res->has_consensus_meta()) {
LogOffset leader_commit;
ParseBinlogOffset(res->consensus_meta().commit(), &leader_commit);
// Update follower commit && apply
partition->ConsensusProcessLocalUpdate(leader_commit);
}

// Reply Ack to master immediately
std::shared_ptr<Binlog> logger = partition->Logger();
Expand Down
4 changes: 4 additions & 0 deletions src/pika_repl_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,11 @@ Status PikaReplClient::SendPartitionTrySync(const std::string& ip,
return Status::Corruption("partition not found");
}
LogOffset last_index = partition->ConsensusLastIndex();
uint32_t term = partition->ConsensusTerm();
term++;
partition->ConsensusUpdateTerm(term);
InnerMessage::ConsensusMeta* consensus_meta = request.mutable_consensus_meta();
consensus_meta->set_term(term);
InnerMessage::BinlogOffset* pb_offset = consensus_meta->mutable_log_offset();
pb_offset->set_filenum(last_index.b_offset.filenum);
pb_offset->set_offset(last_index.b_offset.offset);
Expand Down
12 changes: 12 additions & 0 deletions src/pika_repl_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,18 @@ void PikaReplClientConn::HandleTrySyncResponse(void* arg) {
}

if (response->has_consensus_meta()) {
const InnerMessage::ConsensusMeta& meta = response->consensus_meta();
if (meta.term() > partition->ConsensusTerm()) {
LOG(INFO) << "Update " << table_name << ":" << partition_id
<< " term from " << partition->ConsensusTerm() << " to " << meta.term();
partition->ConsensusUpdateTerm(meta.term());
} else if (meta.term() < partition->ConsensusTerm()) /*outdated pb*/{
LOG(WARNING) << "Drop outdated trysync response " << table_name << ":" << partition_id
<< " recv term: " << meta.term() << " local term: " << partition->ConsensusTerm();
delete task_arg;
return;
}

bool success = TrySyncConsensusCheck(response->consensus_meta(), partition, slave_partition);
if (!success) {
delete task_arg;
Expand Down
1 change: 1 addition & 0 deletions src/pika_repl_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ void PikaReplServer::BuildBinlogSyncResp(const std::vector<WriteTask>& tasks,
committed_index = partition->ConsensusCommittedIndex();
InnerMessage::BinlogOffset* committed = consensus_meta->mutable_commit();
BuildBinlogOffset(committed_index, committed);
consensus_meta->set_term(partition->ConsensusTerm());
}
}

Expand Down
25 changes: 25 additions & 0 deletions src/pika_repl_server_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,17 @@ void PikaReplServerConn::HandleTrySyncRequest(void* arg) {
}

if (pre_success && req->has_consensus_meta()) {
const InnerMessage::ConsensusMeta& meta = req->consensus_meta();
if (meta.term() > partition->ConsensusTerm()) {
LOG(INFO) << "Update " << partition_name
<< " term from " << partition->ConsensusTerm() << " to " << meta.term();
partition->ConsensusUpdateTerm(meta.term());
} else if (meta.term() < partition->ConsensusTerm()) /*outdated pb*/{
LOG(WARNING) << "Drop outdated trysync req " << " partition: " << partition_name
<< " recv term: " << meta.term() << " local term: " << partition->ConsensusTerm();
delete task_arg;
return;
}
pre_success = TrySyncConsensusOffsetCheck(partition, req->consensus_meta(), &response, try_sync_response);
} else if (pre_success) {
pre_success = TrySyncOffsetCheck(partition, try_sync_request, try_sync_response);
Expand Down Expand Up @@ -395,6 +406,20 @@ void PikaReplServerConn::HandleBinlogSyncRequest(void* arg) {
return;
}

if (req->has_consensus_meta()){
const InnerMessage::ConsensusMeta& meta = req->consensus_meta();
if (meta.term() > master_partition->ConsensusTerm()) {
LOG(INFO) << "Update " << table_name << ":" << partition_id
<< " term from " << master_partition->ConsensusTerm() << " to " << meta.term();
master_partition->ConsensusUpdateTerm(meta.term());
} else if (meta.term() < master_partition->ConsensusTerm()) /*outdated pb*/{
LOG(WARNING) << "Drop outdated binlog sync req " << table_name << ":" << partition_id
<< " recv term: " << meta.term() << " local term: " << master_partition->ConsensusTerm();
delete task_arg;
return;
}
}

if (!master_partition->CheckSessionId(node.ip(), node.port(),
table_name, partition_id, session_id)) {
LOG(WARNING) << "Check Session failed " << node.ip() << ":" << node.port()
Expand Down
8 changes: 8 additions & 0 deletions src/pika_rm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,14 @@ LogOffset SyncMasterPartition::ConsensusLastIndex() {
return coordinator_.MemLogger()->last_offset();
}

uint32_t SyncMasterPartition::ConsensusTerm() {
return coordinator_.term();
}

void SyncMasterPartition::ConsensusUpdateTerm(uint32_t term) {
coordinator_.UpdateTerm(term);
}

std::shared_ptr<SlaveNode> SyncMasterPartition::GetSlaveNode(const std::string& ip, int port) {
return coordinator_.SyncPros().GetSlaveNode(ip, port);
}
Expand Down

0 comments on commit f6ac111

Please sign in to comment.