Skip to content

Commit

Permalink
Slave write binlog in binlog receiver if readonly and in binlog bgwor…
Browse files Browse the repository at this point in the history
…ker if not
  • Loading branch information
CatKang committed May 4, 2016
1 parent f7a3275 commit 3a00b8d
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 34 deletions.
2 changes: 1 addition & 1 deletion conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ thread_num : 1
# Sync Thread Number
sync_thread_num : 6
# Item count of sync thread queue
sync_buffer_size : 1000
sync_buffer_size : 10
# Pika log path
log_path : ./log/
# Pika glog level
Expand Down
9 changes: 5 additions & 4 deletions include/pika_binlog_bgworker.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ class BinlogBGWorker{
Cmd* GetCmd(const std::string& opt) {
return GetCmdFromTable(opt, cmds_);
}
void Schedule(PikaCmdArgsType *argv, const std::string& raw_args, uint64_t serial) {
BinlogBGArg *arg = new BinlogBGArg(argv, raw_args, serial, this);
void Schedule(PikaCmdArgsType *argv, const std::string& raw_args, uint64_t serial, bool readonly) {
BinlogBGArg *arg = new BinlogBGArg(argv, raw_args, serial, readonly, this);
binlogbg_thread_.StartIfNeed();
binlogbg_thread_.Schedule(&DoBinlogBG, static_cast<void*>(arg));
}
Expand All @@ -31,9 +31,10 @@ class BinlogBGWorker{
PikaCmdArgsType *argv;
std::string raw_args;
uint64_t serial;
bool readonly; // Server readonly status at the view of binlog dispatch thread
BinlogBGWorker *myself;
BinlogBGArg(PikaCmdArgsType* _argv, const std::string& _raw, uint64_t _s, BinlogBGWorker* _my) :
argv(_argv), raw_args(_raw), serial(_s), myself(_my) {}
BinlogBGArg(PikaCmdArgsType* _argv, const std::string& _raw, uint64_t _s, bool _readonly, BinlogBGWorker* _my) :
argv(_argv), raw_args(_raw), serial(_s), readonly(_readonly), myself(_my) {}
};

};
Expand Down
3 changes: 2 additions & 1 deletion include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ class PikaServer
* Binlog Receiver use
*/
void DispatchBinlogBG(const std::string &key,
PikaCmdArgsType* argv, const std::string& raw_args, uint64_t cur_serial);
PikaCmdArgsType* argv, const std::string& raw_args,
uint64_t cur_serial, bool readonly);
bool WaitTillBinlogBGSerial(uint64_t my_serial);
void SignalNextBinlogBGSerial();

Expand Down
5 changes: 2 additions & 3 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ void SlaveofCmd::DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info)
master_ip_ = slash::StringToLower(*it++);

is_noone_ = false;
if (master_ip_ == "no" && slash::StringToLower(*it++) == "one") {
if (argv.end() - it == 0) {
if (master_ip_ == "no" && slash::StringToLower(*it) == "one") {
if (argv.end() - it == 1) {
is_noone_ = true;
} else {
res_.SetRes(CmdRes::kWrongNum, kCmdNameSlaveof);
Expand Down Expand Up @@ -871,7 +871,6 @@ void ConfigCmd::ConfigSet(std::string& ret) {
return;
}
g_pika_conf->SetReadonly(is_readonly);
pthread_rwlock_rdlock(g_pika_server->rwlock());
ret = "+OK\r\n";
} else if (set_item == "db_sync_speed") {
if (!slash::string2l(value.data(), value.size(), &ival)) {
Expand Down
31 changes: 23 additions & 8 deletions src/pika_binlog_bgworker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ void BinlogBGWorker::DoBinlogBG(void* arg) {
BinlogBGArg *bgarg = static_cast<BinlogBGArg*>(arg);
PikaCmdArgsType argv = *(bgarg->argv);
uint64_t my_serial = bgarg->serial;
bool is_readonly = bgarg->readonly;
BinlogBGWorker *self = bgarg->myself;
std::string opt = argv[0];
slash::StringToLower(opt);
Expand All @@ -33,29 +34,43 @@ void BinlogBGWorker::DoBinlogBG(void* arg) {
start_us = slash::NowMicros();
}

g_pika_server->mutex_record_.Lock(argv[1]);
// No need lock on readonly mode
// Since the binlog task is dispatched by hash code of key
// That is to say binlog with same key will be dispatched to same thread and execute sequencly
if (!is_readonly) {
g_pika_server->mutex_record_.Lock(argv[1]);
}

// Add read lock for no suspend command
if (!cinfo_ptr->is_suspend()) {
pthread_rwlock_rdlock(g_pika_server->rwlock());
}

// Force the binlog write option to serialize
if (g_pika_server->WaitTillBinlogBGSerial(my_serial)) {
g_pika_server->logger_->Lock();
g_pika_server->logger_->Put(bgarg->raw_args);
g_pika_server->logger_->Unlock();
g_pika_server->SignalNextBinlogBGSerial();
// Unlock, clean env, and exit when error happend
bool error_happend = false;
if (!is_readonly) {
error_happend = !g_pika_server->WaitTillBinlogBGSerial(my_serial);
if (!error_happend) {
DLOG(INFO) << "Write binlog in binlog bgthread thread";
g_pika_server->logger_->Lock();
g_pika_server->logger_->Put(bgarg->raw_args);
g_pika_server->logger_->Unlock();
g_pika_server->SignalNextBinlogBGSerial();
}
}

if (!error_happend) {
c_ptr->Do();
}

if (!cinfo_ptr->is_suspend()) {
pthread_rwlock_unlock(g_pika_server->rwlock());
}

g_pika_server->mutex_record_.Unlock(argv[1]);

if (!is_readonly) {
g_pika_server->mutex_record_.Unlock(argv[1]);
}
if (g_pika_conf->slowlog_slower_than() >= 0) {
int64_t duration = slash::NowMicros() - start_us;
if (duration > g_pika_conf->slowlog_slower_than()) {
Expand Down
3 changes: 0 additions & 3 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@ std::string PikaClientConn::DoCmd(const std::string& opt) {
std::string raw_args;
if (cinfo_ptr->is_write()) {
if (g_pika_conf->readonly()) {
if (!cinfo_ptr->is_suspend()) {
pthread_rwlock_unlock(g_pika_server->rwlock());
}
return "-ERR Server in read-only\r\n";
}
raw_args = RestoreArgs();
Expand Down
6 changes: 3 additions & 3 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ int PikaConf::Load()
}
GetConfInt("sync_buffer_size", &sync_buffer_size_);
if (sync_buffer_size_ <= 0) {
sync_buffer_size_ = 1000;
} else if (sync_buffer_size_ > 10000) {
sync_buffer_size_ = 10000;
sync_buffer_size_ = 5;
} else if (sync_buffer_size_ > 100) {
sync_buffer_size_ = 100;
}

// write_buffer_size
Expand Down
23 changes: 20 additions & 3 deletions src/pika_master_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,26 @@ int PikaMasterConn::DealMessage() {
}
RestoreArgs();

bool is_readonly = g_pika_conf->readonly();

// Here, the binlog dispatch thread, instead of the binlog bgthread takes on the task to write binlog
// Only when the server is readonly
uint64_t serial = self_thread_->GetnPlusSerial();
if (is_readonly) {
DLOG(INFO) << "Write binlog in binlog dispatch thread";
if (!g_pika_server->WaitTillBinlogBGSerial(serial)) {
return -2;
}
g_pika_server->logger_->Lock();
g_pika_server->logger_->Put(raw_args_);
g_pika_server->logger_->Unlock();
g_pika_server->SignalNextBinlogBGSerial();
}

PikaCmdArgsType *argv = new PikaCmdArgsType(argv_);
g_pika_server->DispatchBinlogBG(argv_[1], argv, raw_args_, self_thread_->GetnPlusSerial());
// memcpy(wbuf_ + wbuf_len_, res.data(), res.size());
// wbuf_len_ += res.size();
g_pika_server->DispatchBinlogBG(argv_[1], argv, raw_args_,
serial, is_readonly);
// memcpy(wbuf_ + wbuf_len_, res.data(), res.size());
// wbuf_len_ += res.size();
return 0;
}
17 changes: 9 additions & 8 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,17 @@ PikaServer::~PikaServer() {
}
delete ping_thread_;
delete pika_binlog_receiver_thread_;
delete pika_trysync_thread_;
delete pika_heartbeat_thread_;
delete monitor_thread_;

binlogbg_exit_ = true;
std::vector<BinlogBGWorker*>::iterator binlogbg_iter = binlogbg_workers_.begin();
while (binlogbg_iter != binlogbg_workers_.end()) {
binlogbg_exit_ = true;
binlogbg_cond_.SignalAll();
delete (*binlogbg_iter);
binlogbg_iter++;
}
delete pika_trysync_thread_;
delete pika_heartbeat_thread_;
delete monitor_thread_;

DestoryCmdInfoTable();
delete logger_;
Expand Down Expand Up @@ -886,14 +886,15 @@ void PikaServer::DoPurgeDir(void* arg) {
}

void PikaServer::DispatchBinlogBG(const std::string &key,
PikaCmdArgsType* argv, const std::string& raw_args, uint64_t cur_serial) {
PikaCmdArgsType* argv, const std::string& raw_args,
uint64_t cur_serial, bool readonly) {
size_t index = str_hash(key) % binlogbg_workers_.size();
binlogbg_workers_[index]->Schedule(argv, raw_args, cur_serial);
binlogbg_workers_[index]->Schedule(argv, raw_args, cur_serial, readonly);
}

bool PikaServer::WaitTillBinlogBGSerial(uint64_t my_serial) {
binlogbg_mutex_.Lock();
//DLOG(INFO) << "Binlog serial wait: " << my_serial << ", current: " << binlogbg_serial_;
DLOG(INFO) << "Binlog serial wait: " << my_serial << ", current: " << binlogbg_serial_;
while (binlogbg_serial_ != my_serial && !binlogbg_exit_) {
binlogbg_cond_.Wait();
}
Expand All @@ -903,7 +904,7 @@ bool PikaServer::WaitTillBinlogBGSerial(uint64_t my_serial) {

void PikaServer::SignalNextBinlogBGSerial() {
binlogbg_mutex_.Lock();
//DLOG(INFO) << "Binlog serial signal: " << binlogbg_serial_;
DLOG(INFO) << "Binlog serial signal: " << binlogbg_serial_;
++binlogbg_serial_;
binlogbg_cond_.SignalAll();
binlogbg_mutex_.Unlock();
Expand Down

0 comments on commit 3a00b8d

Please sign in to comment.