Skip to content

Commit

Permalink
Add FdClosedHandle; Modify MonitorCmd
Browse files Browse the repository at this point in the history
  • Loading branch information
gaodunqiao committed Jun 30, 2017
1 parent fe2d20f commit 4a3f86a
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 46 deletions.
1 change: 0 additions & 1 deletion include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ class MonitorCmd : public Cmd {
}
virtual void Do();
private:
PikaClientConn* self_client_;
virtual void DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info);
};

Expand Down
3 changes: 0 additions & 3 deletions include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ class PikaClientConn: public pink::RedisConn {
void* worker_specific_data);
virtual ~PikaClientConn() {}
virtual int DealMessage();
void DelEvent(int fd) {
return server_thread_->DelEvent(fd);
}

private:
pink::ServerThread* server_thread_;
Expand Down
1 change: 1 addition & 0 deletions include/pika_heartbeat_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class PikaHeartbeatThread {
: heartbeat_thread_(heartbeat_thread) {
}
void CronHandle() const override;
void FdClosedHandle(int fd, const std::string& ip_port) const override;
void FdTimeoutHandle(int fd, const std::string& ip_port) const override;
bool AccessHandle(std::string& ip) const override;

Expand Down
7 changes: 1 addition & 6 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1129,18 +1129,13 @@ void ConfigCmd::ConfigResetstat(std::string &ret) {

void MonitorCmd::DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info) {
(void)ptr_info;
if (argv.size() != 2) { //append a arg in DoCmd for monitor cmd
if (argv.size() != 1) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameMonitor);
return;
}
memcpy(&self_client_, argv[1].data(), sizeof(PikaClientConn*));
}

void MonitorCmd::Do() {
// TODO (gaodq)
self_client_->DelEvent(self_client_->fd());
g_pika_server->AddMonitorClient(self_client_);
g_pika_server->AddMonitorMessage("OK");
}

void DbsizeCmd::DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info) {
Expand Down
12 changes: 8 additions & 4 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,20 @@ std::string PikaClientConn::DoCmd(const std::string& opt) {
g_pika_server->AddMonitorMessage(monitor_message);
}

if (opt == kCmdNameMonitor) {
PikaClientConn* self = this;
argv_.push_back(std::string(reinterpret_cast<char*>(&self), sizeof(PikaClientConn*)));
}
// Initial
c_ptr->Initial(argv_, cinfo_ptr);
if (!c_ptr->res().ok()) {
return c_ptr->res().message();
}

if (opt == kCmdNameMonitor) {
pink::PinkConn* conn = server_thread_->MoveConnOut(fd());
assert(conn == this);
g_pika_server->AddMonitorClient(static_cast<PikaClientConn*>(conn));
g_pika_server->AddMonitorMessage("OK");
return ""; // Monitor thread will return "OK"
}

std::string raw_args;
if (cinfo_ptr->is_write()) {
if (g_pika_conf->readonly()) {
Expand Down
16 changes: 8 additions & 8 deletions src/pika_dispatch_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,18 @@ int PikaDispatchThread::StartThread() {
}

int64_t PikaDispatchThread::ThreadClientList(std::vector<ClientInfo> *clients) {
auto conns_info = thread_rep_->conns_info();
if (clients != nullptr) {
auto conns = thread_rep_->conns();
for (auto& conn : conns) {
clients->push_back(ClientInfo {
conn.first,
conn.second->ip_port(),
conn.second->last_interaction().tv_sec,
reinterpret_cast<PikaClientConn*>(conn.second),
for (auto& info : conns_info) {
clients->push_back({
info.fd,
info.ip_port,
info.last_interaction.tv_sec,
nullptr /* PinkConn pointer, doesn't need here */
});
}
}
return thread_rep_->conn_num();
return conns_info.size();
}

bool PikaDispatchThread::Handles::AccessHandle(std::string& ip) const {
Expand Down
45 changes: 22 additions & 23 deletions src/pika_heartbeat_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,33 +34,32 @@ void PikaHeartbeatThread::Handles::CronHandle() const {
gettimeofday(&now, NULL);

/*
* find out: 1. stay STAGE_ONE too long
* 2. the hb_fd have already be deleted
* find out stay STAGE_ONE too long
* erase it in slaves_;
*/
{
slash::MutexLock l(&g_pika_server->slave_mutex_);
for (auto& slave : g_pika_server->slaves_) {
DLOG(INFO) << "sid: " << slave.sid << " ip_port: " << slave.ip_port <<
" port " << slave.port << " sender_tid: " << slave.sender_tid <<
" hb_fd: " << slave.hb_fd << " stage :" << slave.stage <<
" sender: " << slave.sender << " create_time: " << slave.create_time.tv_sec;

if ((slave.stage == SLAVE_ITEM_STAGE_ONE &&
now.tv_sec - slave.create_time.tv_sec > 30) ||
(slave.stage == SLAVE_ITEM_STAGE_TWO &&
!heartbeat_thread_->thread_rep_->fd_exist(slave.hb_fd))) {
slash::MutexLock l(&g_pika_server->slave_mutex_);
for (auto& slave : g_pika_server->slaves_) {
DLOG(INFO) << "sid: " << slave.sid << " ip_port: " << slave.ip_port <<
" port " << slave.port << " sender_tid: " << slave.sender_tid <<
" hb_fd: " << slave.hb_fd << " stage :" << slave.stage <<
" sender: " << slave.sender << " create_time: " << slave.create_time.tv_sec;

// Kill BinlogSender
LOG(WARNING) << "Erase slave " << slave.ip_port <<
" from slaves map of heartbeat thread";
//TODO maybe bug here
g_pika_server->slave_mutex_.Unlock();
g_pika_server->DeleteSlave(slave.hb_fd);
g_pika_server->slave_mutex_.Lock();
}
if (slave.stage == SLAVE_ITEM_STAGE_ONE &&
now.tv_sec - slave.create_time.tv_sec > 30) {
// Kill BinlogSender
LOG(WARNING) << "Erase slave stay STAGE_ONE too long: " <<
slave.ip_port << " from slaves map of heartbeat thread";
//TODO maybe bug here
g_pika_server->slave_mutex_.Unlock();
g_pika_server->DeleteSlave(slave.hb_fd);
g_pika_server->slave_mutex_.Lock();
}
}
}
}

void PikaHeartbeatThread::Handles::FdClosedHandle(int fd, const std::string& ip_port) const {
LOG(INFO) << "Find closed Slave: " << ip_port;
g_pika_server->DeleteSlave(fd);
}

void PikaHeartbeatThread::Handles::FdTimeoutHandle(int fd, const std::string& ip_port) const {
Expand Down

0 comments on commit 4a3f86a

Please sign in to comment.