Skip to content

Commit

Permalink
Consensus test(OpenAtomFoundation#857)
Browse files Browse the repository at this point in the history
Basic Functional Test Done
Basic Unit Test Done
  • Loading branch information
whoiami committed Mar 20, 2020
1 parent f6ac111 commit a0fecc2
Show file tree
Hide file tree
Showing 19 changed files with 355 additions and 94 deletions.
11 changes: 8 additions & 3 deletions include/pika_binlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ class Binlog {

Status Put(const std::string &item);

Status GetProducerStatus(uint32_t* filenum, uint64_t* pro_offset, uint64_t* logic_id = NULL, uint32_t* term = NULL);
Status GetProducerStatus(uint32_t* filenum, uint64_t* pro_offset, uint32_t* term = NULL, uint64_t* logic_id = NULL);
/*
* Set Producer pro_num and pro_offset with lock
*/
Status SetProducerStatus(uint32_t filenum, uint64_t pro_offset);

Status SetProducerStatus(uint32_t filenum, uint64_t pro_offset, uint32_t term = 0, uint64_t index = 0);
// Need to hold Lock();
Status Truncate(uint32_t pro_num, uint64_t pro_offset);

uint64_t file_size() {
Expand All @@ -87,6 +87,11 @@ class Binlog {
version_->StableSave();
}

uint32_t term() {
slash::RWLock(&(version_->rwlock_), true);
return version_->term_;
}

void Close();

private:
Expand Down
23 changes: 22 additions & 1 deletion include/pika_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class Context {
Status StableSave();
void PrepareUpdateAppliedIndex(const LogOffset& offset);
void UpdateAppliedIndex(const LogOffset& offset);
void Reset(const LogOffset& applied_index);

pthread_rwlock_t rwlock_;
LogOffset applied_index_;
Expand Down Expand Up @@ -91,6 +92,9 @@ class MemLog {
}
Status PurdgeLogs(const LogOffset& offset, std::vector<LogItem>* logs);
Status GetRangeLogs(int start, int end, std::vector<LogItem>* logs);
Status TruncateTo(const LogOffset& offset);

void Reset(const LogOffset& offset);

LogOffset last_offset() {
slash::MutexLock l_logs(&logs_mu_);
Expand All @@ -115,6 +119,8 @@ class ConsensusCoordinator {
~ConsensusCoordinator();
// since it is invoked in constructor all locks not hold
void Init();
// invoked by dbsync process
Status Reset(const LogOffset& offset);

Status ProposeLog(
std::shared_ptr<Cmd> cmd_ptr,
Expand Down Expand Up @@ -153,6 +159,11 @@ class ConsensusCoordinator {
return committed_index_;
}

LogOffset applied_index() {
slash::RWLock l(&context_->rwlock_, false);
return context_->applied_index_;
}

std::shared_ptr<Context> context() {
return context_;
}
Expand All @@ -178,13 +189,20 @@ class ConsensusCoordinator {
}
tmp_stream << " Mem_logger size: " << mem_logger_->Size() <<
" last offset " << mem_logger_->last_offset().ToString() << "\r\n";
tmp_stream << " Stable_logger first offset " << stable_logger_->first_offset().ToString() << "\r\n";
LogOffset log_status;
stable_logger_->Logger()->GetProducerStatus(
&(log_status.b_offset.filenum), &(log_status.b_offset.offset),
&(log_status.l_offset.term), &(log_status.l_offset.index));
tmp_stream << " Physical Binlog Status: " << log_status.ToString();
return tmp_stream.str();
}

private:
Status ScheduleApplyLog(const LogOffset& committed_index);
Status ScheduleApplyFollowerLog(const LogOffset& committed_index);
bool MatchConsensusLevel();
Status TruncateTo(const LogOffset& offset);

Status InternalAppendLog(const BinlogItem& item,
std::shared_ptr<Cmd> cmd_ptr,
Expand All @@ -198,7 +216,7 @@ class ConsensusCoordinator {
bool InternalUpdateCommittedIndex(const LogOffset& slaves_committed_index,
LogOffset* updated_committed_index);

Status TryGetBinlogOffset(const BinlogOffset& start_offset, LogOffset* log_offset);
Status GetBinlogOffset(const BinlogOffset& start_offset, LogOffset* log_offset);
Status GetBinlogOffset(
const BinlogOffset& start_offset,
const BinlogOffset& end_offset, std::vector<LogOffset>* log_offset);
Expand All @@ -212,6 +230,9 @@ class ConsensusCoordinator {
const BinlogOffset& start_offset, uint64_t target_index, LogOffset* found_offset);
Status GetLogsBefore(const BinlogOffset& start_offset, std::vector<LogOffset>* hints);

// keep members in this class works in order
slash::Mutex order_mu_;

slash::Mutex index_mu_;
LogOffset committed_index_;

Expand Down
8 changes: 3 additions & 5 deletions include/pika_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,12 @@ struct BgSaveInfo {
time_t start_time;
std::string s_start_time;
std::string path;
uint32_t filenum;
uint64_t offset;
BgSaveInfo() : bgsaving(false), filenum(0), offset(0) {}
LogOffset offset;
BgSaveInfo() : bgsaving(false), offset() {}
void Clear() {
bgsaving = false;
path.clear();
filenum = 0;
offset = 0;
offset = LogOffset();
}
};

Expand Down
2 changes: 1 addition & 1 deletion include/pika_repl_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class PikaReplClientConn: public pink::PbConn {
static void HandleTrySyncResponse(void* arg);
static void HandleRemoveSlaveNodeResponse(void* arg);

static bool TrySyncConsensusCheck(
static Status TrySyncConsensusCheck(
const InnerMessage::ConsensusMeta& consensus_meta,
const std::shared_ptr<SyncMasterPartition>& partition,
const std::shared_ptr<SyncSlavePartition>& slave_partition);
Expand Down
1 change: 1 addition & 0 deletions include/pika_repl_server_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class PikaReplServerConn: public pink::PbConn {
static void BuildConsensusMeta(
const bool& reject,
const std::vector<LogOffset>& hints,
const uint32_t& term,
InnerMessage::InnerResponse* response);

static void HandleDBSyncRequest(void* arg);
Expand Down
2 changes: 2 additions & 0 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,12 @@ class SyncMasterPartition : public SyncPartition {
uint32_t ConsensusTerm();
void ConsensusUpdateTerm(uint32_t term);
Status ConsensusUpdateAppliedIndex(const LogOffset& offset);
LogOffset ConsensusAppliedIndex();
Status ConsensusLeaderNegotiate(const LogOffset& f_last_offset,
bool* reject, std::vector<LogOffset>* hints);
Status ConsensusFollowerNegotiate(
const std::vector<LogOffset>& hints, LogOffset* reply_offset);
Status ConsensusReset(LogOffset applied_offset);

std::shared_ptr<StableLog> StableLogger() {
return coordinator_.StableLogger();
Expand Down
5 changes: 5 additions & 0 deletions include/pika_slave_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ class SyncWindow {
std::size_t GetTotalBinglogSize() {
return total_size_;
}
void Reset() {
win_.clear();
total_size_ = 0;
}

private:
// TODO(whoiami) ring buffer maybe
std::deque<SyncWinItem> win_;
Expand Down
7 changes: 7 additions & 0 deletions include/pika_stable_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,22 @@ class StableLog : public std::enable_shared_from_this<StableLog> {
return stable_logger_;
}
void Leave();
void SetFirstOffset(const LogOffset& offset) {
slash::RWLock l(&offset_rwlock_, true);
first_offset_ = offset;
}
LogOffset first_offset() {
slash::RWLock l(&offset_rwlock_, false);
return first_offset_;
}
// Need to hold binlog lock
Status TruncateTo(uint32_t filenum, uint64_t offset);

// Purgelogs use
bool PurgeStableLogs(uint32_t to = 0, bool manual = false);
void ClearPurge();
bool GetBinlogFiles(std::map<uint32_t, std::string>* binlogs);
Status PurgeFileAfter(uint32_t filenum);

private:
void Close();
Expand Down
8 changes: 4 additions & 4 deletions src/pika_binlog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ void Binlog::InitLogFile() {
}

Status Binlog::GetProducerStatus(uint32_t* filenum, uint64_t* pro_offset,
uint64_t* logic_id, uint32_t* term) {
uint32_t* term, uint64_t* logic_id) {
if (!opened_.load()) {
return Status::Busy("Binlog is not open yet");
}
Expand Down Expand Up @@ -351,7 +351,7 @@ Status Binlog::AppendPadding(slash::WritableFile* file, uint64_t* len) {
return s;
}

Status Binlog::SetProducerStatus(uint32_t pro_num, uint64_t pro_offset) {
Status Binlog::SetProducerStatus(uint32_t pro_num, uint64_t pro_offset, uint32_t term, uint64_t index) {
if (!opened_.load()) {
return Status::Busy("Binlog is not open yet");
}
Expand Down Expand Up @@ -384,6 +384,8 @@ Status Binlog::SetProducerStatus(uint32_t pro_num, uint64_t pro_offset) {
slash::RWLock(&(version_->rwlock_), true);
version_->pro_num_ = pro_num;
version_->pro_offset_ = pro_offset;
version_->term_ = term;
version_->logic_id_ = index;
version_->StableSave();
}

Expand All @@ -392,8 +394,6 @@ Status Binlog::SetProducerStatus(uint32_t pro_num, uint64_t pro_offset) {
}

Status Binlog::Truncate(uint32_t pro_num, uint64_t pro_offset) {
slash::MutexLock l(&mutex_);

delete queue_;
std::string profile = NewFileName(filename_, pro_num);
const int fd = open(profile.c_str(), O_RDWR | O_CLOEXEC, 0644);
Expand Down
Loading

0 comments on commit a0fecc2

Please sign in to comment.