Skip to content

Commit

Permalink
Restore consensus context from disk (OpenAtomFoundation#847)
Browse files Browse the repository at this point in the history
Stabilize context: applied_index
  • Loading branch information
whoiami committed Mar 20, 2020
1 parent 27245e8 commit 6288e59
Show file tree
Hide file tree
Showing 13 changed files with 299 additions and 35 deletions.
4 changes: 3 additions & 1 deletion include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ class PikaClientConn: public pink::RedisConn {
std::shared_ptr<PikaClientConn> conn_ptr;
std::vector<pink::RedisCmdArgsType> redis_cmds;
std::shared_ptr<std::string> resp_ptr;
LogOffset offset;
std::string table_name;
uint32_t partition_id;
};

// Auth related
Expand Down Expand Up @@ -47,7 +50,6 @@ class PikaClientConn: public pink::RedisConn {
}
static void DoBackgroundTask(void* arg);
static void DoExecTask(void* arg);
static void DoStaleTask(void* arg);

bool IsPubSub() { return is_pubsub_; }
void SetIsPubSub(bool is_pubsub) { is_pubsub_ = is_pubsub; }
Expand Down
72 changes: 70 additions & 2 deletions include/pika_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,38 @@
#include "include/pika_slave_node.h"
#include "include/pika_stable_log.h"
#include "include/pika_binlog_transverter.h"
#include "slash/include/env.h"

class Context {
public:
Context(const std::string path);
~Context();

Status Init();
// RWLock should be held when access members.
Status StableSave();
void PrepareUpdateAppliedIndex(const LogOffset& offset);
void UpdateAppliedIndex(const LogOffset& offset);

pthread_rwlock_t rwlock_;
LogOffset applied_index_;
SyncWindow applied_win_;

std::string ToString() {
std::stringstream tmp_stream;
slash::RWLock l(&rwlock_, false);
tmp_stream << " Applied_index " << applied_index_.ToString() << "\r\n";
tmp_stream << " Applied window " << applied_win_.ToStringStatus();
return tmp_stream.str();
}

private:
std::string path_;
slash::RWFile *save_;
// No copying allowed;
Context(const Context&);
void operator=(const Context&);
};

class SyncProgress {
public:
Expand Down Expand Up @@ -59,10 +91,15 @@ class MemLog {
}
Status PurdgeLogs(const LogOffset& offset, std::vector<LogItem>* logs);
Status GetRangeLogs(int start, int end, std::vector<LogItem>* logs);
LogOffset LastOffset() {

LogOffset last_offset() {
slash::MutexLock l_logs(&logs_mu_);
return last_offset_;
}
void SetLastOffset(const LogOffset& offset) {
slash::MutexLock l_logs(&logs_mu_);
last_offset_ = offset;
}

private:
int InternalFindLogIndex(const LogOffset& offset);
Expand All @@ -75,6 +112,8 @@ class ConsensusCoordinator {
public:
ConsensusCoordinator(const std::string& table_name, uint32_t partition_id);
~ConsensusCoordinator();
// since it is invoked in constructor all locks not hold
void Init();

Status ProposeLog(
std::shared_ptr<Cmd> cmd_ptr,
Expand Down Expand Up @@ -106,6 +145,34 @@ class ConsensusCoordinator {
return committed_index_;
}

std::shared_ptr<Context> context() {
return context_;
}

// redis parser cb
struct CmdPtrArg {
CmdPtrArg(std::shared_ptr<Cmd> ptr) : cmd_ptr(ptr) {
}
std::shared_ptr<Cmd> cmd_ptr;
};
static int InitCmd(pink::RedisParser* parser, const pink::RedisCmdArgsType& argv);

std::string ToStringStatus() {
std::stringstream tmp_stream;
{
slash::MutexLock l(&index_mu_);
tmp_stream << " Committed_index: " << committed_index_.ToString() << "\r\n";
}
tmp_stream << " Contex: " << "\r\n" << context_->ToString();
{
slash::RWLock l(&term_rwlock_, false);
tmp_stream << " Term: " << term_ << "\r\n";
}
tmp_stream << " Mem_logger size: " << mem_logger_->Size() <<
" last offset " << mem_logger_->last_offset().ToString() << "\r\n";
return tmp_stream.str();
}

private:
Status ScheduleApplyLog(const LogOffset& committed_index);
Status ScheduleApplyFollowerLog(const LogOffset& committed_index);
Expand All @@ -125,7 +192,8 @@ class ConsensusCoordinator {

slash::Mutex index_mu_;
LogOffset committed_index_;
// LogOffset applied_index_;

std::shared_ptr<Context> context_;

pthread_rwlock_t term_rwlock_;
uint32_t term_;
Expand Down
1 change: 1 addition & 0 deletions include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ const size_t kBinlogPrefixLen = 10;

const std::string kPikaMeta = "meta";
const std::string kManifest = "manifest";
const std::string kContext = "context";

/*
* define common character
Expand Down
5 changes: 4 additions & 1 deletion include/pika_repl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,15 @@ struct ReplClientWriteBinlogTaskArg {

struct ReplClientWriteDBTaskArg {
const std::shared_ptr<Cmd> cmd_ptr;
LogOffset offset;
std::string table_name;
uint32_t partition_id;
ReplClientWriteDBTaskArg(const std::shared_ptr<Cmd> _cmd_ptr,
const LogOffset _offset,
const std::string _table_name,
uint32_t _partition_id)
: cmd_ptr(_cmd_ptr),
offset(_offset),
table_name(_table_name), partition_id(_partition_id) {}
~ReplClientWriteDBTaskArg() {
}
Expand All @@ -77,7 +80,7 @@ class PikaReplClient {
const std::shared_ptr<InnerMessage::InnerResponse> res,
std::shared_ptr<pink::PbConn> conn,
void* req_private_data);
void ScheduleWriteDBTask(const std::shared_ptr<Cmd> cmd_ptr,
void ScheduleWriteDBTask(const std::shared_ptr<Cmd> cmd_ptr, const LogOffset& offset,
const std::string& table_name, uint32_t partition_id);

Status SendMetaSync();
Expand Down
3 changes: 2 additions & 1 deletion include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class SyncMasterPartition : public SyncPartition {
Status ConsensusProcessLeaderLog(std::shared_ptr<Cmd> cmd_ptr, const BinlogItem& attribute);
Status ConsensusProcessLocalUpdate(const LogOffset& leader_commit);
LogOffset ConsensusCommittedIndex();
Status ConsensusUpdateAppliedIndex(const LogOffset& offset);

std::shared_ptr<StableLog> StableLogger() {
return coordinator_.StableLogger();
Expand Down Expand Up @@ -236,7 +237,7 @@ class PikaReplicaManager {
void ScheduleWriteBinlogTask(const std::string& table_partition,
const std::shared_ptr<InnerMessage::InnerResponse> res,
std::shared_ptr<pink::PbConn> conn, void* res_private_data);
void ScheduleWriteDBTask(const std::shared_ptr<Cmd> cmd_ptr,
void ScheduleWriteDBTask(const std::shared_ptr<Cmd> cmd_ptr, const LogOffset& offset,
const std::string& table_name, uint32_t partition_id);

void ReplServerRemoveClientConn(int fd);
Expand Down
1 change: 0 additions & 1 deletion include/pika_slave_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ struct SyncWinItem {
}
};


class SyncWindow {
public:
SyncWindow() :total_size_(0) {
Expand Down
2 changes: 1 addition & 1 deletion src/pika.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ int main(int argc, char *argv[]) {
PikaSignalSetup();

LOG(INFO) << "Server at: " << path;
g_pika_cmd_table_manager = new PikaCmdTableManager();
g_pika_server = new PikaServer();
g_pika_rm = new PikaReplicaManager();
g_pika_cmd_table_manager = new PikaCmdTableManager();

if (g_pika_conf->daemonize()) {
close_std();
Expand Down
2 changes: 2 additions & 0 deletions src/pika_binlog_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@ bool PikaBinlogReader::ReadToTheEnd() {
int PikaBinlogReader::Seek(std::shared_ptr<Binlog> logger, uint32_t filenum, uint64_t offset) {
std::string confile = NewFileName(logger->filename(), filenum);
if (!slash::FileExists(confile)) {
LOG(WARNING) << confile << " not exits";
return -1;
}
slash::SequentialFile* readfile;
if (!slash::NewSequentialFile(confile, &readfile).ok()) {
LOG(WARNING) << "New swquential " << confile << " failed";
return -1;
}
if (queue_) {
Expand Down
35 changes: 22 additions & 13 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,25 +205,34 @@ void PikaClientConn::DoBackgroundTask(void* arg) {

void PikaClientConn::DoExecTask(void* arg) {
BgTaskArg* bg_arg = reinterpret_cast<BgTaskArg*>(arg);
std::shared_ptr<Cmd> cmd_ptr = bg_arg->cmd_ptr;;
std::shared_ptr<Cmd> cmd_ptr = bg_arg->cmd_ptr;
std::shared_ptr<PikaClientConn> conn_ptr = bg_arg->conn_ptr;
std::shared_ptr<std::string> resp_ptr = bg_arg->resp_ptr;
LogOffset offset = bg_arg->offset;
std::string table_name = bg_arg->table_name;
uint32_t partition_id = bg_arg->partition_id;

uint64_t start_us = 0;
if (g_pika_conf->slowlog_slower_than() >= 0) {
start_us = slash::NowMicros();
}
cmd_ptr->SetStage(Cmd::kExecuteStage);
cmd_ptr->Execute();
conn_ptr->resp_num--;
*resp_ptr = std::move(cmd_ptr->res().message());
conn_ptr->TryWriteResp();
}
if (g_pika_conf->slowlog_slower_than() >= 0) {
conn_ptr->ProcessSlowlog(cmd_ptr->argv(), start_us);
}

// do the same thing as DoExecTask for now
// maybe not write response
void PikaClientConn::DoStaleTask(void* arg) {
BgTaskArg* bg_arg = reinterpret_cast<BgTaskArg*>(arg);
std::shared_ptr<Cmd> cmd_ptr = bg_arg->cmd_ptr;;
std::shared_ptr<PikaClientConn> conn_ptr = bg_arg->conn_ptr;
std::shared_ptr<std::string> resp_ptr = bg_arg->resp_ptr;
std::shared_ptr<SyncMasterPartition> partition =
g_pika_rm->GetSyncMasterPartitionByName(PartitionInfo(table_name, partition_id));
if (partition == nullptr) {
LOG(WARNING) << "Sync Master Partition not exist " << table_name << partition_id;
return;
}
partition->ConsensusUpdateAppliedIndex(offset);

cmd_ptr->Execute();
if (conn_ptr == nullptr || resp_ptr == nullptr) {
return;
}
conn_ptr->resp_num--;
*resp_ptr = std::move(cmd_ptr->res().message());
conn_ptr->TryWriteResp();
Expand Down
Loading

0 comments on commit 6288e59

Please sign in to comment.