diff --git a/include/pika_binlog.h b/include/pika_binlog.h index d5e91b4ea2..6d68466594 100644 --- a/include/pika_binlog.h +++ b/include/pika_binlog.h @@ -60,12 +60,12 @@ class Binlog { Status Put(const std::string &item); - Status GetProducerStatus(uint32_t* filenum, uint64_t* pro_offset, uint64_t* logic_id = NULL, uint32_t* term = NULL); + Status GetProducerStatus(uint32_t* filenum, uint64_t* pro_offset, uint32_t* term = NULL, uint64_t* logic_id = NULL); /* * Set Producer pro_num and pro_offset with lock */ - Status SetProducerStatus(uint32_t filenum, uint64_t pro_offset); - + Status SetProducerStatus(uint32_t filenum, uint64_t pro_offset, uint32_t term = 0, uint64_t index = 0); + // Need to hold Lock(); Status Truncate(uint32_t pro_num, uint64_t pro_offset); uint64_t file_size() { @@ -87,6 +87,11 @@ class Binlog { version_->StableSave(); } + uint32_t term() { + slash::RWLock(&(version_->rwlock_), true); + return version_->term_; + } + void Close(); private: diff --git a/include/pika_consensus.h b/include/pika_consensus.h index ebc0820e5a..8d3d0ac59d 100644 --- a/include/pika_consensus.h +++ b/include/pika_consensus.h @@ -22,6 +22,7 @@ class Context { Status StableSave(); void PrepareUpdateAppliedIndex(const LogOffset& offset); void UpdateAppliedIndex(const LogOffset& offset); + void Reset(const LogOffset& applied_index); pthread_rwlock_t rwlock_; LogOffset applied_index_; @@ -91,6 +92,9 @@ class MemLog { } Status PurdgeLogs(const LogOffset& offset, std::vector* logs); Status GetRangeLogs(int start, int end, std::vector* logs); + Status TruncateTo(const LogOffset& offset); + + void Reset(const LogOffset& offset); LogOffset last_offset() { slash::MutexLock l_logs(&logs_mu_); @@ -115,6 +119,8 @@ class ConsensusCoordinator { ~ConsensusCoordinator(); // since it is invoked in constructor all locks not hold void Init(); + // invoked by dbsync process + Status Reset(const LogOffset& offset); Status ProposeLog( std::shared_ptr cmd_ptr, @@ -153,6 +159,11 @@ class ConsensusCoordinator { return committed_index_; } + LogOffset applied_index() { + slash::RWLock l(&context_->rwlock_, false); + return context_->applied_index_; + } + std::shared_ptr context() { return context_; } @@ -178,6 +189,12 @@ class ConsensusCoordinator { } tmp_stream << " Mem_logger size: " << mem_logger_->Size() << " last offset " << mem_logger_->last_offset().ToString() << "\r\n"; + tmp_stream << " Stable_logger first offset " << stable_logger_->first_offset().ToString() << "\r\n"; + LogOffset log_status; + stable_logger_->Logger()->GetProducerStatus( + &(log_status.b_offset.filenum), &(log_status.b_offset.offset), + &(log_status.l_offset.term), &(log_status.l_offset.index)); + tmp_stream << " Physical Binlog Status: " << log_status.ToString(); return tmp_stream.str(); } @@ -185,6 +202,7 @@ class ConsensusCoordinator { Status ScheduleApplyLog(const LogOffset& committed_index); Status ScheduleApplyFollowerLog(const LogOffset& committed_index); bool MatchConsensusLevel(); + Status TruncateTo(const LogOffset& offset); Status InternalAppendLog(const BinlogItem& item, std::shared_ptr cmd_ptr, @@ -198,7 +216,7 @@ class ConsensusCoordinator { bool InternalUpdateCommittedIndex(const LogOffset& slaves_committed_index, LogOffset* updated_committed_index); - Status TryGetBinlogOffset(const BinlogOffset& start_offset, LogOffset* log_offset); + Status GetBinlogOffset(const BinlogOffset& start_offset, LogOffset* log_offset); Status GetBinlogOffset( const BinlogOffset& start_offset, const BinlogOffset& end_offset, std::vector* log_offset); @@ -212,6 +230,9 @@ class ConsensusCoordinator { const BinlogOffset& start_offset, uint64_t target_index, LogOffset* found_offset); Status GetLogsBefore(const BinlogOffset& start_offset, std::vector* hints); + // keep members in this class works in order + slash::Mutex order_mu_; + slash::Mutex index_mu_; LogOffset committed_index_; diff --git a/include/pika_partition.h b/include/pika_partition.h index b872d74d9d..56dde4934d 100644 --- a/include/pika_partition.h +++ b/include/pika_partition.h @@ -38,14 +38,12 @@ struct BgSaveInfo { time_t start_time; std::string s_start_time; std::string path; - uint32_t filenum; - uint64_t offset; - BgSaveInfo() : bgsaving(false), filenum(0), offset(0) {} + LogOffset offset; + BgSaveInfo() : bgsaving(false), offset() {} void Clear() { bgsaving = false; path.clear(); - filenum = 0; - offset = 0; + offset = LogOffset(); } }; diff --git a/include/pika_repl_client_conn.h b/include/pika_repl_client_conn.h index 735cec9200..df18c87f2a 100644 --- a/include/pika_repl_client_conn.h +++ b/include/pika_repl_client_conn.h @@ -26,7 +26,7 @@ class PikaReplClientConn: public pink::PbConn { static void HandleTrySyncResponse(void* arg); static void HandleRemoveSlaveNodeResponse(void* arg); - static bool TrySyncConsensusCheck( + static Status TrySyncConsensusCheck( const InnerMessage::ConsensusMeta& consensus_meta, const std::shared_ptr& partition, const std::shared_ptr& slave_partition); diff --git a/include/pika_repl_server_conn.h b/include/pika_repl_server_conn.h index 7d7e035247..8c5345907a 100644 --- a/include/pika_repl_server_conn.h +++ b/include/pika_repl_server_conn.h @@ -41,6 +41,7 @@ class PikaReplServerConn: public pink::PbConn { static void BuildConsensusMeta( const bool& reject, const std::vector& hints, + const uint32_t& term, InnerMessage::InnerResponse* response); static void HandleDBSyncRequest(void* arg); diff --git a/include/pika_rm.h b/include/pika_rm.h index 757407fa63..c54afb15b0 100644 --- a/include/pika_rm.h +++ b/include/pika_rm.h @@ -103,10 +103,12 @@ class SyncMasterPartition : public SyncPartition { uint32_t ConsensusTerm(); void ConsensusUpdateTerm(uint32_t term); Status ConsensusUpdateAppliedIndex(const LogOffset& offset); + LogOffset ConsensusAppliedIndex(); Status ConsensusLeaderNegotiate(const LogOffset& f_last_offset, bool* reject, std::vector* hints); Status ConsensusFollowerNegotiate( const std::vector& hints, LogOffset* reply_offset); + Status ConsensusReset(LogOffset applied_offset); std::shared_ptr StableLogger() { return coordinator_.StableLogger(); diff --git a/include/pika_slave_node.h b/include/pika_slave_node.h index 6a24ed8e0b..55c71554d4 100644 --- a/include/pika_slave_node.h +++ b/include/pika_slave_node.h @@ -50,6 +50,11 @@ class SyncWindow { std::size_t GetTotalBinglogSize() { return total_size_; } + void Reset() { + win_.clear(); + total_size_ = 0; + } + private: // TODO(whoiami) ring buffer maybe std::deque win_; diff --git a/include/pika_stable_log.h b/include/pika_stable_log.h index cbea2236ce..887dd03c3e 100644 --- a/include/pika_stable_log.h +++ b/include/pika_stable_log.h @@ -20,15 +20,22 @@ class StableLog : public std::enable_shared_from_this { return stable_logger_; } void Leave(); + void SetFirstOffset(const LogOffset& offset) { + slash::RWLock l(&offset_rwlock_, true); + first_offset_ = offset; + } LogOffset first_offset() { slash::RWLock l(&offset_rwlock_, false); return first_offset_; } + // Need to hold binlog lock + Status TruncateTo(uint32_t filenum, uint64_t offset); // Purgelogs use bool PurgeStableLogs(uint32_t to = 0, bool manual = false); void ClearPurge(); bool GetBinlogFiles(std::map* binlogs); + Status PurgeFileAfter(uint32_t filenum); private: void Close(); diff --git a/src/pika_binlog.cc b/src/pika_binlog.cc index 7a643af026..0b3b1d1b0f 100644 --- a/src/pika_binlog.cc +++ b/src/pika_binlog.cc @@ -162,7 +162,7 @@ void Binlog::InitLogFile() { } Status Binlog::GetProducerStatus(uint32_t* filenum, uint64_t* pro_offset, - uint64_t* logic_id, uint32_t* term) { + uint32_t* term, uint64_t* logic_id) { if (!opened_.load()) { return Status::Busy("Binlog is not open yet"); } @@ -351,7 +351,7 @@ Status Binlog::AppendPadding(slash::WritableFile* file, uint64_t* len) { return s; } -Status Binlog::SetProducerStatus(uint32_t pro_num, uint64_t pro_offset) { +Status Binlog::SetProducerStatus(uint32_t pro_num, uint64_t pro_offset, uint32_t term, uint64_t index) { if (!opened_.load()) { return Status::Busy("Binlog is not open yet"); } @@ -384,6 +384,8 @@ Status Binlog::SetProducerStatus(uint32_t pro_num, uint64_t pro_offset) { slash::RWLock(&(version_->rwlock_), true); version_->pro_num_ = pro_num; version_->pro_offset_ = pro_offset; + version_->term_ = term; + version_->logic_id_ = index; version_->StableSave(); } @@ -392,8 +394,6 @@ Status Binlog::SetProducerStatus(uint32_t pro_num, uint64_t pro_offset) { } Status Binlog::Truncate(uint32_t pro_num, uint64_t pro_offset) { - slash::MutexLock l(&mutex_); - delete queue_; std::string profile = NewFileName(filename_, pro_num); const int fd = open(profile.c_str(), O_RDWR | O_CLOEXEC, 0644); diff --git a/src/pika_consensus.cc b/src/pika_consensus.cc index f7bb405215..b04c2602c8 100644 --- a/src/pika_consensus.cc +++ b/src/pika_consensus.cc @@ -78,6 +78,13 @@ void Context::UpdateAppliedIndex(const LogOffset& offset) { } } +void Context::Reset(const LogOffset& offset) { + slash::RWLock l(&rwlock_, true); + applied_index_ = offset; + applied_win_.Reset(); + StableSave(); +} + /* SyncProgress */ SyncProgress::SyncProgress() { @@ -191,17 +198,35 @@ int MemLog::Size() { return static_cast(logs_.size()); } +// purdge [begin, offset] Status MemLog::PurdgeLogs(const LogOffset& offset, std::vector* logs) { slash::MutexLock l_logs(&logs_mu_); int index = InternalFindLogIndex(offset); if (index < 0) { - return Status::Corruption("Cant find correct index"); + return Status::NotFound("Cant find correct index"); } logs->assign(logs_.begin(), logs_.begin() + index + 1); logs_.erase(logs_.begin(), logs_.begin() + index + 1); return Status::OK(); } +// keep mem_log [mem_log.begin, offset] +Status MemLog::TruncateTo(const LogOffset& offset) { + slash::MutexLock l_logs(&logs_mu_); + int index = InternalFindLogIndex(offset); + if (index < 0) { + return Status::Corruption("Cant find correct index"); + } + logs_.erase(logs_.begin() + index + 1, logs_.end()); + return Status::OK(); +} + +void MemLog::Reset(const LogOffset& offset) { + slash::MutexLock l_logs(&logs_mu_); + logs_.erase(logs_.begin(), logs_.end()); + last_offset_ = offset; +} + Status MemLog::GetRangeLogs(int start, int end, std::vector* logs) { slash::MutexLock l_logs(&logs_mu_); int log_size = static_cast(logs_.size()); @@ -259,7 +284,11 @@ void ConsensusCoordinator::Init() { context_->Init(); committed_index_ = context_->applied_index_; - LOG(INFO) << "Restore applied index " << context_->applied_index_.ToString(); + //load term_ + term_ = stable_logger_->Logger()->term(); + + LOG(INFO) << PartitionInfo(table_name_, partition_id_).ToString() + << "Restore applied index " << context_->applied_index_.ToString() << " current term " << term_; if (committed_index_ == LogOffset()) { return; } @@ -273,7 +302,7 @@ void ConsensusCoordinator::Init() { int res = binlog_reader.Seek(stable_logger_->Logger(), committed_index_.b_offset.filenum, committed_index_.b_offset.offset); if (res) { - LOG(FATAL) << "Binlog reader init failed"; + LOG(FATAL) << PartitionInfo(table_name_, partition_id_).ToString() << "Binlog reader init failed"; } while(1) { @@ -283,11 +312,11 @@ void ConsensusCoordinator::Init() { if (s.IsEndFile()) { break; } else if (s.IsCorruption() || s.IsIOError()) { - LOG(FATAL) << "Read Binlog error"; + LOG(FATAL) << PartitionInfo(table_name_, partition_id_).ToString() << "Read Binlog error"; } BinlogItem item; if (!PikaBinlogTransverter::BinlogItemWithoutContentDecode(TypeFirst, binlog, &item)) { - LOG(FATAL) << "Binlog item decode failed"; + LOG(FATAL) << PartitionInfo(table_name_, partition_id_).ToString() << "Binlog item decode failed"; } offset.l_offset.term = item.term_id(); offset.l_offset.index = item.logic_id(); @@ -299,7 +328,7 @@ void ConsensusCoordinator::Init() { pink::RedisParserStatus ret = redis_parser.ProcessInputBuffer( redis_parser_start, redis_parser_len, &processed_len); if (ret != pink::kRedisParserDone) { - LOG(FATAL) << "Redis parser parse failed"; + LOG(FATAL) << PartitionInfo(table_name_, partition_id_).ToString() << "Redis parser parse failed"; return; } CmdPtrArg* arg = static_cast(redis_parser.data); @@ -311,6 +340,30 @@ void ConsensusCoordinator::Init() { } } +Status ConsensusCoordinator::Reset(const LogOffset& offset) { + context_->Reset(offset); + { + slash::MutexLock l(&index_mu_); + committed_index_ = offset; + } + + UpdateTerm(offset.l_offset.term); + Status s = stable_logger_->Logger()->SetProducerStatus( + offset.b_offset.filenum, offset.b_offset.offset, + offset.l_offset.term, offset.l_offset.index); + if (!s.ok()) { + LOG(WARNING) << PartitionInfo(table_name_, partition_id_).ToString() << "Consensus reset status failed " << s.ToString(); + return s; + } + + stable_logger_->SetFirstOffset(offset); + + stable_logger_->Logger()->Lock(); + mem_logger_->Reset(offset); + stable_logger_->Logger()->Unlock(); + return Status::OK(); +} + Status ConsensusCoordinator::ProposeLog( std::shared_ptr cmd_ptr, std::shared_ptr conn_ptr, @@ -321,7 +374,7 @@ Status ConsensusCoordinator::ProposeLog( // build BinlogItem uint32_t filenum = 0, term = 0; uint64_t offset = 0, logic_id = 0; - Status s = stable_logger_->Logger()->GetProducerStatus(&filenum, &offset, &logic_id, &term); + Status s = stable_logger_->Logger()->GetProducerStatus(&filenum, &offset, &term, &logic_id); if (!s.ok()) { stable_logger_->Logger()->Unlock(); return s; @@ -329,7 +382,7 @@ Status ConsensusCoordinator::ProposeLog( BinlogItem item; item.set_exec_time(time(nullptr)); item.set_term_id(term); - item.set_logic_id(logic_id); + item.set_logic_id(logic_id + 1); item.set_filenum(filenum); item.set_offset(offset); // make sure stable log and mem log consistent @@ -363,7 +416,8 @@ Status ConsensusCoordinator::InternalAppendLog(const BinlogItem& item, Status ConsensusCoordinator::ProcessLeaderLog(std::shared_ptr cmd_ptr, const BinlogItem& attribute) { LogOffset last_index = mem_logger_->last_offset(); if (attribute.logic_id() < last_index.l_offset.index) { - LOG(WARNING) << "Drop log from leader logic_id " << attribute.logic_id() << " cur last index " << last_index.l_offset.index; + LOG(WARNING) << PartitionInfo(table_name_, partition_id_).ToString() + << "Drop log from leader logic_id " << attribute.logic_id() << " cur last index " << last_index.l_offset.index; return Status::OK(); } @@ -416,6 +470,9 @@ Status ConsensusCoordinator::UpdateSlave(const std::string& ip, int port, // do not commit log which is not current term log if (committed_index.l_offset.term != term()) { + LOG_EVERY_N(INFO, 1000) << "Will not commit log term which is not equals to current term" + << " To updated committed_index" << committed_index.ToString() << " current term " << term() + << " from " << ip << " " << port << " start " << start.ToString() << " end " << end.ToString(); return Status::OK(); } @@ -427,7 +484,10 @@ Status ConsensusCoordinator::UpdateSlave(const std::string& ip, int port, } if (need_update) { s = ScheduleApplyLog(updated_committed_index); - if (!s.ok()) { + // updateslave could be invoked by many thread + // not found means a late offset pass in ScheduleApplyLog + // an early offset is not found + if (!s.ok() && !s.IsNotFound()) { return s; } } @@ -465,6 +525,8 @@ Status ConsensusCoordinator::InternalAppendBinlog(const BinlogItem& item, } Status ConsensusCoordinator::ScheduleApplyLog(const LogOffset& committed_index) { + // logs from PurdgeLogs goes to InternalApply in order + slash::MutexLock l(&order_mu_); std::vector logs; Status s = mem_logger_->PurdgeLogs(committed_index, &logs); if (!s.ok()) { @@ -478,6 +540,8 @@ Status ConsensusCoordinator::ScheduleApplyLog(const LogOffset& committed_index) } Status ConsensusCoordinator::ScheduleApplyFollowerLog(const LogOffset& committed_index) { + // logs from PurdgeLogs goes to InternalApply in order + slash::MutexLock l(&order_mu_); std::vector logs; Status s = mem_logger_->PurdgeLogs(committed_index, &logs); if (!s.ok()) { @@ -564,7 +628,29 @@ int ConsensusCoordinator::InitCmd(pink::RedisParser* parser, const pink::RedisCm return 0; } -Status ConsensusCoordinator::TryGetBinlogOffset(const BinlogOffset& start_offset, LogOffset* log_offset) { +Status ConsensusCoordinator::TruncateTo(const LogOffset& offset) { + LOG(INFO) << PartitionInfo(table_name_, partition_id_).ToString() << "Truncate to " << offset.ToString(); + LogOffset committed = committed_index(); + stable_logger_->Logger()->Lock(); + if (offset.l_offset.index == committed.l_offset.index) { + mem_logger_->Reset(committed); + } else { + Status s = mem_logger_->TruncateTo(offset); + if (!s.ok()) { + stable_logger_->Logger()->Unlock(); + return s; + } + } + Status s = stable_logger_->TruncateTo(offset.b_offset.filenum, offset.b_offset.offset); + if (!s.ok()) { + stable_logger_->Logger()->Unlock(); + return s; + } + stable_logger_->Logger()->Unlock(); + return Status::OK(); +} + +Status ConsensusCoordinator::GetBinlogOffset(const BinlogOffset& start_offset, LogOffset* log_offset) { PikaBinlogReader binlog_reader; int res = binlog_reader.Seek(stable_logger_->Logger(), start_offset.filenum, start_offset.offset); @@ -581,13 +667,16 @@ Status ConsensusCoordinator::TryGetBinlogOffset(const BinlogOffset& start_offset if (!PikaBinlogTransverter::BinlogItemWithoutContentDecode(TypeFirst, binlog, &item)) { return Status::Corruption("Binlog item decode failed"); } - log_offset->b_offset.filenum = item.filenum(); - log_offset->b_offset.offset = item.offset(); + log_offset->b_offset = offset; log_offset->l_offset.term = item.term_id(); log_offset->l_offset.index = item.logic_id(); return Status::OK(); } +// get binlog offset range [start_offset, end_offset] +// start_offset 0,0 end_offset 1,129, result will include binlog (1,129) +// start_offset 0,0 end_offset 1,0, result will NOT include binlog (1,xxx) +// start_offset 0,0 end_offset 0,0, resulet will NOT include binlog(0,xxx) Status ConsensusCoordinator::GetBinlogOffset( const BinlogOffset& start_offset, const BinlogOffset& end_offset, @@ -612,8 +701,7 @@ Status ConsensusCoordinator::GetBinlogOffset( return Status::Corruption("Binlog item decode failed"); } LogOffset offset; - offset.b_offset.filenum = item.filenum(); - offset.b_offset.offset = item.offset(); + offset.b_offset = b_offset; offset.l_offset.term = item.term_id(); offset.l_offset.index = item.logic_id(); if (offset.b_offset > end_offset) { @@ -636,25 +724,23 @@ Status ConsensusCoordinator::FindBinlogFileNum( uint32_t filenum = start_filenum; while(1) { LogOffset first_offset; - std::vector offsets; - Status s = GetBinlogOffset(BinlogOffset(filenum, 0), BinlogOffset(filenum, 0), &offsets); + Status s = GetBinlogOffset(BinlogOffset(filenum, 0), &first_offset); if (!s.ok()) { return s; } - if (!offsets.empty()) { - first_offset = offsets[0]; - } - if (target_index <= first_offset.l_offset.index) { + if (target_index < first_offset.l_offset.index) { if (first_time_right) { + // last filenum + filenum = filenum -1; break; } // move left first_time_left = true; - if (filenum - 1 < lb_binlogs) { + if (filenum == 0 || filenum - 1 < lb_binlogs) { return Status::NotFound(std::to_string(target_index) + " hit low boundary"); } filenum = filenum - 1; - } else if (target_index >= first_offset.l_offset.index) { + } else if (target_index > first_offset.l_offset.index) { if (first_time_left) { break; } @@ -664,6 +750,8 @@ Status ConsensusCoordinator::FindBinlogFileNum( break; } filenum = filenum + 1; + } else { + break; } } *founded_filenum = filenum; @@ -672,6 +760,9 @@ Status ConsensusCoordinator::FindBinlogFileNum( Status ConsensusCoordinator::FindLogicOffsetBySearchingBinlog( const BinlogOffset& hint_offset, uint64_t target_index, LogOffset* found_offset) { + LOG(INFO) << PartitionInfo(table_name_, partition_id_).ToString() + << "FindLogicOffsetBySearchingBinlog hint offset " << hint_offset.ToString() + << " target_index " << target_index; BinlogOffset start_offset; std::map binlogs; if (!stable_logger_->GetBinlogFiles(&binlogs)) { @@ -692,6 +783,7 @@ Status ConsensusCoordinator::FindLogicOffsetBySearchingBinlog( return s; } + LOG(INFO) << PartitionInfo(table_name_, partition_id_).ToString() << "FindBinlogFilenum res " << found_filenum; BinlogOffset traversal_start(found_filenum, 0); BinlogOffset traversal_end(found_filenum + 1, 0); std::vector offsets; @@ -701,6 +793,7 @@ Status ConsensusCoordinator::FindLogicOffsetBySearchingBinlog( } for (auto& offset : offsets) { if (offset.l_offset.index == target_index) { + LOG(INFO) << PartitionInfo(table_name_, partition_id_).ToString() << "Founded " << target_index << " " << offset.ToString(); *found_offset = offset; return Status::OK(); } @@ -710,8 +803,14 @@ Status ConsensusCoordinator::FindLogicOffsetBySearchingBinlog( Status ConsensusCoordinator::FindLogicOffset(const BinlogOffset& start_offset, uint64_t target_index, LogOffset* found_offset) { LogOffset possible_offset; - Status s = TryGetBinlogOffset(start_offset, &possible_offset); + Status s = GetBinlogOffset(start_offset, &possible_offset); if (!s.ok() || possible_offset.l_offset.index != target_index) { + if (!s.ok()) { + LOG(INFO) << PartitionInfo(table_name_, partition_id_).ToString() << "GetBinlogOffset res: " << s.ToString(); + } else { + LOG(INFO) << PartitionInfo(table_name_, partition_id_).ToString() << "GetBInlogOffset res: " << s.ToString() << + " possible_offset " << possible_offset.ToString() << " target_index " << target_index; + } return FindLogicOffsetBySearchingBinlog(start_offset, target_index, found_offset); } *found_offset = possible_offset; @@ -720,7 +819,9 @@ Status ConsensusCoordinator::FindLogicOffset(const BinlogOffset& start_offset, u Status ConsensusCoordinator::GetLogsBefore(const BinlogOffset& start_offset, std::vector* hints) { BinlogOffset traversal_end = start_offset; - BinlogOffset traversal_start(traversal_start.filenum - 1, 0); + BinlogOffset traversal_start(traversal_end.filenum, 0); + traversal_start.filenum = + traversal_start.filenum == 0 ? 0 : traversal_start.filenum - 1; std::map binlogs; if (!stable_logger_->GetBinlogFiles(&binlogs)) { return Status::Corruption("Get binlog files failed"); @@ -729,7 +830,10 @@ Status ConsensusCoordinator::GetLogsBefore(const BinlogOffset& start_offset, std traversal_start.filenum = traversal_end.filenum; } std::vector res; - GetBinlogOffset(traversal_start, traversal_end, &res); + Status s = GetBinlogOffset(traversal_start, traversal_end, &res); + if (!s.ok()) { + return s; + } if (res.size() > 100) { res.assign(res.end() - 100, res.end()); } @@ -740,6 +844,11 @@ Status ConsensusCoordinator::GetLogsBefore(const BinlogOffset& start_offset, std Status ConsensusCoordinator::LeaderNegotiate( const LogOffset& f_last_offset, bool* reject, std::vector* hints) { uint64_t f_index = f_last_offset.l_offset.index; + LOG(INFO) << PartitionInfo(table_name_, partition_id_).ToString() + << "LeaderNeotiate follower last offset " + << f_last_offset.ToString() + << " first_offsert " << stable_logger_->first_offset().ToString() + << " last_offset " << mem_logger_->last_offset().ToString(); *reject = true; if (f_index > mem_logger_->last_offset().l_offset.index) { // hints starts from last_offset() - 100; @@ -748,21 +857,31 @@ Status ConsensusCoordinator::LeaderNegotiate( LOG(WARNING) << f_index << " is larger than last index " << mem_logger_->last_offset().ToString() << " get logs before last index failed " << s.ToString(); return s; } + LOG(INFO) << PartitionInfo(table_name_, partition_id_).ToString() + << "follower index larger then last_offset index, get logs before " << mem_logger_->last_offset().ToString(); return Status::OK(); - } else if (f_index < stable_logger_->first_offset().l_offset.index) { + } + if (f_index < stable_logger_->first_offset().l_offset.index) { // need full sync - LOG(WARNING) << f_index << " not found current first index" << stable_logger_->first_offset().ToString(); + LOG(INFO) << PartitionInfo(table_name_, partition_id_).ToString() + << f_index << " not found current first index" << stable_logger_->first_offset().ToString(); return Status::NotFound("logic index"); } + if (f_last_offset.l_offset.index == 0) { + *reject = false; + return Status::OK(); + } LogOffset found_offset; Status s = FindLogicOffset(f_last_offset.b_offset, f_index, &found_offset); if (!s.ok()) { if (s.IsNotFound()) { - LOG(WARNING) << f_last_offset.ToString() << " not found " << s.ToString(); + LOG(WARNING) << PartitionInfo(table_name_, partition_id_).ToString() + << f_last_offset.ToString() << " not found " << s.ToString(); return s; } else { - LOG(WARNING) << "find logic offset failed" << s.ToString(); + LOG(WARNING) << PartitionInfo(table_name_, partition_id_).ToString() + << "find logic offset failed" << s.ToString(); return s; } } @@ -771,30 +890,57 @@ Status ConsensusCoordinator::LeaderNegotiate( || !(f_last_offset.b_offset == found_offset.b_offset)) { Status s = GetLogsBefore(found_offset.b_offset, hints); if (!s.ok()) { - LOG(WARNING) << "Try to get logs before " << found_offset.ToString() << " failed"; + LOG(WARNING) << PartitionInfo(table_name_, partition_id_).ToString() + << "Try to get logs before " << found_offset.ToString() << " failed"; return s; } return Status::OK(); } + LOG(INFO) << PartitionInfo(table_name_, partition_id_).ToString() + << "Found equal offset " << found_offset.ToString(); *reject = false; return Status::OK(); } +// memlog order: committed_index , [committed_index + 1, memlogger.end()] Status ConsensusCoordinator::FollowerNegotiate(const std::vector& hints, LogOffset* reply_offset) { if (hints.empty()) { return Status::Corruption("hints empty"); } + LOG(INFO) << PartitionInfo(table_name_, partition_id_).ToString() + << "FollowerNegotiate from " << hints[0].ToString() << " to " << hints[hints.size() - 1].ToString(); if (mem_logger_->last_offset().l_offset.index < hints[0].l_offset.index) { *reply_offset = mem_logger_->last_offset(); return Status::OK(); } + if (committed_index().l_offset.index > hints[hints.size() - 1].l_offset.index) { + return Status::Corruption("invalid hints all smaller than committed_index"); + } if (mem_logger_->last_offset().l_offset.index > hints[hints.size() - 1].l_offset.index) { - BinlogOffset truncate_offset = hints[hints.size() -1].b_offset; + LogOffset truncate_offset = hints[hints.size() - 1]; // trunck to hints end - stable_logger_->Logger()->Truncate(truncate_offset.filenum, truncate_offset.offset); + Status s = TruncateTo(truncate_offset); + if (!s.ok()) { + return s; + } } - for (int i = hints.size() - 1; i >= 0; i--) { + + LogOffset committed = committed_index(); + for (int i = hints.size() - 1; i >= 0; i--) { + if (hints[i].l_offset.index < committed.l_offset.index) { + return Status::Corruption("hints not found"); + } + if (hints[i].l_offset.index == committed.l_offset.index) { + if (hints[i].l_offset.term == committed.l_offset.term) { + Status s = TruncateTo(hints[i]); + if (!s.ok()) { + return s; + } + *reply_offset = mem_logger_->last_offset(); + return Status::OK(); + } + } LogOffset found_offset; bool res = mem_logger_->FindLogItem(hints[i], &found_offset); if (!res) { @@ -802,11 +948,19 @@ Status ConsensusCoordinator::FollowerNegotiate(const std::vector& hin } if (found_offset.l_offset.term == hints[i].l_offset.term) { // trunk to found_offsett - stable_logger_->Logger()->Truncate( - found_offset.b_offset.filenum, found_offset.b_offset.offset); + Status s = TruncateTo(found_offset); + if (!s.ok()) { + return s; + } *reply_offset = mem_logger_->last_offset(); return Status::OK(); } } - return Status::Corruption("hints not found"); + + Status s = TruncateTo(hints[0]); + if (!s.ok()) { + return s; + } + *reply_offset = mem_logger_->last_offset(); + return Status::OK(); } diff --git a/src/pika_partition.cc b/src/pika_partition.cc index 385f9b304f..df08e87c66 100644 --- a/src/pika_partition.cc +++ b/src/pika_partition.cc @@ -205,12 +205,12 @@ bool Partition::TryUpdateMasterOffset() { } std::string line, master_ip; int lineno = 0; - int64_t filenum = 0, offset = 0, tmp = 0, master_port = 0; + int64_t filenum = 0, offset = 0, term = 0, index = 0, tmp = 0, master_port = 0; while (std::getline(is, line)) { lineno++; if (lineno == 2) { master_ip = line; - } else if (lineno > 2 && lineno < 6) { + } else if (lineno > 2 && lineno < 8) { if (!slash::string2l(line.data(), line.size(), &tmp) || tmp < 0) { LOG(WARNING) << "Partition: " << partition_name_ << ", Format of info file after db sync error, line : " << line; @@ -220,9 +220,10 @@ bool Partition::TryUpdateMasterOffset() { } if (lineno == 3) { master_port = tmp; } else if (lineno == 4) { filenum = tmp; } - else { offset = tmp; } - - } else if (lineno > 5) { + else if (lineno == 5) { offset = tmp; } + else if (lineno == 6) { term = tmp; } + else if (lineno == 7) { index = tmp; } + } else if (lineno > 8) { LOG(WARNING) << "Partition: " << partition_name_ << ", Format of info file after db sync error, line : " << line; is.close(); @@ -236,7 +237,9 @@ bool Partition::TryUpdateMasterOffset() { << ", master_ip: " << master_ip << ", master_port: " << master_port << ", filenum: " << filenum - << ", offset: " << offset; + << ", offset: " << offset + << ", term: " << term + << ", index: " << index; // Sanity check if (master_ip != slave_partition->MasterIp() @@ -262,7 +265,12 @@ bool Partition::TryUpdateMasterOffset() { LOG(WARNING) << "Master Partition: " << partition_name_ << " not exist"; return false; } - master_partition->Logger()->SetProducerStatus(filenum, offset); + if (g_pika_conf->consensus_level() != 0) { + master_partition->ConsensusReset( + LogOffset(BinlogOffset(filenum, offset), LogicOffset(term, index))); + } else { + master_partition->Logger()->SetProducerStatus(filenum, offset); + } slave_partition->SetReplState(ReplState::kTryConnect); return true; } @@ -342,8 +350,12 @@ void Partition::DoBgSave(void* arg) { out << (time(NULL) - info.start_time) << "s\n" << g_pika_server->host() << "\n" << g_pika_server->port() << "\n" - << info.filenum << "\n" - << info.offset << "\n"; + << info.offset.b_offset.filenum << "\n" + << info.offset.b_offset.offset << "\n"; + if (g_pika_conf->consensus_level() != 0) { + out << info.offset.l_offset.term << "\n" + << info.offset.l_offset.index << "\n"; + } out.close(); } if (!success) { @@ -365,8 +377,8 @@ bool Partition::RunBgsaveEngine() { BgSaveInfo info = bgsave_info(); LOG(INFO) << partition_name_ << " bgsave_info: path=" << info.path - << ", filenum=" << info.filenum - << ", offset=" << info.offset; + << ", filenum=" << info.offset.b_offset.filenum + << ", offset=" << info.offset.b_offset.offset; // Backup to tmp dir rocksdb::Status s = bgsave_engine_->CreateNewBackup(info.path); @@ -419,9 +431,16 @@ bool Partition::InitBgsaveEngine() { { RWLock l(&db_rwlock_, true); + LogOffset bgsave_offset; + if (g_pika_conf->consensus_level() != 0) { + bgsave_offset = partition->ConsensusAppliedIndex(); + } else { + // term, index are 0 + partition->Logger()->GetProducerStatus(&(bgsave_offset.b_offset.filenum), &(bgsave_offset.b_offset.offset)); + } { slash::MutexLock l(&bgsave_protector_); - partition->Logger()->GetProducerStatus(&bgsave_info_.filenum, &bgsave_info_.offset); + bgsave_info_.offset = bgsave_offset; } s = bgsave_engine_->SetBackupContent(); if (!s.ok()) { diff --git a/src/pika_repl_bgworker.cc b/src/pika_repl_bgworker.cc index 87d5df7be4..e1bd733a6e 100644 --- a/src/pika_repl_bgworker.cc +++ b/src/pika_repl_bgworker.cc @@ -178,7 +178,7 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) { // Reply Ack to master immediately std::shared_ptr logger = partition->Logger(); logger->GetProducerStatus(&ack_end.b_offset.filenum, &ack_end.b_offset.offset, - &ack_end.l_offset.index, &ack_end.l_offset.term); + &ack_end.l_offset.term, &ack_end.l_offset.index); // keepalive case if (ack_start.b_offset == BinlogOffset()) { // set ack_end as 0 diff --git a/src/pika_repl_client.cc b/src/pika_repl_client.cc index cf30c116ac..9e747f66b2 100644 --- a/src/pika_repl_client.cc +++ b/src/pika_repl_client.cc @@ -206,6 +206,7 @@ Status PikaReplClient::SendPartitionTrySync(const std::string& ip, } LogOffset last_index = partition->ConsensusLastIndex(); uint32_t term = partition->ConsensusTerm(); + LOG(INFO) << PartitionInfo(table_name, partition_id).ToString() << " TrySync Increase self term from " << term << " to " << term + 1; term++; partition->ConsensusUpdateTerm(term); InnerMessage::ConsensusMeta* consensus_meta = request.mutable_consensus_meta(); diff --git a/src/pika_repl_client_conn.cc b/src/pika_repl_client_conn.cc index a422229783..32ce94408c 100644 --- a/src/pika_repl_client_conn.cc +++ b/src/pika_repl_client_conn.cc @@ -222,8 +222,12 @@ void PikaReplClientConn::HandleTrySyncResponse(void* arg) { return; } - bool success = TrySyncConsensusCheck(response->consensus_meta(), partition, slave_partition); - if (!success) { + if (response->consensus_meta().reject()) { + Status s = TrySyncConsensusCheck(response->consensus_meta(), partition, slave_partition); + if (!s.ok()) { + slave_partition->SetReplState(ReplState::kError); + LOG(WARNING) << "Consensus Check failed " << s.ToString(); + } delete task_arg; return; } @@ -252,13 +256,10 @@ void PikaReplClientConn::HandleTrySyncResponse(void* arg) { delete task_arg; } -bool PikaReplClientConn::TrySyncConsensusCheck( +Status PikaReplClientConn::TrySyncConsensusCheck( const InnerMessage::ConsensusMeta& consensus_meta, const std::shared_ptr& partition, const std::shared_ptr& slave_partition) { - if (consensus_meta.reject() == false) { - return true; - } std::vector hints; for (int i = 0; i < consensus_meta.hint_size(); ++i) { InnerMessage::BinlogOffset pb_offset = consensus_meta.hint(i); @@ -267,14 +268,15 @@ bool PikaReplClientConn::TrySyncConsensusCheck( offset.b_offset.offset = pb_offset.offset(); offset.l_offset.term = pb_offset.term(); offset.l_offset.index = pb_offset.index(); + hints.push_back(offset); } LogOffset reply_offset; Status s = partition->ConsensusFollowerNegotiate(hints, &reply_offset); if (!s.ok()) { - return false; + return s; } slave_partition->SetReplState(ReplState::kTryConnect); - return false; + return s; } diff --git a/src/pika_repl_server_conn.cc b/src/pika_repl_server_conn.cc index 89286e16ed..1e1f720711 100644 --- a/src/pika_repl_server_conn.cc +++ b/src/pika_repl_server_conn.cc @@ -113,15 +113,11 @@ void PikaReplServerConn::HandleTrySyncRequest(void* arg) { if (pre_success && req->has_consensus_meta()) { const InnerMessage::ConsensusMeta& meta = req->consensus_meta(); + // need to response to outdated pb, new follower count on this response to update term if (meta.term() > partition->ConsensusTerm()) { LOG(INFO) << "Update " << partition_name << " term from " << partition->ConsensusTerm() << " to " << meta.term(); partition->ConsensusUpdateTerm(meta.term()); - } else if (meta.term() < partition->ConsensusTerm()) /*outdated pb*/{ - LOG(WARNING) << "Drop outdated trysync req " << " partition: " << partition_name - << " recv term: " << meta.term() << " local term: " << partition->ConsensusTerm(); - delete task_arg; - return; } pre_success = TrySyncConsensusOffsetCheck(partition, req->consensus_meta(), &response, try_sync_response); } else if (pre_success) { @@ -206,12 +202,14 @@ bool PikaReplServerConn::TrySyncConsensusOffsetCheck( try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kSyncPointBePurged); return false; } else { + LOG(WARNING) << "Partition:" << partition_name << " error " << s.ToString(); try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kError); return false; } } try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kOk); - BuildConsensusMeta(reject, hints, response); + uint32_t term = partition->ConsensusTerm(); + BuildConsensusMeta(reject, hints, term, response); if (reject) { return false; } @@ -273,10 +271,12 @@ bool PikaReplServerConn::TrySyncOffsetCheck( void PikaReplServerConn::BuildConsensusMeta( const bool& reject, const std::vector& hints, + const uint32_t& term, InnerMessage::InnerResponse* response) { InnerMessage::ConsensusMeta* consensus_meta = response->mutable_consensus_meta(); + consensus_meta->set_term(term); consensus_meta->set_reject(reject); - if (reject) { + if (!reject) { return; } for (auto hint : hints) { diff --git a/src/pika_rm.cc b/src/pika_rm.cc index 135a076f6d..8ddba79b8a 100644 --- a/src/pika_rm.cc +++ b/src/pika_rm.cc @@ -211,6 +211,10 @@ Status SyncMasterPartition::ConsensusUpdateAppliedIndex(const LogOffset& offset) return Status::OK(); } +LogOffset SyncMasterPartition::ConsensusAppliedIndex() { + return coordinator_.applied_index(); +} + Status SyncMasterPartition::GetSlaveSyncBinlogInfo(const std::string& ip, int port, BinlogOffset* sent_offset, @@ -529,6 +533,10 @@ Status SyncMasterPartition::ConsensusFollowerNegotiate( return coordinator_.FollowerNegotiate(hints, reply_offset); } +Status SyncMasterPartition::ConsensusReset(LogOffset applied_offset) { + return coordinator_.Reset(applied_offset); +} + /* SyncSlavePartition */ SyncSlavePartition::SyncSlavePartition(const std::string& table_name, uint32_t partition_id) diff --git a/src/pika_server.cc b/src/pika_server.cc index 722699d292..46ef8e3d1a 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -700,8 +700,6 @@ void PikaServer::DeleteSlave(int fd) { ip = iter->ip; port = iter->port; is_find = true; - g_pika_rm->LostConnection(iter->ip, iter->port); - g_pika_rm->DropItemInWriteQueue(iter->ip, iter->port); LOG(INFO) << "Delete Slave Success, ip_port: " << iter->ip << ":" << iter->port; slaves_.erase(iter); break; @@ -1012,8 +1010,8 @@ void PikaServer::TryDBSync(const std::string& ip, int port, BgSaveInfo bgsave_info = partition->bgsave_info(); std::string logger_filename = sync_partition->Logger()->filename(); if (slash::IsDir(bgsave_info.path) != 0 - || !slash::FileExists(NewFileName(logger_filename, bgsave_info.filenum)) - || top - bgsave_info.filenum > kDBSyncMaxGap) { + || !slash::FileExists(NewFileName(logger_filename, bgsave_info.offset.b_offset.filenum)) + || top - bgsave_info.offset.b_offset.filenum > kDBSyncMaxGap) { // Need Bgsave first partition->BgSavePartition(); } @@ -1032,8 +1030,10 @@ void PikaServer::DbSyncSendFile(const std::string& ip, int port, BgSaveInfo bgsave_info = partition->bgsave_info(); std::string bg_path = bgsave_info.path; - uint32_t binlog_filenum = bgsave_info.filenum; - uint64_t binlog_offset = bgsave_info.offset; + uint32_t binlog_filenum = bgsave_info.offset.b_offset.filenum; + uint64_t binlog_offset = bgsave_info.offset.b_offset.offset; + uint32_t term = bgsave_info.offset.l_offset.term; + uint64_t index = bgsave_info.offset.l_offset.index; // Get all files need to send std::vector descendant; @@ -1116,6 +1116,9 @@ void PikaServer::DbSyncSendFile(const std::string& ip, int port, fix.open(fn, std::ios::in | std::ios::trunc); if (fix.is_open()) { fix << "0s\n" << lip << "\n" << port_ << "\n" << binlog_filenum << "\n" << binlog_offset << "\n"; + if (g_pika_conf->consensus_level() != 0) { + fix << term << "\n" << index << "\n"; + } fix.close(); } ret = slash::RsyncSendFile(fn, remote_path + "/" + kBgsaveInfoFile, secret_file_path, remote); diff --git a/src/pika_slave_node.cc b/src/pika_slave_node.cc index 82642d2711..5950df8838 100644 --- a/src/pika_slave_node.cc +++ b/src/pika_slave_node.cc @@ -93,10 +93,17 @@ Status SlaveNode::Update(const LogOffset& start, const LogOffset& end, LogOffset if (slave_state != kSlaveBinlogSync) { return Status::Corruption(ToString() + "state not BinlogSync"); } + *updated_offset = LogOffset(); bool res = sync_win.Update(SyncWinItem(start), SyncWinItem(end), updated_offset); if (!res) { return Status::Corruption("UpdateAckedInfo failed"); } + if (*updated_offset == LogOffset()) { + // nothing to update return current acked_offset + *updated_offset = acked_offset; + return Status::OK(); + } + // update acked_offset acked_offset = *updated_offset; return Status::OK(); } diff --git a/src/pika_stable_log.cc b/src/pika_stable_log.cc index 8a9ec3225f..64f92431e6 100644 --- a/src/pika_stable_log.cc +++ b/src/pika_stable_log.cc @@ -136,12 +136,15 @@ bool StableLog::PurgeFiles(uint32_t to, bool manual) { break; } } - if (!binlogs.empty()) { + if (delete_num) { + std::map binlogs; + if (!GetBinlogFiles(&binlogs)) { + LOG(WARNING) << log_path_ << " Could not get binlog files!"; + return false; + } + auto it = binlogs.begin(); if (it != binlogs.end()) { UpdateFirstOffset(it->first); - } else { - std::map::reverse_iterator it = binlogs.rbegin(); - UpdateFirstOffset(it->first); } } if (delete_num) { @@ -183,30 +186,55 @@ void StableLog::UpdateFirstOffset(uint32_t filenum) { } BinlogItem item; + BinlogOffset offset; while (1) { - BinlogOffset offset; std::string binlog; Status s = binlog_reader.Get(&binlog, &(offset.filenum), &(offset.offset)); if (s.IsEndFile()) { - break; + return; } if (!s.ok()) { LOG(WARNING) << "Binlog reader get failed"; return; } - BinlogItem item; if (!PikaBinlogTransverter::BinlogItemWithoutContentDecode(TypeFirst, binlog, &item)) { LOG(WARNING) << "Binlog item decode failed"; return; } + // exec_time == 0, could be padding binlog if (item.exec_time() != 0) { break; } } slash::RWLock l(&offset_rwlock_, true); - first_offset_.b_offset.filenum = item.filenum(); - first_offset_.b_offset.offset = item.offset(); + first_offset_.b_offset = offset; first_offset_.l_offset.term = item.term_id(); first_offset_.l_offset.index = item.logic_id(); } + +Status StableLog::PurgeFileAfter(uint32_t filenum) { + std::map binlogs; + bool res = GetBinlogFiles(&binlogs); + if (!res) { + return Status::Corruption("GetBinlogFiles failed"); + } + for (auto& it : binlogs) { + if (it.first > filenum) { + // Do delete + Status s = slash::DeleteFile(log_path_ + it.second); + if (!s.ok()) { + return s; + } + } + } + return Status::OK(); +} + +Status StableLog::TruncateTo(uint32_t filenum, uint64_t offset) { + Status s = PurgeFileAfter(filenum); + if (!s.ok()) { + return s; + } + return stable_logger_->Truncate(filenum, offset); +}