Skip to content

Commit

Permalink
binlog sync need to check session id (OpenAtomFoundation#589)
Browse files Browse the repository at this point in the history
  • Loading branch information
Axlgrep committed May 15, 2019
1 parent b616fdb commit 47ee479
Show file tree
Hide file tree
Showing 14 changed files with 245 additions and 99 deletions.
49 changes: 38 additions & 11 deletions include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,42 @@ class Node {
class RmNode : public Node {
public:
RmNode(const std::string& ip, int port,
const PartitionInfo& partition_info)
: Node(ip, port), partition_info_(partition_info), session_id_(0), last_send_time_(0), last_recv_time_(0) {
}
RmNode(const std::string& ip, int port, const std::string& table_name, uint32_t partition_id) : Node(ip, port), partition_info_(table_name, partition_id), session_id_(0), last_send_time_(0), last_recv_time_(0) {
}
RmNode() : Node(), partition_info_(), session_id_(0), last_send_time_(0), last_recv_time_(0) {
}
RmNode(const std::string& table_name, uint32_t partition_id) : Node(), partition_info_(table_name, partition_id), session_id_(0), last_send_time_(0), last_recv_time_(0) {
}
const PartitionInfo& partition_info)
: Node(ip, port),
partition_info_(partition_info),
session_id_(0),
last_send_time_(0),
last_recv_time_(0) {}

RmNode(const std::string& ip,
int port,
const std::string& table_name,
uint32_t partition_id)
: Node(ip, port),
partition_info_(table_name, partition_id),
session_id_(0),
last_send_time_(0),
last_recv_time_(0) {}

RmNode(const std::string& ip,
int port,
const std::string& table_name,
uint32_t partition_id,
int32_t session_id)
: Node(ip, port),
partition_info_(table_name, partition_id),
session_id_(session_id),
last_send_time_(0),
last_recv_time_(0) {}

RmNode(const std::string& table_name,
uint32_t partition_id)
: Node(),
partition_info_(table_name, partition_id),
session_id_(0),
last_send_time_(0),
last_recv_time_(0) {}

virtual ~RmNode() = default;
bool operator==(const RmNode& other) const {
if (partition_info_.table_name_ == other.TableName()
Expand All @@ -207,7 +234,7 @@ class RmNode : public Node {
void SetSessionId(uint32_t session_id) {
session_id_ = session_id;
}
uint32_t SessionId() const {
int32_t SessionId() const {
return session_id_;
}
std::string ToString() const {
Expand All @@ -227,7 +254,7 @@ class RmNode : public Node {
}
private:
PartitionInfo partition_info_;
uint32_t session_id_;
int32_t session_id_;
uint64_t last_send_time_;
uint64_t last_recv_time_;
};
Expand Down
1 change: 0 additions & 1 deletion include/pika_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ class Partition : public std::enable_shared_from_this<Partition> {
ReplState repl_state_;
bool full_sync_;


/*
* BgSave use
*/
Expand Down
1 change: 1 addition & 0 deletions include/pika_repl_bgworker.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class PikaReplBgWorker {
int StartThread();
int StopThread();
void Schedule(pink::TaskFunc func, void* arg);
void QueueClear();
static void HandleBGWorkerWriteBinlog(void* arg);
static void HandleBGWorkerWriteDB(void* arg);

Expand Down
9 changes: 5 additions & 4 deletions include/pika_repl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class PikaReplClient {
void ScheduleWriteDBTask(const std::string& dispatch_key,
PikaCmdArgsType* argv, BinlogItem* binlog_item,
const std::string& table_name, uint32_t partition_id);
void DropWriteBinlogTask();

Status SendMetaSync();
Status SendPartitionDBSync(const std::string& table_name,
Expand All @@ -93,10 +94,10 @@ class PikaReplClient {
uint32_t partition_id,
const BinlogOffset& boffset);
Status SendPartitionBinlogSync(const std::string& table_name,
uint32_t partition_id,
const BinlogOffset& ack_start,
const BinlogOffset& ack_end,
bool is_frist_send);
uint32_t partition_id,
const BinlogOffset& ack_start,
const BinlogOffset& ack_end,
bool is_frist_send);
private:
size_t GetHashIndex(std::string key, bool upper_half);
void UpdateNextAvail() {
Expand Down
28 changes: 24 additions & 4 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class SyncWindow {
// role master use
class SlaveNode : public RmNode {
public:
SlaveNode(const std::string& ip, int port, const std::string& table_name, uint32_t partition_id);
SlaveNode(const std::string& ip, int port, const std::string& table_name, uint32_t partition_id, int session_id);
~SlaveNode();
void Lock() {
slave_mu.Lock();
Expand Down Expand Up @@ -113,7 +113,7 @@ class SyncPartition {
class SyncMasterPartition : public SyncPartition {
public:
SyncMasterPartition(const std::string& table_name, uint32_t partition_id);
Status AddSlaveNode(const std::string& ip, int port);
Status AddSlaveNode(const std::string& ip, int port, int session_id);
Status RemoveSlaveNode(const std::string& ip, int port);

Status ActivateSlaveBinlogSync(const std::string& ip, int port, const std::shared_ptr<Binlog> binlog, const BinlogOffset& offset);
Expand All @@ -134,6 +134,11 @@ class SyncMasterPartition : public SyncPartition {

std::string ToStringStatus();

int32_t GenSessionId();
bool CheckSessionId(const std::string& ip, int port,
const std::string& table_name,
uint64_t partition_id, int session_id);

private:
bool CheckReadBinlogFromCache();
// inovker need to hold partition_mu_
Expand All @@ -146,8 +151,11 @@ class SyncMasterPartition : public SyncPartition {
Status GetSlaveNode(const std::string& ip, int port, std::shared_ptr<SlaveNode>* slave_node);

slash::Mutex partition_mu_;

std::vector<std::shared_ptr<SlaveNode>> slaves_;

slash::Mutex session_mu_;
int32_t session_id_;

// BinlogCacheWindow win_;
};

Expand All @@ -165,10 +173,13 @@ class SyncSlavePartition : public SyncPartition {
const std::string& MasterIp() {
return m_info_.Ip();
}

int MasterPort() {
return m_info_.Port();
}
int32_t MasterSessionId() {
return m_info_.SessionId();
}

private:
slash::Mutex partition_mu_;

Expand Down Expand Up @@ -229,6 +240,15 @@ class PikaReplicaManager {

Status WakeUpBinlogSync();

// Session Id
int32_t GenPartitionSessionId(const std::string& table_name, uint32_t partition_id);
int32_t GetSlavePartitionSessionId(const std::string& table_name, uint32_t partition_id);
bool CheckSlavePartitionSessionId(const std::string& table_name, uint32_t partition_id,
int session_id);
bool CheckMasterPartitionSessionId(const std::string& ip, int port,
const std::string& table_name,
uint32_t partition_id, int session_id);

// write_queue related
void ProduceWriteQueue(const std::string& ip, int port, const std::vector<WriteTask>& tasks);
int ConsumeWriteQueue();
Expand Down
1 change: 0 additions & 1 deletion include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ class PikaServer {
int64_t GenSid();
void BecomeMaster();
void DeleteSlave(int fd); //conn fd
void DeleteSlave(const std::string& ip, int64_t port);
int32_t CountSyncSlaves();
int32_t GetSlaveListString(std::string& slave_list_str);
int64_t TryAddSlave(const std::string& ip, int64_t port, int fd,
Expand Down
24 changes: 6 additions & 18 deletions src/pika_inner_message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,15 @@ message InnerRequest {
required uint32 partition_id = 3;
required BinlogOffset ack_range_start = 4;
required BinlogOffset ack_range_end = 5;
required bool first_send = 6;
}

message HeatBeat {
required Node node = 1;
required string ping = 2;
optional int64 sid = 3;
required int32 session_id = 6;
required bool first_send = 7;
}

required Type type = 1;
optional MetaSync meta_sync = 2;
optional TrySync try_sync = 3;
optional DBSync db_sync = 4;
optional BinlogSync binlog_sync = 5;
optional HeatBeat heat_beat = 6;
}

message PartitionInfo {
Expand Down Expand Up @@ -101,10 +95,9 @@ message InnerResponse {
kError = 4;
}
required ReplyCode reply_code = 1;
required Node node = 2;
required Partition partition = 3;
optional BinlogOffset binlog_offset = 4;
optional int32 sid = 5;
required Partition partition = 2;
optional BinlogOffset binlog_offset = 3;
optional int32 session_id = 4;
}

message DBSync {
Expand All @@ -120,11 +113,7 @@ message InnerResponse {
required Partition partition = 1;
required BinlogOffset binlog_offset = 2;
required bytes binlog = 3;
}

// slave to master
message HeatBeat {
required string pong = 1;
required int32 session_id = 4;
}

required Type type = 1;
Expand All @@ -134,5 +123,4 @@ message InnerResponse {
optional DBSync db_sync = 5;
optional TrySync try_sync = 6;
repeated BinlogSync binlog_sync = 7;
optional HeatBeat heat_beat = 8;
}
17 changes: 17 additions & 0 deletions src/pika_repl_bgworker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ void PikaReplBgWorker::Schedule(pink::TaskFunc func, void* arg) {
bg_thread_.Schedule(func, arg);
}

void PikaReplBgWorker::QueueClear() {
bg_thread_.QueueClear();
}

void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) {
ReplClientWriteBinlogTaskArg* task_arg = static_cast<ReplClientWriteBinlogTaskArg*>(arg);
const std::shared_ptr<InnerMessage::InnerResponse> res = task_arg->res;
Expand Down Expand Up @@ -69,6 +73,19 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) {

for (size_t i = 0; i < index->size(); ++i) {
const InnerMessage::InnerResponse::BinlogSync& binlog_res = res->binlog_sync((*index)[i]);
if (!g_pika_rm->CheckSlavePartitionSessionId(
binlog_res.partition().table_name(),
binlog_res.partition().partition_id(),
binlog_res.session_id())) {
LOG(WARNING) << "Check Session failed "
<< binlog_res.partition().table_name()
<< "_" << binlog_res.partition().partition_id();
conn->NotifyClose();
delete index;
delete task_arg;
return;
}

// empty binlog treated as keepalive packet
if (binlog_res.binlog().empty()) {
continue;
Expand Down
11 changes: 11 additions & 0 deletions src/pika_repl_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
#include "slash/include/env.h"
#include "slash/include/slash_string.h"

#include "include/pika_rm.h"
#include "include/pika_server.h"

extern PikaServer* g_pika_server;
extern PikaReplicaManager* g_pika_rm;

PikaReplClient::PikaReplClient(int cron_interval, int keepalive_timeout) : next_avail_(0) {
client_thread_ = new PikaReplClientThread(cron_interval, keepalive_timeout);
Expand Down Expand Up @@ -82,6 +84,12 @@ void PikaReplClient::ScheduleWriteDBTask(const std::string& dispatch_key,
bg_workers_[index]->Schedule(&PikaReplBgWorker::HandleBGWorkerWriteDB, static_cast<void*>(task_arg));
}

void PikaReplClient::DropWriteBinlogTask() {
for (size_t idx = 0; idx < bg_workers_.size() / 2; ++idx) {
bg_workers_[idx]->QueueClear();
}
}

size_t PikaReplClient::GetHashIndex(std::string key, bool upper_half) {
size_t hash_base = bg_workers_.size() / 2;
return (str_hash(key) % hash_base) + (upper_half ? 0 : hash_base);
Expand Down Expand Up @@ -221,6 +229,9 @@ Status PikaReplClient::SendPartitionBinlogSync(const std::string& table_name,
ack_range_end->set_filenum(ack_end.filenum);
ack_range_end->set_offset(ack_end.offset);

int32_t session_id = g_pika_rm->GetSlavePartitionSessionId(table_name, partition_id);
binlog_sync->set_session_id(session_id);

std::string to_send;
std::string master_ip = g_pika_server->master_ip();
int master_port = g_pika_server->master_port();
Expand Down
10 changes: 5 additions & 5 deletions src/pika_repl_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ void PikaReplClientConn::HandleTrySyncResponse(void* arg) {

const InnerMessage::InnerResponse_TrySync& try_sync_response = response->try_sync();
const InnerMessage::Partition& partition_response = try_sync_response.partition();
const InnerMessage::Node& node = try_sync_response.node();
std::string table_name = partition_response.table_name();
uint32_t partition_id = partition_response.partition_id();
std::shared_ptr<Partition> partition = g_pika_server->GetTablePartitionById(table_name, partition_id);
Expand All @@ -203,9 +202,10 @@ void PikaReplClientConn::HandleTrySyncResponse(void* arg) {
std::string partition_name = partition->GetPartitionName();
if (try_sync_response.reply_code() == InnerMessage::InnerResponse::TrySync::kOk) {
BinlogOffset boffset;
uint32_t session_id = try_sync_response.session_id();
partition->SetReplState(ReplState::kConnected);
partition->logger()->GetProducerStatus(&boffset.filenum, &boffset.offset);
g_pika_rm->AddSyncSlavePartition(RmNode(node.ip(), node.port(), table_name, partition_id));
g_pika_rm->AddSyncSlavePartition(RmNode(g_pika_server->master_ip(), g_pika_server->master_port(), table_name, partition_id, session_id));
g_pika_server->SendPartitionBinlogSyncAckRequest(table_name, partition_id, boffset, boffset, true);
LOG(INFO) << "Partition: " << partition_name << " TrySync Ok";
} else if (try_sync_response.reply_code() == InnerMessage::InnerResponse::TrySync::kSyncPointBePurged) {
Expand All @@ -227,10 +227,10 @@ void PikaReplClientConn::DispatchBinlogRes(const std::shared_ptr<InnerMessage::I
// partition to a bunch of binlog chips
std::unordered_map<PartitionInfo, std::vector<int>*, hash_partition_info> par_binlog;
for (int i = 0; i < res->binlog_sync_size(); ++i) {
const InnerMessage::InnerResponse::BinlogSync& binlog_response = res->binlog_sync(i);
const InnerMessage::InnerResponse::BinlogSync& binlog_res = res->binlog_sync(i);
// hash key: table + partition_id
const InnerMessage::Partition& partition = binlog_response.partition();
PartitionInfo p_info(partition.table_name(), partition.partition_id());
PartitionInfo p_info(binlog_res.partition().table_name(),
binlog_res.partition().partition_id());
if (par_binlog.find(p_info) == par_binlog.end()) {
par_binlog[p_info] = new std::vector<int>();
}
Expand Down
Loading

0 comments on commit 47ee479

Please sign in to comment.