Skip to content

Commit

Permalink
Append dummyCmd when switch master (OpenAtomFoundation#863)
Browse files Browse the repository at this point in the history
* Append dummyCmd when switch master
Co-authored-by: 刘军 <[email protected]>
  • Loading branch information
pokeriface authored and whoiami committed Mar 20, 2020
1 parent de21a23 commit 0f3d24c
Show file tree
Hide file tree
Showing 13 changed files with 72 additions and 27 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ dump/
gdb.txt
tags

# IDE
.vscode

make_config.mk
src/*.d
src/build_version.cc
4 changes: 3 additions & 1 deletion include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,9 @@ class PKPatternMatchDelCmd : public Cmd {

class DummyCmd : public Cmd {
public:
DummyCmd() : Cmd("DummyCmd", 0, 0) {}
DummyCmd() : Cmd("", 0, 0) {}
DummyCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag) {}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual Cmd* Clone() override {
return new DummyCmd(*this);
Expand Down
1 change: 1 addition & 0 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const std::string kCmdNamePadding = "padding";
const std::string kCmdNameTcmalloc = "tcmalloc";
#endif
const std::string kCmdNamePKPatternMatchDel = "pkpatternmatchdel";
const std::string kCmdDummy = "dummy";

//Kv
const std::string kCmdNameSet = "set";
Expand Down
4 changes: 2 additions & 2 deletions include/pika_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class MemLog {
logs_.push_back(item);
last_offset_ = item.offset;
}
Status PurdgeLogs(const LogOffset& offset, std::vector<LogItem>* logs);
Status PurgeLogs(const LogOffset& offset, std::vector<LogItem>* logs);
Status GetRangeLogs(int start, int end, std::vector<LogItem>* logs);
Status TruncateTo(const LogOffset& offset);

Expand Down Expand Up @@ -182,7 +182,7 @@ class ConsensusCoordinator {
slash::MutexLock l(&index_mu_);
tmp_stream << " Committed_index: " << committed_index_.ToString() << "\r\n";
}
tmp_stream << " Contex: " << "\r\n" << context_->ToString();
tmp_stream << " Context: " << "\r\n" << context_->ToString();
{
slash::RWLock l(&term_rwlock_, false);
tmp_stream << " Term: " << term_ << "\r\n";
Expand Down
20 changes: 20 additions & 0 deletions include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,20 @@ struct BinlogOffset {
}
return false;
}
bool operator<=(const BinlogOffset& other) const {
if (filenum < other.filenum
|| (filenum == other.filenum && offset <= other.offset)) {
return true;
}
return false;
}
bool operator>=(const BinlogOffset& other) const {
if (filenum > other.filenum
|| (filenum == other.filenum && offset >= other.offset)) {
return true;
}
return false;
}
};

struct LogOffset {
Expand All @@ -179,6 +193,12 @@ struct LogOffset {
bool operator==(const LogOffset& other) const {
return b_offset == other.b_offset;
}
bool operator<=(const LogOffset& other) const {
return b_offset <= other.b_offset;
}
bool operator>=(const LogOffset& other) const {
return b_offset >= other.b_offset;
}
bool operator>(const LogOffset& other) const {
return b_offset > other.b_offset;
}
Expand Down
1 change: 1 addition & 0 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ class SyncMasterPartition : public SyncPartition {
Status ConsensusFollowerNegotiate(
const std::vector<LogOffset>& hints, LogOffset* reply_offset);
Status ConsensusReset(LogOffset applied_offset);
void CommitPreviousLogs(const uint32_t& term);

std::shared_ptr<StableLog> StableLogger() {
return coordinator_.StableLogger();
Expand Down
4 changes: 2 additions & 2 deletions include/pika_slave_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class SyncWindow {
}
void Push(const SyncWinItem& item);
bool Update(const SyncWinItem& start_item, const SyncWinItem& end_item, LogOffset* acked_offset);
int Remainings();
int Remaining();
std::string ToStringStatus() const {
if (win_.empty()) {
return " Size: " + std::to_string(win_.size()) + "\r\n";
Expand All @@ -47,7 +47,7 @@ class SyncWindow {
return res;
}
}
std::size_t GetTotalBinglogSize() {
std::size_t GetTotalBinlogSize() {
return total_size_;
}
void Reset() {
Expand Down
7 changes: 5 additions & 2 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ void InitCmdTable(std::unordered_map<std::string, Cmd*> *cmd_table) {
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNamePadding, paddingptr));
Cmd* pkpatternmatchdelptr = new PKPatternMatchDelCmd(kCmdNamePKPatternMatchDel, 3, kCmdFlagsWrite | kCmdFlagsAdmin);
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNamePKPatternMatchDel, pkpatternmatchdelptr));
Cmd* dummyptr = new DummyCmd(kCmdDummy, 0, kCmdFlagsWrite | kCmdFlagsSinglePartition);
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdDummy, dummyptr));

// Slots related
Cmd* slotsinfoptr = new SlotsInfoCmd(kCmdNameSlotsInfo, -1, kCmdFlagsRead | kCmdFlagsAdmin);
Expand Down Expand Up @@ -695,7 +697,8 @@ void Cmd::DoBinlog(std::shared_ptr<SyncMasterPartition> partition) {
&& g_pika_conf->write_binlog()) {
std::shared_ptr<pink::PinkConn> conn_ptr = GetConn();
std::shared_ptr<std::string> resp_ptr = GetResp();
if (!conn_ptr || !resp_ptr) {
// Consider that dummy cmd appended by system, both conn and resp are null.
/* if (!conn_ptr || !resp_ptr) {
if (!conn_ptr) {
LOG(WARNING) << partition->SyncPartitionInfo().ToString() << " conn empty.";
}
Expand All @@ -704,7 +707,7 @@ void Cmd::DoBinlog(std::shared_ptr<SyncMasterPartition> partition) {
}
res().SetRes(CmdRes::kErrOther);
return;
}
} */

Status s = partition->ConsensusProposeLog(shared_from_this(),
std::dynamic_pointer_cast<PikaClientConn>(conn_ptr), resp_ptr);
Expand Down
15 changes: 7 additions & 8 deletions src/pika_consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ int MemLog::Size() {
return static_cast<int>(logs_.size());
}

// purdge [begin, offset]
Status MemLog::PurdgeLogs(const LogOffset& offset, std::vector<LogItem>* logs) {
// purge [begin, offset]
Status MemLog::PurgeLogs(const LogOffset& offset, std::vector<LogItem>* logs) {
slash::MutexLock l_logs(&logs_mu_);
int index = InternalFindLogIndex(offset);
if (index < 0) {
Expand Down Expand Up @@ -497,8 +497,7 @@ Status ConsensusCoordinator::UpdateSlave(const std::string& ip, int port,
}

bool ConsensusCoordinator::InternalUpdateCommittedIndex(const LogOffset& slave_committed_index, LogOffset* updated_committed_index) {
if (slave_committed_index < committed_index_ ||
slave_committed_index == committed_index_) {
if (slave_committed_index <= committed_index_) {
return false;
}
committed_index_ = slave_committed_index;
Expand Down Expand Up @@ -526,10 +525,10 @@ Status ConsensusCoordinator::InternalAppendBinlog(const BinlogItem& item,
}

Status ConsensusCoordinator::ScheduleApplyLog(const LogOffset& committed_index) {
// logs from PurdgeLogs goes to InternalApply in order
// logs from PurgeLogs goes to InternalApply in order
slash::MutexLock l(&order_mu_);
std::vector<MemLog::LogItem> logs;
Status s = mem_logger_->PurdgeLogs(committed_index, &logs);
Status s = mem_logger_->PurgeLogs(committed_index, &logs);
if (!s.ok()) {
return Status::NotFound("committed index not found " + committed_index.ToString());
}
Expand All @@ -541,10 +540,10 @@ Status ConsensusCoordinator::ScheduleApplyLog(const LogOffset& committed_index)
}

Status ConsensusCoordinator::ScheduleApplyFollowerLog(const LogOffset& committed_index) {
// logs from PurdgeLogs goes to InternalApply in order
// logs from PurgeLogs goes to InternalApply in order
slash::MutexLock l(&order_mu_);
std::vector<MemLog::LogItem> logs;
Status s = mem_logger_->PurdgeLogs(committed_index, &logs);
Status s = mem_logger_->PurgeLogs(committed_index, &logs);
if (!s.ok()) {
return Status::NotFound("committed index not found " + committed_index.ToString());
}
Expand Down
4 changes: 2 additions & 2 deletions src/pika_repl_bgworker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,9 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) {
}

int PikaReplBgWorker::HandleWriteBinlog(pink::RedisParser* parser, const pink::RedisCmdArgsType& argv) {
std::string opt = argv[0];
PikaReplBgWorker* worker = static_cast<PikaReplBgWorker*>(parser->data);
g_pika_server->UpdateQueryNumAndExecCountTable(argv[0]);
g_pika_server->UpdateQueryNumAndExecCountTable(opt);

// Monitor related
std::string monitor_message;
Expand All @@ -248,7 +249,6 @@ int PikaReplBgWorker::HandleWriteBinlog(pink::RedisParser* parser, const pink::R
g_pika_server->AddMonitorMessage(monitor_message);
}

std::string opt = argv[0];
std::shared_ptr<Cmd> c_ptr = g_pika_cmd_table_manager->GetCmd(slash::StringToLower(opt));
if (!c_ptr) {
LOG(WARNING) << "Command " << opt << " not in the command table";
Expand Down
4 changes: 2 additions & 2 deletions src/pika_repl_server_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ void PikaReplServerConn::HandleBinlogSyncRequest(void* arg) {
return;
}

if (req->has_consensus_meta()){
if (req->has_consensus_meta()) {
const InnerMessage::ConsensusMeta& meta = req->consensus_meta();
if (meta.term() > master_partition->ConsensusTerm()) {
LOG(INFO) << "Update " << table_name << ":" << partition_id
Expand Down Expand Up @@ -442,7 +442,7 @@ void PikaReplServerConn::HandleBinlogSyncRequest(void* arg) {
}

if (is_first_send) {
if (!(range_start.b_offset == range_end.b_offset)) {
if (range_start.b_offset != range_end.b_offset) {
LOG(WARNING) << "first binlogsync request pb argument invalid";
conn->NotifyClose();
delete task_arg;
Expand Down
30 changes: 23 additions & 7 deletions src/pika_rm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
#include "include/pika_conf.h"
#include "include/pika_server.h"

#include "include/pika_command.h"
#include "include/pika_admin.h"

extern PikaConf *g_pika_conf;
extern PikaReplicaManager* g_pika_rm;
extern PikaServer *g_pika_server;
Expand Down Expand Up @@ -145,7 +148,7 @@ Status SyncMasterPartition::ActivateSlaveDbSync(const std::string& ip, int port)
}

Status SyncMasterPartition::ReadBinlogFileToWq(const std::shared_ptr<SlaveNode>& slave_ptr) {
int cnt = slave_ptr->sync_win.Remainings();
int cnt = slave_ptr->sync_win.Remaining();
std::shared_ptr<PikaBinlogReader> reader = slave_ptr->binlog_reader;
if (reader == nullptr) {
return Status::OK();
Expand All @@ -155,9 +158,9 @@ Status SyncMasterPartition::ReadBinlogFileToWq(const std::shared_ptr<SlaveNode>&
std::string msg;
uint32_t filenum;
uint64_t offset;
if (slave_ptr->sync_win.GetTotalBinglogSize() > PIKA_MAX_CONN_RBUF_HB * 2) {
if (slave_ptr->sync_win.GetTotalBinlogSize() > PIKA_MAX_CONN_RBUF_HB * 2) {
LOG(INFO) << slave_ptr->ToString() << " total binlog size in sync window is :"
<< slave_ptr->sync_win.GetTotalBinglogSize();
<< slave_ptr->sync_win.GetTotalBinlogSize();
break;
}
Status s = reader->Get(&msg, &filenum, &offset);
Expand Down Expand Up @@ -512,6 +515,19 @@ uint32_t SyncMasterPartition::ConsensusTerm() {

void SyncMasterPartition::ConsensusUpdateTerm(uint32_t term) {
coordinator_.UpdateTerm(term);
if (g_pika_server->role() & PIKA_ROLE_MASTER) {
CommitPreviousLogs(term);
}
}

void SyncMasterPartition::CommitPreviousLogs(const uint32_t& term) {
// Append dummy cmd
std::shared_ptr<Cmd> dummy = std::make_shared<DummyCmd>(kCmdDummy, 0, kCmdFlagsWrite);
PikaCmdArgsType args;
dummy->Initial(args, SyncPartitionInfo().table_name_);
dummy->SetStage(Cmd::kBinlogStage);
dummy->Execute();
dummy->SetStage(Cmd::kExecuteStage);
}

std::shared_ptr<SlaveNode> SyncMasterPartition::GetSlaveNode(const std::string& ip, int port) {
Expand Down Expand Up @@ -700,13 +716,12 @@ void PikaReplicaManager::ProduceWriteQueue(const std::string& ip, int port, uint
}

int PikaReplicaManager::ConsumeWriteQueue() {
std::vector<std::string> to_delete;
std::unordered_map<std::string, std::vector<std::vector<WriteTask>>> to_send_map;
int counter = 0;
{
slash::MutexLock l(&write_queue_mu_);
std::vector<std::string> to_delete;
for (auto& iter : write_queues_) {
const std::string& ip_port = iter.first;
std::unordered_map<uint32_t, std::queue<WriteTask>>& p_map = iter.second;
for (auto& partition_queue : p_map) {
std::queue<WriteTask>& queue = partition_queue.second;
Expand All @@ -724,18 +739,19 @@ int PikaReplicaManager::ConsumeWriteQueue() {
if (batch_size > PIKA_MAX_CONN_RBUF_HB) {
break;
}
to_send.push_back(queue.front());
to_send.push_back(task);
queue.pop();
counter++;
}
if (!to_send.empty()) {
to_send_map[iter.first].push_back(std::move(to_send));
to_send_map[ip_port].push_back(std::move(to_send));
}
}
}
}
}

std::vector<std::string> to_delete;
for (auto& iter : to_send_map) {
std::string ip;
int port = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/pika_slave_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ bool SyncWindow::Update(const SyncWinItem& start_item,
return true;
}

int SyncWindow::Remainings() {
int SyncWindow::Remaining() {
std::size_t remaining_size = g_pika_conf->sync_window_size() - win_.size();
return remaining_size > 0? remaining_size:0 ;
}
Expand Down

0 comments on commit 0f3d24c

Please sign in to comment.