Skip to content

Commit

Permalink
add client kill all & qps for info
Browse files Browse the repository at this point in the history
  • Loading branch information
songzhao committed Nov 2, 2015
1 parent a9d4ada commit b813b18
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 5 deletions.
3 changes: 3 additions & 0 deletions include/pika_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class PikaConn
// struct timeval lastinteraction() { return lastinteraction_; };
// void UpdateLastInteraction() { gettimeofday(&lastinteraction_, NULL); };
int wbuflen() { return sdslen(wbuf_); }
int querynums() { return querynums_; }
void clear_querynums() { querynums_ = 0; }
private:

int fd_;
Expand All @@ -65,6 +67,7 @@ class PikaConn
int32_t wbuf_pos_;
PikaThread *thread_;
port::Mutex mutex_;
int querynums_;
};

#endif
2 changes: 2 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class PikaServer
int ClientNum();
int ClientList(std::string &res);
int ClientKill(std::string &ip_port);
void ClientKillAll();
// int ClientRole(int fd, int role);

void set_masterhost(std::string &masterhost) { masterhost_ = masterhost; }
Expand All @@ -61,6 +62,7 @@ class PikaServer
// std::map<std::string, SlaveItem>* slaves() { return &slaves_; }
void ProcessTimeEvent(struct timeval*);
void DisconnectFromMaster();
int CurrentQps();

int repl_state_; //PIKA_SINGLE; PIKA_MASTER; PIKA_SLAVE
int ms_state_; //PIKA_CONNECT; PIKA_CONNECTED
Expand Down
2 changes: 2 additions & 0 deletions include/pika_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class PikaThread
bool is_master_thread_;
bool is_first_;
int crontimes_;
int querynums_;
int last_sec_querynums_;

// port::Mutex mutex() { return mutex_; }

Expand Down
18 changes: 13 additions & 5 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,17 @@ void ClientCmd::Do(std::list<std::string> &argv, std::string &ret) {
} else if (opt == "kill") {
std::string ip_port = argv.front();
argv.pop_front();
int res = g_pikaServer->ClientKill(ip_port);
if (res == 1) {
transform(ip_port.begin(), ip_port.end(), ip_port.begin(), ::tolower);
if (ip_port == "all") {
g_pikaServer->ClientKillAll();
ret = "+OK\r\n";
} else {
ret = "-ERR No such client\r\n";
int res = g_pikaServer->ClientKill(ip_port);
if (res == 1) {
ret = "+OK\r\n";
} else {
ret = "-ERR No such client\r\n";
}
}

} else {
Expand Down Expand Up @@ -496,14 +502,16 @@ void InfoCmd::Do(std::list<std::string> &argv, std::string &ret) {
"tcp_port:%d\r\n"
"thread_num:%d\r\n"
"config_file:%s\r\n"
"is_bgsaving:%s\r\n",
"is_bgsaving:%s\r\n"
"current qps:%d\r\n",
PIKA_VERSION,
name.sysname, name.release, name.machine,
(long) getpid(),
g_pikaConf->port(),
g_pikaConf->thread_num(),
g_pikaConf->conf_path(),
g_pikaServer->is_bgsaving().c_str()
g_pikaServer->is_bgsaving().c_str(),
g_pikaServer->CurrentQps()
);
info.append(buf);
}
Expand Down
2 changes: 2 additions & 0 deletions src/pika_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ PikaConn::PikaConn(int fd, std::string ip_port, int role) :
msbuf_ = sdsempty();
gettimeofday(&tv_, NULL);
is_authed_ = std::string(g_pikaConf->requirepass()) == "" ? true : false;
querynums_ = 0;
}

PikaConn::~PikaConn()
Expand Down Expand Up @@ -312,6 +313,7 @@ int PikaConn::ProcessInputBuffer() {
if (DoCmd() == 1) {
g_pikaMario->Put(std::string(msbuf_, sdslen(msbuf_)));
}
querynums_++;
sdsclear(msbuf_);
Reset();
}
Expand Down
30 changes: 30 additions & 0 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,36 @@ int PikaServer::ClientKill(std::string &ip_port) {

}

void PikaServer::ClientKillAll() {
int i = 0;
std::map<std::string, client_info>::iterator iter;
for (i = 0; i < g_pikaConf->thread_num(); i++) {
{
RWLock l(pikaThread_[i]->rwlock(), true);
iter = pikaThread_[i]->clients()->begin();
while (iter != pikaThread_[i]->clients()->end()) {
if ((iter->second).role == PIKA_SINGLE) {
(iter->second).is_killed = true;
}
iter++;
}
}
}
}

int PikaServer::CurrentQps() {
int i = 0;
int qps = 0;
std::map<std::string, client_info>::iterator iter;
for (i = 0; i < g_pikaConf->thread_num(); i++) {
{
RWLock l(pikaThread_[i]->rwlock(), false);
qps+=pikaThread_[i]->last_sec_querynums_;
}
}
return qps;
}

//int PikaServer::ClientRole(int fd, int role) {
// int i = 0;
// std::map<int, PikaConn*>::iterator iter_fd;
Expand Down
13 changes: 13 additions & 0 deletions src/pika_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ PikaThread::PikaThread(int thread_index)
notify_receive_fd_ = fds[0];
notify_send_fd_ = fds[1];
pikaEpoll_->PikaAddEvent(notify_receive_fd_, EPOLLIN | EPOLLERR | EPOLLHUP);
querynums_ = 0;
last_sec_querynums_ = 0;

}

Expand All @@ -55,12 +57,23 @@ int PikaThread::ProcessTimeEvent(struct timeval* target) {
if (conns_.size() == 0) {
return 0;
}

{
RWLock l(&rwlock_, true);
last_sec_querynums_ = querynums_;
querynums_ = 0;
}

std::map<int, PikaConn*>::iterator iter;
std::map<std::string, client_info>::iterator iter_clientlist;
int i = 0;
iter = conns_.begin();
crontimes_ = (crontimes_+1)%10;
while (iter != conns_.end()) {

querynums_ += iter->second->querynums();
iter->second->clear_querynums();

if (crontimes_ == 0 && ((iter->second->role() == PIKA_MASTER && g_pikaServer->ms_state_ == PIKA_REP_CONNECTED) || (iter->second->role() == PIKA_SLAVE))) {
LOG(INFO)<<"Send Ping to " << iter->second->ip_port();
iter->second->append_wbuf("*1\r\n$4\r\nPING\r\n");
Expand Down

0 comments on commit b813b18

Please sign in to comment.