Skip to content

Commit

Permalink
feat: Exporter for Proxy (OpenAtomFoundation#2199)
Browse files Browse the repository at this point in the history
* First edition

* Exporter collects Proxy information.

* Improve metrics

* Dynamically adjust refresh time.
  • Loading branch information
dingxiaoshuai123 authored Dec 21, 2023
1 parent 9e26988 commit cacb428
Show file tree
Hide file tree
Showing 22 changed files with 890 additions and 22 deletions.
2 changes: 2 additions & 0 deletions codis/cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ Options:
}
defer s.Close()

proxy.RefreshPeriod.Set(config.MaxDelayRefreshTimeInterval.Int64())

log.Warnf("create proxy with config\n%s", config)

if s, ok := utils.Argument(d, "--pidfile"); ok {
Expand Down
2 changes: 2 additions & 0 deletions codis/config/proxy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,5 @@ metrics_report_statsd_server = ""
metrics_report_statsd_period = "1s"
metrics_report_statsd_prefix = ""

# Maximum delay statistical time interval.(This value must be greater than 0.)
max_delay_refresh_time_interval = "15s"
9 changes: 9 additions & 0 deletions codis/pkg/proxy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ metrics_report_influxdb_database = ""
metrics_report_statsd_server = ""
metrics_report_statsd_period = "1s"
metrics_report_statsd_prefix = ""
# Maximum delay statistical time interval.(This value must be greater than 0.)
max_delay_refresh_time_interval = "15s"
`

type Config struct {
Expand Down Expand Up @@ -192,6 +195,8 @@ type Config struct {
MetricsReportStatsdPeriod timesize.Duration `toml:"metrics_report_statsd_period" json:"metrics_report_statsd_period"`
MetricsReportStatsdPrefix string `toml:"metrics_report_statsd_prefix" json:"metrics_report_statsd_prefix"`

MaxDelayRefreshTimeInterval timesize.Duration `toml:"max_delay_refresh_time_interval" json:"max_delay_refresh_time_interval"`

ConfigFileName string `toml:"-" json:"config_file_name"`
}

Expand Down Expand Up @@ -323,5 +328,9 @@ func (c *Config) Validate() error {
return errors.New("invalid metrics_report_statsd_period")
}

if c.MaxDelayRefreshTimeInterval <= 0 {
return errors.New("max_delay_refresh_time_interval must be greater than 0")
}

return nil
}
22 changes: 21 additions & 1 deletion codis/pkg/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,12 @@ func (p *Proxy) ConfigGet(key string) *redis.Resp {
redis.NewBulkBytes([]byte("metrics_report_statsd_prefix")),
redis.NewBulkBytes([]byte(p.config.MetricsReportStatsdPrefix)),
})
case "max_delay_refresh_time_interval":
if text, err := p.config.MaxDelayRefreshTimeInterval.MarshalText(); err != nil {
return redis.NewErrorf("cant get max_delay_refresh_time_interval value.")
} else {
return redis.NewBulkBytes(text)
}
default:
return redis.NewErrorf("unsupported key: %s", key)
}
Expand Down Expand Up @@ -342,6 +348,18 @@ func (p *Proxy) ConfigSet(key, value string) *redis.Resp {
}
p.config.SlowlogLogSlowerThan = n
return redis.NewString([]byte("OK"))
case "max_delay_refresh_time_interval":
s := &(p.config.MaxDelayRefreshTimeInterval)
err := s.UnmarshalText([]byte(value))
if err != nil {
return redis.NewErrorf("err:%s.", err)
}
if d := p.config.MaxDelayRefreshTimeInterval.Duration(); d <= 0 {
return redis.NewErrorf("max_delay_refresh_time_interval must be greater than 0")
} else {
RefreshPeriod.Set(int64(d))
return redis.NewString([]byte("OK"))
}
default:
return redis.NewErrorf("unsupported key: %s", key)
}
Expand Down Expand Up @@ -558,7 +576,8 @@ type Stats struct {
PrimaryOnly bool `json:"primary_only"`
} `json:"backend"`

Runtime *RuntimeStats `json:"runtime,omitempty"`
Runtime *RuntimeStats `json:"runtime,omitempty"`
SlowCmdCount int64 `json:"slow_cmd_count"` // Cumulative count of slow log
}

type RuntimeStats struct {
Expand Down Expand Up @@ -667,5 +686,6 @@ func (p *Proxy) Stats(flags StatsFlags) *Stats {
stats.Runtime.NumCgoCall = runtime.NumCgoCall()
stats.Runtime.MemOffheap = unsafe2.OffheapBytes()
}
stats.SlowCmdCount = SlowCmdCount.Int64()
return stats
}
13 changes: 11 additions & 2 deletions codis/pkg/proxy/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,14 @@ func (s *Session) loopWriter(tasks *RequestChan) (err error) {
} else {
s.incrOpStats(r, resp.Type)
}
nowTime := time.Now().UnixNano()
duration := int64((nowTime - r.ReceiveTime) / 1e3)
s.updateMaxDelay(duration, r)
if fflush {
s.flushOpStats(false)
}
nowTime := time.Now().UnixNano()
duration := int64((nowTime - r.ReceiveTime) / 1e3)
if duration >= s.config.SlowlogLogSlowerThan {
SlowCmdCount.Incr() // Atomic global variable, increment by 1 when slow log occurs.
//client -> proxy -> server -> porxy -> client
//Record the waiting time from receiving the request from the client to sending it to the backend server
//the waiting time from sending the request to the backend server to receiving the response from the server
Expand Down Expand Up @@ -758,3 +760,10 @@ func (s *Session) handlePConfig(r *Request) error {
}
return nil
}

func (s *Session) updateMaxDelay(duration int64, r *Request) {
e := s.getOpStats(r.OpStr) // There is no race condition in the session
if duration > e.maxDelay.Int64() {
e.maxDelay.Set(duration)
}
}
43 changes: 39 additions & 4 deletions codis/pkg/proxy/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ import (
"pika/codis/v2/pkg/utils/sync2/atomic2"
)

var (
SlowCmdCount atomic2.Int64 // Cumulative count of slow log
RefreshPeriod atomic2.Int64
)

type opStats struct {
opstr string
calls atomic2.Int64
Expand All @@ -22,14 +27,16 @@ type opStats struct {
redis struct {
errors atomic2.Int64
}
maxDelay atomic2.Int64
}

func (s *opStats) OpStats() *OpStats {
o := &OpStats{
OpStr: s.opstr,
Calls: s.calls.Int64(),
Usecs: s.nsecs.Int64() / 1e3,
Fails: s.fails.Int64(),
OpStr: s.opstr,
Calls: s.calls.Int64(),
Usecs: s.nsecs.Int64() / 1e3,
Fails: s.fails.Int64(),
MaxDelay: s.maxDelay.Int64(),
}
if o.Calls != 0 {
o.UsecsPercall = o.Usecs / o.Calls
Expand All @@ -45,6 +52,7 @@ type OpStats struct {
UsecsPercall int64 `json:"usecs_percall"`
Fails int64 `json:"fails"`
RedisErrType int64 `json:"redis_errtype"`
MaxDelay int64 `json:"max_delay"`
}

var cmdstats struct {
Expand All @@ -62,6 +70,7 @@ var cmdstats struct {

func init() {
cmdstats.opmap = make(map[string]*opStats, 128)
SlowCmdCount.Set(0)
go func() {
for {
start := time.Now()
Expand All @@ -72,6 +81,16 @@ func init() {
cmdstats.qps.Set(int64(normalized + 0.5))
}
}()

// Clear the accumulated maximum delay to 0
go func() {
for {
time.Sleep(time.Duration(RefreshPeriod.Int64()))
for _, s := range cmdstats.opmap {
s.maxDelay.Set(0)
}
}
}()
}

func OpTotal() int64 {
Expand Down Expand Up @@ -165,6 +184,22 @@ func incrOpStats(e *opStats) {
s.redis.errors.Add(n)
cmdstats.redis.errors.Add(n)
}

/**
Each session refreshes its own saved metrics, and there is a race condition at this time.
Use the CAS method to update.
*/
for {
oldValue := s.maxDelay
if e.maxDelay > oldValue {
if s.maxDelay.CompareAndSwap(oldValue.Int64(), e.maxDelay.Int64()) {
e.maxDelay.Set(0)
break
}
} else {
break
}
}
}

var sessions struct {
Expand Down
2 changes: 2 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ class PikaServer : public pstd::noncopyable {
uint32_t SlowlogLen();
void SlowlogObtain(int64_t number, std::vector<SlowlogEntry>* slowlogs);
void SlowlogPushEntry(const PikaCmdArgsType& argv, int64_t time, int64_t duration);
uint64_t SlowlogCount();

/*
* Statistic used
Expand Down Expand Up @@ -688,6 +689,7 @@ class PikaServer : public pstd::noncopyable {
* Slowlog used
*/
uint64_t slowlog_entry_id_ = 0;
uint64_t slowlog_counter_ = 0;
std::shared_mutex slowlog_protector_;
std::list<SlowlogEntry> slowlog_list_;

Expand Down
2 changes: 1 addition & 1 deletion src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1062,7 +1062,7 @@ void InfoCmd::InfoStats(std::string& info) {
tmp_stream << "is_slots_migrating:" << (is_migrating ? "Yes, " : "No, ") << start_migration_time_str << ", "
<< (is_migrating ? (current_time_s - start_migration_time) : (end_migration_time - start_migration_time))
<< "\r\n";

tmp_stream << "slow_logs_count:" << g_pika_server->SlowlogCount() << "\r\n";
info.append(tmp_stream.str());
}

Expand Down
6 changes: 6 additions & 0 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1209,11 +1209,17 @@ void PikaServer::SlowlogPushEntry(const PikaCmdArgsType& argv, int64_t time, int
entry.start_time = time;
entry.duration = duration;
slowlog_list_.push_front(entry);
slowlog_counter_++;
}

SlowlogTrim();
}

uint64_t PikaServer::SlowlogCount() {
std::shared_lock l(slowlog_protector_);
return slowlog_counter_;
}

void PikaServer::ResetStat() {
statistic_.server_stat.accumulative_connections.store(0);
statistic_.server_stat.qps.querynum.store(0);
Expand Down
77 changes: 77 additions & 0 deletions tools/pika_exporter/discovery/codis_dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,91 @@ type CodisModelInfo struct {
Servers []CodisServerInfo `json:"servers"`
}

type CodisProxyModelInfo struct {
Id int `json:"id"`
AdminAddr string `json:"admin_addr"`
ProductName string `json:"product_name"`
DataCenter string `json:"data_center"`
}

type CodisGroupInfo struct {
Models []CodisModelInfo `json:"models"`
}

type CodisProxyInfo struct {
Models []CodisProxyModelInfo `json:"models"`
}

type CodisStatsInfo struct {
Group CodisGroupInfo `json:"group"`
Proxy CodisProxyInfo `json:"proxy"`
}

type CodisTopomInfo struct {
Stats CodisStatsInfo `json:"stats"`
}

type RedisInfo struct {
Errors int `json:"errors"`
}

type CmdInfo struct {
Opstr string `json:"opstr"`
Calls int64 `json:"calls"`
Usecs_percall int64 `json:"usecs_percall"`
Fails int64 `json:"fails"`
MaxDelay int64 `json:"max_delay"`
}

type ProxyOpsInfo struct {
Total int `json:"total"`
Fails int `json:"fails"`
Redis RedisInfo `json:"redis"`
Qps int `json:"qps"`
Cmd []CmdInfo `json:"cmd"`
}

type RowInfo struct {
Utime int64 `json:"utime"`
Stime int64 `json:"stime"`
MaxRss int64 `json:"max_rss"`
IxRss int64 `json:"ix_rss"`
IdRss int64 `json:"id_rss"`
IsRss int64 `json:"is_rss"`
}

type RusageInfo struct {
Now string `json:"now"`
Cpu float64 `json:"cpu"`
Mem float64 `json:"mem"`
Raw RowInfo `json:"raw"`
}

type GeneralInfo struct {
Alloc int64 `json:"alloc"`
Sys int64 `json:"sys"`
Lookups int64 `json:"lookups"`
Mallocs int64 `json:"mallocs"`
Frees int64 `json:"frees"`
}

type HeapInfo struct {
Alloc int64 `json:"alloc"`
Sys int64 `json:"sys"`
Idle int64 `json:"idle"`
Inuse int64 `json:"inuse"`
Objects int64 `json:"objects"`
}

type RunTimeInfo struct {
General GeneralInfo `json:"general"`
Heap HeapInfo `json:"heap"`
}

type ProxyStats struct {
Online bool `json:"online"`
Ops ProxyOpsInfo `json:"ops"`
Rusage RusageInfo `json:"rusage"`
RunTime RunTimeInfo `json:"runtime"`
SlowCmdCount int64 `json:"slow_cmd_count"`
}
Loading

0 comments on commit cacb428

Please sign in to comment.