Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
1. While follower processing leader hints, local searching by logic
index instead of binlog offset.

2. Check if enough follower while doing InternalCalCommittedIndex.

3. If slave node already exist, could still process HandleTrySyncRequest.
  • Loading branch information
whoiami committed Mar 20, 2020
1 parent be45f4c commit 46fe81e
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 8 deletions.
3 changes: 2 additions & 1 deletion include/pika_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ class MemLog {
bool FindLogItem(const LogOffset& offset, LogOffset* found_offset);

private:
int InternalFindLogIndex(const LogOffset& offset);
int InternalFindLogByBinlogOffset(const LogOffset& offset);
int InternalFindLogByLogicIndex(const LogOffset& offset);
slash::Mutex logs_mu_;
std::vector<LogItem> logs_;
LogOffset last_offset_;
Expand Down
27 changes: 21 additions & 6 deletions src/pika_consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ int SyncProgress::SlaveSize() {

LogOffset SyncProgress::InternalCalCommittedIndex(std::unordered_map<std::string, LogOffset> match_index) {
int consensus_level = g_pika_conf->consensus_level();
if (static_cast<int>(match_index.size()) < consensus_level) {
return LogOffset();
}
std::vector<LogOffset> offsets;
for (auto index : match_index) {
offsets.push_back(index.second);
Expand All @@ -201,7 +204,7 @@ int MemLog::Size() {
// purge [begin, offset]
Status MemLog::PurgeLogs(const LogOffset& offset, std::vector<LogItem>* logs) {
slash::MutexLock l_logs(&logs_mu_);
int index = InternalFindLogIndex(offset);
int index = InternalFindLogByBinlogOffset(offset);
if (index < 0) {
return Status::NotFound("Cant find correct index");
}
Expand All @@ -213,7 +216,7 @@ Status MemLog::PurgeLogs(const LogOffset& offset, std::vector<LogItem>* logs) {
// keep mem_log [mem_log.begin, offset]
Status MemLog::TruncateTo(const LogOffset& offset) {
slash::MutexLock l_logs(&logs_mu_);
int index = InternalFindLogIndex(offset);
int index = InternalFindLogByBinlogOffset(offset);
if (index < 0) {
return Status::Corruption("Cant find correct index");
}
Expand All @@ -240,15 +243,27 @@ Status MemLog::GetRangeLogs(int start, int end, std::vector<LogItem>* logs) {

bool MemLog::FindLogItem(const LogOffset& offset, LogOffset* found_offset) {
slash::MutexLock l_logs(&logs_mu_);
int index = InternalFindLogIndex(offset);
int index = InternalFindLogByLogicIndex(offset);
if (index < 0) {
return false;
}
*found_offset = logs_[index].offset;
return true;
}

int MemLog::InternalFindLogIndex(const LogOffset& offset) {
int MemLog::InternalFindLogByLogicIndex(const LogOffset& offset) {
for (size_t i = 0; i < logs_.size(); ++i) {
if (logs_[i].offset.l_offset.index > offset.l_offset.index) {
return -1;
}
if (logs_[i].offset.l_offset.index == offset.l_offset.index) {
return i;
}
}
return -1;
}

int MemLog::InternalFindLogByBinlogOffset(const LogOffset& offset) {
for (size_t i = 0; i < logs_.size(); ++i) {
if (logs_[i].offset > offset) {
return -1;
Expand Down Expand Up @@ -882,7 +897,7 @@ Status ConsensusCoordinator::LeaderNegotiate(
Status s = FindLogicOffset(f_last_offset.b_offset, f_index, &found_offset);
if (!s.ok()) {
if (s.IsNotFound()) {
LOG(WARNING) << PartitionInfo(table_name_, partition_id_).ToString()
LOG(INFO) << PartitionInfo(table_name_, partition_id_).ToString()
<< f_last_offset.ToString() << " not found " << s.ToString();
return s;
} else {
Expand Down Expand Up @@ -950,7 +965,7 @@ Status ConsensusCoordinator::FollowerNegotiate(const std::vector<LogOffset>& hin
LogOffset found_offset;
bool res = mem_logger_->FindLogItem(hints[i], &found_offset);
if (!res) {
return Status::Corruption("hints not found");
return Status::Corruption("hints not found " + hints[i].ToString());
}
if (found_offset.l_offset.term == hints[i].l_offset.term) {
// trunk to found_offsett
Expand Down
3 changes: 2 additions & 1 deletion src/pika_repl_server_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ void PikaReplServerConn::HandleTrySyncRequest(void* arg) {
}

if (pre_success && req->has_consensus_meta()) {
if (partition->GetNumberOfSlaveNode() >= g_pika_conf->replication_num()) {
if (partition->GetNumberOfSlaveNode() >= g_pika_conf->replication_num()
&& !partition->CheckSlaveNodeExist(node.ip(), node.port())) {
LOG(WARNING) << "Current replication num: "
<< partition->GetNumberOfSlaveNode()
<< " hits configuration replication-num "
Expand Down

0 comments on commit 46fe81e

Please sign in to comment.