diff --git a/include/pika_client_conn.h b/include/pika_client_conn.h index eb3b8b4909..8343ac60da 100644 --- a/include/pika_client_conn.h +++ b/include/pika_client_conn.h @@ -15,6 +15,9 @@ class PikaClientConn: public pink::RedisConn { std::shared_ptr conn_ptr; std::vector redis_cmds; std::shared_ptr resp_ptr; + LogOffset offset; + std::string table_name; + uint32_t partition_id; }; // Auth related @@ -47,7 +50,6 @@ class PikaClientConn: public pink::RedisConn { } static void DoBackgroundTask(void* arg); static void DoExecTask(void* arg); - static void DoStaleTask(void* arg); bool IsPubSub() { return is_pubsub_; } void SetIsPubSub(bool is_pubsub) { is_pubsub_ = is_pubsub; } diff --git a/include/pika_consensus.h b/include/pika_consensus.h index 5a225fda63..8378992979 100644 --- a/include/pika_consensus.h +++ b/include/pika_consensus.h @@ -10,6 +10,38 @@ #include "include/pika_slave_node.h" #include "include/pika_stable_log.h" #include "include/pika_binlog_transverter.h" +#include "slash/include/env.h" + +class Context { + public: + Context(const std::string path); + ~Context(); + + Status Init(); + // RWLock should be held when access members. + Status StableSave(); + void PrepareUpdateAppliedIndex(const LogOffset& offset); + void UpdateAppliedIndex(const LogOffset& offset); + + pthread_rwlock_t rwlock_; + LogOffset applied_index_; + SyncWindow applied_win_; + + std::string ToString() { + std::stringstream tmp_stream; + slash::RWLock l(&rwlock_, false); + tmp_stream << " Applied_index " << applied_index_.ToString() << "\r\n"; + tmp_stream << " Applied window " << applied_win_.ToStringStatus(); + return tmp_stream.str(); + } + + private: + std::string path_; + slash::RWFile *save_; + // No copying allowed; + Context(const Context&); + void operator=(const Context&); +}; class SyncProgress { public: @@ -59,10 +91,15 @@ class MemLog { } Status PurdgeLogs(const LogOffset& offset, std::vector* logs); Status GetRangeLogs(int start, int end, std::vector* logs); - LogOffset LastOffset() { + + LogOffset last_offset() { slash::MutexLock l_logs(&logs_mu_); return last_offset_; } + void SetLastOffset(const LogOffset& offset) { + slash::MutexLock l_logs(&logs_mu_); + last_offset_ = offset; + } private: int InternalFindLogIndex(const LogOffset& offset); @@ -75,6 +112,8 @@ class ConsensusCoordinator { public: ConsensusCoordinator(const std::string& table_name, uint32_t partition_id); ~ConsensusCoordinator(); + // since it is invoked in constructor all locks not hold + void Init(); Status ProposeLog( std::shared_ptr cmd_ptr, @@ -106,6 +145,34 @@ class ConsensusCoordinator { return committed_index_; } + std::shared_ptr context() { + return context_; + } + + // redis parser cb + struct CmdPtrArg { + CmdPtrArg(std::shared_ptr ptr) : cmd_ptr(ptr) { + } + std::shared_ptr cmd_ptr; + }; + static int InitCmd(pink::RedisParser* parser, const pink::RedisCmdArgsType& argv); + + std::string ToStringStatus() { + std::stringstream tmp_stream; + { + slash::MutexLock l(&index_mu_); + tmp_stream << " Committed_index: " << committed_index_.ToString() << "\r\n"; + } + tmp_stream << " Contex: " << "\r\n" << context_->ToString(); + { + slash::RWLock l(&term_rwlock_, false); + tmp_stream << " Term: " << term_ << "\r\n"; + } + tmp_stream << " Mem_logger size: " << mem_logger_->Size() << + " last offset " << mem_logger_->last_offset().ToString() << "\r\n"; + return tmp_stream.str(); + } + private: Status ScheduleApplyLog(const LogOffset& committed_index); Status ScheduleApplyFollowerLog(const LogOffset& committed_index); @@ -125,7 +192,8 @@ class ConsensusCoordinator { slash::Mutex index_mu_; LogOffset committed_index_; - // LogOffset applied_index_; + + std::shared_ptr context_; pthread_rwlock_t term_rwlock_; uint32_t term_; diff --git a/include/pika_define.h b/include/pika_define.h index 5195e6f142..4673fd2b64 100644 --- a/include/pika_define.h +++ b/include/pika_define.h @@ -463,6 +463,7 @@ const size_t kBinlogPrefixLen = 10; const std::string kPikaMeta = "meta"; const std::string kManifest = "manifest"; +const std::string kContext = "context"; /* * define common character diff --git a/include/pika_repl_client.h b/include/pika_repl_client.h index bcdd03c5ea..817dfdd390 100644 --- a/include/pika_repl_client.h +++ b/include/pika_repl_client.h @@ -49,12 +49,15 @@ struct ReplClientWriteBinlogTaskArg { struct ReplClientWriteDBTaskArg { const std::shared_ptr cmd_ptr; + LogOffset offset; std::string table_name; uint32_t partition_id; ReplClientWriteDBTaskArg(const std::shared_ptr _cmd_ptr, + const LogOffset _offset, const std::string _table_name, uint32_t _partition_id) : cmd_ptr(_cmd_ptr), + offset(_offset), table_name(_table_name), partition_id(_partition_id) {} ~ReplClientWriteDBTaskArg() { } @@ -77,7 +80,7 @@ class PikaReplClient { const std::shared_ptr res, std::shared_ptr conn, void* req_private_data); - void ScheduleWriteDBTask(const std::shared_ptr cmd_ptr, + void ScheduleWriteDBTask(const std::shared_ptr cmd_ptr, const LogOffset& offset, const std::string& table_name, uint32_t partition_id); Status SendMetaSync(); diff --git a/include/pika_rm.h b/include/pika_rm.h index 7258166e24..9041e5115c 100644 --- a/include/pika_rm.h +++ b/include/pika_rm.h @@ -99,6 +99,7 @@ class SyncMasterPartition : public SyncPartition { Status ConsensusProcessLeaderLog(std::shared_ptr cmd_ptr, const BinlogItem& attribute); Status ConsensusProcessLocalUpdate(const LogOffset& leader_commit); LogOffset ConsensusCommittedIndex(); + Status ConsensusUpdateAppliedIndex(const LogOffset& offset); std::shared_ptr StableLogger() { return coordinator_.StableLogger(); @@ -236,7 +237,7 @@ class PikaReplicaManager { void ScheduleWriteBinlogTask(const std::string& table_partition, const std::shared_ptr res, std::shared_ptr conn, void* res_private_data); - void ScheduleWriteDBTask(const std::shared_ptr cmd_ptr, + void ScheduleWriteDBTask(const std::shared_ptr cmd_ptr, const LogOffset& offset, const std::string& table_name, uint32_t partition_id); void ReplServerRemoveClientConn(int fd); diff --git a/include/pika_slave_node.h b/include/pika_slave_node.h index 960f1f2891..6a24ed8e0b 100644 --- a/include/pika_slave_node.h +++ b/include/pika_slave_node.h @@ -29,7 +29,6 @@ struct SyncWinItem { } }; - class SyncWindow { public: SyncWindow() :total_size_(0) { diff --git a/src/pika.cc b/src/pika.cc index 0408752dd9..69404765c3 100644 --- a/src/pika.cc +++ b/src/pika.cc @@ -185,9 +185,9 @@ int main(int argc, char *argv[]) { PikaSignalSetup(); LOG(INFO) << "Server at: " << path; + g_pika_cmd_table_manager = new PikaCmdTableManager(); g_pika_server = new PikaServer(); g_pika_rm = new PikaReplicaManager(); - g_pika_cmd_table_manager = new PikaCmdTableManager(); if (g_pika_conf->daemonize()) { close_std(); diff --git a/src/pika_binlog_reader.cc b/src/pika_binlog_reader.cc index 9ce5c56934..ea73464793 100644 --- a/src/pika_binlog_reader.cc +++ b/src/pika_binlog_reader.cc @@ -54,10 +54,12 @@ bool PikaBinlogReader::ReadToTheEnd() { int PikaBinlogReader::Seek(std::shared_ptr logger, uint32_t filenum, uint64_t offset) { std::string confile = NewFileName(logger->filename(), filenum); if (!slash::FileExists(confile)) { + LOG(WARNING) << confile << " not exits"; return -1; } slash::SequentialFile* readfile; if (!slash::NewSequentialFile(confile, &readfile).ok()) { + LOG(WARNING) << "New swquential " << confile << " failed"; return -1; } if (queue_) { diff --git a/src/pika_client_conn.cc b/src/pika_client_conn.cc index e5fd8e4cd6..250c72237a 100644 --- a/src/pika_client_conn.cc +++ b/src/pika_client_conn.cc @@ -205,25 +205,34 @@ void PikaClientConn::DoBackgroundTask(void* arg) { void PikaClientConn::DoExecTask(void* arg) { BgTaskArg* bg_arg = reinterpret_cast(arg); - std::shared_ptr cmd_ptr = bg_arg->cmd_ptr;; + std::shared_ptr cmd_ptr = bg_arg->cmd_ptr; std::shared_ptr conn_ptr = bg_arg->conn_ptr; std::shared_ptr resp_ptr = bg_arg->resp_ptr; + LogOffset offset = bg_arg->offset; + std::string table_name = bg_arg->table_name; + uint32_t partition_id = bg_arg->partition_id; + uint64_t start_us = 0; + if (g_pika_conf->slowlog_slower_than() >= 0) { + start_us = slash::NowMicros(); + } + cmd_ptr->SetStage(Cmd::kExecuteStage); cmd_ptr->Execute(); - conn_ptr->resp_num--; - *resp_ptr = std::move(cmd_ptr->res().message()); - conn_ptr->TryWriteResp(); -} + if (g_pika_conf->slowlog_slower_than() >= 0) { + conn_ptr->ProcessSlowlog(cmd_ptr->argv(), start_us); + } -// do the same thing as DoExecTask for now -// maybe not write response -void PikaClientConn::DoStaleTask(void* arg) { - BgTaskArg* bg_arg = reinterpret_cast(arg); - std::shared_ptr cmd_ptr = bg_arg->cmd_ptr;; - std::shared_ptr conn_ptr = bg_arg->conn_ptr; - std::shared_ptr resp_ptr = bg_arg->resp_ptr; + std::shared_ptr partition = + g_pika_rm->GetSyncMasterPartitionByName(PartitionInfo(table_name, partition_id)); + if (partition == nullptr) { + LOG(WARNING) << "Sync Master Partition not exist " << table_name << partition_id; + return; + } + partition->ConsensusUpdateAppliedIndex(offset); - cmd_ptr->Execute(); + if (conn_ptr == nullptr || resp_ptr == nullptr) { + return; + } conn_ptr->resp_num--; *resp_ptr = std::move(cmd_ptr->res().message()); conn_ptr->TryWriteResp(); diff --git a/src/pika_consensus.cc b/src/pika_consensus.cc index 9abb340ddb..dee92b8fde 100644 --- a/src/pika_consensus.cc +++ b/src/pika_consensus.cc @@ -9,10 +9,74 @@ #include "include/pika_server.h" #include "include/pika_client_conn.h" #include "include/pika_rm.h" +#include "include/pika_cmd_table_manager.h" extern PikaServer* g_pika_server; extern PikaConf* g_pika_conf; extern PikaReplicaManager* g_pika_rm; +extern PikaCmdTableManager* g_pika_cmd_table_manager; + +/* Context */ + +Context::Context(const std::string path) : applied_index_(), path_(path), save_(NULL) { + pthread_rwlock_init(&rwlock_, NULL); +} + +Context::~Context() { + pthread_rwlock_destroy(&rwlock_); + delete save_; +} + +Status Context::StableSave() { + char *p = save_->GetData(); + memcpy(p, &(applied_index_.b_offset.filenum), sizeof(uint32_t)); + p += 4; + memcpy(p, &(applied_index_.b_offset.offset), sizeof(uint64_t)); + p += 8; + memcpy(p, &(applied_index_.l_offset.term), sizeof(uint32_t)); + p += 4; + memcpy(p, &(applied_index_.l_offset.index), sizeof(uint64_t)); + return Status::OK(); +} + +Status Context::Init() { + if (!slash::FileExists(path_)) { + Status s = slash::NewRWFile(path_, &save_); + if (!s.ok()) { + LOG(FATAL) << "Context new file failed " << s.ToString(); + } + StableSave(); + } else { + Status s = slash::NewRWFile(path_, &save_); + if (!s.ok()) { + LOG(FATAL) << "Context new file failed " << s.ToString(); + } + } + if (save_->GetData() != NULL) { + memcpy((char*)(&(applied_index_.b_offset.filenum)), save_->GetData(), sizeof(uint32_t)); + memcpy((char*)(&(applied_index_.b_offset.offset)), save_->GetData() + 4, sizeof(uint64_t)); + memcpy((char*)(&(applied_index_.l_offset.term)), save_->GetData() + 12, sizeof(uint32_t)); + memcpy((char*)(&(applied_index_.l_offset.index)), save_->GetData() + 16, sizeof(uint64_t)); + return Status::OK(); + } else { + return Status::Corruption("Context init error"); + } +} + +void Context::PrepareUpdateAppliedIndex(const LogOffset& offset) { + slash::RWLock l(&rwlock_, true); + applied_win_.Push(SyncWinItem(offset)); +} + +void Context::UpdateAppliedIndex(const LogOffset& offset) { + slash::RWLock l(&rwlock_, true); + LogOffset cur_offset; + applied_win_.Update(SyncWinItem(offset), SyncWinItem(offset), &cur_offset); + if (cur_offset > applied_index_) { + applied_index_ = cur_offset; + StableSave(); + } +} /* SyncProgress */ @@ -166,15 +230,77 @@ ConsensusCoordinator::ConsensusCoordinator(const std::string& table_name, uint32 std::string table_log_path = g_pika_conf->log_path() + "log_" + table_name + "/"; std::string log_path = g_pika_conf->classic_mode() ? table_log_path : table_log_path + std::to_string(partition_id) + "/"; + context_ = std::make_shared(table_log_path + kContext); stable_logger_ = std::make_shared(table_name, partition_id, log_path); mem_logger_ = std::make_shared(); pthread_rwlock_init(&term_rwlock_, NULL); + if (g_pika_conf->consensus_level() != 0) { + Init(); + } } ConsensusCoordinator::~ConsensusCoordinator() { pthread_rwlock_destroy(&term_rwlock_); } +// since it is invoked in constructor all locks not hold +void ConsensusCoordinator::Init() { + // load committed_index_ & applied_index + context_->Init(); + committed_index_ = context_->applied_index_; + + LOG(INFO) << "Restore applied index " << context_->applied_index_.ToString(); + if (committed_index_ == LogOffset()) { + return; + } + // load mem_logger_ + mem_logger_->SetLastOffset(committed_index_); + pink::RedisParserSettings settings; + settings.DealMessage = &(ConsensusCoordinator::InitCmd); + pink::RedisParser redis_parser; + redis_parser.RedisParserInit(REDIS_PARSER_REQUEST, settings); + std::shared_ptr binlog_reader = std::make_shared(); + int res = binlog_reader->Seek(stable_logger_->Logger(), + committed_index_.b_offset.filenum, committed_index_.b_offset.offset); + if (res) { + LOG(FATAL) << "Binlog reader init failed"; + } + + while(1) { + LogOffset offset; + std::string binlog; + Status s = binlog_reader->Get(&binlog, &(offset.b_offset.filenum), &(offset.b_offset.offset)); + if (s.IsEndFile()) { + break; + } else if (s.IsCorruption() || s.IsIOError()) { + LOG(FATAL) << "Read Binlog error"; + } + BinlogItem item; + if (!PikaBinlogTransverter::BinlogItemWithoutContentDecode(TypeFirst, binlog, &item)) { + LOG(FATAL) << "Binlog item decode failed"; + } + offset.l_offset.term = item.term_id(); + offset.l_offset.index = item.logic_id(); + + redis_parser.data = static_cast(&table_name_); + const char* redis_parser_start = binlog.data() + BINLOG_ENCODE_LEN; + int redis_parser_len = static_cast(binlog.size()) - BINLOG_ENCODE_LEN; + int processed_len = 0; + pink::RedisParserStatus ret = redis_parser.ProcessInputBuffer( + redis_parser_start, redis_parser_len, &processed_len); + if (ret != pink::kRedisParserDone) { + LOG(FATAL) << "Redis parser parse failed"; + return; + } + CmdPtrArg* arg = static_cast(redis_parser.data); + std::shared_ptr cmd_ptr = arg->cmd_ptr; + delete arg; + redis_parser.data = NULL; + + mem_logger_->AppendLog(MemLog::LogItem(offset, cmd_ptr, nullptr, nullptr)); + } +} + Status ConsensusCoordinator::ProposeLog( std::shared_ptr cmd_ptr, std::shared_ptr conn_ptr, @@ -223,7 +349,14 @@ Status ConsensusCoordinator::InternalAppendLog(const BinlogItem& item, return Status::OK(); } +// precheck if prev_offset match && drop this log if this log exist Status ConsensusCoordinator::ProcessLeaderLog(std::shared_ptr cmd_ptr, const BinlogItem& attribute) { + LogOffset last_index = mem_logger_->last_offset(); + if (attribute.logic_id() < last_index.l_offset.index) { + LOG(WARNING) << "Drop log from leader logic_id " << attribute.logic_id() << " cur last index " << last_index.l_offset.index; + return Status::OK(); + } + stable_logger_->Logger()->Lock(); Status s = InternalAppendLog(attribute, cmd_ptr, nullptr, nullptr); stable_logger_->Logger()->Unlock(); @@ -237,10 +370,12 @@ Status ConsensusCoordinator::ProcessLeaderLog(std::shared_ptr cmd_ptr, cons } Status ConsensusCoordinator::ProcessLocalUpdate(const LogOffset& leader_commit) { - LogOffset last_index = mem_logger_->LastOffset(); - LogOffset committed_index = last_index < leader_commit ? last_index : leader_commit; + if (g_pika_conf->consensus_level() == 0) { + return Status::OK(); + } - LOG(WARNING) << "last_index " << last_index.ToString() << " leader_commit " << leader_commit.ToString() << " committed_index " << committed_index.ToString(); + LogOffset last_index = mem_logger_->last_offset(); + LogOffset committed_index = last_index < leader_commit ? last_index : leader_commit; LogOffset updated_committed_index; bool need_update = false; @@ -248,7 +383,6 @@ Status ConsensusCoordinator::ProcessLocalUpdate(const LogOffset& leader_commit) slash::MutexLock l(&index_mu_); need_update = InternalUpdateCommittedIndex(committed_index, &updated_committed_index); } - LOG(WARNING) << "need update " << need_update << " updated_committed_index " << updated_committed_index.ToString(); if (need_update) { Status s = ScheduleApplyFollowerLog(updated_committed_index); if (!s.ok()) { @@ -319,9 +453,10 @@ Status ConsensusCoordinator::ScheduleApplyLog(const LogOffset& committed_index) std::vector logs; Status s = mem_logger_->PurdgeLogs(committed_index, &logs); if (!s.ok()) { - return Status::NotFound("committed index not found in log"); + return Status::NotFound("committed index not found " + committed_index.ToString()); } for (auto log : logs) { + context_->PrepareUpdateAppliedIndex(log.offset); InternalApply(log); } return Status::OK(); @@ -331,9 +466,10 @@ Status ConsensusCoordinator::ScheduleApplyFollowerLog(const LogOffset& committed std::vector logs; Status s = mem_logger_->PurdgeLogs(committed_index, &logs); if (!s.ok()) { - return Status::NotFound("committed index not found in log"); + return Status::NotFound("committed index not found " + committed_index.ToString()); } for (auto log : logs) { + context_->PrepareUpdateAppliedIndex(log.offset); InternalApplyFollower(log); } return Status::OK(); @@ -379,10 +515,31 @@ void ConsensusCoordinator::InternalApply(const MemLog::LogItem& log) { arg->cmd_ptr = log.cmd_ptr; arg->conn_ptr = log.conn_ptr; arg->resp_ptr = log.resp_ptr; + arg->offset = log.offset; + arg->table_name = table_name_; + arg->partition_id = partition_id_; g_pika_server->ScheduleClientBgThreads( PikaClientConn::DoExecTask, arg, log.cmd_ptr->current_key().front()); } void ConsensusCoordinator::InternalApplyFollower(const MemLog::LogItem& log) { - g_pika_rm->ScheduleWriteDBTask(log.cmd_ptr, table_name_, partition_id_); + g_pika_rm->ScheduleWriteDBTask(log.cmd_ptr, log.offset, table_name_, partition_id_); +} + +int ConsensusCoordinator::InitCmd(pink::RedisParser* parser, const pink::RedisCmdArgsType& argv) { + std::string* table_name = static_cast(parser->data); + std::string opt = argv[0]; + std::shared_ptr c_ptr = g_pika_cmd_table_manager->GetCmd(slash::StringToLower(opt)); + if (!c_ptr) { + LOG(WARNING) << "Command " << opt << " not in the command table"; + return -1; + } + // Initial + c_ptr->Initial(argv, *table_name); + if (!c_ptr->res().ok()) { + LOG(WARNING) << "Fail to initial command from binlog: " << opt; + return -1; + } + parser->data = static_cast(new CmdPtrArg(c_ptr)); + return 0; } diff --git a/src/pika_repl_bgworker.cc b/src/pika_repl_bgworker.cc index 13a116e9e2..bb1bcbfc95 100644 --- a/src/pika_repl_bgworker.cc +++ b/src/pika_repl_bgworker.cc @@ -218,6 +218,7 @@ void PikaReplBgWorker::HandleBGWorkerWriteDB(void* arg) { ReplClientWriteDBTaskArg* task_arg = static_cast(arg); const std::shared_ptr c_ptr = task_arg->cmd_ptr; const PikaCmdArgsType& argv = c_ptr->argv(); + LogOffset offset = task_arg->offset; std::string table_name = task_arg->table_name; uint32_t partition_id = task_arg->partition_id; @@ -247,6 +248,16 @@ void PikaReplBgWorker::HandleBGWorkerWriteDB(void* arg) { } } } + delete task_arg; -} + if (g_pika_conf->consensus_level() != 0) { + std::shared_ptr partition = + g_pika_rm->GetSyncMasterPartitionByName(PartitionInfo(table_name, partition_id)); + if (partition == nullptr) { + LOG(WARNING) << "Sync Master Partition not exist " << table_name << partition_id; + return; + } + partition->ConsensusUpdateAppliedIndex(offset); + } +} diff --git a/src/pika_repl_client.cc b/src/pika_repl_client.cc index e96827eb49..7d22000539 100644 --- a/src/pika_repl_client.cc +++ b/src/pika_repl_client.cc @@ -76,12 +76,12 @@ void PikaReplClient::ScheduleWriteBinlogTask(std::string table_partition, } void PikaReplClient::ScheduleWriteDBTask(const std::shared_ptr cmd_ptr, - const std::string& table_name, uint32_t partition_id) { + const LogOffset& offset, const std::string& table_name, uint32_t partition_id) { const PikaCmdArgsType& argv = cmd_ptr->argv(); std::string dispatch_key = argv.size() >= 2 ? argv[1] : argv[0]; size_t index = GetHashIndex(dispatch_key, false); ReplClientWriteDBTaskArg* task_arg = - new ReplClientWriteDBTaskArg(cmd_ptr, table_name, partition_id); + new ReplClientWriteDBTaskArg(cmd_ptr, offset, table_name, partition_id); bg_workers_[index]->Schedule(&PikaReplBgWorker::HandleBGWorkerWriteDB, static_cast(task_arg)); } diff --git a/src/pika_rm.cc b/src/pika_rm.cc index 82baca8c6f..3bad040ac7 100644 --- a/src/pika_rm.cc +++ b/src/pika_rm.cc @@ -147,6 +147,9 @@ Status SyncMasterPartition::ActivateSlaveDbSync(const std::string& ip, int port) Status SyncMasterPartition::ReadBinlogFileToWq(const std::shared_ptr& slave_ptr) { int cnt = slave_ptr->sync_win.Remainings(); std::shared_ptr reader = slave_ptr->binlog_reader; + if (reader == nullptr) { + return Status::OK(); + } std::vector tasks; for (int i = 0; i < cnt; ++i) { std::string msg; @@ -198,6 +201,16 @@ Status SyncMasterPartition::ConsensusUpdateSlave(const std::string& ip, int port return Status::OK(); } +Status SyncMasterPartition::ConsensusUpdateAppliedIndex(const LogOffset& offset) { + std::shared_ptr context = coordinator_.context(); + if (context == nullptr) { + LOG(WARNING) << "Coordinator context empty."; + return Status::NotFound("context"); + } + context->UpdateAppliedIndex(offset); + return Status::OK(); +} + Status SyncMasterPartition::GetSlaveSyncBinlogInfo(const std::string& ip, int port, BinlogOffset* sent_offset, @@ -380,9 +393,7 @@ Status SyncMasterPartition::CheckSyncTimeout(uint64_t now) { std::string SyncMasterPartition::ToStringStatus() { std::stringstream tmp_stream; tmp_stream << " Current Master Session: " << session_id_ << "\r\n"; - tmp_stream << " ConsensusLogs size: " << - (coordinator_.MemLogger() == nullptr ? 0 : coordinator_.MemLogger()->Size()) << "\r\n"; - + tmp_stream << " Consensus: " << "\r\n" << coordinator_.ToStringStatus(); std::unordered_map> slaves = GetAllSlaveNodes(); int i = 0; for (auto slave_iter : slaves) { @@ -745,8 +756,8 @@ void PikaReplicaManager::ScheduleWriteBinlogTask(const std::string& table_partit } void PikaReplicaManager::ScheduleWriteDBTask(const std::shared_ptr cmd_ptr, - const std::string& table_name, uint32_t partition_id) { - pika_repl_client_->ScheduleWriteDBTask(cmd_ptr, table_name, partition_id); + const LogOffset& offset, const std::string& table_name, uint32_t partition_id) { + pika_repl_client_->ScheduleWriteDBTask(cmd_ptr, offset, table_name, partition_id); } void PikaReplicaManager::ReplServerRemoveClientConn(int fd) {