Skip to content

Commit

Permalink
Multi parts (OpenAtomFoundation#526)
Browse files Browse the repository at this point in the history
* Update pink support DestConnectFailedHandle func to handle connect failed situation

* Update Pink

* Add Async Repl Server Binlog Process

1. Add Thread array for writing binlog and writing DB
2. Writing same partition binlog by a preassigned thread in the thread array
  which assigned by hashing its table+partition
3. Writing same key to DB by a preassigned thread in the thread array
  which assigned by hashing by its key
4. Add write queue in repl client, all binlog chips will be forwared into that queue first
  and scheduled to send from the queue later.

* Add Repl Client Thread Pool

1. Build a thread pool to process send response.
  Pb response triggers reading binlog and hadnling binlog and that will block a single thread.
  Using thread pool instead of single thread will ease that situation.
2. Add rw lock for binlog ctl array in repl clent
3. Add mutex lock for every binlog ctl

* Change TrySync MetaSync into async

* Debug Binlog Syncing Process

1. Periodically consume write queue to sync binlog to slave using auxiliary thread
2. Trigger sending binlog process using auxiliary thread

* Fix Rebase
  • Loading branch information
whoiami authored and Axlgrep committed May 15, 2019
1 parent 08636e4 commit 263ef95
Show file tree
Hide file tree
Showing 17 changed files with 1,000 additions and 305 deletions.
10 changes: 8 additions & 2 deletions include/pika_auxiliary_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,17 @@

#include "pink/include/pink_thread.h"

#include "slash/include/slash_mutex.h"

class PikaAuxiliaryThread : public pink::Thread {
public:
PikaAuxiliaryThread() {}
PikaAuxiliaryThread() :
mu_(),
cv_(&mu_) {
}
virtual ~PikaAuxiliaryThread();

slash::Mutex mu_;
slash::CondVar cv_;
private:
virtual void* ThreadMain();
void RunEveryPartitionStateMachine();
Expand Down
72 changes: 72 additions & 0 deletions include/pika_repl_bgworker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (c) 2019-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef PIKA_REPL_BGWROKER_H_
#define PIKA_REPL_BGWROKER_H_

#include <memory>
#include <string>

#include "pink/include/bg_thread.h"
#include "pink/include/pb_conn.h"

#include "src/pika_inner_message.pb.h"

#include "include/pika_command.h"
#include "include/pika_binlog_transverter.h"

class PikaReplBgWorker {
public:
explicit PikaReplBgWorker(int queue_size);
void ScheduleRequest(const std::shared_ptr<InnerMessage::InnerRequest> req,
std::shared_ptr<pink::PbConn> conn, void* req_private_data);
void ScheduleWriteDb(PikaCmdArgsType* argv, BinlogItem* binlog_item,
const std::string table_name, uint32_t partition_id);
int StartThread();
void QueueSize(int* size_a, int* size_b) {
bg_thread_.QueueSize(size_a, size_b);
}

static void HandleMetaSyncRequest(void* arg);
static void HandleBinlogSyncRequest(void* arg);
static void HandleTrySyncRequest(void* arg);
static void HandleWriteDb(void* arg);

BinlogItem binlog_item_;
pink::RedisParser redis_parser_;
std::string ip_port_;
std::string table_name_;
uint32_t partition_id_;

private:
pink::BGThread bg_thread_;

static int HandleWriteBinlog(pink::RedisParser* parser, const pink::RedisCmdArgsType& argv);

struct ReplBgWorkerArg {
const std::shared_ptr<InnerMessage::InnerRequest> req;
std::shared_ptr<pink::PbConn> conn;
void* req_private_data;
PikaReplBgWorker* worker;
ReplBgWorkerArg(const std::shared_ptr<InnerMessage::InnerRequest> _req, std::shared_ptr<pink::PbConn> _conn, void* _req_private_data, PikaReplBgWorker* _worker) : req(_req), conn(_conn), req_private_data(_req_private_data), worker(_worker) {
}
};

struct WriteDbBgArg {
PikaCmdArgsType *argv;
BinlogItem* binlog_item;
std::string table_name;
uint32_t partition_id;
WriteDbBgArg(PikaCmdArgsType* _argv, BinlogItem* _binlog_item, const std::string _table_name, uint32_t _partition_id)
: argv(_argv), binlog_item(_binlog_item), table_name(_table_name), partition_id(_partition_id) {
}
~WriteDbBgArg() {
delete argv;
delete binlog_item;
}
};
};

#endif // PIKA_REPL_BGWROKER_H_
92 changes: 86 additions & 6 deletions include/pika_repl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
#include "include/pika_binlog_reader.h"
#include "include/pika_repl_client_thread.h"

#include "pink/include/thread_pool.h"
#include "src/pika_inner_message.pb.h"

#define kBinlogSyncBatchNum 10

using slash::Status;
Expand All @@ -29,6 +32,47 @@ struct RmNode {
int port_;
RmNode(const std::string& table, int partition, const std::string& ip, int port) : table_(table), partition_(partition), ip_(ip), port_(port) {
}
RmNode(const RmNode& node) {
table_ = node.table_;
partition_ = node.partition_;
ip_ = node.ip_;
port_ = node.port_;
}
bool operator <(const RmNode& other) const {
if (table_ < other.table_) {
return true;
} else if (partition_ < other.partition_) {
return true;
} else if (ip_ < other.ip_) {
return true;
} else if (port_ < other.port_) {
return true;
}
return false;
}
std::string ToString() const {
return table_ + "_" + std::to_string(partition_) + "_" + ip_ + ":" + std::to_string(port_);
}
};

struct BinlogChip {
uint32_t file_num_;
uint64_t offset_;
std::string binlog_;
BinlogChip(uint32_t file_num, uint64_t offset, std::string binlog) :file_num_(file_num), offset_(offset), binlog_(binlog) {
}
BinlogChip(const BinlogChip& binlog_chip) {
file_num_ = binlog_chip.file_num_;
offset_ = binlog_chip.offset_;
binlog_ = binlog_chip.binlog_;
}
};

struct WriteTask {
struct RmNode rm_node_;
struct BinlogChip binlog_chip_;
WriteTask(RmNode rm_node, BinlogChip binlog_chip) : rm_node_(rm_node), binlog_chip_(binlog_chip) {
}
};

class PikaReplClient {
Expand All @@ -40,25 +84,61 @@ class PikaReplClient {
int Start();
Status AddBinlogReader(const RmNode& slave, std::shared_ptr<Binlog> logger, uint32_t filenum, uint64_t offset);
Status RemoveBinlogReader(const RmNode& slave);
void RunStateMachine(const RmNode& slave);

bool NeedToSendBinlog(const RmNode& slave);

Status SendMetaSync();
Status SendPartitionTrySync(const std::string& table_name,
uint32_t partition_id,
const BinlogOffset& boffset);
Status SendBinlogSync(const RmNode& slave);

Status TriggerSendBinlogSync();

int ConsumeWriteQueue();

void Schedule(pink::TaskFunc func, void* arg){
client_tp_->Schedule(func, arg);
}

bool SetAckInfo(const RmNode& slave, uint32_t ack_file_num, uint64_t ack_offset, uint64_t active_time);
bool GetAckInfo(const RmNode& slave, uint32_t* act_file_num, uint64_t* ack_offset, uint64_t* active_time);

private:
PikaBinlogReader* NewPikaBinlogReader(std::shared_ptr<Binlog> logger, uint32_t filenum, uint64_t offset);

Status TrySendSyncBinlog(const RmNode& slave);
void BuildBinlogPb(const RmNode& slave, const std::string& msg, uint32_t filenum, uint64_t offset, InnerMessage::InnerRequest& request);
void ProduceWriteQueue(WriteTask& task);

Status BuildBinlogMsgFromFile(const RmNode& slave, std::string* scratch, uint32_t* filenum, uint64_t* offset);
void BuildBinlogPb(const RmNode& slave, const std::string& msg, uint32_t filenum, uint64_t offset, InnerMessage::InnerRequest& request);

PikaReplClientThread* client_thread_;
// keys of this map: table_partition_slaveip:port
std::map<std::string, PikaBinlogReader*> slave_binlog_readers_;


struct BinlogSyncCtl {
slash::Mutex ctl_mu_;
PikaBinlogReader* reader_;
uint32_t ack_file_num_;
uint64_t ack_offset_;
uint64_t active_time_;

BinlogSyncCtl(PikaBinlogReader* reader, uint32_t ack_file_num, uint64_t ack_offset, uint64_t active_time)
: reader_(reader), ack_file_num_(ack_file_num), ack_offset_(ack_offset), active_time_(active_time) {
}
~BinlogSyncCtl() {
if (reader_) {
delete reader_;
}
}
};

pthread_rwlock_t binlog_ctl_rw_;
std::map<RmNode, BinlogSyncCtl*> binlog_ctl_;

slash::Mutex write_queue_mu_;
// every host owns a queue
std::unordered_map<std::string, std::queue<WriteTask>> write_queues_; // ip+port, queue<WriteTask>

pink::ThreadPool* client_tp_;
};

#endif
19 changes: 13 additions & 6 deletions include/pika_repl_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,28 @@

#include "pink/include/pb_conn.h"

#include <memory>

#include "include/pika_conf.h"
#include "src/pika_inner_message.pb.h"

class PikaReplClientConn: public pink::PbConn {
public:
PikaReplClientConn(int fd, const std::string& ip_port, pink::Thread *thread, void* worker_specific_data, pink::PinkEpoll* epoll);
virtual ~PikaReplClientConn() = default;

static void DoReplClientTask(void* arg);
static void HandleBinlogSyncResponse(void* arg);
static void HandleMetaSyncResponse(void* arg);
static void HandleTrySyncResponse(void* arg);
static bool IsTableStructConsistent(const std::vector<TableStruct>& current_tables,
const std::vector<TableStruct>& expect_tables);
int DealMessage() override;
private:
bool IsTableStructConsistent(const std::vector<TableStruct>& current_tables,
const std::vector<TableStruct>& expect_tables);
int HandleMetaSyncResponse(const InnerMessage::InnerResponse& response);
int HandleTrySyncResponse(const InnerMessage::InnerResponse& response);
struct ReplRespArg {
std::shared_ptr<InnerMessage::InnerResponse> resp;
std::shared_ptr<pink::PbConn> conn;
ReplRespArg(std::shared_ptr<InnerMessage::InnerResponse> _resp, std::shared_ptr<pink::PbConn> _conn) : resp(_resp), conn(_conn) {
}
};
};

#endif
4 changes: 4 additions & 0 deletions include/pika_repl_client_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ class ReplClientHandle : public pink::ClientHandle {
// int DeleteWorkerSpecificData(void* data) const override {
// return 0;
// }
// void DestConnectFailedHandle(std::string ip_port, std::string reason) const override {
// std::cout << "ip " << ip_port << " " << reason << std::endl;
// }

};

class PikaReplClientThread : public pink::ClientThread {
Expand Down
43 changes: 37 additions & 6 deletions include/pika_repl_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,46 @@

#include "include/pika_repl_server_thread.h"

#include <vector>

#include "include/pika_repl_bgworker.h"
#include "include/pika_command.h"
#include "pika_binlog_transverter.h"

class PikaReplServer {
public:
PikaReplServer(const std::set<std::string>& ips, int port, int cron_interval);
~PikaReplServer();
public:
PikaReplServer(const std::set<std::string>& ips, int port, int cron_interval);
~PikaReplServer();
int Start();

void ScheduleBinlogSyncTask(std::string table_partition,
const std::shared_ptr<InnerMessage::InnerRequest> req,
std::shared_ptr<pink::PbConn> conn,
void* req_private_data);

void ScheduleMetaSyncTask(const std::shared_ptr<InnerMessage::InnerRequest> req,
std::shared_ptr<pink::PbConn> conn,
void* req_private_data);

void ScheduleTrySyncTask(const std::shared_ptr<InnerMessage::InnerRequest> req,
std::shared_ptr<pink::PbConn> conn,
void* req_private_data);

int Start();
void ScheduleDbTask(const std::string& key,
PikaCmdArgsType* argv,
BinlogItem* binlog_item,
const std::string& table_name,
uint32_t partition_id);
private:
size_t GetHashIndex(std::string key, bool upper_half);
void UpdateNextAvail() {
next_avail_ = (next_avail_ + 1) % bg_workers_.size();
}

private:
PikaReplServerThread* pika_repl_server_thread_;
PikaReplServerThread* pika_repl_server_thread_;
std::vector<PikaReplBgWorker*> bg_workers_;
int next_avail_;
std::hash<std::string> str_hash;
};

#endif
19 changes: 2 additions & 17 deletions include/pika_repl_server_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,19 @@
#include <string>

#include "pink/include/pb_conn.h"
#include "pink/include/redis_parser.h"
#include "pink/include/pink_thread.h"

#include "src/pika_inner_message.pb.h"

#include "include/pika_binlog_transverter.h"

class PikaReplServerThread;

class PikaReplServerConn: public pink::PbConn {
public:
PikaReplServerConn(int fd, std::string ip_port, pink::Thread* thread, void* worker_specific_data, pink::PinkEpoll* epoll);
virtual ~PikaReplServerConn();

int DealMessage();
bool ProcessAuth(const pink::RedisCmdArgsType& argv);
bool ProcessBinlogData(const pink::RedisCmdArgsType& argv, const BinlogItem& binlog_item);

BinlogItem binlog_item_;
private:
static int ParserDealMessage(pink::RedisParser* parser, const pink::RedisCmdArgsType& argv);
int HandleMetaSyncRequest(const InnerMessage::InnerRequest& req);
int HandleTrySync(const InnerMessage::InnerRequest& req);
int HandleBinlogSync(const InnerMessage::InnerRequest& req);

pink::RedisParser redis_parser_;

PikaReplServerThread* binlog_receiver_;
// dispatch binlog by its table_name + partition
void DispatchBinlogReq(const std::shared_ptr<InnerMessage::InnerRequest> req);
};

#endif // INCLUDE_PIKA_REPL_SERVER_CONN_H_
Loading

0 comments on commit 263ef95

Please sign in to comment.