Skip to content

Commit

Permalink
Disptch binlog sync operation to multi thread
Browse files Browse the repository at this point in the history
  • Loading branch information
CatKang committed Apr 28, 2016
1 parent 59d87c6 commit a379de3
Show file tree
Hide file tree
Showing 11 changed files with 209 additions and 63 deletions.
45 changes: 45 additions & 0 deletions include/pika_binlog_bgworker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#ifndef PIKA_BINLOG_BGWORKER_H_
#define PIKA_BINLOG_BGWORKER_H_
#include "pika_command.h"
#include "bg_thread.h"

class BinlogBGWorker{
public:
BinlogBGWorker() {
cmds_.reserve(300);
InitCmdTable(&cmds_);
}
~BinlogBGWorker() {
binlogbg_thread_.Stop();
DestoryCmdTable(cmds_);
}
Cmd* GetCmd(const std::string& opt) {
return GetCmdFromTable(opt, cmds_);
}
void Schedule(PikaCmdArgsType *argv, const std::string& raw_args, uint64_t serial) {
BinlogBGArg *arg = new BinlogBGArg(argv, raw_args, serial, this);
binlogbg_thread_.StartIfNeed();
binlogbg_thread_.Schedule(&DoBinlogBG, static_cast<void*>(arg));
}
static void DoBinlogBG(void* arg);

private:
std::unordered_map<std::string, Cmd*> cmds_;
pink::BGThread binlogbg_thread_;

struct BinlogBGArg {
PikaCmdArgsType *argv;
std::string raw_args;
uint64_t serial;
BinlogBGWorker *myself;
BinlogBGArg(PikaCmdArgsType* _argv, const std::string& _raw, uint64_t _s, BinlogBGWorker* _my) :
argv(_argv), raw_args(_raw), serial(_s), myself(_my) {}
~BinlogBGArg() {
delete argv;
}
};

};


#endif
5 changes: 5 additions & 0 deletions include/pika_binlog_receiver_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ class PikaBinlogReceiverThread : public pink::HolyThread<PikaMasterConn>
return last_sec_thread_querynum_;
}

uint64_t GetnPlusSerial() {
return serial_++;
}

void PlusThreadQuerynum() {
slash::RWLock(&rwlock_, true);
thread_querynum_++;
Expand Down Expand Up @@ -63,5 +67,6 @@ class PikaBinlogReceiverThread : public pink::HolyThread<PikaMasterConn>
uint64_t last_thread_querynum_;
uint64_t last_time_us_;
uint64_t last_sec_thread_querynum_;
uint64_t serial_;
};
#endif
1 change: 0 additions & 1 deletion include/pika_master_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ class PikaMasterConn: public pink::RedisConn {
virtual int DealMessage();
private:
PikaBinlogReceiverThread* self_thread_;
std::string DoCmd(const std::string& opt);
std::string RestoreArgs();
};

Expand Down
26 changes: 26 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define PIKA_SERVER_H_

#include <vector>
#include <functional>
#include <map>
#include <unordered_set>
#include <nemo.h>
Expand All @@ -16,8 +17,10 @@
#include "pika_worker_thread.h"
#include "pika_monitor_thread.h"
#include "pika_define.h"
#include "pika_binlog_bgworker.h"

#include "slash_status.h"
#include "slash_mutex.h"
#include "bg_thread.h"
#include "nemo_backupable.h"

Expand Down Expand Up @@ -252,6 +255,14 @@ class PikaServer
return monitor_thread_;
}

/*
* Binlog Receiver use
*/
void DispatchBinlogBG(const std::string &key,
PikaCmdArgsType* argv, const std::string& raw_args, uint64_t cur_serial);
bool WaitTillBinlogBGSerial(uint64_t my_serial);
void SignalNextBinlogBGSerial();

/*
*for statistic
*/
Expand Down Expand Up @@ -345,6 +356,17 @@ class PikaServer
*/
PikaMonitorThread* monitor_thread_;

/*
* Binlog Receiver use
*/
bool binlogbg_exit_;
slash::Mutex binlogbg_mutex_;
slash::CondVar binlogbg_cond_;
uint64_t binlogbg_serial_;
std::vector<BinlogBGWorker*> binlogbg_workers_;
std::hash<std::string> str_hash;


/*
* for statistic
*/
Expand All @@ -356,4 +378,8 @@ class PikaServer
PikaServer(PikaServer &ps);
void operator =(const PikaServer &ps);
};




#endif
67 changes: 67 additions & 0 deletions src/pika_binlog_bgworker.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#include "pika_binlog_bgworker.h"
#include "pika_server.h"
#include "pika_conf.h"
#include "slash_string.h"

extern PikaServer* g_pika_server;
extern PikaConf* g_pika_conf;

void BinlogBGWorker::DoBinlogBG(void* arg) {
BinlogBGArg *bgarg = static_cast<BinlogBGArg*>(arg);
PikaCmdArgsType argv = *(bgarg->argv);
uint64_t my_serial = bgarg->serial;
BinlogBGWorker *self = bgarg->myself;
std::string opt = argv[0];
slash::StringToLower(opt);

// Get command info
const CmdInfo* const cinfo_ptr = GetCmdInfo(opt);
Cmd* c_ptr = self->GetCmd(opt);
if (!cinfo_ptr || !c_ptr) {
LOG(ERROR) << "Error operation from binlog: " << opt;
}
c_ptr->res().clear();

// Initial
c_ptr->Initial(argv, cinfo_ptr);
if (!c_ptr->res().ok()) {
LOG(ERROR) << "Fail to initial command from binlog: " << opt;
}

uint64_t start_us;
if (g_pika_conf->slowlog_slower_than() >= 0) {
start_us = slash::NowMicros();
}

g_pika_server->mutex_record_.Lock(argv[1]);

// Add read lock for no suspend command
if (!cinfo_ptr->is_suspend()) {
pthread_rwlock_rdlock(g_pika_server->rwlock());
}

// Force the binlog write option to serialize
if (g_pika_server->WaitTillBinlogBGSerial(my_serial)) {
g_pika_server->logger_->Lock();
g_pika_server->logger_->Put(bgarg->raw_args);
g_pika_server->logger_->Unlock();
g_pika_server->SignalNextBinlogBGSerial();

c_ptr->Do();
}

if (!cinfo_ptr->is_suspend()) {
pthread_rwlock_unlock(g_pika_server->rwlock());
}

g_pika_server->mutex_record_.Unlock(argv[1]);

if (g_pika_conf->slowlog_slower_than() >= 0) {
int64_t duration = slash::NowMicros() - start_us;
if (duration > g_pika_conf->slowlog_slower_than()) {
LOG(ERROR) << "command:" << opt << ", start_time(s): " << start_us / 1000000 << ", duration(us): " << duration;
}
}

delete bgarg;
}
3 changes: 2 additions & 1 deletion src/pika_binlog_receiver_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ PikaBinlogReceiverThread::PikaBinlogReceiverThread(int port, int cron_interval)
thread_querynum_(0),
last_thread_querynum_(0),
last_time_us_(slash::NowMicros()),
last_sec_thread_querynum_(0) {
last_sec_thread_querynum_(0),
serial_(0) {
cmds_.reserve(300);
InitCmdTable(&cmds_);
}
Expand Down
16 changes: 10 additions & 6 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,6 @@ std::string PikaClientConn::DoCmd(const std::string& opt) {
return c_ptr->res().message();
}

// Add read lock for no suspend command
if (!cinfo_ptr->is_suspend()) {
pthread_rwlock_rdlock(g_pika_server->rwlock());
}

std::string raw_args;
if (cinfo_ptr->is_write()) {
if (g_pika_conf->readonly()) {
Expand All @@ -96,6 +91,11 @@ std::string PikaClientConn::DoCmd(const std::string& opt) {
g_pika_server->mutex_record_.Lock(argv_[1]);
}

// Add read lock for no suspend command
if (!cinfo_ptr->is_suspend()) {
pthread_rwlock_rdlock(g_pika_server->rwlock());
}

c_ptr->Do();

if (cinfo_ptr->is_write()) {
Expand All @@ -104,12 +104,16 @@ std::string PikaClientConn::DoCmd(const std::string& opt) {
g_pika_server->logger_->Put(raw_args);
g_pika_server->logger_->Unlock();
}
g_pika_server->mutex_record_.Unlock(argv_[1]);
}

if (!cinfo_ptr->is_suspend()) {
pthread_rwlock_unlock(g_pika_server->rwlock());
}

if (cinfo_ptr->is_write()) {
g_pika_server->mutex_record_.Unlock(argv_[1]);
}

if (g_pika_conf->slowlog_slower_than() >= 0) {
int64_t duration = slash::NowMicros() - start_us;
if (duration > g_pika_conf->slowlog_slower_than()) {
Expand Down
2 changes: 1 addition & 1 deletion src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ void InitCmdInfoTable() {
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNamePing, pingptr));
CmdInfo* selectptr = new CmdInfo(kCmdNameSelect, 2, kCmdFlagsWrite | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameSelect, selectptr));
CmdInfo* flushallptr = new CmdInfo(kCmdNameFlushall, 1, kCmdFlagsRead | kCmdFlagsSuspend | kCmdFlagsAdmin);
CmdInfo* flushallptr = new CmdInfo(kCmdNameFlushall, 1, kCmdFlagsWrite | kCmdFlagsSuspend | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameFlushall, flushallptr));
CmdInfo* readonlyptr = new CmdInfo(kCmdNameReadonly, 2, kCmdFlagsRead | kCmdFlagsSuspend | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameReadonly, readonlyptr));
Expand Down
63 changes: 10 additions & 53 deletions src/pika_master_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

extern PikaServer* g_pika_server;
extern PikaConf* g_pika_conf;
static const int RAW_ARGS_LEN = 1024 * 1024;

PikaMasterConn::PikaMasterConn(int fd, std::string ip_port, pink::Thread* thread) :
RedisConn(fd, ip_port) {
Expand All @@ -17,6 +16,7 @@ PikaMasterConn::PikaMasterConn(int fd, std::string ip_port, pink::Thread* thread
PikaMasterConn::~PikaMasterConn() {
}

static const int RAW_ARGS_LEN = 1024 * 1024;
std::string PikaMasterConn::RestoreArgs() {
std::string res;
res.reserve(RAW_ARGS_LEN);
Expand All @@ -29,20 +29,15 @@ std::string PikaMasterConn::RestoreArgs() {
return res;
}

std::string PikaMasterConn::DoCmd(const std::string& opt) {
// Get command info
const CmdInfo* const cinfo_ptr = GetCmdInfo(opt);
Cmd* c_ptr = self_thread_->GetCmd(opt);
if (!cinfo_ptr || !c_ptr) {
return "-Err unknown or unsupported command \'" + opt + "\r\n";
int PikaMasterConn::DealMessage() {
//no reply
//eq set_is_reply(false);
self_thread_ -> PlusThreadQuerynum();
if (argv_.empty()) {
return -2;
}
c_ptr->res().clear();

uint64_t start_us;
if (g_pika_conf->slowlog_slower_than() >= 0) {
start_us = slash::NowMicros();
}

// Monitor related
std::string monitor_message;
bool is_monitoring = g_pika_server->monitor_thread()->HasMonitorClients();
if (is_monitoring) {
Expand All @@ -53,46 +48,8 @@ std::string PikaMasterConn::DoCmd(const std::string& opt) {
g_pika_server->monitor_thread()->AddMonitorMessage(monitor_message);
}

// Initial
c_ptr->Initial(argv_, cinfo_ptr);
if (!c_ptr->res().ok()) {
return c_ptr->res().message();
}

// TODO Check authed
// Add read lock for no suspend command

g_pika_server->mutex_record_.Lock(argv_[1]);
c_ptr->Do();

if (c_ptr->res().ok()) {
g_pika_server->logger_->Lock();
g_pika_server->logger_->Put(RestoreArgs());
g_pika_server->logger_->Unlock();
}
g_pika_server->mutex_record_.Unlock(argv_[1]);

if (g_pika_conf->slowlog_slower_than() >= 0) {
int64_t duration = slash::NowMicros() - start_us;
if (duration > g_pika_conf->slowlog_slower_than()) {
LOG(ERROR) << "command:" << opt << ", start_time(s): " << start_us / 1000000 << ", duration(us): " << duration;
}
}


return c_ptr->res().message();
}

int PikaMasterConn::DealMessage() {
//no reply
//eq set_is_reply(false);
self_thread_ -> PlusThreadQuerynum();
if (argv_.empty()) {
return -2;
}
std::string opt = argv_[0];
slash::StringToLower(opt);
DoCmd(opt);
PikaCmdArgsType *argv = new PikaCmdArgsType(argv_);
g_pika_server->DispatchBinlogBG(argv_[1], argv, RestoreArgs(), self_thread_->GetnPlusSerial());
// memcpy(wbuf_ + wbuf_len_, res.data(), res.size());
// wbuf_len_ += res.size();
return 0;
Expand Down
Loading

0 comments on commit a379de3

Please sign in to comment.