diff --git a/conf/pika.conf b/conf/pika.conf index 02ee69a80f..a7bffc3996 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -2,6 +2,8 @@ port : 9221 # Thread Number thread_num : 1 +# Slave Thread Number +slave_thread_num : 1 # Pika log path log_path : ./log/ # Pika glog level diff --git a/include/pika_conf.h b/include/pika_conf.h index 275594cb46..ab75ceab08 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -16,6 +16,7 @@ class PikaConf : public BaseConf ~PikaConf() { pthread_rwlock_destroy(&rwlock_); } int port() { RWLock l(&rwlock_, false); return port_; } int thread_num() { RWLock l(&rwlock_, false); return thread_num_; } + int slave_thread_num() { RWLock l(&rwlock_, false); return slave_thread_num_; } char* log_path() { RWLock l(&rwlock_, false); return log_path_; } int log_level() { RWLock l(&rwlock_, false); return log_level_; } char* db_path() { RWLock l(&rwlock_, false); return db_path_; } @@ -47,6 +48,7 @@ class PikaConf : public BaseConf private: int port_; int thread_num_; + int slave_thread_num_; char log_path_[PIKA_WORD_SIZE]; char db_path_[PIKA_WORD_SIZE]; int write_buffer_size_; diff --git a/include/pika_server.h b/include/pika_server.h index dc498ba81c..0100aa910d 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -104,6 +104,7 @@ class PikaServer * The udp server port and address */ int sockfd_; + int slave_sockfd_; int flags_; int port_; struct sockaddr_in servaddr_; @@ -132,6 +133,8 @@ class PikaServer * last_thread_ is the last work thread */ int last_thread_; + int last_slave_thread_; + int thread_num_; /* * This is the work threads */ diff --git a/src/pika_admin.cc b/src/pika_admin.cc index 80acae8fff..44e658811b 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -190,7 +190,7 @@ void SlaveofCmd::Do(std::list &argv, std::string &ret) { } g_pikaServer->ms_state_ = PIKA_REP_CONNECT; g_pikaServer->set_masterhost(p2); - g_pikaServer->set_masterport(port); + g_pikaServer->set_masterport(port+100); ret = "+OK\r\n"; } else { ret = "-ERR State is not in PIKA_REP_SINGLE\r\n"; diff --git a/src/pika_conf.cc b/src/pika_conf.cc index 4359ac8edc..7e6da3353e 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -10,6 +10,7 @@ PikaConf::PikaConf(const char* path) : getConfInt("port", &port_); getConfInt("thread_num", &thread_num_); + getConfInt("slave_thread_num", &slave_thread_num_); getConfStr("log_path", log_path_); getConfInt("log_level", &log_level_); getConfStr("db_path", db_path_); @@ -22,7 +23,10 @@ PikaConf::PikaConf(const char* path) : getConfInt("target_file_size_base", &target_file_size_base_); if (thread_num_ <= 0) { - thread_num_ = 24; + thread_num_ = 16; + } + if (slave_thread_num_ <= 0) { + slave_thread_num_ = 7; } if (write_buffer_size_ <= 0 ) { write_buffer_size_ = 4194304; // 40M diff --git a/src/pika_conn.cc b/src/pika_conn.cc index e30cfd7c21..4a2cfb0069 100644 --- a/src/pika_conn.cc +++ b/src/pika_conn.cc @@ -385,30 +385,38 @@ int PikaConn::PikaSendReply() { ssize_t nwritten = 0; if (role_ == PIKA_SLAVE) { - MutexLock l(&mutex_); - while (sdslen(wbuf_) > 0) { - nwritten = write(fd_, wbuf_, sdslen(wbuf_)); - if (nwritten == -1) { - if (errno == EAGAIN) { - nwritten = 0; - } else { - /* - * Here we clear this connection - */ - LOG(INFO) << "send error, close client"; - should_close_after_reply = true; - return 0; - } - } - if (nwritten <= 0) { - break; - } - sdsrange(wbuf_, nwritten, -1); - } + mutex_.Lock(); if (sdslen(wbuf_) == 0) { + mutex_.Unlock(); + usleep(10000); return 0; } else { - return -1; + mutex_.Unlock(); + MutexLock l(&mutex_); + while (sdslen(wbuf_) > 0) { + nwritten = write(fd_, wbuf_, sdslen(wbuf_)); + if (nwritten == -1) { + if (errno == EAGAIN) { + nwritten = 0; + } else { + /* + * Here we clear this connection + */ + LOG(INFO) << "send error, close client"; + should_close_after_reply = true; + return 0; + } + } + if (nwritten <= 0) { + break; + } + sdsrange(wbuf_, nwritten, -1); + } + if (sdslen(wbuf_) == 0) { + return 0; + } else { + return -1; + } } } else { while (sdslen(wbuf_) > 0) { diff --git a/src/pika_server.cc b/src/pika_server.cc index c07f8b1020..13f224d6a9 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -39,6 +39,22 @@ Status PikaServer::SetBlockType(BlockType type) close(sockfd_); return s; } + + if ((flags_ = fcntl(slave_sockfd_, F_GETFL, 0)) < 0) { + s = Status::Corruption("F_GETFEL error"); + close(slave_sockfd_); + return s; + } + if (type == kBlock) { + flags_ &= (~O_NONBLOCK); + } else if (type == kNonBlock) { + flags_ |= O_NONBLOCK; + } + if (fcntl(slave_sockfd_, F_SETFL, flags_) < 0) { + s = Status::Corruption("F_SETFL error"); + close(slave_sockfd_); + return s; + } return Status::OK(); } @@ -73,15 +89,35 @@ PikaServer::PikaServer() } listen(sockfd_, 10); + // init slave_sock + slave_sockfd_ = socket(AF_INET, SOCK_STREAM, 0); + memset(&servaddr_, 0, sizeof(servaddr_)); + yes = 1; + if (setsockopt(slave_sockfd_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) == -1) { + LOG(FATAL) << "setsockopt SO_REUSEADDR: " << strerror(errno); + } + int slave_port = g_pikaConf->port() + 100; + servaddr_.sin_family = AF_INET; + servaddr_.sin_addr.s_addr = htonl(INADDR_ANY); + servaddr_.sin_port = htons(slave_port); + + ret = bind(slave_sockfd_, (struct sockaddr *) &servaddr_, sizeof(servaddr_)); + if (ret < 0) { + LOG(FATAL) << "bind error: "<< strerror(errno); + } + listen(slave_sockfd_, 10); + SetBlockType(kNonBlock); // init pika epoll pikaEpoll_ = new PikaEpoll(); pikaEpoll_->PikaAddEvent(sockfd_, EPOLLIN | EPOLLERR | EPOLLHUP); + pikaEpoll_->PikaAddEvent(slave_sockfd_, EPOLLIN | EPOLLERR | EPOLLHUP); - + thread_num_ = g_pikaConf->thread_num() + g_pikaConf->slave_thread_num() + 1; last_thread_ = 0; - for (int i = 0; i < g_pikaConf->thread_num(); i++) { + last_slave_thread_ = 0; + for (int i = 0; i < thread_num_; i++) { pikaThread_[i] = new PikaThread(i); } @@ -102,7 +138,7 @@ PikaServer::PikaServer() // } // start the pikaThread_ thread - for (int i = 0; i < g_pikaConf->thread_num(); i++) { + for (int i = 0; i < thread_num_; i++) { pthread_create(&(pikaThread_[i]->thread_id_), NULL, &(PikaServer::StartThread), pikaThread_[i]); } @@ -110,7 +146,7 @@ PikaServer::PikaServer() PikaServer::~PikaServer() { - for (int i = 0; i < g_pikaConf->thread_num(); i++) { + for (int i = 0; i < thread_num_; i++) { delete(pikaThread_[i]); } delete(pikaEpoll_); @@ -356,13 +392,14 @@ void PikaServer::ProcessTimeEvent(struct timeval* target) { ip_port.append(":"); ll2string(buf, sizeof(buf), ntohs(s_addr.sin_port)); ip_port.append(buf); - std::queue *q = &(pikaThread_[last_thread_]->conn_queue_); + std::queue *q = &(pikaThread_[thread_num_-1]->conn_queue_); + LOG(INFO) << "Push Master to Thread " << thread_num_-1; PikaItem ti(connfd, ip_port, PIKA_MASTER); { - MutexLock l(&pikaThread_[last_thread_]->mutex_); + MutexLock l(&pikaThread_[thread_num_-1]->mutex_); q->push(ti); } - write(pikaThread_[last_thread_]->notify_send_fd(), "", 1); + write(pikaThread_[thread_num_-1]->notify_send_fd(), "", 1); repl_state_ |= PIKA_SLAVE; ms_state_ = PIKA_REP_CONNECTING; } @@ -396,7 +433,7 @@ int PikaServer::TrySync(/*std::string &ip, std::string &str_port,*/ int fd, uint std::map::iterator iter_fd; PikaConn* conn = NULL; int i = 0; - for (i = 0; i < g_pikaConf->thread_num(); i++) { + for (i = g_pikaConf->thread_num(); i < thread_num_; i++) { iter_fd = pikaThread_[i]->conns()->find(fd); if (iter_fd != pikaThread_[i]->conns()->end()) { conn = iter_fd->second; @@ -455,7 +492,7 @@ int PikaServer::ClientList(std::string &res) { std::map::iterator iter; res = "+"; char buf[32]; - for (int i = 0; i < g_pikaConf->thread_num(); i++) { + for (int i = 0; i < thread_num_; i++) { { RWLock l(pikaThread_[i]->rwlock(), false); @@ -483,7 +520,7 @@ int PikaServer::GetSlaveList(std::string &res) { int slave_num = 0; char buf[512]; - for (int i = 0; i < g_pikaConf->thread_num(); i++) { + for (int i = g_pikaConf->thread_num(); i < thread_num_; i++) { { RWLock l(pikaThread_[i]->rwlock(), false); for (iter = pikaThread_[i]->clients()->begin(); @@ -505,7 +542,7 @@ int PikaServer::GetSlaveList(std::string &res) { int PikaServer::ClientNum() { int client_num = 0; std::map::iterator iter; - for (int i = 0; i < g_pikaConf->thread_num(); i++) { + for (int i = 0; i < thread_num_; i++) { { RWLock l(pikaThread_[i]->rwlock(), false); iter = pikaThread_[i]->clients()->begin(); @@ -521,7 +558,7 @@ int PikaServer::ClientNum() { int PikaServer::ClientKill(std::string &ip_port) { int i = 0; std::map::iterator iter; - for (i = 0; i < g_pikaConf->thread_num(); i++) { + for (i = 0; i < thread_num_; i++) { { RWLock l(pikaThread_[i]->rwlock(), true); iter = pikaThread_[i]->clients()->find(ip_port); @@ -531,7 +568,7 @@ int PikaServer::ClientKill(std::string &ip_port) { } } } - if (i < g_pikaConf->thread_num()) { + if (i < thread_num_) { return 1; } else { return 0; @@ -560,7 +597,7 @@ int PikaServer::CurrentQps() { int i = 0; int qps = 0; std::map::iterator iter; - for (i = 0; i < g_pikaConf->thread_num(); i++) { + for (i = 0; i < thread_num_; i++) { { RWLock l(pikaThread_[i]->rwlock(), false); qps+=pikaThread_[i]->last_sec_querynums_; @@ -659,6 +696,7 @@ void PikaServer::RunProcess() } std::queue *q = &(pikaThread_[last_thread_]->conn_queue_); PikaItem ti(connfd, ip_port); + LOG(INFO) << "Push Client to Thread " << (last_thread_); { MutexLock l(&pikaThread_[last_thread_]->mutex_); q->push(ti); @@ -666,6 +704,30 @@ void PikaServer::RunProcess() write(pikaThread_[last_thread_]->notify_send_fd(), "", 1); last_thread_++; last_thread_ %= g_pikaConf->thread_num(); + } else if (fd == slave_sockfd_ && ((tfe + i)->mask_ & EPOLLIN)) { + connfd = accept(slave_sockfd_, (struct sockaddr *) &cliaddr, &clilen); +// LOG(INFO) << "Accept new connection, fd: " << connfd << " ip: " << inet_ntop(AF_INET, &cliaddr.sin_addr, ipAddr, sizeof(ipAddr)) << " port: " << ntohs(cliaddr.sin_port); + ip_port = inet_ntop(AF_INET, &cliaddr.sin_addr, ipAddr, sizeof(ipAddr)); + ip_port.append(":"); + ll2string(buf, sizeof(buf), ntohs(cliaddr.sin_port)); + ip_port.append(buf); + int clientnum = ClientNum(); + if (clientnum >= g_pikaConf->maxconnection()) { + LOG(WARNING) << "Reach Max Connection: "<< g_pikaConf->maxconnection() << " refuse new client: " << ip_port; + close(connfd); + continue; + } + int user_thread_num = g_pikaConf->thread_num(); + std::queue *q = &(pikaThread_[last_slave_thread_ + user_thread_num]->conn_queue_); + PikaItem ti(connfd, ip_port); + LOG(INFO) << "Push Slave to Thread " << (last_slave_thread_ + user_thread_num); + { + MutexLock l(&pikaThread_[last_slave_thread_ + user_thread_num]->mutex_); + q->push(ti); + } + write(pikaThread_[last_slave_thread_ + user_thread_num]->notify_send_fd(), "", 1); + last_slave_thread_++; + last_slave_thread_ %= g_pikaConf->slave_thread_num(); } else if ((tfe + i)->mask_ & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) { LOG(WARNING) << "Epoll timeout event"; }