Skip to content

Commit

Permalink
Refactor ReplicaManager (OpenAtomFoundation#813)
Browse files Browse the repository at this point in the history
Remove BinlogReaderManger
Separate class SlaveNode into single file
  • Loading branch information
whoiami committed Mar 20, 2020
1 parent dd677ba commit 4143daf
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 198 deletions.
11 changes: 11 additions & 0 deletions include/pika_consistency.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@
#include "include/pika_client_conn.h"
#include "include/pika_define.h"

//class SyncProgress {
// GetSlaveNode();
// GetAllSlaveNodes();
// AddSlaveNode();
// RemoveSlaveNode();
// SlaveSize();
// private:
// pthread_rwlock_t rwlock_;
// std::unordered_map<std::string, std::shared_ptr<SlaveNode>> slaves_;
//};

class ConsistencyCoordinator {
public:
struct LogItem {
Expand Down
87 changes: 1 addition & 86 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "include/pika_repl_server.h"
#include "include/pika_stable_log.h"
#include "include/pika_consistency.h"
#include "include/pika_slave_node.h"

#define kBinlogSendPacketNum 40
#define kBinlogSendBatchNum 100
Expand All @@ -29,74 +30,6 @@

using slash::Status;

struct SyncWinItem {
BinlogOffset offset_;
bool acked_;
bool operator==(const SyncWinItem& other) const {
if (offset_.filenum == other.offset_.filenum && offset_.offset == other.offset_.offset) {
return true;
}
return false;
}
explicit SyncWinItem(const BinlogOffset& offset) : offset_(offset), acked_(false) {
}
SyncWinItem(uint32_t filenum, uint64_t offset) : offset_(filenum, offset), acked_(false) {
}
std::string ToString() const {
return offset_.ToString() + " acked: " + std::to_string(acked_);
}
};

class SyncWindow {
public:
SyncWindow() {
}
void Push(const SyncWinItem& item);
bool Update(const SyncWinItem& start_item, const SyncWinItem& end_item, BinlogOffset* acked_offset);
int Remainings();
std::string ToStringStatus() const {
if (win_.empty()) {
return " Size: " + std::to_string(win_.size()) + "\r\n";
} else {
std::string res;
res += " Size: " + std::to_string(win_.size()) + "\r\n";
res += (" Begin_item: " + win_.begin()->ToString() + "\r\n");
res += (" End_item: " + win_.rbegin()->ToString() + "\r\n");
return res;
}
}
private:
// TODO(whoiami) ring buffer maybe
std::deque<SyncWinItem> win_;
};

// role master use
class SlaveNode : public RmNode {
public:
SlaveNode(const std::string& ip, int port, const std::string& table_name, uint32_t partition_id, int session_id);
~SlaveNode();
void Lock() {
slave_mu.Lock();
}
void Unlock() {
slave_mu.Unlock();
}
SlaveState slave_state;

BinlogSyncState b_state;
SyncWindow sync_win;
BinlogOffset sent_offset;
BinlogOffset acked_offset;

std::string ToStringStatus();

std::shared_ptr<PikaBinlogReader> binlog_reader;
Status InitBinlogFileReader(const std::shared_ptr<Binlog>& binlog, const BinlogOffset& offset);
void ReleaseBinlogFileReader();

slash::Mutex slave_mu;
};

class SyncPartition {
public:
SyncPartition(const std::string& table_name, uint32_t partition_id);
Expand Down Expand Up @@ -174,11 +107,7 @@ class SyncMasterPartition : public SyncPartition {

private:
bool CheckReadBinlogFromCache();
// inovker need to hold partition_mu_
void CleanMasterNode();
void CleanSlaveNode();
// invoker need to hold slave_mu_
Status ReadCachedBinlogToWq(const std::shared_ptr<SlaveNode>& slave_ptr);
Status ReadBinlogFileToWq(const std::shared_ptr<SlaveNode>& slave_ptr);
// inovker need to hold partition_mu_
Status GetSlaveNode(const std::string& ip, int port, std::shared_ptr<SlaveNode>* slave_node);
Expand Down Expand Up @@ -234,18 +163,6 @@ class SyncSlavePartition : public SyncPartition {
std::string local_ip_;
};

class BinlogReaderManager {
public:
~BinlogReaderManager();
Status FetchBinlogReader(const RmNode& rm_node, std::shared_ptr<PikaBinlogReader>* reader);
Status ReleaseBinlogReader(const RmNode& rm_node);
std::string ToStringStatus();
private:
slash::Mutex reader_mu_;
std::unordered_map<RmNode, std::shared_ptr<PikaBinlogReader>, hash_rm_node> occupied_;
std::vector<std::shared_ptr<PikaBinlogReader>> vacant_;
};

class PikaReplicaManager {
public:
PikaReplicaManager();
Expand Down Expand Up @@ -325,8 +242,6 @@ class PikaReplicaManager {
void ReplServerRemoveClientConn(int fd);
void ReplServerUpdateClientConnMap(const std::string& ip_port, int fd);

BinlogReaderManager binlog_reader_mgr;

private:
void InitPartition();
Status SelectLocalIp(const std::string& remote_ip,
Expand Down
83 changes: 83 additions & 0 deletions include/pika_slave_node.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef PIKA_SLAVE_NODE_H_
#define PIKA_SLAVE_NODE_H_

#include <deque>
#include <memory>

#include "include/pika_define.h"
#include "include/pika_binlog_reader.h"

struct SyncWinItem {
BinlogOffset offset_;
bool acked_;
bool operator==(const SyncWinItem& other) const {
if (offset_.filenum == other.offset_.filenum && offset_.offset == other.offset_.offset) {
return true;
}
return false;
}
explicit SyncWinItem(const BinlogOffset& offset) : offset_(offset), acked_(false) {
}
SyncWinItem(uint32_t filenum, uint64_t offset) : offset_(filenum, offset), acked_(false) {
}
std::string ToString() const {
return offset_.ToString() + " acked: " + std::to_string(acked_);
}
};


class SyncWindow {
public:
SyncWindow() {
}
void Push(const SyncWinItem& item);
bool Update(const SyncWinItem& start_item, const SyncWinItem& end_item, BinlogOffset* acked_offset);
int Remainings();
std::string ToStringStatus() const {
if (win_.empty()) {
return " Size: " + std::to_string(win_.size()) + "\r\n";
} else {
std::string res;
res += " Size: " + std::to_string(win_.size()) + "\r\n";
res += (" Begin_item: " + win_.begin()->ToString() + "\r\n");
res += (" End_item: " + win_.rbegin()->ToString() + "\r\n");
return res;
}
}
private:
// TODO(whoiami) ring buffer maybe
std::deque<SyncWinItem> win_;
};

// role master use
class SlaveNode : public RmNode {
public:
SlaveNode(const std::string& ip, int port, const std::string& table_name, uint32_t partition_id, int session_id);
~SlaveNode();
void Lock() {
slave_mu.Lock();
}
void Unlock() {
slave_mu.Unlock();
}
SlaveState slave_state;

BinlogSyncState b_state;
SyncWindow sync_win;
BinlogOffset sent_offset;
BinlogOffset acked_offset;

std::string ToStringStatus();

std::shared_ptr<PikaBinlogReader> binlog_reader;
Status InitBinlogFileReader(const std::shared_ptr<Binlog>& binlog, const BinlogOffset& offset);

slash::Mutex slave_mu;
};

#endif // PIKA_SLAVE_NODE_H
119 changes: 7 additions & 112 deletions src/pika_rm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,87 +19,6 @@ extern PikaConf *g_pika_conf;
extern PikaReplicaManager* g_pika_rm;
extern PikaServer *g_pika_server;

/* BinlogReaderManager */

BinlogReaderManager::~BinlogReaderManager() {
}

Status BinlogReaderManager::FetchBinlogReader(const RmNode& rm_node, std::shared_ptr<PikaBinlogReader>* reader) {
slash::MutexLock l(&reader_mu_);
if (occupied_.find(rm_node) != occupied_.end()) {
return Status::Corruption(rm_node.ToString() + " exist");
}
if (vacant_.empty()) {
*reader = std::make_shared<PikaBinlogReader>();
} else {
*reader = *(vacant_.begin());
vacant_.erase(vacant_.begin());
}
occupied_[rm_node] = *reader;
return Status::OK();
}

Status BinlogReaderManager::ReleaseBinlogReader(const RmNode& rm_node) {
slash::MutexLock l(&reader_mu_);
if (occupied_.find(rm_node) == occupied_.end()) {
return Status::NotFound(rm_node.ToString());
}
std::shared_ptr<PikaBinlogReader> reader = occupied_[rm_node];
occupied_.erase(rm_node);
vacant_.push_back(reader);
return Status::OK();
}

/* SlaveNode */

SlaveNode::SlaveNode(const std::string& ip, int port,
const std::string& table_name,
uint32_t partition_id, int session_id)
: RmNode(ip, port, table_name, partition_id, session_id),
slave_state(kSlaveNotSync),
b_state(kNotSync), sent_offset(), acked_offset() {
}

SlaveNode::~SlaveNode() {
if (b_state == kReadFromFile && binlog_reader != nullptr) {
RmNode rm_node(Ip(), Port(), TableName(), PartitionId());
ReleaseBinlogFileReader();
}
}

Status SlaveNode::InitBinlogFileReader(const std::shared_ptr<Binlog>& binlog,
const BinlogOffset& offset) {
Status s = g_pika_rm->binlog_reader_mgr.FetchBinlogReader(
RmNode(Ip(), Port(), NodePartitionInfo()), &binlog_reader);
if (!s.ok()) {
return s;
}
int res = binlog_reader->Seek(binlog, offset.filenum, offset.offset);
if (res) {
g_pika_rm->binlog_reader_mgr.ReleaseBinlogReader(
RmNode(Ip(), Port(), NodePartitionInfo()));
return Status::Corruption(ToString() + " binlog reader init failed");
}
return Status::OK();
}

void SlaveNode::ReleaseBinlogFileReader() {
g_pika_rm->binlog_reader_mgr.ReleaseBinlogReader(
RmNode(Ip(), Port(), NodePartitionInfo()));
binlog_reader = nullptr;
}

std::string SlaveNode::ToStringStatus() {
std::stringstream tmp_stream;
tmp_stream << " Slave_state: " << SlaveStateMsg[slave_state] << "\r\n";
tmp_stream << " Binlog_sync_state: " << BinlogSyncStateMsg[b_state] << "\r\n";
tmp_stream << " Sync_window: " << "\r\n" << sync_win.ToStringStatus();
tmp_stream << " Sent_offset: " << sent_offset.ToString() << "\r\n";
tmp_stream << " Acked_offset: " << acked_offset.ToString() << "\r\n";
tmp_stream << " Binlog_reader activated: " << (binlog_reader != nullptr) << "\r\n";
return tmp_stream.str();
}

/* SyncPartition */

SyncPartition::SyncPartition(const std::string& table_name, uint32_t partition_id)
Expand All @@ -125,10 +44,6 @@ SyncMasterPartition::SyncMasterPartition(const std::string& table_name, uint32_t
stable_logger_ = std::make_shared<StableLog>(table_name, partition_id, log_path);
}

bool SyncMasterPartition::CheckReadBinlogFromCache() {
return false;
}

int SyncMasterPartition::GetNumberOfSlaveNode() {
slash::MutexLock l(&partition_mu_);
return slaves_.size();
Expand Down Expand Up @@ -201,26 +116,18 @@ Status SyncMasterPartition::ActivateSlaveBinlogSync(const std::string& ip,
if (!s.ok()) {
return s;
}
bool read_cache = CheckReadBinlogFromCache();

slave_ptr->Lock();
slave_ptr->slave_state = kSlaveBinlogSync;
slave_ptr->sent_offset = offset;
slave_ptr->acked_offset = offset;
if (read_cache) {
// read binlog file from file
s = slave_ptr->InitBinlogFileReader(Logger(), offset);
if (!s.ok()) {
slave_ptr->Unlock();
// RegistToBinlogCacheWindow(ip, port, offset);
slave_ptr->Lock();
slave_ptr->b_state = kReadFromCache;
} else {
// read binlog file from file
s = slave_ptr->InitBinlogFileReader(Logger(), offset);
if (!s.ok()) {
slave_ptr->Unlock();
return Status::Corruption("Init binlog file reader failed" + s.ToString());
}
slave_ptr->b_state = kReadFromFile;
return Status::Corruption("Init binlog file reader failed" + s.ToString());
}
slave_ptr->b_state = kReadFromFile;
slave_ptr->Unlock();
}

Expand All @@ -241,11 +148,7 @@ Status SyncMasterPartition::SyncBinlogToWq(const std::string& ip, int port) {

{
slash::MutexLock l(&slave_ptr->slave_mu);
if (slave_ptr->b_state == kReadFromFile) {
ReadBinlogFileToWq(slave_ptr);
} else if (slave_ptr->b_state == kReadFromCache) {
ReadCachedBinlogToWq(slave_ptr);
}
ReadBinlogFileToWq(slave_ptr);
}
return Status::OK();
}
Expand All @@ -266,10 +169,6 @@ Status SyncMasterPartition::ActivateSlaveDbSync(const std::string& ip, int port)
return Status::OK();
}

Status SyncMasterPartition::ReadCachedBinlogToWq(const std::shared_ptr<SlaveNode>& slave_ptr) {
return Status::OK();
}

Status SyncMasterPartition::ReadBinlogFileToWq(const std::shared_ptr<SlaveNode>& slave_ptr) {
int cnt = slave_ptr->sync_win.Remainings();
std::shared_ptr<PikaBinlogReader> reader = slave_ptr->binlog_reader;
Expand Down Expand Up @@ -380,11 +279,7 @@ Status SyncMasterPartition::WakeUpSlaveBinlogSync() {
{
slash::MutexLock l(&slave_ptr->slave_mu);
if (slave_ptr->sent_offset == slave_ptr->acked_offset) {
if (slave_ptr->b_state == kReadFromFile) {
ReadBinlogFileToWq(slave_ptr);
} else if (slave_ptr->b_state == kReadFromCache) {
ReadCachedBinlogToWq(slave_ptr);
}
ReadBinlogFileToWq(slave_ptr);
}
}
}
Expand Down
Loading

0 comments on commit 4143daf

Please sign in to comment.