Skip to content

Commit

Permalink
support slowlog command (OpenAtomFoundation#338)
Browse files Browse the repository at this point in the history
  • Loading branch information
Axlgrep authored Aug 24, 2018
1 parent eadc743 commit 0ee7245
Show file tree
Hide file tree
Showing 11 changed files with 188 additions and 4 deletions.
2 changes: 2 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ expire-logs-nums : 10
root-connection-num : 2
# Slowlog-log-slower-than
slowlog-log-slower-than : 10000
# Slowlog-max-len
slowlog-max-len : 128
# slave-read-only(yes/no, 1/0)
slave-read-only : 0
# Pika db sync path
Expand Down
15 changes: 15 additions & 0 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,21 @@ class ScandbCmd : public Cmd {
}
};

class SlowlogCmd : public Cmd {
public:
enum SlowlogCondition{kGET, kLEN, kRESET};
SlowlogCmd() : condition_(kGET) {}
virtual void Do();
private:
int64_t number_;
SlowlogCmd::SlowlogCondition condition_;
virtual void DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info);
virtual void Clear() {
number_ = 10;
condition_ = kGET;
}
};

#ifdef TCMALLOC_EXTENSION
class TcmallocCmd : public Cmd {
public:
Expand Down
1 change: 1 addition & 0 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const std::string kCmdNameTime = "time";
const std::string kCmdNameDelbackup = "delbackup";
const std::string kCmdNameEcho = "echo";
const std::string kCmdNameScandb = "scandb";
const std::string kCmdNameSlowlog = "slowlog";
#ifdef TCMALLOC_EXTENSION
const std::string kCmdNameTcmalloc = "tcmalloc";
#endif
Expand Down
6 changes: 6 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class PikaConf : public slash::BaseConf {
int maxclients() { RWLock l(&rwlock_, false); return maxclients_; }
int root_connection_num() { RWLock l(&rwlock_, false); return root_connection_num_; }
int slowlog_slower_than() { RWLock l(&rwlock_, false); return slowlog_log_slower_than_; }
int slowlog_max_len() { RWLock L(&rwlock_, false); return slowlog_max_len_; }
std::string network_interface() { RWLock l(&rwlock_, false); return network_interface_; }

// Immutable config items, we don't use lock.
Expand Down Expand Up @@ -159,6 +160,10 @@ class PikaConf : public slash::BaseConf {
RWLock l(&rwlock_, true);
slowlog_log_slower_than_ = value;
}
void SetSlowlogMaxLen(const int value) {
RWLock l(&rwlock_, true);
slowlog_max_len_ = value;
}
void SetDbSyncSpeed(const int value) {
RWLock l(&rwlock_, true);
db_sync_speed_ = value;
Expand Down Expand Up @@ -212,6 +217,7 @@ class PikaConf : public slash::BaseConf {
int maxclients_;
int root_connection_num_;
int slowlog_log_slower_than_;
int slowlog_max_len_;
int expire_logs_days_;
int expire_logs_nums_;
bool readonly_;
Expand Down
13 changes: 13 additions & 0 deletions include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#ifndef PIKA_DEFINE_H_
#define PIKA_DEFINE_H_

#include "include/pika_command.h"

#define PIKA_MAX_WORKER_THREAD_NUM 24

Expand All @@ -32,6 +33,18 @@ struct SlaveItem {
struct timeval create_time;
};

//slowlog define
#define SLOWLOG_ENTRY_MAX_ARGC 32
#define SLOWLOG_ENTRY_MAX_STRING 128

//slowlog entry
struct SlowlogEntry {
int64_t id;
int64_t start_time;
int64_t duration;
PikaCmdArgsType argv;
};

#define PIKA_MIN_RESERVED_FDS 5000

const int SLAVE_ITEM_STAGE_ONE = 1;
Expand Down
16 changes: 16 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,15 @@ class PikaServer {
bool WaitTillBinlogBGSerial(uint64_t my_serial);
void SignalNextBinlogBGSerial();

/*
* Slowlog use
*/
void SlowlogTrim();
void SlowlogReset();
uint32_t SlowlogLen();
void SlowlogObtain(int64_t number, std::vector<SlowlogEntry>* slowlogs);
void SlowlogPushEntry(const PikaCmdArgsType& argv, int32_t time, int64_t duration);

/*
*for statistic
*/
Expand Down Expand Up @@ -454,6 +463,7 @@ class PikaServer {
* Flushall use
*/
static void DoPurgeDir(void* arg);

/*
* Keyscan use
*/
Expand Down Expand Up @@ -481,6 +491,12 @@ class PikaServer {
std::vector<BinlogBGWorker*> binlogbg_workers_;
std::hash<std::string> str_hash;

/*
* Slowlog use
*/
pthread_rwlock_t slowlog_protector_;
uint64_t slowlog_entry_id_;
std::list<SlowlogEntry> slowlog_list_;

/*
* for statistic
Expand Down
64 changes: 62 additions & 2 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1051,6 +1051,10 @@ void ConfigCmd::ConfigGet(std::string &ret) {
ret = "*2\r\n";
EncodeString(&ret, "slowlog-log-slower-than");
EncodeInt32(&ret, g_pika_conf->slowlog_slower_than());
} else if (get_item == "slowlog-max-len") {
ret = "*2\r\n";
EncodeString(&ret, "slowlog-max-len");
EncodeInt32(&ret, g_pika_conf->slowlog_max_len());
} else if (get_item == "write-binlog") {
ret = "*2\r\n";
EncodeString(&ret, "write-binlog");
Expand Down Expand Up @@ -1084,7 +1088,7 @@ void ConfigCmd::ConfigGet(std::string &ret) {
EncodeString(&ret, "slave-priority");
EncodeInt32(&ret, g_pika_conf->slave_priority());
} else if (get_item == "*") {
ret = "*88\r\n";
ret = "*90\r\n";
EncodeString(&ret, "port");
EncodeInt32(&ret, g_pika_conf->port());
EncodeString(&ret, "double-master-ip");
Expand Down Expand Up @@ -1149,6 +1153,8 @@ void ConfigCmd::ConfigGet(std::string &ret) {
EncodeInt32(&ret, g_pika_conf->root_connection_num());
EncodeString(&ret, "slowlog-log-slower-than");
EncodeInt32(&ret, g_pika_conf->slowlog_slower_than());
EncodeString(&ret, "slowlog-max-len");
EncodeInt32(&ret, g_pika_conf->slowlog_max_len());
EncodeString(&ret, "slave-read-only");
EncodeInt32(&ret, g_pika_conf->readonly());
EncodeString(&ret, "write-binlog");
Expand Down Expand Up @@ -1181,7 +1187,7 @@ void ConfigCmd::ConfigGet(std::string &ret) {
void ConfigCmd::ConfigSet(std::string& ret) {
std::string set_item = config_args_v_[1];
if (set_item == "*") {
ret = "*20\r\n";
ret = "*21\r\n";
EncodeString(&ret, "loglevel");
EncodeString(&ret, "timeout");
EncodeString(&ret, "requirepass");
Expand All @@ -1195,6 +1201,7 @@ void ConfigCmd::ConfigSet(std::string& ret) {
EncodeString(&ret, "expire-logs-nums");
EncodeString(&ret, "root-connection-num");
EncodeString(&ret, "slowlog-log-slower-than");
EncodeString(&ret, "slowlog-max-len");
EncodeString(&ret, "slave-read-only");
EncodeString(&ret, "write-binlog");
EncodeString(&ret, "identify-binlog-type");
Expand Down Expand Up @@ -1291,6 +1298,14 @@ void ConfigCmd::ConfigSet(std::string& ret) {
}
g_pika_conf->SetSlowlogSlowerThan(ival);
ret = "+OK\r\n";
} else if (set_item == "slowlog-max-len") {
if (!slash::string2l(value.data(), value.size(), &ival) || ival < 0) {
ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slowlog-max-len'\r\n";
return;
}
g_pika_conf->SetSlowlogMaxLen(ival);
g_pika_server->SlowlogTrim();
ret = "+OK\r\n";
} else if (set_item == "slave-read-only") {
slash::StringToLower(value);
bool is_readonly;
Expand Down Expand Up @@ -1559,6 +1574,51 @@ void ScandbCmd::Do() {
return;
}

void SlowlogCmd::DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info) {
if (!ptr_info->CheckArg(argv.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameSlowlog);
return;
}
if (argv.size() == 2 && !strcasecmp(argv[1].data(), "reset")) {
condition_ = SlowlogCmd::kRESET;
} else if (argv.size() == 2 && !strcasecmp(argv[1].data(), "len")) {
condition_ = SlowlogCmd::kLEN;
} else if ((argv.size() == 2 || argv.size() == 3) && !strcasecmp(argv[1].data(), "get")) {
condition_ = SlowlogCmd::kGET;
if (argv.size() == 3 && !slash::string2l(argv[2].data(), argv[2].size(), &number_)) {
res_.SetRes(CmdRes::kInvalidInt);
return;
}
} else {
res_.SetRes(CmdRes::kErrOther, "Unknown SLOWLOG subcommand or wrong # of args. Try GET, RESET, LEN.");
return;
}
}

void SlowlogCmd::Do() {
if (condition_ == SlowlogCmd::kRESET) {
g_pika_server->SlowlogReset();
res_.SetRes(CmdRes::kOk);
} else if (condition_ == SlowlogCmd::kLEN) {
res_.AppendInteger(g_pika_server->SlowlogLen());
} else {
std::vector<SlowlogEntry> slowlogs;
g_pika_server->SlowlogObtain(number_, &slowlogs);
res_.AppendArrayLen(slowlogs.size());
for (const auto& slowlog : slowlogs) {
res_.AppendArrayLen(4);
res_.AppendInteger(slowlog.id);
res_.AppendInteger(slowlog.start_time);
res_.AppendInteger(slowlog.duration);
res_.AppendArrayLen(slowlog.argv.size());
for (const auto& arg : slowlog.argv) {
res_.AppendString(arg);
}
}
}
return;
}

#ifdef TCMALLOC_EXTENSION
void TcmallocCmd::DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info) {
(void)ptr_info;
Expand Down
4 changes: 3 additions & 1 deletion src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ std::string PikaClientConn::DoCmd(
}

if (g_pika_conf->slowlog_slower_than() >= 0) {
int32_t start_time = start_us / 1000000;
int64_t duration = slash::NowMicros() - start_us;
if (duration > g_pika_conf->slowlog_slower_than()) {
std::string slow_log;
Expand All @@ -243,7 +244,8 @@ std::string PikaClientConn::DoCmd(
break;
}
}
LOG(ERROR) << "ip_port: "<< ip_port() << ", command:" << slow_log << ", start_time(s): " << start_us / 1000000 << ", duration(us): " << duration;
LOG(ERROR) << "ip_port: "<< ip_port() << ", command:" << slow_log << ", start_time(s): " << start_time << ", duration(us): " << duration;
g_pika_server->SlowlogPushEntry(argv, start_time, duration);
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ void InitCmdInfoTable() {
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameEcho, echoptr));
CmdInfo* scandbptr = new CmdInfo(kCmdNameScandb, -1, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameScandb, scandbptr));
CmdInfo* slowlogptr = new CmdInfo(kCmdNameSlowlog, -2, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameSlowlog, slowlogptr));
#ifdef TCMALLOC_EXTENSION
CmdInfo* tcmallocptr = new CmdInfo(kCmdNameTcmalloc, -2, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameTcmalloc, tcmallocptr));
Expand Down Expand Up @@ -485,6 +487,8 @@ void InitCmdTable(std::unordered_map<std::string, Cmd*> *cmd_table) {
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameEcho, echoptr));
Cmd* scandbptr = new ScandbCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameScandb, scandbptr));
Cmd* slowlogptr = new SlowlogCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameSlowlog, slowlogptr));
#ifdef TCMALLOC_EXTENSION
Cmd* tcmallocptr = new TcmallocCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameTcmalloc, tcmallocptr));
Expand Down
2 changes: 2 additions & 0 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ int PikaConf::Load()
root_connection_num_ = 2;
}
GetConfInt("slowlog-log-slower-than", &slowlog_log_slower_than_);
GetConfInt("slowlog-max-len", &slowlog_max_len_);
std::string user_blacklist;
GetConfStr("userblacklist", &user_blacklist);
SetUserBlackList(std::string(user_blacklist));
Expand Down Expand Up @@ -266,6 +267,7 @@ int PikaConf::ConfigRewrite() {
SetConfInt("expire-logs-nums", expire_logs_nums_);
SetConfInt("root-connection-num", root_connection_num_);
SetConfInt("slowlog-log-slower-than", slowlog_log_slower_than_);
SetConfInt("slowlog-max-len", slowlog_max_len_);
SetConfBool("slave-read-only", readonly_);
SetConfStr("compact-cron", compact_cron_);
SetConfStr("compact-interval", compact_interval_);
Expand Down
65 changes: 64 additions & 1 deletion src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ PikaServer::PikaServer() :
purging_(false),
binlogbg_exit_(false),
binlogbg_cond_(&binlogbg_mutex_),
binlogbg_serial_(0) {
binlogbg_serial_(0),
slowlog_entry_id_(0) {

pthread_rwlockattr_t attr;
pthread_rwlockattr_init(&attr);
Expand Down Expand Up @@ -1542,6 +1543,68 @@ void PikaServer::SignalNextBinlogBGSerial() {
binlogbg_mutex_.Unlock();
}

void PikaServer::SlowlogTrim() {
RWLock l(&slowlog_protector_, true);
uint32_t slowlog_max_len = static_cast<uint32_t>(g_pika_conf->slowlog_max_len());
while (slowlog_list_.size() > slowlog_max_len) {
slowlog_list_.pop_back();
}
}

void PikaServer::SlowlogReset() {
RWLock l(&slowlog_protector_, true);
slowlog_list_.clear();
}

uint32_t PikaServer::SlowlogLen() {
RWLock l(&slowlog_protector_, false);
return slowlog_list_.size();
}

void PikaServer::SlowlogObtain(int64_t number, std::vector<SlowlogEntry>* slowlogs) {
RWLock l(&slowlog_protector_, false);
slowlogs->clear();
std::list<SlowlogEntry>::const_iterator iter = slowlog_list_.begin();
while (number-- && iter != slowlog_list_.end()) {
slowlogs->push_back(*iter);
iter++;
}
}

void PikaServer::SlowlogPushEntry(const PikaCmdArgsType& argv, int32_t time, int64_t duration) {
SlowlogEntry entry;
uint32_t slargc = (argv.size() < SLOWLOG_ENTRY_MAX_ARGC)
? argv.size() : SLOWLOG_ENTRY_MAX_ARGC;

for (uint32_t idx = 0; idx < slargc; ++idx) {
if (slargc != argv.size() && idx == slargc - 1) {
char buffer[32];
sprintf(buffer, "... (%lu more arguments)", argv.size() - slargc + 1);
entry.argv.push_back(std::string(buffer));
} else {
if (argv[idx].size() > SLOWLOG_ENTRY_MAX_STRING) {
char buffer[32];
sprintf(buffer, "... (%lu more bytes)", argv[idx].size() - SLOWLOG_ENTRY_MAX_STRING);
std::string suffix(buffer);
std::string brief = argv[idx].substr(0, SLOWLOG_ENTRY_MAX_STRING);
entry.argv.push_back(brief + suffix);
} else {
entry.argv.push_back(argv[idx]);
}
}
}

{
RWLock l(&slowlog_protector_, true);
entry.id = slowlog_entry_id_++;
entry.start_time = time;
entry.duration = duration;
slowlog_list_.push_front(entry);
}

SlowlogTrim();
}

void PikaServer::RunKeyScan() {
std::vector<uint64_t> new_key_nums_v;

Expand Down

0 comments on commit 0ee7245

Please sign in to comment.