diff --git a/include/pika_admin.h b/include/pika_admin.h index 38e98a7e41..93b9ed29f1 100644 --- a/include/pika_admin.h +++ b/include/pika_admin.h @@ -12,12 +12,16 @@ * Admin */ class SlaveofCmd : public Cmd { -public: - SlaveofCmd() : is_noone_(false), have_offset_(false), - filenum_(0), pro_offset_(0) { + public: + SlaveofCmd() + : is_noone_(false), + have_offset_(false), + filenum_(0), + pro_offset_(0) { } virtual void Do(); -private: + + private: std::string master_ip_; int64_t master_port_; bool is_noone_; @@ -32,11 +36,11 @@ class SlaveofCmd : public Cmd { }; class TrysyncCmd : public Cmd { -public: - TrysyncCmd() { - } + public: + TrysyncCmd() {} virtual void Do(); -private: + + private: std::string slave_ip_; int64_t slave_port_; int64_t filenum_; @@ -45,116 +49,117 @@ class TrysyncCmd : public Cmd { }; class InternalTrysyncCmd : public Cmd { -public: - InternalTrysyncCmd() { - } + public: + InternalTrysyncCmd() {} virtual void Do(); -private: + + private: std::string hub_ip_; int64_t hub_port_; int64_t filenum_; int64_t pro_offset_; + bool send_most_recently_; virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info); }; class AuthCmd : public Cmd { -public: - AuthCmd() { - } + public: + AuthCmd() {} virtual void Do(); -private: + + private: std::string pwd_; virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info); }; class BgsaveCmd : public Cmd { -public: - BgsaveCmd() { - } + public: + BgsaveCmd() {} virtual void Do(); -private: + + private: virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info); }; class BgsaveoffCmd : public Cmd { -public: - BgsaveoffCmd() { - } + public: + BgsaveoffCmd() {} virtual void Do(); -private: + + private: virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info); }; class CompactCmd : public Cmd { -public: - CompactCmd() { - } + public: + CompactCmd() {} virtual void Do(); -private: + + private: virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info); }; class PurgelogstoCmd : public Cmd { -public: - PurgelogstoCmd() : num_(0){ - } + public: + PurgelogstoCmd() : num_(0) {} virtual void Do(); -private: + + private: uint32_t num_; virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info); }; class PingCmd : public Cmd { -public: - PingCmd() { - } + public: + PingCmd() {} virtual void Do(); -private: + + private: virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info); }; class SelectCmd : public Cmd { -public: - SelectCmd() { - } + public: + SelectCmd() {} virtual void Do(); -private: + + private: virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info); }; class FlushallCmd : public Cmd { -public: - FlushallCmd() { - } + public: + FlushallCmd() {} virtual void Do(); -private: + + private: virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info); }; class ReadonlyCmd : public Cmd { -public: - ReadonlyCmd() : is_open_(false) { - } + public: + ReadonlyCmd() : is_open_(false) {} virtual void Do(); -private: + + private: bool is_open_; virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info); }; class ClientCmd : public Cmd { -public: - ClientCmd() { - } + public: + ClientCmd() {} virtual void Do(); const static std::string CLIENT_LIST_S; const static std::string CLIENT_KILL_S; -private: + + private: std::string operation_, ip_port_; virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info); }; class InfoCmd : public Cmd { -public: + public: enum InfoSection { kInfoErr = 0x0, kInfoServer, @@ -170,10 +175,10 @@ class InfoCmd : public Cmd { kInfoDoubleMaster }; - InfoCmd() : rescan_(false), off_(false) { - } + InfoCmd() : rescan_(false), off_(false) {} virtual void Do(); -private: + + private: InfoSection info_section_; bool rescan_; //whether to rescan the keyspace bool off_; @@ -207,20 +212,20 @@ class InfoCmd : public Cmd { }; class ShutdownCmd : public Cmd { -public: - ShutdownCmd() { - } + public: + ShutdownCmd() {} virtual void Do(); -private: + + private: virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info); }; class ConfigCmd : public Cmd { -public: - ConfigCmd() { - } + public: + ConfigCmd() {} virtual void Do(); -private: + + private: std::vector config_args_v_; virtual void DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info); void ConfigGet(std::string &ret); @@ -230,51 +235,51 @@ class ConfigCmd : public Cmd { }; class MonitorCmd : public Cmd { -public: - MonitorCmd() { - } + public: + MonitorCmd() {} virtual void Do(); -private: + + private: virtual void DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info); }; class DbsizeCmd : public Cmd { -public: - DbsizeCmd() { - } + public: + DbsizeCmd() {} virtual void Do(); -private: + + private: virtual void DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info); }; class TimeCmd : public Cmd { -public: - TimeCmd() { - } + public: + TimeCmd() {} virtual void Do(); -private: + + private: virtual void DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info); }; class DelbackupCmd : public Cmd { -public: - DelbackupCmd() { - } + public: + DelbackupCmd() {} virtual void Do(); -private: + + private: virtual void DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info); }; #ifdef TCMALLOC_EXTENSION class TcmallocCmd : public Cmd { -public: - TcmallocCmd() { - } + public: + TcmallocCmd() {} virtual void Do(); -private: + + private: int64_t type_; int64_t rate_; virtual void DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info); }; #endif -#endif +#endif // PIKA_ADMIN_H_ diff --git a/include/pika_hub_manager.h b/include/pika_hub_manager.h index ecab21bca6..8d2dfe1809 100644 --- a/include/pika_hub_manager.h +++ b/include/pika_hub_manager.h @@ -154,10 +154,14 @@ class PikaHubManager { } Status AddHub(const std::string hub_ip, int hub_port, - uint32_t filenum, uint64_t con_offset); + uint32_t filenum, uint64_t con_offset, + bool send_most_recently); - std::string hub_ip() { - return hub_ip_; + std::string hub_ip() { return hub_ip_; } + + bool CouldPurge(int file_tobe_purged) { + slash::MutexLock l(&sending_window_protector_); + return file_tobe_purged < sending_window_.left; } std::string StatusToString(); @@ -165,11 +169,9 @@ class PikaHubManager { void HubConnected() { hub_stage_ = STARTED; } void StopHub(int connnection_num); - slash::Mutex sending_window_protector_; - private: friend class PikaHubSenderThread; - Status ResetSenders(); + Status ResetSenders(bool send_most_recently); bool GetNextFilenum(PikaHubSenderThread* thread, uint32_t* filenum, uint64_t* con_offset); @@ -182,6 +184,7 @@ class PikaHubManager { uint32_t hub_filenum_; uint64_t hub_con_offset_; + slash::Mutex sending_window_protector_; struct { int64_t left; int64_t right; diff --git a/src/pika_admin.cc b/src/pika_admin.cc index 8f202978fa..fd3b09b516 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -215,6 +215,8 @@ void InternalTrysyncCmd::DoInitial(PikaCmdArgsType &argv, res_.SetRes(CmdRes::kInvalidInt); return; } + + send_most_recently_ = (*it == "1"); } void InternalTrysyncCmd::Do() { @@ -222,7 +224,9 @@ void InternalTrysyncCmd::Do() { << " filenum: " << filenum_ << " pro_offset: " << pro_offset_; Status status = g_pika_server->pika_hub_manager_->AddHub( - hub_ip_, hub_port_ + 1000, filenum_, pro_offset_); + hub_ip_, hub_port_ + 1000, + filenum_, pro_offset_, + send_most_recently_); if (!status.ok()) { LOG(WARNING) << "hub offset is larger than mine, slave ip: " << hub_ip_ diff --git a/src/pika_command.cc b/src/pika_command.cc index e117f880e2..68da2ad2eb 100644 --- a/src/pika_command.cc +++ b/src/pika_command.cc @@ -27,7 +27,7 @@ void InitCmdInfoTable() { CmdInfo* trysyncptr = new CmdInfo(kCmdNameTrysync, 5, kCmdFlagsRead | kCmdFlagsAdmin | kCmdFlagsSuspend | kCmdFlagsAdminRequire); cmd_infos.insert(std::pair(kCmdNameTrysync, trysyncptr)); ////InternalTrysync for Pika HUB - CmdInfo* internal_trysyncptr = new CmdInfo(kCmdNameInternalTrysync, 5, kCmdFlagsRead | kCmdFlagsAdmin | kCmdFlagsSuspend | kCmdFlagsAdminRequire); + CmdInfo* internal_trysyncptr = new CmdInfo(kCmdNameInternalTrysync, 6, kCmdFlagsRead | kCmdFlagsAdmin | kCmdFlagsSuspend | kCmdFlagsAdminRequire); cmd_infos.insert(std::pair(kCmdNameInternalTrysync, internal_trysyncptr )); CmdInfo* authptr = new CmdInfo(kCmdNameAuth, 2, kCmdFlagsRead | kCmdFlagsAdmin); cmd_infos.insert(std::pair(kCmdNameAuth, authptr)); diff --git a/src/pika_hub_manager.cc b/src/pika_hub_manager.cc index e95adb8982..9710250f8d 100644 --- a/src/pika_hub_manager.cc +++ b/src/pika_hub_manager.cc @@ -28,8 +28,10 @@ PikaHubManager::PikaHubManager(const std::set &ips, int port, } } -Status PikaHubManager::AddHub(const std::string hub_ip, int hub_port, - uint32_t filenum, uint64_t con_offset) { +Status PikaHubManager::AddHub( + const std::string hub_ip, int hub_port, + uint32_t filenum, uint64_t con_offset, + bool send_most_recently) { std::string ip_port = slash::IpPortString(hub_ip, hub_port); LOG(INFO) << "Try add hub, " << ip_port; @@ -65,27 +67,46 @@ Status PikaHubManager::AddHub(const std::string hub_ip, int hub_port, hub_ip_ = hub_ip; hub_port_ = hub_port; - Status s = ResetSenders(); + Status s = ResetSenders(send_most_recently); hub_stage_ = s.ok() ? DEGRADE : STOPED; return s; } -Status PikaHubManager::ResetSenders() { +Status PikaHubManager::ResetSenders(bool send_most_recently) { assert(hub_stage_ < STARTED); // Sanitize uint32_t cur_filenum = 0; uint64_t cur_offset = 0; g_pika_server->logger_->GetProducerStatus(&cur_filenum, &cur_offset); - if (hub_con_offset_ > g_pika_server->logger_->file_size() || - cur_filenum < hub_filenum_ || - (cur_filenum == hub_filenum_ && cur_offset < hub_con_offset_)) { - return Status::InvalidArgument("AddHubBinlogSender invalid binlog offset"); - } - std::string confile = NewFileName(g_pika_server->logger_->filename, hub_filenum_); - if (!slash::FileExists(confile)) { - // Not found binlog specified by filenum - return Status::InvalidArgument("AddHubBinlogSender file does not exist"); + // We just care about filenum + hub_con_offset_ = 0; + if (send_most_recently) { + // Find a valid file num close to hub_filenum_ + if (hub_filenum_ >= cur_filenum) { + hub_filenum_ = cur_filenum; + } + + while (hub_filenum_ <= cur_filenum) { + std::string confile = + NewFileName(g_pika_server->logger_->filename, hub_filenum_); + if (slash::FileExists(confile)) { + break; + } + hub_filenum_++; + } + if (hub_filenum_ > cur_filenum) { + return Status::InvalidArgument("AddHubBinlogSender invalid binlog offset"); + } + } else { + std::string confile = + NewFileName(g_pika_server->logger_->filename, hub_filenum_); + if (hub_filenum_ > cur_filenum || + !slash::FileExists(confile)) { + return Status::InvalidArgument("AddHubBinlogSender invalid binlog offset"); + } } + LOG(INFO) << "Send most recently file: " << (send_most_recently ? "yes" : "no") + << ", ready to send binlog file: " << hub_filenum_; for (int i = 0; i < kMaxHubSender; i++) { if (sender_threads_[i]->TryStartThread(hub_ip_, hub_port_) != 0) { diff --git a/src/pika_master_conn.cc b/src/pika_master_conn.cc index 8e573f95df..1a75cf48c7 100644 --- a/src/pika_master_conn.cc +++ b/src/pika_master_conn.cc @@ -30,11 +30,18 @@ int PikaMasterConn::DealMessage() { } // TODO(shq) maybe monitor do not need these infomation + std::string server_id; + std::string binlog_info; if (!g_pika_server->DoubleMasterMode()) { if (argv_.size() > 4 && *(argv_.end() - 4) == kPikaBinlogMagic) { // Record new binlog format - argv_.erase(argv_.end() - 4, argv_.end()); + argv_.pop_back(); // send_to_hub flag + binlog_info = argv_.back(); // binlog_info + argv_.pop_back(); + server_id = argv_.back(); // server_id + argv_.pop_back(); + argv_.pop_back(); // kPikaBinlogMagic } } @@ -54,7 +61,6 @@ int PikaMasterConn::DealMessage() { // Here, the binlog dispatch thread, instead of the binlog bgthread takes on the task to write binlog // Only when the server is readonly uint64_t serial = binlog_receiver_->GetnPlusSerial(); - std::string dummy_binlog_info(""); if (is_readonly) { if (!g_pika_server->WaitTillBinlogBGSerial(serial)) { return -2; @@ -65,8 +71,8 @@ int PikaMasterConn::DealMessage() { g_pika_server->logger_->Lock(); g_pika_server->logger_->Put(c_ptr->ToBinlog( argv_, - g_pika_conf->server_id(), - dummy_binlog_info, + server_id.empty() ? g_pika_conf->server_id() : server_id, + binlog_info, false)); g_pika_server->logger_->Unlock(); g_pika_server->SignalNextBinlogBGSerial(); diff --git a/src/pika_server.cc b/src/pika_server.cc index d5a20c6fe2..91c0e30440 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -1128,7 +1128,8 @@ bool PikaServer::CouldPurge(uint32_t index) { } PikaBinlogSenderThread *pb = static_cast((*it).sender); uint32_t filenum = pb->filenum(); - if (index > filenum) { + if (index > filenum || // slaves + !pika_hub_manager_->CouldPurge(index)) { // pika hub return false; } }