From 4a3f86ae37393f7e56f7116d12c3275cf833e947 Mon Sep 17 00:00:00 2001 From: gaodunqiao Date: Fri, 30 Jun 2017 19:16:36 +0800 Subject: [PATCH] Add FdClosedHandle; Modify MonitorCmd --- include/pika_admin.h | 1 - include/pika_client_conn.h | 3 --- include/pika_heartbeat_thread.h | 1 + src/pika_admin.cc | 7 +---- src/pika_client_conn.cc | 12 ++++++--- src/pika_dispatch_thread.cc | 16 ++++++------ src/pika_heartbeat_thread.cc | 45 ++++++++++++++++----------------- third/pink | 2 +- 8 files changed, 41 insertions(+), 46 deletions(-) diff --git a/include/pika_admin.h b/include/pika_admin.h index a64ca20628..23cc40c33e 100644 --- a/include/pika_admin.h +++ b/include/pika_admin.h @@ -216,7 +216,6 @@ class MonitorCmd : public Cmd { } virtual void Do(); private: - PikaClientConn* self_client_; virtual void DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info); }; diff --git a/include/pika_client_conn.h b/include/pika_client_conn.h index ee541d2cfd..5d0df691ea 100644 --- a/include/pika_client_conn.h +++ b/include/pika_client_conn.h @@ -21,9 +21,6 @@ class PikaClientConn: public pink::RedisConn { void* worker_specific_data); virtual ~PikaClientConn() {} virtual int DealMessage(); - void DelEvent(int fd) { - return server_thread_->DelEvent(fd); - } private: pink::ServerThread* server_thread_; diff --git a/include/pika_heartbeat_thread.h b/include/pika_heartbeat_thread.h index 3f97234170..4d09bc4573 100644 --- a/include/pika_heartbeat_thread.h +++ b/include/pika_heartbeat_thread.h @@ -36,6 +36,7 @@ class PikaHeartbeatThread { : heartbeat_thread_(heartbeat_thread) { } void CronHandle() const override; + void FdClosedHandle(int fd, const std::string& ip_port) const override; void FdTimeoutHandle(int fd, const std::string& ip_port) const override; bool AccessHandle(std::string& ip) const override; diff --git a/src/pika_admin.cc b/src/pika_admin.cc index 3680583639..733e57db34 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -1129,18 +1129,13 @@ void ConfigCmd::ConfigResetstat(std::string &ret) { void MonitorCmd::DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info) { (void)ptr_info; - if (argv.size() != 2) { //append a arg in DoCmd for monitor cmd + if (argv.size() != 1) { res_.SetRes(CmdRes::kWrongNum, kCmdNameMonitor); return; } - memcpy(&self_client_, argv[1].data(), sizeof(PikaClientConn*)); } void MonitorCmd::Do() { - // TODO (gaodq) - self_client_->DelEvent(self_client_->fd()); - g_pika_server->AddMonitorClient(self_client_); - g_pika_server->AddMonitorMessage("OK"); } void DbsizeCmd::DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info) { diff --git a/src/pika_client_conn.cc b/src/pika_client_conn.cc index d1595efbbf..7649831648 100644 --- a/src/pika_client_conn.cc +++ b/src/pika_client_conn.cc @@ -75,16 +75,20 @@ std::string PikaClientConn::DoCmd(const std::string& opt) { g_pika_server->AddMonitorMessage(monitor_message); } - if (opt == kCmdNameMonitor) { - PikaClientConn* self = this; - argv_.push_back(std::string(reinterpret_cast(&self), sizeof(PikaClientConn*))); - } // Initial c_ptr->Initial(argv_, cinfo_ptr); if (!c_ptr->res().ok()) { return c_ptr->res().message(); } + if (opt == kCmdNameMonitor) { + pink::PinkConn* conn = server_thread_->MoveConnOut(fd()); + assert(conn == this); + g_pika_server->AddMonitorClient(static_cast(conn)); + g_pika_server->AddMonitorMessage("OK"); + return ""; // Monitor thread will return "OK" + } + std::string raw_args; if (cinfo_ptr->is_write()) { if (g_pika_conf->readonly()) { diff --git a/src/pika_dispatch_thread.cc b/src/pika_dispatch_thread.cc index 65d2531819..f5d8f6cd2d 100644 --- a/src/pika_dispatch_thread.cc +++ b/src/pika_dispatch_thread.cc @@ -31,18 +31,18 @@ int PikaDispatchThread::StartThread() { } int64_t PikaDispatchThread::ThreadClientList(std::vector *clients) { + auto conns_info = thread_rep_->conns_info(); if (clients != nullptr) { - auto conns = thread_rep_->conns(); - for (auto& conn : conns) { - clients->push_back(ClientInfo { - conn.first, - conn.second->ip_port(), - conn.second->last_interaction().tv_sec, - reinterpret_cast(conn.second), + for (auto& info : conns_info) { + clients->push_back({ + info.fd, + info.ip_port, + info.last_interaction.tv_sec, + nullptr /* PinkConn pointer, doesn't need here */ }); } } - return thread_rep_->conn_num(); + return conns_info.size(); } bool PikaDispatchThread::Handles::AccessHandle(std::string& ip) const { diff --git a/src/pika_heartbeat_thread.cc b/src/pika_heartbeat_thread.cc index 90dfda0f47..6e39289878 100644 --- a/src/pika_heartbeat_thread.cc +++ b/src/pika_heartbeat_thread.cc @@ -34,33 +34,32 @@ void PikaHeartbeatThread::Handles::CronHandle() const { gettimeofday(&now, NULL); /* - * find out: 1. stay STAGE_ONE too long - * 2. the hb_fd have already be deleted + * find out stay STAGE_ONE too long * erase it in slaves_; */ - { - slash::MutexLock l(&g_pika_server->slave_mutex_); - for (auto& slave : g_pika_server->slaves_) { - DLOG(INFO) << "sid: " << slave.sid << " ip_port: " << slave.ip_port << - " port " << slave.port << " sender_tid: " << slave.sender_tid << - " hb_fd: " << slave.hb_fd << " stage :" << slave.stage << - " sender: " << slave.sender << " create_time: " << slave.create_time.tv_sec; - - if ((slave.stage == SLAVE_ITEM_STAGE_ONE && - now.tv_sec - slave.create_time.tv_sec > 30) || - (slave.stage == SLAVE_ITEM_STAGE_TWO && - !heartbeat_thread_->thread_rep_->fd_exist(slave.hb_fd))) { + slash::MutexLock l(&g_pika_server->slave_mutex_); + for (auto& slave : g_pika_server->slaves_) { + DLOG(INFO) << "sid: " << slave.sid << " ip_port: " << slave.ip_port << + " port " << slave.port << " sender_tid: " << slave.sender_tid << + " hb_fd: " << slave.hb_fd << " stage :" << slave.stage << + " sender: " << slave.sender << " create_time: " << slave.create_time.tv_sec; - // Kill BinlogSender - LOG(WARNING) << "Erase slave " << slave.ip_port << - " from slaves map of heartbeat thread"; - //TODO maybe bug here - g_pika_server->slave_mutex_.Unlock(); - g_pika_server->DeleteSlave(slave.hb_fd); - g_pika_server->slave_mutex_.Lock(); - } + if (slave.stage == SLAVE_ITEM_STAGE_ONE && + now.tv_sec - slave.create_time.tv_sec > 30) { + // Kill BinlogSender + LOG(WARNING) << "Erase slave stay STAGE_ONE too long: " << + slave.ip_port << " from slaves map of heartbeat thread"; + //TODO maybe bug here + g_pika_server->slave_mutex_.Unlock(); + g_pika_server->DeleteSlave(slave.hb_fd); + g_pika_server->slave_mutex_.Lock(); } - } + } +} + +void PikaHeartbeatThread::Handles::FdClosedHandle(int fd, const std::string& ip_port) const { + LOG(INFO) << "Find closed Slave: " << ip_port; + g_pika_server->DeleteSlave(fd); } void PikaHeartbeatThread::Handles::FdTimeoutHandle(int fd, const std::string& ip_port) const { diff --git a/third/pink b/third/pink index e8886cb7f6..7f687bade8 160000 --- a/third/pink +++ b/third/pink @@ -1 +1 @@ -Subproject commit e8886cb7f6cba8d693ba9aecb4011882e7cee13f +Subproject commit 7f687bade86b05355d8f5da02372c9b92d497831