diff --git a/conf/pika.conf b/conf/pika.conf index 5ab58621c9..7d56dad8cb 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -1,19 +1,19 @@ # Pika port port : 9221 # Thread Number -thread_num : 1 +thread-num : 1 # Sync Thread Number -sync_thread_num : 6 +sync-thread-num : 6 # Item count of sync thread queue -sync_buffer_size : 10 +sync-buffer-size : 10 # Pika log path -log_path : ./log/ -# Pika glog level -log_level : 0 +log-path : ./log/ +# Pika glog level: only INFO and ERROR +loglevel : 0 # Pika db path -db_path : ./db/ -# Pika write_buffer_size -write_buffer_size : 268435456 +db-path : ./db/ +# Pika write-buffer-size +write-buffer-size : 268435456 # Pika timeout timeout : 60 # Requirepass @@ -23,40 +23,40 @@ userpass : # User Blacklist userblacklist : # Dump Prefix -dump_prefix : +dump-prefix : # daemonize [yes | no] #daemonize : yes # Dump Path -dump_path : ./dump/ +dump-path : ./dump/ # pidfile Path pidfile : ./pika.pid # Max Connection -maxconnection : 20000 +maxclients : 20000 # the per file size of sst to compact, defalut is 2M -target_file_size_base : 20971520 -# Expire_logs_days -expire_logs_days : 7 -# Expire_logs_nums -expire_logs_nums : 10 -# Root_connection_num -root_connection_num : 2 -# Slowlog_log_slower_than -slowlog_log_slower_than : 10000 +target-file-size-base : 20971520 +# Expire-logs-days +expire-logs-days : 7 +# Expire-logs-nums +expire-logs-nums : 10 +# Root-connection-num +root-connection-num : 2 +# Slowlog-log-slower-than +slowlog-log-slower-than : 10000 # slave-read-only(yes/no, 1/0) -slave_read_only : 0 +slave-read-only : 0 # Pika db sync path -db_sync_path : ./dbsync/ +db-sync-path : ./dbsync/ # db sync speed(MB) max is set to 125MB, min is set to 0, and if below 0 or above 125, the value will be adjust to 125 -db_sync_speed : -1 +db-sync-speed : -1 ################### ## Critical Settings ################### # binlog file size: default is 100M, limited in [1K, 2G] -binlog_file_size : 104857600 +binlog-file-size : 104857600 # Compression compression : snappy -# max_background_flushes: default is 1, limited in [1, 4] -max_background_flushes : 1 -# max_background_compactions: default is 1, limited in [1, 4] -max_background_compactions : 1 +# max-background-flushes: default is 1, limited in [1, 4] +max-background-flushes : 1 +# max-background-compactions: default is 1, limited in [1, 4] +max-background-compactions : 1 diff --git a/include/pika_conf.h b/include/pika_conf.h index aeeece7299..cfd8b2a277 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -50,7 +50,7 @@ class PikaConf : public slash::BaseConf { int expire_logs_days() { RWLock l(&rwlock_, false); return expire_logs_days_; } std::string conf_path() { RWLock l(&rwlock_, false); return conf_path_; } bool readonly() { RWLock l(&rwlock_, false); return readonly_; } - int maxconnection() { RWLock l(&rwlock_, false); return maxconnection_; } + int maxclients() { RWLock l(&rwlock_, false); return maxclients_; } int root_connection_num() { RWLock l(&rwlock_, false); return root_connection_num_; } int slowlog_slower_than() { RWLock l(&rwlock_, false); return slowlog_log_slower_than_; } @@ -100,7 +100,7 @@ class PikaConf : public slash::BaseConf { } void SetMaxConnection(const int value) { RWLock l(&rwlock_, true); - maxconnection_ = value; + maxclients_ = value; } void SetRootConnectionNum(const int value) { RWLock l(&rwlock_, true); @@ -140,7 +140,7 @@ class PikaConf : public slash::BaseConf { //char pidfile_[PIKA_WORD_SIZE]; std::string compression_; - int maxconnection_; + int maxclients_; int root_connection_num_; int slowlog_log_slower_than_; int expire_logs_days_; diff --git a/include/pika_define.h b/include/pika_define.h index 57d57d1365..e3233976d5 100644 --- a/include/pika_define.h +++ b/include/pika_define.h @@ -35,6 +35,8 @@ struct SlaveItem { struct timeval create_time; }; +#define PIKA_MIN_RESERVED_FDS 5000 + #define SLAVE_ITEM_STAGE_ONE 1 #define SLAVE_ITEM_STAGE_TWO 2 diff --git a/pikatests/tests/assets/default.conf b/pikatests/tests/assets/default.conf index 50d3ec7b18..c1045a7713 100644 --- a/pikatests/tests/assets/default.conf +++ b/pikatests/tests/assets/default.conf @@ -25,7 +25,7 @@ dump_path : ./dump/ # pidfile Path pidfile : ./pika.pid # Max Connection -maxconnection : 20000 +maxclients : 20000 # the per file size of sst to compact, defalut is 2M target_file_size_base : 20971520 # Expire_logs_days diff --git a/src/pika.cc b/src/pika.cc index 8c5d833d43..2cbcc413c7 100644 --- a/src/pika.cc +++ b/src/pika.cc @@ -1,4 +1,5 @@ #include +#include #include "pika_server.h" #include "pika_command.h" #include "pika_conf.h" @@ -74,7 +75,7 @@ static void create_pid_file(void) { } static void IntSigHandle(const int sig) { - DLOG(INFO) << "Catch Signal " << sig << ", cleanup..."; + LOG(INFO) << "Catch Signal " << sig << ", cleanup..."; g_pika_server->Exit(); } @@ -132,6 +133,21 @@ int main(int argc, char *argv[]) { PikaConfInit(path); + rlimit limit; + if (getrlimit(RLIMIT_NOFILE,&limit) == -1) { + LOG(WARNING) << "getrlimit error: " << strerror(errno); + } else if (limit.rlim_cur < g_pika_conf->maxclients() + PIKA_MIN_RESERVED_FDS) { + rlim_t old_limit = limit.rlim_cur; + rlim_t best_limit = g_pika_conf->maxclients() + PIKA_MIN_RESERVED_FDS; + limit.rlim_cur = best_limit > limit.rlim_max ? limit.rlim_max-1 : best_limit; + limit.rlim_max = best_limit > limit.rlim_max ? limit.rlim_max-1 : best_limit; + if (setrlimit(RLIMIT_NOFILE,&limit) != -1) { + LOG(WARNING) << "your 'limit -n ' of " << old_limit << " is not enough for Redis to start. pika have successfully reconfig it to " << limit.rlim_cur; + } else { + LOG(FATAL) << "your 'limit -n ' of " << old_limit << " is not enough for Redis to start. pika can not reconfig it(" << strerror(errno) << "), do it by yourself"; + } + } + // daemonize if needed if (g_pika_conf->daemonize()) { daemonize(); @@ -143,7 +159,7 @@ int main(int argc, char *argv[]) { PikaSignalSetup(); InitCmdInfoTable(); - DLOG(INFO) << "Server at: " << path; + LOG(INFO) << "Server at: " << path; g_pika_server = new PikaServer(); if (g_pika_conf->daemonize()) { diff --git a/src/pika_admin.cc b/src/pika_admin.cc index ee40a462ed..0b7fca9043 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -1,4 +1,5 @@ #include "slash_string.h" +#include "rsync.h" #include "pika_conf.h" #include "pika_admin.h" #include "pika_server.h" @@ -62,7 +63,7 @@ void SlaveofCmd::DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info) void SlaveofCmd::Do() { if (is_noone_) { // Stop rsync - LOG(ERROR) << "stop rsync"; + LOG(INFO) << "start slaveof, stop rsync first"; slash::StopRsync(g_pika_conf->db_sync_path()); g_pika_server->RemoveMaster(); @@ -114,7 +115,7 @@ void TrysyncCmd::DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info) void TrysyncCmd::Do() { std::string ip_port = slash::IpPortString(slave_ip_, slave_port_); - DLOG(INFO) << "Trysync, Slave ip_port: " << ip_port << " filenum: " << filenum_ << " pro_offset: " << pro_offset_; + LOG(INFO) << "Trysync, Slave ip_port: " << ip_port << " filenum: " << filenum_ << " pro_offset: " << pro_offset_; slash::MutexLock l(&(g_pika_server->slave_mutex_)); if (!g_pika_server->FindSlave(ip_port)) { SlaveItem s; @@ -126,11 +127,11 @@ void TrysyncCmd::Do() { gettimeofday(&s.create_time, NULL); s.sender = NULL; - DLOG(INFO) << "Trysync, dont FindSlave, so AddBinlogSender"; + LOG(INFO) << "Trysync, dont FindSlave, so AddBinlogSender"; Status status = g_pika_server->AddBinlogSender(s, filenum_, pro_offset_); if (status.ok()) { res_.AppendInteger(s.sid); - DLOG(INFO) << "Send Sid to Slave: " << s.sid; + LOG(INFO) << "Send Sid to Slave: " << s.sid; g_pika_server->BecomeMaster(); } else if (status.IsIncomplete()) { res_.AppendString(kInnerReplWait); @@ -528,8 +529,8 @@ void InfoCmd::InfoReplication(std::string &info) { std::stringstream tmp_stream; tmp_stream << "# Replication("; switch (host_role) { + case PIKA_ROLE_SINGLE : case PIKA_ROLE_MASTER : tmp_stream << "MASTER)\r\nrole:master\r\n"; break; - case PIKA_ROLE_SINGLE : tmp_stream << "MASTER)\r\nrole:single\r\n"; break; case PIKA_ROLE_SLAVE : tmp_stream << "SLAVE)\r\nrole:slave\r\n"; break; case PIKA_ROLE_MASTER | PIKA_ROLE_SLAVE : tmp_stream << "MASTER/SLAVE)\r\nrole:slave\r\n"; break; default: info.append("ERR: server role is error\r\n"); return; @@ -677,45 +678,45 @@ void ConfigCmd::ConfigGet(std::string &ret) { ret = "*2\r\n"; EncodeString(&ret, "port"); EncodeInt32(&ret, g_pika_conf->port()); - } else if (get_item == "thread_num") { + } else if (get_item == "thread-num") { ret = "*2\r\n"; - EncodeString(&ret, "thread_num"); + EncodeString(&ret, "thread-num"); EncodeInt32(&ret, g_pika_conf->thread_num()); - } else if (get_item == "sync_thread_num") { + } else if (get_item == "sync-thread-num") { ret = "*2\r\n"; - EncodeString(&ret, "sync_thread_num"); + EncodeString(&ret, "sync-thread-num"); EncodeInt32(&ret, g_pika_conf->sync_thread_num()); - } else if (get_item == "sync_buffer_size") { + } else if (get_item == "sync-buffer-size") { ret = "*2\r\n"; - EncodeString(&ret, "sync_buffer_size"); + EncodeString(&ret, "sync-buffer-size"); EncodeInt32(&ret, g_pika_conf->sync_buffer_size()); - } else if (get_item == "log_path") { + } else if (get_item == "log-path") { ret = "*2\r\n"; - EncodeString(&ret, "log_path"); + EncodeString(&ret, "log-path"); EncodeString(&ret, g_pika_conf->log_path()); - } else if (get_item == "log_level") { + } else if (get_item == "loglevel") { ret = "*2\r\n"; - EncodeString(&ret, "log_level"); - EncodeInt32(&ret, g_pika_conf->log_level()); - } else if (get_item == "db_path") { + EncodeString(&ret, "loglevel"); + EncodeString(&ret, g_pika_conf->log_level() ? "ERROR" : "INFO"); + } else if (get_item == "db-path") { ret = "*2\r\n"; - EncodeString(&ret, "db_path"); + EncodeString(&ret, "db-path"); EncodeString(&ret, g_pika_conf->db_path()); - } else if (get_item == "db_sync_path") { + } else if (get_item == "db-sync-path") { ret = "*2\r\n"; - EncodeString(&ret, "db_sync_path"); + EncodeString(&ret, "db-sync-path"); EncodeString(&ret, g_pika_conf->db_sync_path()); - } else if (get_item == "db_sync_speed") { + } else if (get_item == "db-sync-speed") { ret = "*2\r\n"; - EncodeString(&ret, "db_sync_speed"); + EncodeString(&ret, "db-sync-speed"); EncodeInt32(&ret, g_pika_conf->db_sync_speed()); } else if (get_item == "maxmemory") { ret = "*2\r\n"; EncodeString(&ret, "maxmemory"); EncodeInt32(&ret, g_pika_conf->write_buffer_size()); - } else if (get_item == "write_buffer_size") { + } else if (get_item == "write-buffer-size") { ret = "*2\r\n"; - EncodeString(&ret, "write_buffer_size"); + EncodeString(&ret, "write-buffer-size"); EncodeInt32(&ret, g_pika_conf->write_buffer_size()); } else if (get_item == "timeout") { ret = "*2\r\n"; @@ -733,57 +734,57 @@ void ConfigCmd::ConfigGet(std::string &ret) { ret = "*2\r\n"; EncodeString(&ret, "userblacklist"); EncodeString(&ret, (g_pika_conf->suser_blacklist()).c_str()); - } else if (get_item == "dump_prefix") { + } else if (get_item == "dump-prefix") { ret = "*2\r\n"; - EncodeString(&ret, "dump_prefix"); + EncodeString(&ret, "dump-prefix"); EncodeString(&ret, g_pika_conf->bgsave_prefix()); } else if (get_item == "daemonize") { ret = "*2\r\n"; EncodeString(&ret, "daemonize"); EncodeString(&ret, g_pika_conf->daemonize() ? "yes" : "no"); - } else if (get_item == "dump_path") { + } else if (get_item == "dump-path") { ret = "*2\r\n"; - EncodeString(&ret, "dump_path"); + EncodeString(&ret, "dump-path"); EncodeString(&ret, g_pika_conf->bgsave_path()); } else if (get_item == "pidfile") { ret = "*2\r\n"; EncodeString(&ret, "pidfile"); EncodeString(&ret, g_pika_conf->pidfile()); - } else if (get_item == "maxconnection") { + } else if (get_item == "maxclients") { ret = "*2\r\n"; - EncodeString(&ret, "maxconnection"); - EncodeInt32(&ret, g_pika_conf->maxconnection()); - } else if (get_item == "target_file_size_base") { + EncodeString(&ret, "maxclients"); + EncodeInt32(&ret, g_pika_conf->maxclients()); + } else if (get_item == "target-file-size-base") { ret = "*2\r\n"; - EncodeString(&ret, "target_file_size_base"); + EncodeString(&ret, "target-file-size-base"); EncodeInt32(&ret, g_pika_conf->target_file_size_base()); - } else if (get_item == "max_background_flushes") { + } else if (get_item == "max-background-flushes") { ret = "*2\r\n"; - EncodeString(&ret, "max_background_flushes"); + EncodeString(&ret, "max-background-flushes"); EncodeInt32(&ret, g_pika_conf->max_background_flushes()); - } else if (get_item == "max_background_compactions") { + } else if (get_item == "max-background-compactions") { ret = "*2\r\n"; - EncodeString(&ret, "max_background_compactions"); + EncodeString(&ret, "max-background-compactions"); EncodeInt32(&ret, g_pika_conf->max_background_compactions()); - } else if (get_item == "expire_logs_days") { + } else if (get_item == "expire-logs-days") { ret = "*2\r\n"; - EncodeString(&ret, "expire_logs_days"); + EncodeString(&ret, "expire-logs-days"); EncodeInt32(&ret, g_pika_conf->expire_logs_days()); - } else if (get_item == "expire_logs_nums") { + } else if (get_item == "expire-logs-nums") { ret = "*2\r\n"; - EncodeString(&ret, "expire_logs_nums"); + EncodeString(&ret, "expire-logs-nums"); EncodeInt32(&ret, g_pika_conf->expire_logs_nums()); - } else if (get_item == "root_connection_num" ) { + } else if (get_item == "root-connection-num" ) { ret = "*2\r\n"; - EncodeString(&ret, "root_connection_num"); + EncodeString(&ret, "root-connection-num"); EncodeInt32(&ret, g_pika_conf->root_connection_num()); - } else if (get_item == "slowlog_log_slower_than") { + } else if (get_item == "slowlog-log-slower-than") { ret = "*2\r\n"; - EncodeString(&ret, "slowlog_log_slower_than"); + EncodeString(&ret, "slowlog-log-slower-than"); EncodeInt32(&ret, g_pika_conf->slowlog_slower_than()); - } else if (get_item == "binlog_file_size") { + } else if (get_item == "binlog-file-size") { ret = "*2\r\n"; - EncodeString(&ret, "binlog_file_size"); + EncodeString(&ret, "binlog-file-size"); EncodeInt32(&ret, g_pika_conf->binlog_file_size()); } else if (get_item == "compression") { ret = "*2\r\n"; @@ -798,37 +799,67 @@ void ConfigCmd::ConfigGet(std::string &ret) { EncodeString(&ret, "no"); } } else if (get_item == "*") { - ret = "*30\r\n"; + ret = "*58\r\n"; EncodeString(&ret, "port"); - EncodeString(&ret, "thread_num"); - EncodeString(&ret, "sync_thread_num"); - EncodeString(&ret, "sync_buffer_size"); - EncodeString(&ret, "log_path"); - EncodeString(&ret, "log_level"); - EncodeString(&ret, "db_path"); + EncodeInt32(&ret, g_pika_conf->port()); + EncodeString(&ret, "thread-num"); + EncodeInt32(&ret, g_pika_conf->thread_num()); + EncodeString(&ret, "sync-thread-num"); + EncodeInt32(&ret, g_pika_conf->sync_thread_num()); + EncodeString(&ret, "sync-buffer-size"); + EncodeInt32(&ret, g_pika_conf->sync_buffer_size()); + EncodeString(&ret, "log-path"); + EncodeString(&ret, g_pika_conf->log_path()); + EncodeString(&ret, "loglevel"); + EncodeString(&ret, g_pika_conf->log_level() ? "ERROR" : "INFO"); + EncodeString(&ret, "db-path"); + EncodeString(&ret, g_pika_conf->db_path()); EncodeString(&ret, "maxmemory"); - EncodeString(&ret, "write_buffer_size"); + EncodeInt32(&ret, g_pika_conf->write_buffer_size()); + EncodeString(&ret, "write-buffer-size"); + EncodeInt32(&ret, g_pika_conf->write_buffer_size()); EncodeString(&ret, "timeout"); + EncodeInt32(&ret, g_pika_conf->timeout()); EncodeString(&ret, "requirepass"); + EncodeString(&ret, g_pika_conf->requirepass()); EncodeString(&ret, "userpass"); + EncodeString(&ret, g_pika_conf->userpass()); EncodeString(&ret, "userblacklist"); + EncodeString(&ret, g_pika_conf->suser_blacklist()); EncodeString(&ret, "daemonize"); - EncodeString(&ret, "dump_path"); - EncodeString(&ret, "dump_prefix"); + EncodeInt32(&ret, g_pika_conf->daemonize()); + EncodeString(&ret, "dump-path"); + EncodeString(&ret, g_pika_conf->bgsave_path()); + EncodeString(&ret, "dump-prefix"); + EncodeString(&ret, g_pika_conf->bgsave_prefix()); EncodeString(&ret, "pidfile"); - EncodeString(&ret, "maxconnection"); - EncodeString(&ret, "target_file_size_base"); - EncodeString(&ret, "max_background_flushes"); - EncodeString(&ret, "max_background_compactions"); - EncodeString(&ret, "expire_logs_days"); - EncodeString(&ret, "expire_logs_nums"); - EncodeString(&ret, "root_connection_num"); - EncodeString(&ret, "slowlog_log_slower_than"); + EncodeString(&ret, g_pika_conf->pidfile()); + EncodeString(&ret, "maxclients"); + EncodeInt32(&ret, g_pika_conf->maxclients()); + EncodeString(&ret, "target-file-size-base"); + EncodeInt32(&ret, g_pika_conf->target_file_size_base()); + EncodeString(&ret, "max-background-flushes"); + EncodeInt32(&ret, g_pika_conf->max_background_flushes()); + EncodeString(&ret, "max-background-compactions"); + EncodeInt32(&ret, g_pika_conf->max_background_compactions()); + EncodeString(&ret, "expire-logs-days"); + EncodeInt32(&ret, g_pika_conf->expire_logs_days()); + EncodeString(&ret, "expire-logs-nums"); + EncodeInt32(&ret, g_pika_conf->expire_logs_nums()); + EncodeString(&ret, "root-connection-num"); + EncodeInt32(&ret, g_pika_conf->root_connection_num()); + EncodeString(&ret, "slowlog-log-slower-than"); + EncodeInt32(&ret, g_pika_conf->slowlog_slower_than()); EncodeString(&ret, "slave-read-only"); - EncodeString(&ret, "binlog_file_size"); + EncodeInt32(&ret, g_pika_conf->readonly()); + EncodeString(&ret, "binlog-file-size"); + EncodeInt32(&ret, g_pika_conf->binlog_file_size()); EncodeString(&ret, "compression"); - EncodeString(&ret, "db_sync_path"); - EncodeString(&ret, "db_sync_speed"); + EncodeString(&ret, g_pika_conf->compression()); + EncodeString(&ret, "db-sync-path"); + EncodeString(&ret, g_pika_conf->db_sync_path()); + EncodeString(&ret, "db-sync-speed"); + EncodeInt32(&ret, g_pika_conf->db_sync_speed()); } else { ret = "*0\r\n"; } @@ -838,26 +869,31 @@ void ConfigCmd::ConfigSet(std::string& ret) { std::string set_item = config_args_v_[1]; if (set_item == "*") { ret = "*13\r\n"; - EncodeString(&ret, "log_level"); + EncodeString(&ret, "loglevel"); EncodeString(&ret, "timeout"); EncodeString(&ret, "requirepass"); EncodeString(&ret, "userpass"); EncodeString(&ret, "userblacklist"); - EncodeString(&ret, "dump_prefix"); - EncodeString(&ret, "maxconnection"); - EncodeString(&ret, "expire_logs_days"); - EncodeString(&ret, "expire_logs_nums"); - EncodeString(&ret, "root_connection_num"); - EncodeString(&ret, "slowlog_log_slower_than"); + EncodeString(&ret, "dump-prefix"); + EncodeString(&ret, "maxclients"); + EncodeString(&ret, "expire-logs-days"); + EncodeString(&ret, "expire-logs-nums"); + EncodeString(&ret, "root-connection-num"); + EncodeString(&ret, "slowlog-log-slower-than"); EncodeString(&ret, "slave-read-only"); - EncodeString(&ret, "db_sync_speed"); + EncodeString(&ret, "db-sync-speed"); return; } std::string value = config_args_v_[2]; long int ival; - if (set_item == "log_level") { - if (!slash::string2l(value.data(), value.size(), &ival) || ival < 0 || ival > 4) { - ret = "-ERR Invalid argument " + value + " for CONFIG SET 'log_level'\r\n"; + if (set_item == "loglevel") { + slash::StringToLower(value); + if (value == "info") { + ival = 0; + } else if (value == "error") { + ival = 1; + } else { + ret = "-ERR Invalid argument " + value + " for CONFIG SET 'loglevel'\r\n"; return; } g_pika_conf->SetLogLevel(ival); @@ -879,40 +915,40 @@ void ConfigCmd::ConfigSet(std::string& ret) { } else if (set_item == "userblacklist") { g_pika_conf->SetUserBlackList(value); ret = "+OK\r\n"; - } else if (set_item == "dump_prefix") { + } else if (set_item == "dump-prefix") { g_pika_conf->SetBgsavePrefix(value); ret = "+OK\r\n"; - } else if (set_item == "maxconnection") { + } else if (set_item == "maxclients") { if (!slash::string2l(value.data(), value.size(), &ival)) { - ret = "-ERR Invalid argument " + value + " for CONFIG SET 'maxconnection'\r\n"; + ret = "-ERR Invalid argument " + value + " for CONFIG SET 'maxclients'\r\n"; return; } g_pika_conf->SetMaxConnection(ival); ret = "+OK\r\n"; - } else if (set_item == "expire_logs_days") { + } else if (set_item == "expire-logs-days") { if (!slash::string2l(value.data(), value.size(), &ival)) { - ret = "-ERR Invalid argument " + value + " for CONFIG SET 'expire_logs_days'\r\n"; + ret = "-ERR Invalid argument " + value + " for CONFIG SET 'expire-logs-days'\r\n"; return; } g_pika_conf->SetExpireLogsDays(ival); ret = "+OK\r\n"; - } else if (set_item == "expire_logs_nums") { + } else if (set_item == "expire-logs-nums") { if (!slash::string2l(value.data(), value.size(), &ival)) { - ret = "-ERR Invalid argument " + value + " for CONFIG SET 'expire_logs_nums'\r\n"; + ret = "-ERR Invalid argument " + value + " for CONFIG SET 'expire-logs-nums'\r\n"; return; } g_pika_conf->SetExpireLogsNums(ival); ret = "+OK\r\n"; - } else if (set_item == "root_connection_num") { + } else if (set_item == "root-connection-num") { if (!slash::string2l(value.data(), value.size(), &ival)) { - ret = "-ERR Invalid argument " + value + " for CONFIG SET 'root_connection_num'\r\n"; + ret = "-ERR Invalid argument " + value + " for CONFIG SET 'root-connection-num'\r\n"; return; } g_pika_conf->SetRootConnectionNum(ival); ret = "+OK\r\n"; - } else if (set_item == "slowlog_log_slower_than") { + } else if (set_item == "slowlog-log-slower-than") { if (!slash::string2l(value.data(), value.size(), &ival)) { - ret = "-ERR Invalid argument " + value + " for CONFIG SET 'slowlog_slower_than'\r\n"; + ret = "-ERR Invalid argument " + value + " for CONFIG SET 'slowlog-log-slower-than'\r\n"; return; } g_pika_conf->SetSlowlogSlowerThan(ival); @@ -925,14 +961,14 @@ void ConfigCmd::ConfigSet(std::string& ret) { } else if (value == "0" || value == "no") { is_readonly = false; } else { - ret = "-ERR Invalid argument " + value + " for CONFIG SET 'readonly'\r\n"; + ret = "-ERR Invalid argument " + value + " for CONFIG SET 'slave-read-only'\r\n"; return; } g_pika_conf->SetReadonly(is_readonly); ret = "+OK\r\n"; - } else if (set_item == "db_sync_speed") { + } else if (set_item == "db-sync-speed") { if (!slash::string2l(value.data(), value.size(), &ival)) { - ret = "-ERR Invalid argument " + value + " for CONFIG SET 'db_sync_speed(MB)'\r\n"; + ret = "-ERR Invalid argument " + value + " for CONFIG SET 'db-sync-speed(MB)'\r\n"; return; } if (ival < 0 || ival > 125) { diff --git a/src/pika_binlog.cc b/src/pika_binlog.cc index d0cc9607f1..daf5b69e08 100644 --- a/src/pika_binlog.cc +++ b/src/pika_binlog.cc @@ -93,12 +93,12 @@ Binlog::Binlog(const std::string& binlog_path, const int file_size) : std::string profile; if (!slash::FileExists(manifest)) { - DLOG(INFO) << "Binlog: Manifest file not exist"; + LOG(INFO) << "Binlog: Manifest file not exist, we create a new one."; profile = NewFileName(filename, pro_num_); s = slash::NewWritableFile(profile, &queue_); if (!s.ok()) { - LOG(WARNING) << "Binlog: new " << filename << " " << s.ToString(); + LOG(INFO) << "Binlog: new " << filename << " " << s.ToString(); } s = slash::NewRWFile(manifest, &versionfile_); @@ -109,7 +109,7 @@ Binlog::Binlog(const std::string& binlog_path, const int file_size) : version_ = new Version(versionfile_); version_->StableSave(); } else { - DLOG(INFO) << "Binlog: Find the exist file "; + LOG(INFO) << "Binlog: Find the exist file."; s = slash::NewRWFile(manifest, &versionfile_); if (s.ok()) { diff --git a/src/pika_binlog_bgworker.cc b/src/pika_binlog_bgworker.cc index 77226d97c0..a6b798cbaa 100644 --- a/src/pika_binlog_bgworker.cc +++ b/src/pika_binlog_bgworker.cc @@ -19,14 +19,14 @@ void BinlogBGWorker::DoBinlogBG(void* arg) { const CmdInfo* const cinfo_ptr = GetCmdInfo(opt); Cmd* c_ptr = self->GetCmd(opt); if (!cinfo_ptr || !c_ptr) { - LOG(ERROR) << "Error operation from binlog: " << opt; + LOG(WARNING) << "Error operation from binlog: " << opt; } c_ptr->res().clear(); // Initial c_ptr->Initial(argv, cinfo_ptr); if (!c_ptr->res().ok()) { - LOG(ERROR) << "Fail to initial command from binlog: " << opt; + LOG(WARNING) << "Fail to initial command from binlog: " << opt; } uint64_t start_us; @@ -52,7 +52,6 @@ void BinlogBGWorker::DoBinlogBG(void* arg) { if (!is_readonly) { error_happend = !g_pika_server->WaitTillBinlogBGSerial(my_serial); if (!error_happend) { - DLOG(INFO) << "Write binlog in binlog bgthread thread"; g_pika_server->logger_->Lock(); g_pika_server->logger_->Put(bgarg->raw_args); g_pika_server->logger_->Unlock(); @@ -74,11 +73,10 @@ void BinlogBGWorker::DoBinlogBG(void* arg) { if (g_pika_conf->slowlog_slower_than() >= 0) { int64_t duration = slash::NowMicros() - start_us; if (duration > g_pika_conf->slowlog_slower_than()) { - LOG(ERROR) << "command:" << opt << ", start_time(s): " << start_us / 1000000 << ", duration(us): " << duration; + LOG(WARNING) << "command:" << opt << ", start_time(s): " << start_us / 1000000 << ", duration(us): " << duration; } } - DLOG(INFO) << "delete argv ptr"; delete bgarg->argv; delete bgarg; } diff --git a/src/pika_binlog_receiver_thread.cc b/src/pika_binlog_receiver_thread.cc index 577bb3fe98..54727e483c 100644 --- a/src/pika_binlog_receiver_thread.cc +++ b/src/pika_binlog_receiver_thread.cc @@ -19,7 +19,7 @@ PikaBinlogReceiverThread::PikaBinlogReceiverThread(int port, int cron_interval) PikaBinlogReceiverThread::~PikaBinlogReceiverThread() { DestoryCmdTable(cmds_); - DLOG(INFO) << "BinlogReceiver thread " << thread_id() << " exit!!!"; + LOG(INFO) << "BinlogReceiver thread " << thread_id() << " exit!!!"; } bool PikaBinlogReceiverThread::AccessHandle(std::string& ip) { @@ -27,7 +27,7 @@ bool PikaBinlogReceiverThread::AccessHandle(std::string& ip) { ip = g_pika_server->host(); } if (ThreadClientNum() != 0 || !g_pika_server->ShouldAccessConnAsMaster(ip)) { - DLOG(INFO) << "BinlogReceiverThread AccessHandle failed"; + LOG(WARNING) << "BinlogReceiverThread AccessHandle failed: " << ip; return false; } g_pika_server->PlusMasterConnection(); @@ -71,7 +71,7 @@ void PikaBinlogReceiverThread::KillAll() { slash::RWLock l(&rwlock_, true); std::map::iterator iter = conns_.begin(); while (iter != conns_.end()) { - DLOG(INFO) << "==========Kill Master Sender Conn=============="; + LOG(INFO) << "==========Kill Master Sender Conn=============="; close(iter->first); delete(static_cast(iter->second)); iter = conns_.erase(iter); diff --git a/src/pika_binlog_sender_thread.cc b/src/pika_binlog_sender_thread.cc index 3dfd75270e..9ef48f1ab4 100644 --- a/src/pika_binlog_sender_thread.cc +++ b/src/pika_binlog_sender_thread.cc @@ -41,7 +41,7 @@ PikaBinlogSenderThread::~PikaBinlogSenderThread() { delete [] backing_store_; delete cli_; - DLOG(INFO) << "a BinlogSender thread " << thread_id() << " exit!"; + LOG(INFO) << "a BinlogSender thread " << thread_id() << " exit!"; } int PikaBinlogSenderThread::trim() { @@ -255,7 +255,7 @@ void* PikaBinlogSenderThread::ThreadMain() { // 1. Connect to slave result = cli_->Connect(ip_, port_); - DLOG(INFO) << "BinlogSender Connect slave(" << ip_ << ":" << port_ << ") " << result.ToString(); + LOG(INFO) << "BinlogSender Connect slave(" << ip_ << ":" << port_ << ") " << result.ToString(); if (result.ok()) { while (true) { @@ -265,7 +265,7 @@ void* PikaBinlogSenderThread::ThreadMain() { //DLOG(INFO) << "BinlogSender Parse, return " << s.ToString(); if (s.IsCorruption()) { // should exit - DLOG(INFO) << "BinlogSender will exit"; + LOG(WARNING) << "BinlogSender Parse failed, will exit"; //close(sockfd_); break; } else if (s.IsIOError()) { diff --git a/src/pika_client_conn.cc b/src/pika_client_conn.cc index 06e69c25fa..47961a81f0 100644 --- a/src/pika_client_conn.cc +++ b/src/pika_client_conn.cc @@ -41,7 +41,7 @@ std::string PikaClientConn::DoCmd(const std::string& opt) { // Check authed if (!auth_stat_.IsAuthed(cinfo_ptr)) { - LOG(INFO) << "(" << ip_port() << ")Authentication required, close connection"; + LOG(WARNING) << "(" << ip_port() << ")Authentication required, close connection"; return "-ERR NOAUTH Authentication required.\r\n"; } @@ -64,7 +64,7 @@ std::string PikaClientConn::DoCmd(const std::string& opt) { if (is_monitoring) { monitor_message = std::to_string(1.0*slash::NowMicros()/1000000) + " [" + this->ip_port() + "]"; for (PikaCmdArgsType::iterator iter = argv_.begin(); iter != argv_.end(); iter++) { - monitor_message += " \"" + *iter + "\""; + monitor_message += " " + slash::ToRead(*iter); } g_pika_server->monitor_thread()->AddMonitorMessage(monitor_message); } diff --git a/src/pika_conf.cc b/src/pika_conf.cc index f57ab3ebb8..31b884375a 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -20,36 +20,46 @@ int PikaConf::Load() } // Mutable Section - GetConfInt("log_level", &log_level_); + std::string loglevel; + GetConfStr("loglevel", &loglevel); + slash::StringToLower(loglevel); + if (loglevel == "info") { + SetLogLevel(0); + } else if (loglevel == "error") { + SetLogLevel(1); + } else { + SetLogLevel(0); + fprintf(stderr, "invalid loglevel value in conf file, defaultly set loglevel to INFO\n"); + } GetConfInt("timeout", &timeout_); if (timeout_ <= 0) { timeout_ = 60; // 60s } GetConfStr("requirepass", &requirepass_); GetConfStr("userpass", &userpass_); - GetConfInt("maxconnection", &maxconnection_); - if (maxconnection_ <= 0) { - maxconnection_ = 20000; + GetConfInt("maxclients", &maxclients_); + if (maxclients_ <= 0) { + maxclients_ = 20000; } - GetConfInt("root_connection_num", &root_connection_num_); + GetConfInt("root-connection-num", &root_connection_num_); if (root_connection_num_ < 0) { root_connection_num_ = 2; } - GetConfInt("slowlog_log_slower_than", &slowlog_log_slower_than_); + GetConfInt("slowlog-log-slower-than", &slowlog_log_slower_than_); std::string user_blacklist; GetConfStr("userblacklist", &user_blacklist); SetUserBlackList(std::string(user_blacklist)); - GetConfStr("dump_path", &bgsave_path_); + GetConfStr("dump-path", &bgsave_path_); if (bgsave_path_[bgsave_path_.length() - 1] != '/') { bgsave_path_ += "/"; } - GetConfStr("dump_prefix", &bgsave_prefix_); + GetConfStr("dump-prefix", &bgsave_prefix_); - GetConfInt("expire_logs_nums", &expire_logs_nums_); + GetConfInt("expire-logs-nums", &expire_logs_nums_); if (expire_logs_nums_ <= 10 ) { expire_logs_nums_ = 10; } - GetConfInt("expire_logs_days", &expire_logs_days_); + GetConfInt("expire-logs-days", &expire_logs_days_); if (expire_logs_days_ <= 0 ) { expire_logs_days_ = 1; } @@ -60,26 +70,26 @@ int PikaConf::Load() // Immutable Sections // GetConfInt("port", &port_); - GetConfStr("log_path", &log_path_); - GetConfStr("db_path", &db_path_); + GetConfStr("log-path", &log_path_); + GetConfStr("db-path", &db_path_); if (log_path_[log_path_.length() - 1] != '/') { log_path_ += "/"; } - GetConfInt("thread_num", &thread_num_); + GetConfInt("thread-num", &thread_num_); if (thread_num_ <= 0) { thread_num_ = 12; } if (thread_num_ > 24) { thread_num_ = 24; } - GetConfInt("sync_thread_num", &sync_thread_num_); + GetConfInt("sync-thread-num", &sync_thread_num_); if (sync_thread_num_ <= 0) { sync_thread_num_ = 3; } if (sync_thread_num_ > 24) { sync_thread_num_ = 24; } - GetConfInt("sync_buffer_size", &sync_buffer_size_); + GetConfInt("sync-buffer-size", &sync_buffer_size_); if (sync_buffer_size_ <= 0) { sync_buffer_size_ = 5; } else if (sync_buffer_size_ > 100) { @@ -87,19 +97,19 @@ int PikaConf::Load() } // write_buffer_size - GetConfInt("write_buffer_size", &write_buffer_size_); + GetConfInt("write-buffer-size", &write_buffer_size_); if (write_buffer_size_ <= 0 ) { write_buffer_size_ = 4194304; // 40M } // target_file_size_base - GetConfInt("target_file_size_base", &target_file_size_base_); + GetConfInt("target-file-size-base", &target_file_size_base_); if (target_file_size_base_ <= 0) { target_file_size_base_ = 1048576; // 10M } max_background_flushes_ = 1; - GetConfInt("max_background_flushes", &max_background_flushes_); + GetConfInt("max-background-flushes", &max_background_flushes_); if (max_background_flushes_ <= 0) { max_background_flushes_ = 1; } @@ -108,7 +118,7 @@ int PikaConf::Load() } max_background_compactions_ = 1; - GetConfInt("max_background_compactions", &max_background_compactions_); + GetConfInt("max-background-compactions", &max_background_compactions_); if (max_background_compactions_ <= 0) { max_background_compactions_ = 1; } @@ -120,18 +130,18 @@ int PikaConf::Load() std::string dmz; GetConfStr("daemonize", &dmz); daemonize_ = (dmz == "yes") ? true : false; - GetConfInt("binlog_file_size", &binlog_file_size_); + GetConfInt("binlog-file-size", &binlog_file_size_); if (binlog_file_size_ < 1024 || static_cast(binlog_file_size_) > (1024LL * 1024 * 1024)) { binlog_file_size_ = 100 * 1024 * 1024; // 100M } GetConfStr("pidfile", &pidfile_); // db sync - GetConfStr("db_sync_path", &db_sync_path_); + GetConfStr("db-sync-path", &db_sync_path_); if (db_sync_path_[db_sync_path_.length() - 1] != '/') { db_sync_path_ += "/"; } - GetConfInt("db_sync_speed", &db_sync_speed_); + GetConfInt("db-sync-speed", &db_sync_speed_); if (db_sync_speed_ < 0 || db_sync_speed_ > 125) { db_sync_speed_ = 125; } @@ -141,32 +151,32 @@ int PikaConf::Load() int PikaConf::ConfigRewrite() { SetConfInt("port", port_); - SetConfInt("thread_num", thread_num_); - SetConfInt("sync_thread_num", sync_thread_num_); - SetConfInt("sync_buffer_size", sync_buffer_size_); - SetConfStr("log_path", log_path_); - SetConfInt("log_level", log_level_); - SetConfStr("db_path", db_path_); - SetConfStr("db_sync_path", db_sync_path_); - SetConfInt("db_sync_speed", db_sync_speed_); - SetConfInt("write_buffer_size", write_buffer_size_); + SetConfInt("thread-num", thread_num_); + SetConfInt("sync-thread-num", sync_thread_num_); + SetConfInt("sync-buffer-size", sync_buffer_size_); + SetConfStr("log-path", log_path_); + SetConfStr("loglevel", log_level_ ? "ERROR" : "INFO"); + SetConfStr("db-path", db_path_); + SetConfStr("db-sync-path", db_sync_path_); + SetConfInt("db-sync-speed", db_sync_speed_); + SetConfInt("write-buffer-size", write_buffer_size_); SetConfInt("timeout", timeout_); SetConfStr("requirepass", requirepass_); SetConfStr("userpass", userpass_); SetConfStr("userblacklist", suser_blacklist()); - SetConfStr("dump_path", bgsave_path_); - SetConfStr("dump_prefix", bgsave_prefix_); - SetConfInt("maxconnection", maxconnection_); - SetConfInt("root_connection_num", root_connection_num_); - SetConfInt("slowlog_log_slower_than", slowlog_log_slower_than_); - SetConfInt("target_file_size_base", target_file_size_base_); - SetConfInt("max_background_flushes", max_background_flushes_); - SetConfInt("max_background_compactions", max_background_compactions_); - SetConfInt("expire_logs_nums", expire_logs_nums_); - SetConfInt("expire_logs_days", expire_logs_days_); - SetConfBool("slave_read_only", readonly_); + SetConfStr("dump-path", bgsave_path_); + SetConfStr("dump-prefix", bgsave_prefix_); + SetConfInt("maxclients", maxclients_); + SetConfInt("root-connection-num", root_connection_num_); + SetConfInt("slowlog-log-slower-than", slowlog_log_slower_than_); + SetConfInt("target-file-size-base", target_file_size_base_); + SetConfInt("max-background-flushes", max_background_flushes_); + SetConfInt("max-background-compactions", max_background_compactions_); + SetConfInt("expire-logs-nums", expire_logs_nums_); + SetConfInt("expire-logs-days", expire_logs_days_); + SetConfBool("slave-read-only", readonly_); - SetConfInt("binlog_file_size_", binlog_file_size_); + SetConfInt("binlog-file-size", binlog_file_size_); SetConfStr("compression", compression_); return WriteBack(); diff --git a/src/pika_dispatch_thread.cc b/src/pika_dispatch_thread.cc index 3bcfe5ff14..1325f28099 100644 --- a/src/pika_dispatch_thread.cc +++ b/src/pika_dispatch_thread.cc @@ -12,7 +12,7 @@ PikaDispatchThread::PikaDispatchThread(int port, int work_num, PikaWorkerThread* } PikaDispatchThread::~PikaDispatchThread() { - DLOG(INFO) << "dispatch thread " << thread_id() << " exit!!!"; + LOG(INFO) << "dispatch thread " << thread_id() << " exit!!!"; } void PikaDispatchThread::CronHandle() { @@ -37,13 +37,13 @@ bool PikaDispatchThread::AccessHandle(std::string& ip) { } int client_num = ClientNum(); - if ((client_num >= g_pika_conf->maxconnection() + g_pika_conf->root_connection_num()) - || (client_num >= g_pika_conf->maxconnection() && ip != g_pika_server->host())) { - DLOG(INFO) << "Max connections reach, Deny new comming: " << ip; + if ((client_num >= g_pika_conf->maxclients() + g_pika_conf->root_connection_num()) + || (client_num >= g_pika_conf->maxclients() && ip != g_pika_server->host())) { + LOG(WARNING) << "Max connections reach, Deny new comming: " << ip; return false; } - DLOG(INFO) << "ip: " << ip; + DLOG(INFO) << "new clinet comming, ip: " << ip; g_pika_server->incr_accumulative_connections(); return true; } diff --git a/src/pika_heartbeat_conn.cc b/src/pika_heartbeat_conn.cc index ce2cf32557..b8307545ce 100644 --- a/src/pika_heartbeat_conn.cc +++ b/src/pika_heartbeat_conn.cc @@ -25,8 +25,8 @@ int PikaHeartbeatConn::DealMessage() { memcpy(wbuf_ + wbuf_len_, "+OK\r\n", 5); wbuf_len_ += 5; } else { - memcpy(wbuf_ + wbuf_len_, "-ERR What the fuck are u sending\r\n", 35); - wbuf_len_ += 35; + memcpy(wbuf_ + wbuf_len_, "-ERR What the fuck are u sending\r\n", 34); + wbuf_len_ += 34; } return 0; } diff --git a/src/pika_heartbeat_thread.cc b/src/pika_heartbeat_thread.cc index 0db62eac8a..137445ee9a 100644 --- a/src/pika_heartbeat_thread.cc +++ b/src/pika_heartbeat_thread.cc @@ -11,7 +11,7 @@ PikaHeartbeatThread::PikaHeartbeatThread(int port, int cron_interval) : } PikaHeartbeatThread::~PikaHeartbeatThread() { - DLOG(INFO) << "PikaHeartbeat thread " << thread_id() << " exit!!!"; + LOG(INFO) << "PikaHeartbeat thread " << thread_id() << " exit!!!"; } void PikaHeartbeatThread::CronHandle() { @@ -25,7 +25,7 @@ void PikaHeartbeatThread::CronHandle() { std::map::iterator iter = conns_.begin(); while (iter != conns_.end()) { if (now.tv_sec - static_cast(iter->second)->last_interaction().tv_sec > 20) { - DLOG(INFO) << "Find Timeout Slave: " << static_cast(iter->second)->ip_port(); + LOG(INFO) << "Find Timeout Slave: " << static_cast(iter->second)->ip_port(); close(iter->first); // erase item in slaves_ g_pika_server->DeleteSlave(iter->first); @@ -55,7 +55,7 @@ void PikaHeartbeatThread::CronHandle() { //pthread_kill(iter->tid); // Kill BinlogSender - DLOG(INFO) << "Erase slave " << iter->ip_port << " from slaves map of heartbeat thread"; + LOG(WARNING) << "Erase slave " << iter->ip_port << " from slaves map of heartbeat thread"; { //TODO maybe bug here g_pika_server->slave_mutex_.Unlock(); @@ -77,12 +77,12 @@ bool PikaHeartbeatThread::AccessHandle(std::string& ip) { std::vector::iterator iter = g_pika_server->slaves_.begin(); while (iter != g_pika_server->slaves_.end()) { if (iter->ip_port.find(ip) != std::string::npos) { - DLOG(INFO) << "HeartbeatThread access connection " << ip; + LOG(INFO) << "HeartbeatThread access connection " << ip; return true; } iter++; } - DLOG(INFO) << "HeartbeatThread deny connection: " << ip; + LOG(WARNING) << "HeartbeatThread deny connection: " << ip; return false; } diff --git a/src/pika_master_conn.cc b/src/pika_master_conn.cc index ef07ef8331..79b269237b 100644 --- a/src/pika_master_conn.cc +++ b/src/pika_master_conn.cc @@ -42,7 +42,7 @@ int PikaMasterConn::DealMessage() { if (is_monitoring) { monitor_message = std::to_string(1.0*slash::NowMicros()/1000000) + " [" + this->ip_port() + "]"; for (PikaCmdArgsType::iterator iter = argv_.begin(); iter != argv_.end(); iter++) { - monitor_message += " \"" + *iter + "\""; + monitor_message += " " + slash::ToRead(*iter); } g_pika_server->monitor_thread()->AddMonitorMessage(monitor_message); } @@ -54,7 +54,6 @@ int PikaMasterConn::DealMessage() { // Only when the server is readonly uint64_t serial = self_thread_->GetnPlusSerial(); if (is_readonly) { - DLOG(INFO) << "Write binlog in binlog dispatch thread"; if (!g_pika_server->WaitTillBinlogBGSerial(serial)) { return -2; } diff --git a/src/pika_monitor_thread.cc b/src/pika_monitor_thread.cc index c340052454..d85e467657 100644 --- a/src/pika_monitor_thread.cc +++ b/src/pika_monitor_thread.cc @@ -28,7 +28,7 @@ PikaMonitorThread::~PikaMonitorThread() { ++iter) { close(iter->fd); } - DLOG(INFO) << " PikaMonitorThread " << pthread_self() << " exit!!!"; + LOG(INFO) << " PikaMonitorThread " << pthread_self() << " exit!!!"; } void PikaMonitorThread::AddMonitorClient(pink::RedisConn* client_ptr) { diff --git a/src/pika_server.cc b/src/pika_server.cc index d439e03511..422bad932f 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -4,6 +4,7 @@ #include #include #include "env.h" +#include "rsync.h" #include "pika_server.h" #include "slash_string.h" #include "bg_thread.h" @@ -47,10 +48,10 @@ PikaServer::PikaServer() : option.compression = false; } std::string db_path = g_pika_conf->db_path(); - LOG(WARNING) << "Prepare DB..."; + LOG(INFO) << "Prepare DB..."; db_ = std::shared_ptr(new nemo::Nemo(db_path, option)); assert(db_); - LOG(WARNING) << "DB Success"; + LOG(INFO) << "DB Success"; // Create thread worker_num_ = g_pika_conf->thread_num(); @@ -91,7 +92,7 @@ PikaServer::~PikaServer() { while (iter != slaves_.end()) { delete static_cast(iter->sender); iter = slaves_.erase(iter); - DLOG(INFO) << "Delete slave success"; + LOG(INFO) << "Delete slave success"; } } delete ping_thread_; @@ -114,7 +115,7 @@ PikaServer::~PikaServer() { pthread_rwlock_destroy(&state_protector_); pthread_rwlock_destroy(&rwlock_); - DLOG(INFO) << "PikaServer " << pthread_self() << " exit!!!"; + LOG(INFO) << "PikaServer " << pthread_self() << " exit!!!"; } bool PikaServer::ServerInit() { @@ -136,7 +137,7 @@ bool PikaServer::ServerInit() { host_ = inet_ntoa(((struct sockaddr_in *)&ifr.ifr_addr)->sin_addr); port_ = g_pika_conf->port(); - DLOG(INFO) << "host: " << host_ << " port: " << port_; + LOG(INFO) << "host: " << host_ << " port: " << port_; return true; } @@ -167,7 +168,7 @@ void PikaServer::Start() { //SetMaster("127.0.0.1", 9221); - DLOG(WARNING) << "Pika Server going to start"; + LOG(INFO) << "Pika Server going to start"; while (!exit_) { DoTimingTask(); // wake up every half hour @@ -176,7 +177,7 @@ void PikaServer::Start() { sleep(1); } } - DLOG(INFO) << "Goodbye..."; + LOG(INFO) << "Goodbye..."; Cleanup(); } @@ -204,7 +205,7 @@ void PikaServer::DeleteSlave(int fd) { delete static_cast(iter->sender); slaves_.erase(iter); - DLOG(INFO) << "Delete slave success"; + LOG(INFO) << "Delete slave success"; break; } iter++; @@ -235,12 +236,12 @@ bool PikaServer::ChangeDb(const std::string& new_path) { LOG(INFO) << "Prepare change db from: " << tmp_path; db_.reset(); if (0 != slash::RenameFile(db_path.c_str(), tmp_path)) { - LOG(ERROR) << "Failed to rename db path when change db, error: " << strerror(errno); + LOG(WARNING) << "Failed to rename db path when change db, error: " << strerror(errno); return false; } if (0 != slash::RenameFile(new_path.c_str(), db_path.c_str())) { - LOG(ERROR) << "Failed to rename new db path when change db, error: " << strerror(errno); + LOG(WARNING) << "Failed to rename new db path when change db, error: " << strerror(errno); return false; } db_.reset(new nemo::Nemo(db_path, option)); @@ -253,7 +254,7 @@ bool PikaServer::ChangeDb(const std::string& new_path) { void PikaServer::MayUpdateSlavesMap(int64_t sid, int32_t hb_fd) { slash::MutexLock l(&slave_mutex_); std::vector::iterator iter = slaves_.begin(); - DLOG(INFO) << "MayUpdateSlavesMap, sid: " << sid << " hb_fd: " << hb_fd; + LOG(INFO) << "MayUpdateSlavesMap, sid: " << sid << " hb_fd: " << hb_fd; while (iter != slaves_.end()) { if (iter->sid == sid) { iter->hb_fd = hb_fd; @@ -309,7 +310,7 @@ bool PikaServer::SetMaster(std::string& master_ip, int master_port) { master_port_ = master_port; role_ |= PIKA_ROLE_SLAVE; repl_state_ = PIKA_REPL_CONNECT; - DLOG(INFO) << "open read-only mode"; + LOG(INFO) << "open read-only mode"; g_pika_conf->SetReadonly(true); return true; } @@ -416,7 +417,7 @@ void PikaServer::RemoveMaster() { delete ping_thread_; ping_thread_ = NULL; } - DLOG(INFO) << "close read-only mode"; + LOG(INFO) << "close read-only mode"; g_pika_conf->SetReadonly(false); } @@ -471,7 +472,7 @@ void PikaServer::DBSyncSendFile(const std::string& ip, int port) { // Get all files need to send std::vector descendant; if (!slash::GetDescendant(bg_path, descendant)) { - LOG(ERROR) << "Get Descendant when try to do db sync failed"; + LOG(WARNING) << "Get Descendant when try to do db sync failed"; } // Iterate to send files @@ -488,7 +489,7 @@ void PikaServer::DBSyncSendFile(const std::string& ip, int port) { // We need specify the speed limit for every single file ret = slash::RsyncSendFile(*it, target_path, remote); if (0 != ret) { - LOG(ERROR) << "rsync send file failed! From: " << *it + LOG(WARNING) << "rsync send file failed! From: " << *it << ", To: " << target_path << ", At: " << ip << ":" << port << ", Error: " << ret; @@ -506,7 +507,7 @@ void PikaServer::DBSyncSendFile(const std::string& ip, int port) { // Send info file at last if (0 == ret) { if (0 != (ret = slash::RsyncSendFile(bg_path + "/" + kBgsaveInfoFile, kBgsaveInfoFile, remote))) { - LOG(ERROR) << "send info file failed"; + LOG(WARNING) << "send info file failed"; } } @@ -555,14 +556,14 @@ Status PikaServer::AddBinlogSender(SlaveItem &slave, uint32_t filenum, uint64_t pthread_t tid = sender->thread_id(); slave.sender_tid = tid; - DLOG(INFO) << "AddBinlogSender ok, tid is " << slave.sender_tid << " hd_fd: " << slave.hb_fd << " stage: " << slave.stage; + LOG(INFO) << "AddBinlogSender ok, tid is " << slave.sender_tid << " hd_fd: " << slave.hb_fd << " stage: " << slave.stage; // Add sender // slash::MutexLock l(&slave_mutex_); slaves_.push_back(slave); return Status::OK(); } else { - DLOG(INFO) << "AddBinlogSender failed"; + LOG(WARNING) << "AddBinlogSender failed"; return Status::NotFound("AddBinlogSender bad sender"); } } @@ -579,13 +580,13 @@ bool PikaServer::InitBgsaveEnv() { std::string bgsave_path(g_pika_conf->bgsave_path()); bgsave_info_.path = bgsave_path + g_pika_conf->bgsave_prefix() + std::string(s_time, 8); if (!slash::DeleteDirIfExist(bgsave_info_.path)) { - LOG(ERROR) << "remove exist bgsave dir failed"; + LOG(WARNING) << "remove exist bgsave dir failed"; return false; } slash::CreatePath(bgsave_info_.path, 0755); // Prepare for failed dir if (!slash::DeleteDirIfExist(bgsave_info_.path + "_FAILED")) { - LOG(ERROR) << "remove exist fail bgsave dir failed :"; + LOG(WARNING) << "remove exist fail bgsave dir failed :"; return false; } } @@ -597,7 +598,7 @@ bool PikaServer::InitBgsaveEngine() { delete bgsave_engine_; nemo::Status nemo_s = nemo::BackupEngine::Open(db().get(), &bgsave_engine_); if (!nemo_s.ok()) { - LOG(ERROR) << "open backup engine failed " << nemo_s.ToString(); + LOG(WARNING) << "open backup engine failed " << nemo_s.ToString(); return false; } @@ -609,7 +610,7 @@ bool PikaServer::InitBgsaveEngine() { } nemo_s = bgsave_engine_->SetBackupContent(); if (!nemo_s.ok()){ - LOG(ERROR) << "set backup content failed " << nemo_s.ToString(); + LOG(WARNING) << "set backup content failed " << nemo_s.ToString(); return false; } } @@ -622,7 +623,7 @@ bool PikaServer::RunBgsaveEngine(const std::string path) { LOG(INFO) << "Create new backup finished."; if (!nemo_s.ok()) { - LOG(ERROR) << "backup failed :" << nemo_s.ToString(); + LOG(WARNING) << "backup failed :" << nemo_s.ToString(); return false; } return true; @@ -759,7 +760,7 @@ bool PikaServer::PurgeFiles(uint32_t to, bool manual, bool force) { std::map binlogs; if (!GetBinlogFiles(binlogs)) { - LOG(ERROR) << "Could not get binlog files!"; + LOG(WARNING) << "Could not get binlog files!"; return false; } @@ -775,7 +776,7 @@ bool PikaServer::PurgeFiles(uint32_t to, bool manual, bool force) { // We check this every time to avoid lock when we do file deletion if (!CouldPurge(it->first) && !force) { - LOG(INFO) << "Could not purge "<< (it->first) << ", since it is already be used"; + LOG(WARNING) << "Could not purge "<< (it->first) << ", since it is already be used"; return false; } @@ -785,7 +786,7 @@ bool PikaServer::PurgeFiles(uint32_t to, bool manual, bool force) ++delete_num; --remain_expire_num; } else { - LOG(ERROR) << "Purge log file : " << (it->second) << " failed! error:" << s.ToString(); + LOG(WARNING) << "Purge log file : " << (it->second) << " failed! error:" << s.ToString(); } } else { // Break when face the first one not satisfied @@ -793,7 +794,9 @@ bool PikaServer::PurgeFiles(uint32_t to, bool manual, bool force) break; } } - DLOG(INFO) << "Success purge "<< delete_num << " files to index : " << to; + if (delete_num) { + LOG(INFO) << "Success purge "<< delete_num; + } return true; } @@ -802,7 +805,7 @@ bool PikaServer::GetBinlogFiles(std::map& binlogs) { std::vector children; int ret = slash::GetChildren(g_pika_conf->log_path(), children); if (ret != 0){ - LOG(ERROR) << "Get all files in log path failed! error:" << ret; + LOG(WARNING) << "Get all files in log path failed! error:" << ret; return false; } @@ -823,10 +826,9 @@ bool PikaServer::GetBinlogFiles(std::map& binlogs) { void PikaServer::AutoPurge() { if (!PurgeLogs(0, false, false)) { - LOG(WARNING) << "Auto purge failed"; + DLOG(WARNING) << "Auto purge failed"; return; } - DLOG(INFO) << "Auto Purge sucess"; } bool PikaServer::FlushAll() { @@ -851,7 +853,7 @@ bool PikaServer::FlushAll() { dbpath.append("/deleting"); slash::RenameFile(g_pika_conf->db_path(), dbpath.c_str()); - LOG(WARNING) << "Delete old db..."; + LOG(INFO) << "Delete old db..."; db_.reset(); nemo::Options option; @@ -860,9 +862,9 @@ bool PikaServer::FlushAll() { if (g_pika_conf->compression() == "none") { option.compression = false; } - LOG(WARNING) << "Prepare open new db..."; + LOG(INFO) << "Prepare open new db..."; db_ = std::shared_ptr(new nemo::Nemo(g_pika_conf->db_path(), option)); - LOG(WARNING) << "open new db success"; + LOG(INFO) << "open new db success"; PurgeDir(dbpath); return true; } @@ -876,9 +878,9 @@ void PikaServer::PurgeDir(std::string& path) { void PikaServer::DoPurgeDir(void* arg) { std::string path = *(static_cast(arg)); - DLOG(INFO) << "Delete dir: " << path << " start"; + LOG(INFO) << "Delete dir: " << path << " start"; slash::DeleteDir(path); - DLOG(INFO) << "Delete dir: " << path << " done"; + LOG(INFO) << "Delete dir: " << path << " done"; delete static_cast(arg); } diff --git a/src/pika_slaveping_thread.cc b/src/pika_slaveping_thread.cc index ea583975f3..833467d1dd 100644 --- a/src/pika_slaveping_thread.cc +++ b/src/pika_slaveping_thread.cc @@ -15,8 +15,8 @@ pink::Status PikaSlavepingThread::Send() { argv.push_back(std::to_string(sid_)); pink::RedisCli::SerializeCommand(argv, &wbuf_str); is_first_send_ = false; + LOG(INFO) << wbuf_str; } - DLOG(INFO) << wbuf_str; return cli_->Send(&wbuf_str); } @@ -27,10 +27,10 @@ pink::Status PikaSlavepingThread::RecvProc() { DLOG(INFO) << "Reply from master after ping: " << cli_->argv_[0]; if (cli_->argv_[0] == "pong" || cli_->argv_[0] == "ok") { } else { - s = pink::Status::Corruption(""); + s = pink::Status::Corruption("Reply is not pong or ok"); } } else { - DLOG(INFO) << "RecvProc, recv error: " << s.ToString(); + LOG(WARNING) << "RecvProc, recv error: " << s.ToString(); } return s; } @@ -50,7 +50,7 @@ void* PikaSlavepingThread::ThreadMain() { g_pika_server->PlusMasterConnection(); while (true) { if (should_exit_) { - DLOG(INFO) << "Close Slaveping Thread now"; + LOG(INFO) << "Close Slaveping Thread now"; close(cli_->fd()); g_pika_server->pika_binlog_receiver_thread()->KillBinlogSender(); break; @@ -64,17 +64,17 @@ void* PikaSlavepingThread::ThreadMain() { DLOG(INFO) << "Ping master success"; gettimeofday(&last_interaction, NULL); } else if (s.IsTimeout()) { - DLOG(INFO) << "Slaveping timeout once"; + LOG(WARNING) << "Slaveping timeout once"; gettimeofday(&now, NULL); if (now.tv_sec - last_interaction.tv_sec > 30) { //timeout; - DLOG(INFO) << "Ping master timeout"; + LOG(WARNING) << "Ping master timeout"; close(cli_->fd()); g_pika_server->pika_binlog_receiver_thread()->KillBinlogSender(); break; } } else { - DLOG(INFO) << "Ping master error"; + LOG(WARNING) << "Ping master error"; close(cli_->fd()); g_pika_server->pika_binlog_receiver_thread()->KillBinlogSender(); break; @@ -83,9 +83,9 @@ void* PikaSlavepingThread::ThreadMain() { } g_pika_server->MinusMasterConnection(); } else if (!should_exit_) { - DLOG(INFO) << "Slaveping, Connect timeout"; + LOG(WARNING) << "Slaveping, Connect timeout"; if ((++connect_retry_times) >= 30) { - DLOG(INFO) << "Slaveping, Connect timeout 10 times, disconnect with master"; + LOG(WARNING) << "Slaveping, Connect timeout 10 times, disconnect with master"; close(cli_->fd()); g_pika_server->pika_binlog_receiver_thread()->KillBinlogSender(); connect_retry_times = 0; diff --git a/src/pika_trysync_thread.cc b/src/pika_trysync_thread.cc index 4c78d17ad5..7604aed35f 100644 --- a/src/pika_trysync_thread.cc +++ b/src/pika_trysync_thread.cc @@ -6,6 +6,7 @@ #include "pika_server.h" #include "pika_conf.h" #include "env.h" +#include "rsync.h" extern PikaServer* g_pika_server; extern PikaConf* g_pika_conf; @@ -15,7 +16,7 @@ PikaTrysyncThread::~PikaTrysyncThread() { pthread_join(thread_id(), NULL); slash::StopRsync(g_pika_conf->db_sync_path()); delete cli_; - DLOG(INFO) << " Trysync thread " << pthread_self() << " exit!!!"; + LOG(INFO) << " Trysync thread " << pthread_self() << " exit!!!"; } bool PikaTrysyncThread::Send() { @@ -42,7 +43,7 @@ bool PikaTrysyncThread::Send() { pink::RedisCli::SerializeCommand(argv, &tbuf_str); wbuf_str.append(tbuf_str); - DLOG(INFO) << wbuf_str; + LOG(INFO) << wbuf_str; pink::Status s; s = cli_->Send(&wbuf_str); @@ -67,9 +68,10 @@ bool PikaTrysyncThread::RecvProc() { } reply = cli_->argv_[0]; - DLOG(INFO) << "Reply from master after trysync: " << reply; + LOG(INFO) << "Reply from master after trysync: " << reply; if (!is_authed && should_auth) { if (kInnerReplOk != slash::StringToLower(reply)) { + LOG(WARNING) << "remove master"; g_pika_server->RemoveMaster(); return false; } @@ -78,7 +80,7 @@ bool PikaTrysyncThread::RecvProc() { if (cli_->argv_.size() == 1 && slash::string2l(reply.data(), reply.size(), &sid_)) { // Luckly, I got your point, the sync is comming - DLOG(INFO) << "Recv sid from master: " << sid_; + LOG(INFO) << "Recv sid from master: " << sid_; break; } // Failed @@ -89,10 +91,11 @@ bool PikaTrysyncThread::RecvProc() { // 1, Master do bgsave first. // 2, Master waiting for an existing bgsaving process // 3, Master do dbsyncing - DLOG(INFO) << "Need wait to sync"; + LOG(INFO) << "Need wait to sync"; g_pika_server->NeedWaitDBSync(); - } else { - g_pika_server->RemoveMaster(); +// } else { +// LOG(WARNING) << "remove master"; +// g_pika_server->RemoveMaster(); } return false; } @@ -116,7 +119,7 @@ bool PikaTrysyncThread::TryUpdateMasterOffset() { // Got new binlog offset std::ifstream is(info_path); if (!is) { - LOG(ERROR) << "Failed to open info file after db sync"; + LOG(WARNING) << "Failed to open info file after db sync"; return false; } std::string line, master_ip; @@ -128,7 +131,7 @@ bool PikaTrysyncThread::TryUpdateMasterOffset() { master_ip = line; } else if (lineno > 2 && lineno < 6) { if (!slash::string2l(line.data(), line.size(), &tmp) || tmp < 0) { - LOG(ERROR) << "Format of info file after db sync error, line : " << line; + LOG(WARNING) << "Format of info file after db sync error, line : " << line; is.close(); return false; } @@ -137,7 +140,7 @@ bool PikaTrysyncThread::TryUpdateMasterOffset() { else { offset = tmp; } } else if (lineno > 5) { - LOG(ERROR) << "Format of info file after db sync error, line : " << line; + LOG(WARNING) << "Format of info file after db sync error, line : " << line; is.close(); return false; } @@ -151,7 +154,7 @@ bool PikaTrysyncThread::TryUpdateMasterOffset() { // Sanity check if (master_ip != g_pika_server->master_ip() || master_port != g_pika_server->master_port()) { - LOG(ERROR) << "Error master ip port: " << master_ip << ":" << master_port; + LOG(WARNING) << "Error master ip port: " << master_ip << ":" << master_port; return false; } @@ -159,7 +162,7 @@ bool PikaTrysyncThread::TryUpdateMasterOffset() { slash::StopRsync(g_pika_conf->db_sync_path()); slash::DeleteFile(info_path); if (!g_pika_server->ChangeDb(g_pika_conf->db_sync_path())) { - LOG(ERROR) << "Failed to change db"; + LOG(WARNING) << "Failed to change db"; return false; } @@ -186,7 +189,7 @@ void* PikaTrysyncThread::ThreadMain() { if (g_pika_server->WaitingDBSync()) { //Try to update offset by db sync if (TryUpdateMasterOffset()) { - DLOG(INFO) << "Success Update Master Offset"; + LOG(INFO) << "Success Update Master Offset"; } } @@ -194,7 +197,7 @@ void* PikaTrysyncThread::ThreadMain() { continue; } sleep(2); - DLOG(INFO) << "Should connect master"; + LOG(INFO) << "Should connect master"; std::string master_ip = g_pika_server->master_ip(); int master_port = g_pika_server->master_port(); @@ -207,9 +210,9 @@ void* PikaTrysyncThread::ThreadMain() { // To make sure only data from current master is received int ret = slash::StartRsync(dbsync_path, kDBSyncModule + "_" + ip_port, g_pika_conf->port() + 300); if (0 != ret) { - LOG(ERROR) << "Failed to start rsync, path:" << dbsync_path << " error : " << ret; + LOG(WARNING) << "Failed to start rsync, path:" << dbsync_path << " error : " << ret; } - DLOG(INFO) << "Finish to start rsync, path:" << dbsync_path; + LOG(INFO) << "Finish to start rsync, path:" << dbsync_path; if ((cli_->Connect(master_ip, master_port)).ok()) { @@ -222,7 +225,7 @@ void* PikaTrysyncThread::ThreadMain() { delete g_pika_server->ping_thread_; g_pika_server->ping_thread_ = new PikaSlavepingThread(sid_); g_pika_server->ping_thread_->StartThread(); - DLOG(INFO) << "Trysync success"; + LOG(INFO) << "Trysync success"; } cli_->Close(); } else { diff --git a/src/pika_worker_thread.cc b/src/pika_worker_thread.cc index 150fa75b40..076f886128 100644 --- a/src/pika_worker_thread.cc +++ b/src/pika_worker_thread.cc @@ -18,7 +18,7 @@ PikaWorkerThread::~PikaWorkerThread() { should_exit_ = true; pthread_join(thread_id(), NULL); DestoryCmdTable(cmds_); - DLOG(INFO) << "A worker thread " << thread_id() << " exit!!!"; + LOG(INFO) << "A worker thread " << thread_id() << " exit!!!"; } void PikaWorkerThread::CronHandle() { @@ -39,7 +39,7 @@ void PikaWorkerThread::CronHandle() { * Find timeout client */ if (now.tv_sec - static_cast(iter->second)->last_interaction().tv_sec > g_pika_conf->timeout()) { - DLOG(INFO) << "Find Timeout Client: " << static_cast(iter->second)->ip_port(); + LOG(INFO) << "Find Timeout Client: " << static_cast(iter->second)->ip_port(); AddCronTask(WorkerCronTask{TASK_KILL, static_cast(iter->second)->ip_port()}); } iter++; @@ -112,7 +112,6 @@ void PikaWorkerThread::ClientKill(std::string ip_port) { if (static_cast(iter->second)->ip_port() != ip_port) { continue; } - DLOG(INFO) << "==========Kill Client=============="; close(iter->first); delete(static_cast(iter->second)); conns_.erase(iter); @@ -124,7 +123,6 @@ void PikaWorkerThread::ClientKillAll() { slash::RWLock l(&rwlock_, true); std::map::iterator iter = conns_.begin(); while (iter != conns_.end()) { - DLOG(INFO) << "==========Kill Client=============="; close(iter->first); delete(static_cast(iter->second)); iter = conns_.erase(iter);