forked from OpenAtomFoundation/pika
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pika_dispatch_thread.cc
92 lines (78 loc) · 2.96 KB
/
pika_dispatch_thread.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include <glog/logging.h>
#include "pika_dispatch_thread.h"
#include "pika_client_conn.h"
#include "pika_server.h"
#include "pika_conf.h"
extern PikaServer* g_pika_server;
extern PikaConf* g_pika_conf;
PikaDispatchThread::PikaDispatchThread(std::set<std::string> &ips, int port, int work_num,
int cron_interval, int queue_limit)
: handles_(this) {
thread_rep_ = pink::NewDispatchThread(ips, port, work_num, &conn_factory_,
cron_interval, queue_limit, &handles_);
thread_rep_->set_thread_name("Dispatcher");
}
PikaDispatchThread::~PikaDispatchThread() {
thread_rep_->StopThread();
LOG(INFO) << "dispatch thread " << thread_rep_->thread_id() << " exit!!!";
delete thread_rep_;
}
int PikaDispatchThread::StartThread() {
return thread_rep_->StartThread();
}
int64_t PikaDispatchThread::ThreadClientList(std::vector<ClientInfo> *clients) {
std::vector<pink::ServerThread::ConnInfo> conns_info =
thread_rep_->conns_info();
if (clients != nullptr) {
for (auto& info : conns_info) {
clients->push_back({
info.fd,
info.ip_port,
info.last_interaction.tv_sec,
nullptr /* PinkConn pointer, doesn't need here */
});
}
}
return conns_info.size();
}
bool PikaDispatchThread::ClientKill(const std::string& ip_port) {
return thread_rep_->KillConn(ip_port);
}
void PikaDispatchThread::ClientKillAll() {
thread_rep_->KillAllConns();
}
bool PikaDispatchThread::Handles::AccessHandle(std::string& ip) const {
if (ip == "127.0.0.1") {
ip = g_pika_server->host();
}
int client_num = pika_disptcher_->thread_rep_->conn_num();
if ((client_num >= g_pika_conf->maxclients() + g_pika_conf->root_connection_num())
|| (client_num >= g_pika_conf->maxclients() && ip != g_pika_server->host())) {
LOG(WARNING) << "Max connections reach, Deny new comming: " << ip;
return false;
}
DLOG(INFO) << "new clinet comming, ip: " << ip;
g_pika_server->incr_accumulative_connections();
return true;
}
void PikaDispatchThread::Handles::CronHandle() const {
pika_disptcher_->thread_rep_->set_keepalive_timeout(g_pika_conf->timeout());
g_pika_server->ResetLastSecQuerynum();
}
int PikaDispatchThread::Handles::CreateWorkerSpecificData(void** data) const {
CmdTable* cmds = new CmdTable;
cmds->reserve(300);
InitCmdTable(cmds);
*data = reinterpret_cast<void*>(cmds);
return 0;
}
int PikaDispatchThread::Handles::DeleteWorkerSpecificData(void* data) const {
CmdTable* cmds = reinterpret_cast<CmdTable*>(data);
DestoryCmdTable(cmds);
delete cmds;
return 0;
}