Skip to content

Commit

Permalink
Refactor ConsistencyCoordinator & SyncMasterPartition (OpenAtomFounda…
Browse files Browse the repository at this point in the history
…tion#829)

Impl class SyncProgress to Manage slave node sync progress
Impl class MemLog to Manage memory log
Impl class StableLog to Manage Binlog files
  • Loading branch information
whoiami committed Mar 20, 2020
1 parent 41a17b0 commit 0bcb64b
Show file tree
Hide file tree
Showing 11 changed files with 544 additions and 397 deletions.
5 changes: 2 additions & 3 deletions include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,13 @@ class PikaClientConn: public pink::RedisConn {
std::string current_table_;
bool is_pubsub_;

std::shared_ptr<Cmd> DoCmd(const PikaCmdArgsType& argv, const std::string& opt);
std::shared_ptr<Cmd> DoCmd(const PikaCmdArgsType& argv, const std::string& opt,
std::shared_ptr<std::string> resp_ptr);

void ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t start_us);
void ProcessMonitor(const PikaCmdArgsType& argv);

void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<std::string> resp_ptr);
void ConsistencyProposeLog(
std::shared_ptr<Cmd> cmd_ptr, std::shared_ptr<std::string> resp_ptr);
void TryWriteResp();

AuthStat auth_stat_;
Expand Down
7 changes: 5 additions & 2 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ class CmdRes {
CmdRet ret_;
};

class Cmd {
class Cmd: public std::enable_shared_from_this<Cmd> {
public:
enum CmdStage {
kNone,
Expand Down Expand Up @@ -445,6 +445,9 @@ class Cmd {
void SetConn(const std::shared_ptr<pink::PinkConn> conn);
std::shared_ptr<pink::PinkConn> GetConn();

void SetResp(const std::shared_ptr<std::string> resp);
std::shared_ptr<std::string> GetResp();

void SetStage(CmdStage stage);
protected:
// enable copy, used default copy
Expand All @@ -467,7 +470,7 @@ class Cmd {
std::string table_name_;

std::weak_ptr<pink::PinkConn> conn_;
BinlogOffset binlog_offset_;
std::weak_ptr<std::string> resp_;
CmdStage stage_;

private:
Expand Down
100 changes: 74 additions & 26 deletions include/pika_consistency.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,32 @@

#include "include/pika_client_conn.h"
#include "include/pika_define.h"
#include "include/pika_slave_node.h"
#include "include/pika_stable_log.h"

//class SyncProgress {
// GetSlaveNode();
// GetAllSlaveNodes();
// AddSlaveNode();
// RemoveSlaveNode();
// SlaveSize();
// private:
// pthread_rwlock_t rwlock_;
// std::unordered_map<std::string, std::shared_ptr<SlaveNode>> slaves_;
//};
class SyncProgress {
public:
SyncProgress();
~SyncProgress();
std::shared_ptr<SlaveNode> GetSlaveNode(const std::string& ip, int port);
std::unordered_map<std::string, std::shared_ptr<SlaveNode>> GetAllSlaveNodes();
Status AddSlaveNode(const std::string& ip, int port,
const std::string& table_name, uint32_t partition_id, int session_id);
Status RemoveSlaveNode(const std::string& ip, int port);
Status Update(const std::string& ip, int port, const BinlogOffset& start,
const BinlogOffset& end, BinlogOffset* committed_index);
int SlaveSize();

class ConsistencyCoordinator {
private:
BinlogOffset InternalCalCommittedIndex();

pthread_rwlock_t rwlock_;
std::unordered_map<std::string, std::shared_ptr<SlaveNode>> slaves_;
slash::Mutex match_mu_;
std::unordered_map<std::string, BinlogOffset> match_index_;
};

class MemLog {
public:
struct LogItem {
LogItem(
Expand All @@ -35,32 +48,67 @@ class ConsistencyCoordinator {
std::shared_ptr<std::string> resp_ptr;
};

ConsistencyCoordinator();
MemLog();
int Size();
void PushLog(const LogItem& item) {
slash::MutexLock l_logs(&logs_mu_);
logs_.push_back(item);
}
Status PurdgeLogs(BinlogOffset offset, std::vector<LogItem>* logs);
Status GetRangeLogs(int start, int end, std::vector<LogItem>* logs);

private:
int InternalFindLogIndex(BinlogOffset offset);
slash::Mutex logs_mu_;
std::vector<LogItem> logs_;
};

class ConsistencyCoordinator {
public:
ConsistencyCoordinator(const std::string& table_name, uint32_t partition_id);

Status ProposeLog(
const BinlogOffset& offset,
std::shared_ptr<Cmd> cmd_ptr,
std::shared_ptr<PikaClientConn> conn_ptr,
std::shared_ptr<std::string> resp_ptr);
Status ScheduleApplyLog();
Status UpdateSlave(const std::string& ip, int port,
const BinlogOffset& start, const BinlogOffset& end);
Status AddSlaveNode(const std::string& ip, int port, int session_id);
Status RemoveSlaveNode(const std::string& ip, int port);

Status CheckEnoughFollower();
Status UpdateMatchIndex(const std::string& ip, int port, const BinlogOffset& offset);
SyncProgress& SyncPros() {
return sync_pros_;
}
std::shared_ptr<StableLog> StableLogger() {
return stable_logger_;
}
std::shared_ptr<MemLog> MemLogger() {
return mem_logger_;
}

private:
// Could del if impl raft
Status AddFollower(const std::string& ip, int port);
// not implement
Status RemoveFollower(const std::string& ip, int port);
Status ScheduleApplyLog();
bool MatchConsistencyLevel();

size_t LogsSize();
Status InternalPutBinlog(std::shared_ptr<Cmd> cmd_ptr,
BinlogOffset* binlog_offset);
void InternalApply(const MemLog::LogItem& log);
void InternalApplyStale(const MemLog::LogItem& log);
bool InternalUpdateCommittedIndex(const BinlogOffset& slaves_committed_index);

private:
slash::Mutex logs_mu_;
std::vector<LogItem> logs_;
slash::Mutex index_mu_;
std::unordered_map<std::string, BinlogOffset> match_index_;
BinlogOffset committed_index_;

int InternalFindLogIndex();
void InternalUpdateCommittedIndex();
bool InternalMatchConsistencyLevel();
int InternalPurdgeLog(std::vector<LogItem>* logs);
void InternalApply(const LogItem& log);
void InternalApplyStale(const LogItem& log);
std::string table_name_;
uint32_t partition_id_;

SyncProgress sync_pros_;
std::shared_ptr<StableLog> stable_logger_;
std::shared_ptr<MemLog> mem_logger_;
};
#endif // INCLUDE_PIKA_CONSISTENCY_H_
28 changes: 13 additions & 15 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class SyncMasterPartition : public SyncPartition {
Status ActivateSlaveDbSync(const std::string& ip, int port);

Status SyncBinlogToWq(const std::string& ip, int port);
Status UpdateSlaveBinlogAckInfo(const std::string& ip, int port, const BinlogOffset& start, const BinlogOffset& end);

Status GetSlaveSyncBinlogInfo(const std::string& ip, int port, BinlogOffset* sent_offset, BinlogOffset* acked_offset);
Status GetSlaveState(const std::string& ip, int port, SlaveState* const slave_state);

Expand Down Expand Up @@ -87,41 +87,39 @@ class SyncMasterPartition : public SyncPartition {
uint64_t partition_id, int session_id);

// consistency use
Status ConsistencyUpdateSlave(
const std::string& ip, int port,
const BinlogOffset& start,
const BinlogOffset& end);
Status ConsistencyProposeLog(
const BinlogOffset& offset,
std::shared_ptr<Cmd> cmd_ptr,
std::shared_ptr<PikaClientConn> conn_ptr,
std::shared_ptr<std::string> resp_ptr);
Status ConsistencySanityCheck();
Status ConsistencyScheduleApplyLog();

std::shared_ptr<StableLog> StableLogger() {
return coordinator_.StableLogger();
}

std::shared_ptr<Binlog> Logger() {
if (!stable_logger_) {
if (!coordinator_.StableLogger()) {
return nullptr;
}
return stable_logger_->Logger();
}
std::shared_ptr<StableLog> StableLogger() {
return stable_logger_;
return coordinator_.StableLogger()->Logger();
}

private:
bool CheckReadBinlogFromCache();
// invoker need to hold slave_mu_
Status ReadBinlogFileToWq(const std::shared_ptr<SlaveNode>& slave_ptr);
// inovker need to hold partition_mu_
Status GetSlaveNode(const std::string& ip, int port, std::shared_ptr<SlaveNode>* slave_node);

slash::Mutex partition_mu_;
std::vector<std::shared_ptr<SlaveNode>> slaves_;
std::shared_ptr<SlaveNode> GetSlaveNode(const std::string& ip, int port);
std::unordered_map<std::string, std::shared_ptr<SlaveNode>> GetAllSlaveNodes();

slash::Mutex session_mu_;
int32_t session_id_;

ConsistencyCoordinator coordinator_;

std::shared_ptr<StableLog> stable_logger_;
// BinlogCacheWindow win_;
};

class SyncSlavePartition : public SyncPartition {
Expand Down
1 change: 1 addition & 0 deletions include/pika_slave_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class SlaveNode : public RmNode {

std::shared_ptr<PikaBinlogReader> binlog_reader;
Status InitBinlogFileReader(const std::shared_ptr<Binlog>& binlog, const BinlogOffset& offset);
Status Update(const BinlogOffset& start, const BinlogOffset& end);

slash::Mutex slave_mu;
};
Expand Down
44 changes: 6 additions & 38 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ PikaClientConn::PikaClientConn(int fd, std::string ip_port,
auth_stat_.Init();
}

std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv,
const std::string& opt) {
std::shared_ptr<Cmd> PikaClientConn::DoCmd(
const PikaCmdArgsType& argv,
const std::string& opt,
std::shared_ptr<std::string> resp_ptr) {
// Get command info
std::shared_ptr<Cmd> c_ptr = g_pika_cmd_table_manager->GetCmd(opt);
if (!c_ptr) {
Expand All @@ -45,6 +47,7 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv,
}

c_ptr->SetConn(std::dynamic_pointer_cast<PikaClientConn>(shared_from_this()));
c_ptr->SetResp(resp_ptr);

// Check authed
// AuthCmd will set stat_
Expand Down Expand Up @@ -258,46 +261,11 @@ void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<s
}
slash::StringToLower(opt);

std::shared_ptr<Cmd> cmd_ptr = DoCmd(argv, opt);
std::shared_ptr<Cmd> cmd_ptr = DoCmd(argv, opt, resp_ptr);
// level == 0 or (cmd error) or (is_read)
if (g_pika_conf->consistency_level() == 0 || !cmd_ptr->res().ok() || !cmd_ptr->is_write()) {
resp_num--;
*resp_ptr = std::move(cmd_ptr->res().message());
} else {
ConsistencyProposeLog(cmd_ptr, resp_ptr);
if (!cmd_ptr->res().ok()) {
resp_num--;
*resp_ptr = std::move(cmd_ptr->res().message());
}
g_pika_server->SignalAuxiliary();
}
}

void PikaClientConn::ConsistencyProposeLog(std::shared_ptr<Cmd> cmd_ptr, std::shared_ptr<std::string> resp_ptr) {
BinlogOffset binlog_offset = cmd_ptr->binlog_offset();
std::string table_name = cmd_ptr->table_name();
std::shared_ptr<Table> table = g_pika_server->GetTable(table_name);
if (table == nullptr) {
cmd_ptr->res().SetRes(CmdRes::kErrOther, "-ERR Internal Error");
return;
}
uint32_t index = g_pika_cmd_table_manager->DistributeKey(
cmd_ptr->current_key().front(), table->PartitionNum());
std::shared_ptr<SyncMasterPartition> master_partition =
g_pika_rm->GetSyncMasterPartitionByName(PartitionInfo(table_name, index));
if (!master_partition) {
LOG(WARNING) << "Sync Master Partition: " << table_name << ":" << index
<< ", NotFound";
cmd_ptr->res().SetRes(CmdRes::kErrOther, "-ERR Internal Error");
return;
}
Status s = master_partition->ConsistencyProposeLog(
binlog_offset,
cmd_ptr,
std::dynamic_pointer_cast<PikaClientConn>(shared_from_this()),
resp_ptr);
if (!s.ok()) {
cmd_ptr->res().SetRes(CmdRes::kErrOther, "-ERR consistency level not match");
}
}

Expand Down
45 changes: 23 additions & 22 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -689,29 +689,26 @@ void Cmd::DoBinlog(std::shared_ptr<SyncMasterPartition> partition) {
if (res().ok()
&& is_write()
&& g_pika_conf->write_binlog()) {
std::shared_ptr<pink::PinkConn> conn_ptr = GetConn();
std::shared_ptr<std::string> resp_ptr = GetResp();
if (!conn_ptr || !resp_ptr) {
if (!conn_ptr) {
LOG(WARNING) << partition->SyncPartitionInfo().ToString() << " conn empty.";
}
if (!resp_ptr) {
LOG(WARNING) << partition->SyncPartitionInfo().ToString() << " resp empty.";
}
res().SetRes(CmdRes::kErrOther);
return;
}

uint32_t filenum = 0;
uint64_t offset = 0;
uint64_t logic_id = 0;

partition->Logger()->Lock();
partition->Logger()->GetProducerStatus(&filenum, &offset, &logic_id);
uint32_t exec_time = time(nullptr);
std::string binlog = ToBinlog(exec_time,
g_pika_conf->server_id(),
logic_id,
filenum,
offset);

Status s = partition->Logger()->Put(binlog);
partition->Logger()->GetProducerStatus(&filenum, &offset);
binlog_offset_ = BinlogOffset(filenum, offset);
partition->Logger()->Unlock();

Status s = partition->ConsistencyProposeLog(shared_from_this(),
std::dynamic_pointer_cast<PikaClientConn>(conn_ptr), resp_ptr);
if (!s.ok()) {
LOG(WARNING) << partition->SyncPartitionInfo().ToString()
<< " Writing binlog failed, maybe no space left on device " << s.ToString();
res().SetRes(CmdRes::kErrOther, s.ToString());
return;
}
}
}
Expand Down Expand Up @@ -761,10 +758,6 @@ std::string Cmd::table_name() const {
return table_name_;
}

BinlogOffset Cmd::binlog_offset() const {
return binlog_offset_;
}

std::string Cmd::ToBinlog(uint32_t exec_time,
const std::string& server_id,
uint64_t logic_id,
Expand Down Expand Up @@ -814,6 +807,14 @@ std::shared_ptr<pink::PinkConn> Cmd::GetConn() {
return conn_.lock();
}

void Cmd::SetResp(const std::shared_ptr<std::string> resp) {
resp_ = resp;
}

std::shared_ptr<std::string> Cmd::GetResp() {
return resp_.lock();
}

void Cmd::SetStage(CmdStage stage) {
stage_ = stage;
}
Loading

0 comments on commit 0bcb64b

Please sign in to comment.