Skip to content

Commit

Permalink
add worker queue limit to pika and pink
Browse files Browse the repository at this point in the history
  • Loading branch information
flabby committed Mar 6, 2017
1 parent 5429dc6 commit 184593c
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 11 deletions.
11 changes: 8 additions & 3 deletions include/pika_dispatch_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@
class PikaDispatchThread : public pink::DispatchThread<PikaClientConn>
{
public:
PikaDispatchThread(int port, int work_num, PikaWorkerThread** pika_worker_thread, int cron_interval);
PikaDispatchThread(std::string &ip, int port, int work_num, PikaWorkerThread** pika_worker_thread, int cron_interval);
PikaDispatchThread(std::set<std::string> &ips, int port, int work_num, PikaWorkerThread** pika_worker_thread, int cron_interval);
PikaDispatchThread(int port, int work_num, PikaWorkerThread** pika_worker_thread,
int cron_interval, int queue_limit);
PikaDispatchThread(std::string &ip, int port, int work_num,
PikaWorkerThread** pika_worker_thread,
int cron_interval, int queue_limit);
PikaDispatchThread(std::set<std::string> &ips, int port, int work_num,
PikaWorkerThread** pika_worker_thread,
int cron_interval, int queue_limit);
virtual ~PikaDispatchThread();
virtual bool AccessHandle(std::string& ip);

Expand Down
24 changes: 18 additions & 6 deletions src/pika_dispatch_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,28 @@
extern PikaServer* g_pika_server;
extern PikaConf* g_pika_conf;

PikaDispatchThread::PikaDispatchThread(int port, int work_num, PikaWorkerThread** pika_worker_thread, int cron_interval) :
DispatchThread::DispatchThread(port, work_num, reinterpret_cast<pink::WorkerThread<PikaClientConn>**>(pika_worker_thread), cron_interval) {
PikaDispatchThread::PikaDispatchThread(int port, int work_num,
PikaWorkerThread** pika_worker_thread,
int cron_interval, int queue_limit) :
DispatchThread::DispatchThread(port, work_num,
reinterpret_cast<pink::WorkerThread<PikaClientConn>**>(pika_worker_thread),
cron_interval, queue_limit) {
}

PikaDispatchThread::PikaDispatchThread(std::string &ip, int port, int work_num, PikaWorkerThread** pika_worker_thread, int cron_interval) :
DispatchThread::DispatchThread(ip, port, work_num, reinterpret_cast<pink::WorkerThread<PikaClientConn>**>(pika_worker_thread), cron_interval) {
PikaDispatchThread::PikaDispatchThread(std::string &ip, int port, int work_num,
PikaWorkerThread** pika_worker_thread,
int cron_interval, int queue_limit) :
DispatchThread::DispatchThread(ip, port, work_num,
reinterpret_cast<pink::WorkerThread<PikaClientConn>**>(pika_worker_thread),
cron_interval, queue_limit) {
}

PikaDispatchThread::PikaDispatchThread(std::set<std::string> &ips, int port, int work_num, PikaWorkerThread** pika_worker_thread, int cron_interval) :
DispatchThread::DispatchThread(ips, port, work_num, reinterpret_cast<pink::WorkerThread<PikaClientConn>**>(pika_worker_thread), cron_interval) {
PikaDispatchThread::PikaDispatchThread(std::set<std::string> &ips, int port, int work_num,
PikaWorkerThread** pika_worker_thread,
int cron_interval, int queue_limit) :
DispatchThread::DispatchThread(ips, port, work_num,
reinterpret_cast<pink::WorkerThread<PikaClientConn>**>(pika_worker_thread),
cron_interval, queue_limit) {
}

PikaDispatchThread::~PikaDispatchThread() {
Expand Down
5 changes: 4 additions & 1 deletion src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ PikaServer::PikaServer() :
ips.insert("127.0.0.1");
ips.insert(host_);
}
pika_dispatch_thread_ = new PikaDispatchThread(ips, port_, worker_num_, pika_worker_thread_, 3000);
// We estimate the queue size
int worker_queue_limit = g_pika_conf->maxclients() / worker_num_ + 100;
LOG(INFO) << "Worker queue limit is " << worker_queue_limit;
pika_dispatch_thread_ = new PikaDispatchThread(ips, port_, worker_num_, pika_worker_thread_, 3000, worker_queue_limit);
pika_binlog_receiver_thread_ = new PikaBinlogReceiverThread(ips, port_ + 1000, 1000);
pika_heartbeat_thread_ = new PikaHeartbeatThread(ips, port_ + 2000, 1000);
pika_trysync_thread_ = new PikaTrysyncThread();
Expand Down
2 changes: 1 addition & 1 deletion third/pink
Submodule pink updated 1 files
+52 −21 include/dispatch_thread.h

0 comments on commit 184593c

Please sign in to comment.