Skip to content

Commit

Permalink
separate ms_thread from client_thread & fix: master occupy 100% cpu w…
Browse files Browse the repository at this point in the history
…hen there is no writting from client
  • Loading branch information
songzhao committed Nov 19, 2015
1 parent d43a13d commit 5812436
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 37 deletions.
2 changes: 2 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
port : 9221
# Thread Number
thread_num : 1
# Slave Thread Number
slave_thread_num : 1
# Pika log path
log_path : ./log/
# Pika glog level
Expand Down
2 changes: 2 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class PikaConf : public BaseConf
~PikaConf() { pthread_rwlock_destroy(&rwlock_); }
int port() { RWLock l(&rwlock_, false); return port_; }
int thread_num() { RWLock l(&rwlock_, false); return thread_num_; }
int slave_thread_num() { RWLock l(&rwlock_, false); return slave_thread_num_; }
char* log_path() { RWLock l(&rwlock_, false); return log_path_; }
int log_level() { RWLock l(&rwlock_, false); return log_level_; }
char* db_path() { RWLock l(&rwlock_, false); return db_path_; }
Expand Down Expand Up @@ -47,6 +48,7 @@ class PikaConf : public BaseConf
private:
int port_;
int thread_num_;
int slave_thread_num_;
char log_path_[PIKA_WORD_SIZE];
char db_path_[PIKA_WORD_SIZE];
int write_buffer_size_;
Expand Down
3 changes: 3 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ class PikaServer
* The udp server port and address
*/
int sockfd_;
int slave_sockfd_;
int flags_;
int port_;
struct sockaddr_in servaddr_;
Expand Down Expand Up @@ -132,6 +133,8 @@ class PikaServer
* last_thread_ is the last work thread
*/
int last_thread_;
int last_slave_thread_;
int thread_num_;
/*
* This is the work threads
*/
Expand Down
2 changes: 1 addition & 1 deletion src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ void SlaveofCmd::Do(std::list<std::string> &argv, std::string &ret) {
}
g_pikaServer->ms_state_ = PIKA_REP_CONNECT;
g_pikaServer->set_masterhost(p2);
g_pikaServer->set_masterport(port);
g_pikaServer->set_masterport(port+100);
ret = "+OK\r\n";
} else {
ret = "-ERR State is not in PIKA_REP_SINGLE\r\n";
Expand Down
6 changes: 5 additions & 1 deletion src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ PikaConf::PikaConf(const char* path) :

getConfInt("port", &port_);
getConfInt("thread_num", &thread_num_);
getConfInt("slave_thread_num", &slave_thread_num_);
getConfStr("log_path", log_path_);
getConfInt("log_level", &log_level_);
getConfStr("db_path", db_path_);
Expand All @@ -22,7 +23,10 @@ PikaConf::PikaConf(const char* path) :
getConfInt("target_file_size_base", &target_file_size_base_);

if (thread_num_ <= 0) {
thread_num_ = 24;
thread_num_ = 16;
}
if (slave_thread_num_ <= 0) {
slave_thread_num_ = 7;
}
if (write_buffer_size_ <= 0 ) {
write_buffer_size_ = 4194304; // 40M
Expand Down
50 changes: 29 additions & 21 deletions src/pika_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -385,30 +385,38 @@ int PikaConn::PikaSendReply()
{
ssize_t nwritten = 0;
if (role_ == PIKA_SLAVE) {
MutexLock l(&mutex_);
while (sdslen(wbuf_) > 0) {
nwritten = write(fd_, wbuf_, sdslen(wbuf_));
if (nwritten == -1) {
if (errno == EAGAIN) {
nwritten = 0;
} else {
/*
* Here we clear this connection
*/
LOG(INFO) << "send error, close client";
should_close_after_reply = true;
return 0;
}
}
if (nwritten <= 0) {
break;
}
sdsrange(wbuf_, nwritten, -1);
}
mutex_.Lock();
if (sdslen(wbuf_) == 0) {
mutex_.Unlock();
usleep(10000);
return 0;
} else {
return -1;
mutex_.Unlock();
MutexLock l(&mutex_);
while (sdslen(wbuf_) > 0) {
nwritten = write(fd_, wbuf_, sdslen(wbuf_));
if (nwritten == -1) {
if (errno == EAGAIN) {
nwritten = 0;
} else {
/*
* Here we clear this connection
*/
LOG(INFO) << "send error, close client";
should_close_after_reply = true;
return 0;
}
}
if (nwritten <= 0) {
break;
}
sdsrange(wbuf_, nwritten, -1);
}
if (sdslen(wbuf_) == 0) {
return 0;
} else {
return -1;
}
}
} else {
while (sdslen(wbuf_) > 0) {
Expand Down
90 changes: 76 additions & 14 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,22 @@ Status PikaServer::SetBlockType(BlockType type)
close(sockfd_);
return s;
}

if ((flags_ = fcntl(slave_sockfd_, F_GETFL, 0)) < 0) {
s = Status::Corruption("F_GETFEL error");
close(slave_sockfd_);
return s;
}
if (type == kBlock) {
flags_ &= (~O_NONBLOCK);
} else if (type == kNonBlock) {
flags_ |= O_NONBLOCK;
}
if (fcntl(slave_sockfd_, F_SETFL, flags_) < 0) {
s = Status::Corruption("F_SETFL error");
close(slave_sockfd_);
return s;
}
return Status::OK();
}

Expand Down Expand Up @@ -73,15 +89,35 @@ PikaServer::PikaServer()
}
listen(sockfd_, 10);

// init slave_sock
slave_sockfd_ = socket(AF_INET, SOCK_STREAM, 0);
memset(&servaddr_, 0, sizeof(servaddr_));
yes = 1;
if (setsockopt(slave_sockfd_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) == -1) {
LOG(FATAL) << "setsockopt SO_REUSEADDR: " << strerror(errno);
}
int slave_port = g_pikaConf->port() + 100;
servaddr_.sin_family = AF_INET;
servaddr_.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr_.sin_port = htons(slave_port);

ret = bind(slave_sockfd_, (struct sockaddr *) &servaddr_, sizeof(servaddr_));
if (ret < 0) {
LOG(FATAL) << "bind error: "<< strerror(errno);
}
listen(slave_sockfd_, 10);

SetBlockType(kNonBlock);

// init pika epoll
pikaEpoll_ = new PikaEpoll();
pikaEpoll_->PikaAddEvent(sockfd_, EPOLLIN | EPOLLERR | EPOLLHUP);
pikaEpoll_->PikaAddEvent(slave_sockfd_, EPOLLIN | EPOLLERR | EPOLLHUP);


thread_num_ = g_pikaConf->thread_num() + g_pikaConf->slave_thread_num() + 1;
last_thread_ = 0;
for (int i = 0; i < g_pikaConf->thread_num(); i++) {
last_slave_thread_ = 0;
for (int i = 0; i < thread_num_; i++) {
pikaThread_[i] = new PikaThread(i);
}

Expand All @@ -102,15 +138,15 @@ PikaServer::PikaServer()
// }

// start the pikaThread_ thread
for (int i = 0; i < g_pikaConf->thread_num(); i++) {
for (int i = 0; i < thread_num_; i++) {
pthread_create(&(pikaThread_[i]->thread_id_), NULL, &(PikaServer::StartThread), pikaThread_[i]);
}

}

PikaServer::~PikaServer()
{
for (int i = 0; i < g_pikaConf->thread_num(); i++) {
for (int i = 0; i < thread_num_; i++) {
delete(pikaThread_[i]);
}
delete(pikaEpoll_);
Expand Down Expand Up @@ -356,13 +392,14 @@ void PikaServer::ProcessTimeEvent(struct timeval* target) {
ip_port.append(":");
ll2string(buf, sizeof(buf), ntohs(s_addr.sin_port));
ip_port.append(buf);
std::queue<PikaItem> *q = &(pikaThread_[last_thread_]->conn_queue_);
std::queue<PikaItem> *q = &(pikaThread_[thread_num_-1]->conn_queue_);
LOG(INFO) << "Push Master to Thread " << thread_num_-1;
PikaItem ti(connfd, ip_port, PIKA_MASTER);
{
MutexLock l(&pikaThread_[last_thread_]->mutex_);
MutexLock l(&pikaThread_[thread_num_-1]->mutex_);
q->push(ti);
}
write(pikaThread_[last_thread_]->notify_send_fd(), "", 1);
write(pikaThread_[thread_num_-1]->notify_send_fd(), "", 1);
repl_state_ |= PIKA_SLAVE;
ms_state_ = PIKA_REP_CONNECTING;
}
Expand Down Expand Up @@ -396,7 +433,7 @@ int PikaServer::TrySync(/*std::string &ip, std::string &str_port,*/ int fd, uint
std::map<int, PikaConn*>::iterator iter_fd;
PikaConn* conn = NULL;
int i = 0;
for (i = 0; i < g_pikaConf->thread_num(); i++) {
for (i = g_pikaConf->thread_num(); i < thread_num_; i++) {
iter_fd = pikaThread_[i]->conns()->find(fd);
if (iter_fd != pikaThread_[i]->conns()->end()) {
conn = iter_fd->second;
Expand Down Expand Up @@ -455,7 +492,7 @@ int PikaServer::ClientList(std::string &res) {
std::map<std::string, client_info>::iterator iter;
res = "+";
char buf[32];
for (int i = 0; i < g_pikaConf->thread_num(); i++) {
for (int i = 0; i < thread_num_; i++) {

{
RWLock l(pikaThread_[i]->rwlock(), false);
Expand Down Expand Up @@ -483,7 +520,7 @@ int PikaServer::GetSlaveList(std::string &res) {
int slave_num = 0;
char buf[512];

for (int i = 0; i < g_pikaConf->thread_num(); i++) {
for (int i = g_pikaConf->thread_num(); i < thread_num_; i++) {
{
RWLock l(pikaThread_[i]->rwlock(), false);
for (iter = pikaThread_[i]->clients()->begin();
Expand All @@ -505,7 +542,7 @@ int PikaServer::GetSlaveList(std::string &res) {
int PikaServer::ClientNum() {
int client_num = 0;
std::map<std::string, client_info>::iterator iter;
for (int i = 0; i < g_pikaConf->thread_num(); i++) {
for (int i = 0; i < thread_num_; i++) {
{
RWLock l(pikaThread_[i]->rwlock(), false);
iter = pikaThread_[i]->clients()->begin();
Expand All @@ -521,7 +558,7 @@ int PikaServer::ClientNum() {
int PikaServer::ClientKill(std::string &ip_port) {
int i = 0;
std::map<std::string, client_info>::iterator iter;
for (i = 0; i < g_pikaConf->thread_num(); i++) {
for (i = 0; i < thread_num_; i++) {
{
RWLock l(pikaThread_[i]->rwlock(), true);
iter = pikaThread_[i]->clients()->find(ip_port);
Expand All @@ -531,7 +568,7 @@ int PikaServer::ClientKill(std::string &ip_port) {
}
}
}
if (i < g_pikaConf->thread_num()) {
if (i < thread_num_) {
return 1;
} else {
return 0;
Expand Down Expand Up @@ -560,7 +597,7 @@ int PikaServer::CurrentQps() {
int i = 0;
int qps = 0;
std::map<std::string, client_info>::iterator iter;
for (i = 0; i < g_pikaConf->thread_num(); i++) {
for (i = 0; i < thread_num_; i++) {
{
RWLock l(pikaThread_[i]->rwlock(), false);
qps+=pikaThread_[i]->last_sec_querynums_;
Expand Down Expand Up @@ -659,13 +696,38 @@ void PikaServer::RunProcess()
}
std::queue<PikaItem> *q = &(pikaThread_[last_thread_]->conn_queue_);
PikaItem ti(connfd, ip_port);
LOG(INFO) << "Push Client to Thread " << (last_thread_);
{
MutexLock l(&pikaThread_[last_thread_]->mutex_);
q->push(ti);
}
write(pikaThread_[last_thread_]->notify_send_fd(), "", 1);
last_thread_++;
last_thread_ %= g_pikaConf->thread_num();
} else if (fd == slave_sockfd_ && ((tfe + i)->mask_ & EPOLLIN)) {
connfd = accept(slave_sockfd_, (struct sockaddr *) &cliaddr, &clilen);
// LOG(INFO) << "Accept new connection, fd: " << connfd << " ip: " << inet_ntop(AF_INET, &cliaddr.sin_addr, ipAddr, sizeof(ipAddr)) << " port: " << ntohs(cliaddr.sin_port);
ip_port = inet_ntop(AF_INET, &cliaddr.sin_addr, ipAddr, sizeof(ipAddr));
ip_port.append(":");
ll2string(buf, sizeof(buf), ntohs(cliaddr.sin_port));
ip_port.append(buf);
int clientnum = ClientNum();
if (clientnum >= g_pikaConf->maxconnection()) {
LOG(WARNING) << "Reach Max Connection: "<< g_pikaConf->maxconnection() << " refuse new client: " << ip_port;
close(connfd);
continue;
}
int user_thread_num = g_pikaConf->thread_num();
std::queue<PikaItem> *q = &(pikaThread_[last_slave_thread_ + user_thread_num]->conn_queue_);
PikaItem ti(connfd, ip_port);
LOG(INFO) << "Push Slave to Thread " << (last_slave_thread_ + user_thread_num);
{
MutexLock l(&pikaThread_[last_slave_thread_ + user_thread_num]->mutex_);
q->push(ti);
}
write(pikaThread_[last_slave_thread_ + user_thread_num]->notify_send_fd(), "", 1);
last_slave_thread_++;
last_slave_thread_ %= g_pikaConf->slave_thread_num();
} else if ((tfe + i)->mask_ & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) {
LOG(WARNING) << "Epoll timeout event";
}
Expand Down

0 comments on commit 5812436

Please sign in to comment.