Skip to content

Commit

Permalink
stat work-queue time in slow log (OpenAtomFoundation#1997)
Browse files Browse the repository at this point in the history
Co-authored-by: wangshaoyi <[email protected]>
  • Loading branch information
wangshao1 and wangshaoyi authored Sep 22, 2023
1 parent 594ab2f commit b94751e
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 22 deletions.
33 changes: 32 additions & 1 deletion include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,36 @@

#include "include/pika_command.h"


// TODO: stat time costing in write out data to connfd
struct TimeStat {
TimeStat() = default;
void Reset() {
enqueue_ts_ = dequeue_ts_ = 0;
process_done_ts_ = 0;
}

uint64_t start_ts() const {
return enqueue_ts_;
}

uint64_t total_time() const {
return process_done_ts_ > enqueue_ts_ ? process_done_ts_ - enqueue_ts_ : 0;
}

uint64_t queue_time() const {
return dequeue_ts_ > enqueue_ts_ ? dequeue_ts_ - enqueue_ts_ : 0;
}

uint64_t process_time() const {
return process_done_ts_ > dequeue_ts_ ? process_done_ts_ - dequeue_ts_ : 0;
}

uint64_t enqueue_ts_;
uint64_t dequeue_ts_;
uint64_t process_done_ts_;
};

class PikaClientConn : public net::RedisConn {
public:
using WriteCompleteCallback = std::function<void()>;
Expand Down Expand Up @@ -65,6 +95,7 @@ class PikaClientConn : public net::RedisConn {
std::atomic<int> resp_num;
std::vector<std::shared_ptr<std::string>> resp_array;

std::shared_ptr<TimeStat> time_stat_;
private:
net::ServerThread* const server_thread_;
std::string current_db_;
Expand All @@ -74,7 +105,7 @@ class PikaClientConn : public net::RedisConn {
std::shared_ptr<Cmd> DoCmd(const PikaCmdArgsType& argv, const std::string& opt,
const std::shared_ptr<std::string>& resp_ptr);

void ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t start_us, uint64_t do_duration);
void ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t do_duration);
void ProcessMonitor(const PikaCmdArgsType& argv);

void ExecRedisCmd(const PikaCmdArgsType& argv, const std::shared_ptr<std::string>& resp_ptr);
Expand Down
1 change: 0 additions & 1 deletion src/net/src/net_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ namespace net {
NetConn::NetConn(const int fd, std::string ip_port, Thread* thread, NetMultiplexer* net_mpx)
: fd_(fd),
ip_port_(std::move(ip_port)),

#ifdef __ENABLE_SSL
ssl_(nullptr),
#endif
Expand Down
36 changes: 16 additions & 20 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ PikaClientConn::PikaClientConn(int fd, const std::string& ip_port, net::Thread*
server_thread_(reinterpret_cast<net::ServerThread*>(thread)),
current_db_(g_pika_conf->default_db()) {
auth_stat_.Init();
time_stat_.reset(new TimeStat());
}

std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const std::string& opt,
Expand All @@ -49,11 +50,6 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
return c_ptr;
}

uint64_t start_us = 0;
if (g_pika_conf->slowlog_slower_than() >= 0) {
start_us = pstd::NowMicros();
}

bool is_monitoring = g_pika_server->HasMonitorClients();
if (is_monitoring) {
ProcessMonitor(argv);
Expand Down Expand Up @@ -107,23 +103,21 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st

// Process Command
c_ptr->Execute();
uint64_t duration = pstd::NowMicros() - start_us;
time_stat_->process_done_ts_ = pstd::NowMicros();
auto cmdstat_map = g_pika_server->GetCommandStatMap();
(*cmdstat_map)[opt].cmd_count.fetch_add(1);
(*cmdstat_map)[opt].cmd_time_consuming.fetch_add(duration);
(*cmdstat_map)[opt].cmd_time_consuming.fetch_add(time_stat_->total_time());

if (g_pika_conf->slowlog_slower_than() >= 0) {
ProcessSlowlog(argv, start_us, c_ptr->GetDoDuration());
ProcessSlowlog(argv, c_ptr->GetDoDuration());
}

return c_ptr;
}

void PikaClientConn::ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t start_us, uint64_t do_duration) {
auto start_time = static_cast<int32_t>(start_us / 1000000);
auto duration = static_cast<int64_t>(pstd::NowMicros() - start_us);
if (duration > g_pika_conf->slowlog_slower_than()) {
g_pika_server->SlowlogPushEntry(argv, start_time, duration);
void PikaClientConn::ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t do_duration) {
if (time_stat_->total_time() > g_pika_conf->slowlog_slower_than()) {
g_pika_server->SlowlogPushEntry(argv, time_stat_->start_ts(), time_stat_->total_time());
if (g_pika_conf->slowlog_write_errorlog()) {
bool trim = false;
std::string slow_log;
Expand All @@ -142,8 +136,10 @@ void PikaClientConn::ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t start_
}
LOG(ERROR) << "ip_port: " << ip_port() << ", db: " << current_db_ << ", command:" << slow_log
<< ", command_size: " << cmd_size - 1 << ", arguments: " << argv.size()
<< ", start_time(s): " << start_time << ", duration(us): " << duration
<< ", do_duration_(us): " << do_duration;
<< ", total_time(ms): " << time_stat_->total_time() / 1000
<< ", queue_time(ms): " << time_stat_->queue_time() / 1000
<< ", process_time(ms): " << time_stat_->process_time() / 1000
<< ", cmd_time(ms): " << do_duration / 1000;
}
}
}
Expand All @@ -160,9 +156,11 @@ void PikaClientConn::ProcessMonitor(const PikaCmdArgsType& argv) {

void PikaClientConn::ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>& argvs, bool async,
std::string* response) {
time_stat_->Reset();
if (async) {
auto arg = new BgTaskArg();
arg->redis_cmds = argvs;
time_stat_->enqueue_ts_ = pstd::NowMicros();
arg->conn_ptr = std::dynamic_pointer_cast<PikaClientConn>(shared_from_this());
g_pika_server->ScheduleClientPool(&DoBackgroundTask, arg);
return;
Expand All @@ -173,6 +171,7 @@ void PikaClientConn::ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>&
void PikaClientConn::DoBackgroundTask(void* arg) {
std::unique_ptr<BgTaskArg> bg_arg(static_cast<BgTaskArg*>(arg));
std::shared_ptr<PikaClientConn> conn_ptr = bg_arg->conn_ptr;
conn_ptr->time_stat_->dequeue_ts_ = pstd::NowMicros();
if (bg_arg->redis_cmds.empty()) {
conn_ptr->NotifyEpoll(false);
return;
Expand All @@ -197,14 +196,10 @@ void PikaClientConn::DoExecTask(void* arg) {
uint32_t slot_id = bg_arg->slot_id;
bg_arg.reset();

uint64_t start_us = 0;
if (g_pika_conf->slowlog_slower_than() >= 0) {
start_us = pstd::NowMicros();
}
cmd_ptr->SetStage(Cmd::kExecuteStage);
cmd_ptr->Execute();
if (g_pika_conf->slowlog_slower_than() >= 0) {
conn_ptr->ProcessSlowlog(cmd_ptr->argv(), start_us, cmd_ptr->GetDoDuration());
conn_ptr->ProcessSlowlog(cmd_ptr->argv(), cmd_ptr->GetDoDuration());
}

std::shared_ptr<SyncMasterSlot> slot = g_pika_rm->GetSyncMasterSlotByName(SlotInfo(db_name, slot_id));
Expand Down Expand Up @@ -232,6 +227,7 @@ void PikaClientConn::BatchExecRedisCmd(const std::vector<net::RedisCmdArgsType>&
resp_array.push_back(resp_ptr);
ExecRedisCmd(argv, resp_ptr);
}
time_stat_->process_done_ts_ = pstd::NowMicros();
TryWriteResp();
}

Expand Down

0 comments on commit b94751e

Please sign in to comment.