Skip to content

Commit

Permalink
Add term_id into binlog (server_id is obsolete) (OpenAtomFoundation#842)
Browse files Browse the repository at this point in the history
  • Loading branch information
whoiami committed Mar 20, 2020
1 parent 51a5826 commit 4c1c07e
Show file tree
Hide file tree
Showing 25 changed files with 302 additions and 240 deletions.
4 changes: 2 additions & 2 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class FlushallCmd : public Cmd {
virtual void DoInitial() override;
virtual std::string ToBinlog(
uint32_t exec_time,
const std::string& server_id,
uint32_t term_id,
uint64_t logic_id,
uint32_t filenum,
uint64_t offset) override;
Expand Down Expand Up @@ -419,7 +419,7 @@ class PaddingCmd : public Cmd {
virtual void DoInitial();
virtual std::string ToBinlog(
uint32_t exec_time,
const std::string& server_id,
uint32_t term_id,
uint64_t logic_id,
uint32_t filenum,
uint64_t offset) override;
Expand Down
10 changes: 9 additions & 1 deletion include/pika_binlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class Version {
uint32_t pro_num_;
uint64_t pro_offset_;
uint64_t logic_id_;
uint32_t term_;

pthread_rwlock_t rwlock_;

Expand Down Expand Up @@ -59,7 +60,7 @@ class Binlog {

Status Put(const std::string &item);

Status GetProducerStatus(uint32_t* filenum, uint64_t* pro_offset, uint64_t* logic_id = NULL);
Status GetProducerStatus(uint32_t* filenum, uint64_t* pro_offset, uint64_t* logic_id = NULL, uint32_t* term = NULL);
/*
* Set Producer pro_num and pro_offset with lock
*/
Expand All @@ -77,6 +78,13 @@ class Binlog {
return binlog_io_error_;
}

void SetTerm(uint32_t term) {
slash::MutexLock l(&mutex_);
slash::RWLock(&(version_->rwlock_), true);
version_->term_ = term;
version_->StableSave();
}

void Close();

private:
Expand Down
14 changes: 7 additions & 7 deletions include/pika_binlog_transverter.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

/*
* ***********************************************Type First Binlog Item Format***********************************************
* | <Type> | <Create Time> | <Server Id> | <Binlog Logic Id> | <File Num> | <Offset> | <Content Length> | <Content> |
* 2 Bytes 4 Bytes 4 Bytes 8 Bytes 4 Bytes 8 Bytes 4 Bytes content length Bytes
* | <Type> | <Create Time> | <Term Id> | <Binlog Logic Id> | <File Num> | <Offset> | <Content Length> | <Content> |
* 2 Bytes 4 Bytes 4 Bytes 8 Bytes 4 Bytes 8 Bytes 4 Bytes content length Bytes
*
*/
#define BINLOG_ENCODE_LEN 34
Expand All @@ -33,7 +33,7 @@ class BinlogItem {
public:
BinlogItem() :
exec_time_(0),
server_id_(0),
term_id_(0),
logic_id_(0),
filenum_(0),
offset_(0),
Expand All @@ -42,22 +42,22 @@ class BinlogItem {
friend class PikaBinlogTransverter;

uint32_t exec_time() const;
uint32_t server_id() const;
uint32_t term_id() const;
uint64_t logic_id() const;
uint32_t filenum() const;
uint64_t offset() const;
std::string content() const;
std::string ToString() const;

void set_exec_time(uint32_t exec_time);
void set_server_id(uint32_t server_id);
void set_term_id(uint32_t term_id);
void set_logic_id(uint64_t logic_id);
void set_filenum(uint32_t filenum);
void set_offset(uint64_t offset);

private:
uint32_t exec_time_;
uint32_t server_id_;
uint32_t term_id_;
uint64_t logic_id_;
uint32_t filenum_;
uint64_t offset_;
Expand All @@ -70,7 +70,7 @@ class PikaBinlogTransverter{
PikaBinlogTransverter() {};
static std::string BinlogEncode(BinlogType type,
uint32_t exec_time,
uint32_t server_id,
uint32_t term_id,
uint64_t logic_id,
uint32_t filenum,
uint64_t offset,
Expand Down
2 changes: 1 addition & 1 deletion include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ class Cmd: public std::enable_shared_from_this<Cmd> {
BinlogOffset binlog_offset() const;

virtual std::string ToBinlog(uint32_t exec_time,
const std::string& server_id,
uint32_t term_id,
uint64_t logic_id,
uint32_t filenum,
uint64_t offset);
Expand Down
42 changes: 23 additions & 19 deletions include/pika_consistency.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,34 @@ class SyncProgress {
~SyncProgress();
std::shared_ptr<SlaveNode> GetSlaveNode(const std::string& ip, int port);
std::unordered_map<std::string, std::shared_ptr<SlaveNode>> GetAllSlaveNodes();
std::unordered_map<std::string, LogOffset> GetAllMatchIndex();
Status AddSlaveNode(const std::string& ip, int port,
const std::string& table_name, uint32_t partition_id, int session_id);
Status RemoveSlaveNode(const std::string& ip, int port);
Status Update(const std::string& ip, int port, const BinlogOffset& start,
const BinlogOffset& end, BinlogOffset* committed_index);
Status Update(const std::string& ip, int port, const LogOffset& start,
const LogOffset& end, LogOffset* committed_index);
int SlaveSize();

private:
BinlogOffset InternalCalCommittedIndex();
LogOffset InternalCalCommittedIndex(
std::unordered_map<std::string, LogOffset> match_index);

pthread_rwlock_t rwlock_;
std::unordered_map<std::string, std::shared_ptr<SlaveNode>> slaves_;
slash::Mutex match_mu_;
std::unordered_map<std::string, BinlogOffset> match_index_;
std::unordered_map<std::string, LogOffset> match_index_;
};

class MemLog {
public:
struct LogItem {
LogItem(
BinlogOffset _offset,
LogOffset _offset,
std::shared_ptr<Cmd> _cmd_ptr,
std::shared_ptr<PikaClientConn> _conn_ptr,
std::shared_ptr<std::string> _resp_ptr)
: offset(_offset), cmd_ptr(_cmd_ptr), conn_ptr(_conn_ptr), resp_ptr(_resp_ptr) {
}
BinlogOffset offset;
LogOffset offset;
std::shared_ptr<Cmd> cmd_ptr;
std::shared_ptr<PikaClientConn> conn_ptr;
std::shared_ptr<std::string> resp_ptr;
Expand All @@ -54,27 +55,29 @@ class MemLog {
slash::MutexLock l_logs(&logs_mu_);
logs_.push_back(item);
}
Status PurdgeLogs(BinlogOffset offset, std::vector<LogItem>* logs);
Status PurdgeLogs(const LogOffset& offset, std::vector<LogItem>* logs);
Status GetRangeLogs(int start, int end, std::vector<LogItem>* logs);

private:
int InternalFindLogIndex(BinlogOffset offset);
int InternalFindLogIndex(const LogOffset& offset);
slash::Mutex logs_mu_;
std::vector<LogItem> logs_;
};

class ConsistencyCoordinator {
public:
ConsistencyCoordinator(const std::string& table_name, uint32_t partition_id);
~ConsistencyCoordinator();

Status ProposeLog(
std::shared_ptr<Cmd> cmd_ptr,
std::shared_ptr<PikaClientConn> conn_ptr,
std::shared_ptr<std::string> resp_ptr);
Status UpdateSlave(const std::string& ip, int port,
const BinlogOffset& start, const BinlogOffset& end);
const LogOffset& start, const LogOffset& end);
Status AddSlaveNode(const std::string& ip, int port, int session_id);
Status RemoveSlaveNode(const std::string& ip, int port);
void UpdateTerm(uint32_t term);

Status CheckEnoughFollower();
SyncProgress& SyncPros() {
Expand All @@ -88,21 +91,22 @@ class ConsistencyCoordinator {
}

private:
// Could del if impl raft
Status AddFollower(const std::string& ip, int port);
// not implement
Status RemoveFollower(const std::string& ip, int port);
Status ScheduleApplyLog();
Status ScheduleApplyLog(const LogOffset& committed_index);
bool MatchConsistencyLevel();

Status InternalPutBinlog(std::shared_ptr<Cmd> cmd_ptr,
BinlogOffset* binlog_offset);
Status InternalAppendLog(std::shared_ptr<Cmd> cmd_ptr,
LogOffset* log_offset);
void InternalApply(const MemLog::LogItem& log);
void InternalApplyStale(const MemLog::LogItem& log);
bool InternalUpdateCommittedIndex(const BinlogOffset& slaves_committed_index);
bool InternalUpdateCommittedIndex(const LogOffset& slaves_committed_index,
LogOffset* updated_committed_index);

slash::Mutex index_mu_;
BinlogOffset committed_index_;
LogOffset committed_index_;
// LogOffset applied_index_;

pthread_rwlock_t term_rwlock_;
uint32_t term_;

std::string table_name_;
uint32_t partition_id_;
Expand Down
46 changes: 44 additions & 2 deletions include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,22 @@ enum SlotState {
INBUSY = 1,
};

struct LogicOffset {
uint32_t term;
uint64_t index;
LogicOffset()
: term(0), index(0) {}
LogicOffset(uint32_t _term, uint64_t _index)
: term(_term), index(_index) {}
LogicOffset(const LogicOffset& other) {
term = other.term;
index = other.index;
}
std::string ToString() const {
return "term: " + std::to_string(term) + " index: " + std::to_string(index);
}
};

struct BinlogOffset {
uint32_t filenum;
uint64_t offset;
Expand Down Expand Up @@ -132,6 +148,32 @@ struct BinlogOffset {
}
};

struct LogOffset {
LogOffset(const LogOffset& _log_offset) {
b_offset = _log_offset.b_offset;
l_offset = _log_offset.l_offset;
}
LogOffset() : b_offset(), l_offset() {
}
LogOffset(BinlogOffset _b_offset, LogicOffset _l_offset)
: b_offset(_b_offset), l_offset(_l_offset) {
}
bool operator<(const LogOffset& other) const {
return b_offset < other.b_offset;
}
bool operator==(const LogOffset& other) const {
return b_offset == other.b_offset;
}
bool operator>(const LogOffset& other) const {
return b_offset > other.b_offset;
}
std::string ToString() const {
return b_offset.ToString() + " " + l_offset.ToString();
}
BinlogOffset b_offset;
LogicOffset l_offset;
};

//dbsync arg
struct DBSyncArg {
PikaServer* p;
Expand Down Expand Up @@ -176,9 +218,9 @@ const std::string BinlogSyncStateMsg[] = {
};

struct BinlogChip {
BinlogOffset offset_;
LogOffset offset_;
std::string binlog_;
BinlogChip(BinlogOffset offset, std::string binlog) : offset_(offset), binlog_(binlog) {
BinlogChip(LogOffset offset, std::string binlog) : offset_(offset), binlog_(binlog) {
}
BinlogChip(const BinlogChip& binlog_chip) {
offset_ = binlog_chip.offset_;
Expand Down
14 changes: 7 additions & 7 deletions include/pika_kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class SetCmd : public Cmd {
}
virtual std::string ToBinlog(
uint32_t exec_time,
const std::string& server_id,
uint32_t term_id,
uint64_t logic_id,
uint32_t filenum,
uint64_t offset) override;
Expand Down Expand Up @@ -273,7 +273,7 @@ class SetnxCmd : public Cmd {
virtual void DoInitial() override;
virtual std::string ToBinlog(
uint32_t exec_time,
const std::string& server_id,
uint32_t term_id,
uint64_t logic_id,
uint32_t filenum,
uint64_t offset) override;
Expand All @@ -299,7 +299,7 @@ class SetexCmd : public Cmd {
virtual void DoInitial() override;
virtual std::string ToBinlog(
uint32_t exec_time,
const std::string& server_id,
uint32_t term_id,
uint64_t logic_id,
uint32_t filenum,
uint64_t offset) override;
Expand All @@ -325,7 +325,7 @@ class PsetexCmd : public Cmd {
virtual void DoInitial() override;
virtual std::string ToBinlog(
uint32_t exec_time,
const std::string& server_id,
uint32_t term_id,
uint64_t logic_id,
uint32_t filenum,
uint64_t offset) override;
Expand Down Expand Up @@ -479,7 +479,7 @@ class ExpireCmd : public Cmd {
virtual void DoInitial() override;
virtual std::string ToBinlog(
uint32_t exec_time,
const std::string& server_id,
uint32_t term_id,
uint64_t logic_id,
uint32_t filenum,
uint64_t offset) override;
Expand All @@ -504,7 +504,7 @@ class PexpireCmd : public Cmd {
virtual void DoInitial() override;
virtual std::string ToBinlog(
uint32_t exec_time,
const std::string& server_id,
uint32_t term_id,
uint64_t logic_id,
uint32_t filenum,
uint64_t offset) override;
Expand Down Expand Up @@ -548,7 +548,7 @@ class PexpireatCmd : public Cmd {
virtual void DoInitial() override;
virtual std::string ToBinlog(
uint32_t exec_time,
const std::string& server_id,
uint32_t term_id,
uint64_t logic_id,
uint32_t filenum,
uint64_t offset) override;
Expand Down
4 changes: 2 additions & 2 deletions include/pika_repl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ class PikaReplClient {
uint32_t port,
const std::string& table_name,
uint32_t partition_id,
const BinlogOffset& ack_start,
const BinlogOffset& ack_end,
const LogOffset& ack_start,
const LogOffset& ack_end,
const std::string& local_ip,
bool is_frist_send);
Status SendRemoveSlaveNode(const std::string& ip,
Expand Down
8 changes: 4 additions & 4 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ class SyncMasterPartition : public SyncPartition {
// consistency use
Status ConsistencyUpdateSlave(
const std::string& ip, int port,
const BinlogOffset& start,
const BinlogOffset& end);
const LogOffset& start,
const LogOffset& end);
Status ConsistencyProposeLog(
std::shared_ptr<Cmd> cmd_ptr,
std::shared_ptr<PikaClientConn> conn_ptr,
Expand Down Expand Up @@ -184,7 +184,7 @@ class PikaReplicaManager {
Status SendPartitionTrySyncRequest(const std::string& table_name, size_t partition_id);
Status SendPartitionDBSyncRequest(const std::string& table_name, size_t partition_id);
Status SendPartitionBinlogSyncAckRequest(const std::string& table, uint32_t partition_id,
const BinlogOffset& ack_start, const BinlogOffset& ack_end,
const LogOffset& ack_start, const LogOffset& ack_end,
bool is_first_send = false);
Status CloseReplClientConn(const std::string& ip, int32_t port);

Expand Down Expand Up @@ -218,7 +218,7 @@ class PikaReplicaManager {
Status LostConnection(const std::string& ip, int port);

// Update binlog win and try to send next binlog
Status UpdateSyncBinlogStatus(const RmNode& slave, const BinlogOffset& offset_start, const BinlogOffset& offset_end);
Status UpdateSyncBinlogStatus(const RmNode& slave, const LogOffset& offset_start, const LogOffset& offset_end);

Status WakeUpBinlogSync();

Expand Down
Loading

0 comments on commit 4c1c07e

Please sign in to comment.