From 3a00b8db0d99c5218520bad20307de9c75ef61e5 Mon Sep 17 00:00:00 2001 From: CatKang Date: Wed, 4 May 2016 11:13:41 +0800 Subject: [PATCH] Slave write binlog in binlog receiver if readonly and in binlog bgworker if not --- conf/pika.conf | 2 +- include/pika_binlog_bgworker.h | 9 +++++---- include/pika_server.h | 3 ++- src/pika_admin.cc | 5 ++--- src/pika_binlog_bgworker.cc | 31 +++++++++++++++++++++++-------- src/pika_client_conn.cc | 3 --- src/pika_conf.cc | 6 +++--- src/pika_master_conn.cc | 23 ++++++++++++++++++++--- src/pika_server.cc | 17 +++++++++-------- 9 files changed, 65 insertions(+), 34 deletions(-) diff --git a/conf/pika.conf b/conf/pika.conf index b7d9779f99..04a7dd2e5e 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -5,7 +5,7 @@ thread_num : 1 # Sync Thread Number sync_thread_num : 6 # Item count of sync thread queue -sync_buffer_size : 1000 +sync_buffer_size : 10 # Pika log path log_path : ./log/ # Pika glog level diff --git a/include/pika_binlog_bgworker.h b/include/pika_binlog_bgworker.h index 1ad5fc626e..72466fe50e 100644 --- a/include/pika_binlog_bgworker.h +++ b/include/pika_binlog_bgworker.h @@ -16,8 +16,8 @@ class BinlogBGWorker{ Cmd* GetCmd(const std::string& opt) { return GetCmdFromTable(opt, cmds_); } - void Schedule(PikaCmdArgsType *argv, const std::string& raw_args, uint64_t serial) { - BinlogBGArg *arg = new BinlogBGArg(argv, raw_args, serial, this); + void Schedule(PikaCmdArgsType *argv, const std::string& raw_args, uint64_t serial, bool readonly) { + BinlogBGArg *arg = new BinlogBGArg(argv, raw_args, serial, readonly, this); binlogbg_thread_.StartIfNeed(); binlogbg_thread_.Schedule(&DoBinlogBG, static_cast(arg)); } @@ -31,9 +31,10 @@ class BinlogBGWorker{ PikaCmdArgsType *argv; std::string raw_args; uint64_t serial; + bool readonly; // Server readonly status at the view of binlog dispatch thread BinlogBGWorker *myself; - BinlogBGArg(PikaCmdArgsType* _argv, const std::string& _raw, uint64_t _s, BinlogBGWorker* _my) : - argv(_argv), raw_args(_raw), serial(_s), myself(_my) {} + BinlogBGArg(PikaCmdArgsType* _argv, const std::string& _raw, uint64_t _s, bool _readonly, BinlogBGWorker* _my) : + argv(_argv), raw_args(_raw), serial(_s), readonly(_readonly), myself(_my) {} }; }; diff --git a/include/pika_server.h b/include/pika_server.h index 50a18ef705..4a7471204e 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -259,7 +259,8 @@ class PikaServer * Binlog Receiver use */ void DispatchBinlogBG(const std::string &key, - PikaCmdArgsType* argv, const std::string& raw_args, uint64_t cur_serial); + PikaCmdArgsType* argv, const std::string& raw_args, + uint64_t cur_serial, bool readonly); bool WaitTillBinlogBGSerial(uint64_t my_serial); void SignalNextBinlogBGSerial(); diff --git a/src/pika_admin.cc b/src/pika_admin.cc index e593176d31..b423fd6bcb 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -18,8 +18,8 @@ void SlaveofCmd::DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info) master_ip_ = slash::StringToLower(*it++); is_noone_ = false; - if (master_ip_ == "no" && slash::StringToLower(*it++) == "one") { - if (argv.end() - it == 0) { + if (master_ip_ == "no" && slash::StringToLower(*it) == "one") { + if (argv.end() - it == 1) { is_noone_ = true; } else { res_.SetRes(CmdRes::kWrongNum, kCmdNameSlaveof); @@ -871,7 +871,6 @@ void ConfigCmd::ConfigSet(std::string& ret) { return; } g_pika_conf->SetReadonly(is_readonly); - pthread_rwlock_rdlock(g_pika_server->rwlock()); ret = "+OK\r\n"; } else if (set_item == "db_sync_speed") { if (!slash::string2l(value.data(), value.size(), &ival)) { diff --git a/src/pika_binlog_bgworker.cc b/src/pika_binlog_bgworker.cc index e8a2a75b12..5a209404cc 100644 --- a/src/pika_binlog_bgworker.cc +++ b/src/pika_binlog_bgworker.cc @@ -10,6 +10,7 @@ void BinlogBGWorker::DoBinlogBG(void* arg) { BinlogBGArg *bgarg = static_cast(arg); PikaCmdArgsType argv = *(bgarg->argv); uint64_t my_serial = bgarg->serial; + bool is_readonly = bgarg->readonly; BinlogBGWorker *self = bgarg->myself; std::string opt = argv[0]; slash::StringToLower(opt); @@ -33,7 +34,12 @@ void BinlogBGWorker::DoBinlogBG(void* arg) { start_us = slash::NowMicros(); } - g_pika_server->mutex_record_.Lock(argv[1]); + // No need lock on readonly mode + // Since the binlog task is dispatched by hash code of key + // That is to say binlog with same key will be dispatched to same thread and execute sequencly + if (!is_readonly) { + g_pika_server->mutex_record_.Lock(argv[1]); + } // Add read lock for no suspend command if (!cinfo_ptr->is_suspend()) { @@ -41,12 +47,20 @@ void BinlogBGWorker::DoBinlogBG(void* arg) { } // Force the binlog write option to serialize - if (g_pika_server->WaitTillBinlogBGSerial(my_serial)) { - g_pika_server->logger_->Lock(); - g_pika_server->logger_->Put(bgarg->raw_args); - g_pika_server->logger_->Unlock(); - g_pika_server->SignalNextBinlogBGSerial(); + // Unlock, clean env, and exit when error happend + bool error_happend = false; + if (!is_readonly) { + error_happend = !g_pika_server->WaitTillBinlogBGSerial(my_serial); + if (!error_happend) { + DLOG(INFO) << "Write binlog in binlog bgthread thread"; + g_pika_server->logger_->Lock(); + g_pika_server->logger_->Put(bgarg->raw_args); + g_pika_server->logger_->Unlock(); + g_pika_server->SignalNextBinlogBGSerial(); + } + } + if (!error_happend) { c_ptr->Do(); } @@ -54,8 +68,9 @@ void BinlogBGWorker::DoBinlogBG(void* arg) { pthread_rwlock_unlock(g_pika_server->rwlock()); } - g_pika_server->mutex_record_.Unlock(argv[1]); - + if (!is_readonly) { + g_pika_server->mutex_record_.Unlock(argv[1]); + } if (g_pika_conf->slowlog_slower_than() >= 0) { int64_t duration = slash::NowMicros() - start_us; if (duration > g_pika_conf->slowlog_slower_than()) { diff --git a/src/pika_client_conn.cc b/src/pika_client_conn.cc index 25dc875d90..9183b3392d 100644 --- a/src/pika_client_conn.cc +++ b/src/pika_client_conn.cc @@ -82,9 +82,6 @@ std::string PikaClientConn::DoCmd(const std::string& opt) { std::string raw_args; if (cinfo_ptr->is_write()) { if (g_pika_conf->readonly()) { - if (!cinfo_ptr->is_suspend()) { - pthread_rwlock_unlock(g_pika_server->rwlock()); - } return "-ERR Server in read-only\r\n"; } raw_args = RestoreArgs(); diff --git a/src/pika_conf.cc b/src/pika_conf.cc index eb449e0a8b..cd885a4d09 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -81,9 +81,9 @@ int PikaConf::Load() } GetConfInt("sync_buffer_size", &sync_buffer_size_); if (sync_buffer_size_ <= 0) { - sync_buffer_size_ = 1000; - } else if (sync_buffer_size_ > 10000) { - sync_buffer_size_ = 10000; + sync_buffer_size_ = 5; + } else if (sync_buffer_size_ > 100) { + sync_buffer_size_ = 100; } // write_buffer_size diff --git a/src/pika_master_conn.cc b/src/pika_master_conn.cc index e6f8738968..c67832618f 100644 --- a/src/pika_master_conn.cc +++ b/src/pika_master_conn.cc @@ -48,9 +48,26 @@ int PikaMasterConn::DealMessage() { } RestoreArgs(); + bool is_readonly = g_pika_conf->readonly(); + + // Here, the binlog dispatch thread, instead of the binlog bgthread takes on the task to write binlog + // Only when the server is readonly + uint64_t serial = self_thread_->GetnPlusSerial(); + if (is_readonly) { + DLOG(INFO) << "Write binlog in binlog dispatch thread"; + if (!g_pika_server->WaitTillBinlogBGSerial(serial)) { + return -2; + } + g_pika_server->logger_->Lock(); + g_pika_server->logger_->Put(raw_args_); + g_pika_server->logger_->Unlock(); + g_pika_server->SignalNextBinlogBGSerial(); + } + PikaCmdArgsType *argv = new PikaCmdArgsType(argv_); - g_pika_server->DispatchBinlogBG(argv_[1], argv, raw_args_, self_thread_->GetnPlusSerial()); -// memcpy(wbuf_ + wbuf_len_, res.data(), res.size()); -// wbuf_len_ += res.size(); + g_pika_server->DispatchBinlogBG(argv_[1], argv, raw_args_, + serial, is_readonly); + // memcpy(wbuf_ + wbuf_len_, res.data(), res.size()); + // wbuf_len_ += res.size(); return 0; } diff --git a/src/pika_server.cc b/src/pika_server.cc index 69ae64ec3f..be707c8110 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -96,17 +96,17 @@ PikaServer::~PikaServer() { } delete ping_thread_; delete pika_binlog_receiver_thread_; - delete pika_trysync_thread_; - delete pika_heartbeat_thread_; - delete monitor_thread_; + binlogbg_exit_ = true; std::vector::iterator binlogbg_iter = binlogbg_workers_.begin(); while (binlogbg_iter != binlogbg_workers_.end()) { - binlogbg_exit_ = true; binlogbg_cond_.SignalAll(); delete (*binlogbg_iter); binlogbg_iter++; } + delete pika_trysync_thread_; + delete pika_heartbeat_thread_; + delete monitor_thread_; DestoryCmdInfoTable(); delete logger_; @@ -886,14 +886,15 @@ void PikaServer::DoPurgeDir(void* arg) { } void PikaServer::DispatchBinlogBG(const std::string &key, - PikaCmdArgsType* argv, const std::string& raw_args, uint64_t cur_serial) { + PikaCmdArgsType* argv, const std::string& raw_args, + uint64_t cur_serial, bool readonly) { size_t index = str_hash(key) % binlogbg_workers_.size(); - binlogbg_workers_[index]->Schedule(argv, raw_args, cur_serial); + binlogbg_workers_[index]->Schedule(argv, raw_args, cur_serial, readonly); } bool PikaServer::WaitTillBinlogBGSerial(uint64_t my_serial) { binlogbg_mutex_.Lock(); - //DLOG(INFO) << "Binlog serial wait: " << my_serial << ", current: " << binlogbg_serial_; + DLOG(INFO) << "Binlog serial wait: " << my_serial << ", current: " << binlogbg_serial_; while (binlogbg_serial_ != my_serial && !binlogbg_exit_) { binlogbg_cond_.Wait(); } @@ -903,7 +904,7 @@ bool PikaServer::WaitTillBinlogBGSerial(uint64_t my_serial) { void PikaServer::SignalNextBinlogBGSerial() { binlogbg_mutex_.Lock(); - //DLOG(INFO) << "Binlog serial signal: " << binlogbg_serial_; + DLOG(INFO) << "Binlog serial signal: " << binlogbg_serial_; ++binlogbg_serial_; binlogbg_cond_.SignalAll(); binlogbg_mutex_.Unlock();