Skip to content

Commit

Permalink
add config item: maxconnection, root_connection and slowlog
Browse files Browse the repository at this point in the history
  • Loading branch information
flabby committed Mar 24, 2016
1 parent 814c294 commit 85bb96e
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 37 deletions.
51 changes: 33 additions & 18 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,36 @@ class PikaConf : public slash::BaseConf {
int port() { RWLock l(&rwlock_, false); return port_; }
int thread_num() { RWLock l(&rwlock_, false); return thread_num_; }
int slave_thread_num() { RWLock l(&rwlock_, false); return slave_thread_num_; }
std::string log_path() { RWLock l(&rwlock_, false); return log_path_; }
std::string log_path() { RWLock l(&rwlock_, false); return log_path_; }
int log_level() { RWLock l(&rwlock_, false); return log_level_; }
std::string db_path() { RWLock l(&rwlock_, false); return db_path_; }
int write_buffer_size() { RWLock l(&rwlock_, false); return write_buffer_size_; }
std::string db_path() { RWLock l(&rwlock_, false); return db_path_; }
int write_buffer_size() { RWLock l(&rwlock_, false); return write_buffer_size_; }
int timeout() { RWLock l(&rwlock_, false); return timeout_; }

std::string requirepass() { RWLock l(&rwlock_, false); return requirepass_; }
std::string bgsave_path() { RWLock l(&rwlock_, false); return bgsave_path_; }
std::string userpass() { RWLock l(&rwlock_, false); return userpass_; }
std::string userpass() { RWLock l(&rwlock_, false); return userpass_; }
const std::string suser_blacklist() {
RWLock l(&rwlock_, false); return slash::StringConcat(user_blacklist_, COMMA);
RWLock l(&rwlock_, false);
return slash::StringConcat(user_blacklist_, COMMA);
}
const std::vector<std::string>& vuser_blacklist() {
RWLock l(&rwlock_, false); return user_blacklist_;
}
std::string compression() { RWLock l(&rwlock_, false); return compression_; }
int target_file_size_base() { RWLock l(&rwlock_, false); return target_file_size_base_; }
int expire_logs_nums() { RWLock l(&rwlock_, false); return expire_logs_nums_; }
int expire_logs_days() { RWLock l(&rwlock_, false); return expire_logs_days_; }
std::string compression() { RWLock l(&rwlock_, false); return compression_; }
int target_file_size_base() { RWLock l(&rwlock_, false); return target_file_size_base_; }
int expire_logs_nums() { RWLock l(&rwlock_, false); return expire_logs_nums_; }
int expire_logs_days() { RWLock l(&rwlock_, false); return expire_logs_days_; }
std::string conf_path() { RWLock l(&rwlock_, false); return conf_path_; }
bool readonly() {
RWLock l(&rwlock_, false); return readonly_;
}
bool readonly() { RWLock l(&rwlock_, false); return readonly_; }
int maxconnection() { RWLock l(&rwlock_, false); return maxconnection_; }
int root_connection_num() { RWLock l(&rwlock_, false); return root_connection_num_; }
int slowlog_slower_than() { RWLock l(&rwlock_, false); return slowlog_slower_than_; }

// Immutable config items, we don't use lock.
bool daemonize() { return daemonize_; }
std::string pidfile() { return pidfile_; }
int binlog_file_size() { return binlog_file_size_; }
bool daemonize() { return daemonize_; }
std::string pidfile() { return pidfile_; }
int binlog_file_size() { return binlog_file_size_; }

// Setter
void SetPort(const int value) { RWLock l(&rwlock_, true); port_ = value; }
Expand Down Expand Up @@ -90,9 +92,22 @@ class PikaConf : public slash::BaseConf {
RWLock l(&rwlock_, true);
expire_logs_days_ = value;
}
void SetMaxConnection(const int value) {
RWLock l(&rwlock_, true);
maxconnection_ = value;
}
void SetRootConnectionNum(const int value) {
RWLock l(&rwlock_, true);
root_connection_num_ = value;
}
void SetSlowlogSlowerThan(const int value) {
RWLock l(&rwlock_, true);
slowlog_slower_than_ = value;
}

int Load();
int ConfigRewrite();

private:
int port_;
int thread_num_;
Expand All @@ -113,11 +128,11 @@ class PikaConf : public slash::BaseConf {

//char pidfile_[PIKA_WORD_SIZE];
std::string compression_;
//int maxconnection_;
int maxconnection_;
int root_connection_num_;
int slowlog_slower_than_;
int expire_logs_days_;
int expire_logs_nums_;
//int root_connection_num_;
//int slowlog_slower_than_;
bool readonly_;
std::string conf_path_;
//char username_[30];
Expand Down
2 changes: 1 addition & 1 deletion src/pika_binlog_receiver_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ PikaBinlogReceiverThread::PikaBinlogReceiverThread(int port, int cron_interval)

PikaBinlogReceiverThread::~PikaBinlogReceiverThread() {
DestoryCmdTable(cmds_);
DLOG(INFO) << "BinlogReceiver thread " << pthread_self() << " exit!!!";
DLOG(INFO) << "BinlogReceiver thread " << thread_id() << " exit!!!";
}

bool PikaBinlogReceiverThread::AccessHandle(std::string& ip) {
Expand Down
2 changes: 1 addition & 1 deletion src/pika_binlog_sender_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ PikaBinlogSenderThread::~PikaBinlogSenderThread() {
}
pthread_rwlock_destroy(&rwlock_);
delete [] backing_store_;
DLOG(INFO) << "a BinlogSender thread " << pthread_self() << " exit!";
DLOG(INFO) << "a BinlogSender thread " << thread_id() << " exit!";
}

int PikaBinlogSenderThread::trim() {
Expand Down
12 changes: 12 additions & 0 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ std::string PikaClientConn::DoCmd(const std::string& opt) {
return "-ERR NOAUTH Authentication required.\r\n";
}

uint64_t start_us, end_us;
if (g_pika_conf->slowlog_slower_than() >= 0) {
start_us = slash::NowMicros();
}

// For now, only shutdown need check local
if (cinfo_ptr->is_local()) {
if (ip_port().find("127.0.0.1") != std::string::npos
Expand Down Expand Up @@ -90,6 +95,13 @@ std::string PikaClientConn::DoCmd(const std::string& opt) {
pthread_rwlock_unlock(g_pika_server->rwlock());
}

if (g_pika_conf->slowlog_slower_than() >= 0) {
uint64_t duration = slash::NowMicros() - start_us;
if (duration > g_pika_conf->slowlog_slower_than()) {
LOG(ERROR) << "command:" << opt << ", start_time(s): " << start_us / 1000000 << ", duration(us): " << duration;
}
}

if (opt == kCmdNameAuth) {
if(!auth_stat_.ChecknUpdate(c_ptr->res().raw_message())) {
LOG(WARNING) << "(" << ip_port() << ")Wrong Password, close connection";
Expand Down
29 changes: 18 additions & 11 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ int PikaConf::Load()
GetConfStr("requirepass", &requirepass_);
GetConfStr("userpass", &userpass_);

GetConfInt("maxconnection", &maxconnection_);
if (maxconnection_ <= 0) {
maxconnection_ = 20000;
}

GetConfInt("root_connection_num", &root_connection_num_);
if (root_connection_num_ < 0) {
root_connection_num_ = 2;
}

GetConfInt("slowlog_slower_than", &slowlog_slower_than_);

std::string user_blacklist;
GetConfStr("userblacklist", &user_blacklist);
SetUserBlackList(std::string(user_blacklist));
Expand Down Expand Up @@ -87,24 +99,14 @@ int PikaConf::Load()
if (timeout_ <= 0) {
timeout_ = 60; // 60s
}
//if (maxconnection_ <= 0) {
// maxconnection_ = 20000;
//}
if (expire_logs_days_ <= 0 ) {
expire_logs_days_ = 1;
}
if (expire_logs_nums_ <= 10 ) {
expire_logs_nums_ = 10;
}
//if (root_connection_num_ < 0) {
// root_connection_num_ = 0;
//}
//if (db_sync_speed_ < 0 || db_sync_speed_ > 125) {
// db_sync_speed_ = 125;
//}

return ret;

}

int PikaConf::ConfigRewrite() {
Expand All @@ -120,11 +122,16 @@ int PikaConf::ConfigRewrite() {
SetConfStr("userpass", userpass_);
SetConfStr("userblacklist", suser_blacklist());
SetConfStr("dump_path", bgsave_path_);
SetConfInt("maxconnection", maxconnection_);
SetConfInt("root_connection_num", root_connection_num_);
SetConfInt("slowlog_slower_than", slowlog_slower_than_);
SetConfInt("target_file_size_base", target_file_size_base_);
SetConfInt("expire_logs_nums", expire_logs_nums_);
SetConfInt("expire_logs_days", expire_logs_days_);
SetConfStr("compression", compression_);
SetConfBool("slave_read_only", readonly_);

SetConfInt("binlog_file_size_", binlog_file_size_);
SetConfStr("compression", compression_);

return WriteBack();
}
10 changes: 8 additions & 2 deletions src/pika_dispatch_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@
#include "pika_dispatch_thread.h"
#include "pika_client_conn.h"
#include "pika_server.h"
#include "pika_conf.h"

extern PikaServer* g_pika_server;
extern PikaConf* g_pika_conf;

PikaDispatchThread::PikaDispatchThread(int port, int work_num, PikaWorkerThread** pika_worker_thread, int cron_interval) :
DispatchThread::DispatchThread(port, work_num, reinterpret_cast<pink::WorkerThread<PikaClientConn>**>(pika_worker_thread), cron_interval) {
}

PikaDispatchThread::~PikaDispatchThread() {
DLOG(INFO) << "dispatch thread " << pthread_self() << " exit!!!";
DLOG(INFO) << "dispatch thread " << thread_id() << " exit!!!";
}

void PikaDispatchThread::CronHandle() {
Expand All @@ -34,10 +36,14 @@ bool PikaDispatchThread::AccessHandle(std::string& ip) {
if (ip == "127.0.0.1") {
ip = g_pika_server->host();
}
if (ClientNum() >= 1000) {

int client_num = ClientNum();
if ((client_num >= g_pika_conf->maxconnection() + g_pika_conf->root_connection_num())
|| (client_num >= g_pika_conf->maxconnection() && ip != g_pika_server->host())) {
DLOG(INFO) << "Max connections reach, Deny new comming: " << ip;
return false;
}

DLOG(INFO) << "ip: " << ip;
g_pika_server->incr_accumulative_connections();
return true;
Expand Down
4 changes: 2 additions & 2 deletions src/pika_heartbeat_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ PikaHeartbeatThread::PikaHeartbeatThread(int port, int cron_interval) :
}

PikaHeartbeatThread::~PikaHeartbeatThread() {
DLOG(INFO) << "PikaHeartbeat thread " << pthread_self() << " exit!!!";
DLOG(INFO) << "PikaHeartbeat thread " << thread_id() << " exit!!!";
}

void PikaHeartbeatThread::CronHandle() {
Expand All @@ -24,7 +24,7 @@ void PikaHeartbeatThread::CronHandle() {
slash::RWLock l(&rwlock_, true); // Use WriteLock to iterate the conns_
std::map<int, void*>::iterator iter = conns_.begin();
while (iter != conns_.end()) {
if (now.tv_sec - static_cast<PikaHeartbeatConn*>(iter->second)->last_interaction().tv_sec > 30) {
if (now.tv_sec - static_cast<PikaHeartbeatConn*>(iter->second)->last_interaction().tv_sec > 20) {
DLOG(INFO) << "Find Timeout Slave: " << static_cast<PikaHeartbeatConn*>(iter->second)->ip_port();
close(iter->first);
// erase item in slaves_
Expand Down
2 changes: 1 addition & 1 deletion src/pika_worker_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ PikaWorkerThread::PikaWorkerThread(int cron_interval):
PikaWorkerThread::~PikaWorkerThread() {
should_exit_ = true;
DestoryCmdTable(cmds_);
DLOG(INFO) << "A worker thread " << pthread_self() << " exit!!!";
DLOG(INFO) << "A worker thread " << thread_id() << " exit!!!";
}

void PikaWorkerThread::CronHandle() {
Expand Down
2 changes: 1 addition & 1 deletion third/slash
Submodule slash updated from 5107cb to d20462

0 comments on commit 85bb96e

Please sign in to comment.