Skip to content

Commit

Permalink
metrics: add metrics for top sql (pingcap#25805)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored Jul 5, 2021
1 parent 420e27e commit fee39d3
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 4 deletions.
3 changes: 3 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ func RegisterMetrics() {
prometheus.MustRegister(SmallTxnWriteDuration)
prometheus.MustRegister(TxnWriteThroughput)
prometheus.MustRegister(LoadSysVarCacheCounter)
prometheus.MustRegister(TopSQLIgnoredCounter)
prometheus.MustRegister(TopSQLReportDurationHistogram)
prometheus.MustRegister(TopSQLReportDataHistogram)

tikvmetrics.InitMetrics(TiDB, TiKVClient)
tikvmetrics.RegisterMetrics()
Expand Down
32 changes: 32 additions & 0 deletions metrics/topsql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package metrics

import "github.com/prometheus/client_golang/prometheus"

// Top SQL metrics.
var (
TopSQLIgnoredCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "topsql",
Name: "ignored_total",
Help: "Counter of ignored top-sql metrics (register-sql, register-plan, collect-data and report-data), normally it should be 0.",
}, []string{LblType})

TopSQLReportDurationHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "topsql",
Name: "report_duration_seconds",
Help: "Bucket histogram of reporting time (s) to the top-sql agent",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 24), // 1ms ~ 2.3h
}, []string{LblType, LblResult})

TopSQLReportDataHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "topsql",
Name: "report_data_total",
Help: "Bucket histogram of reporting records/sql/plan count to the top-sql agent.",
Buckets: prometheus.ExponentialBuckets(1, 2, 20), // 1 ~ 524288
}, []string{LblType})
)
29 changes: 27 additions & 2 deletions util/topsql/reporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func (r *GRPCReportClient) sendBatchCPUTimeRecord(ctx context.Context, records [
if len(records) == 0 {
return nil
}
start := time.Now()
client := tipb.NewTopSQLAgentClient(r.conn)
stream, err := client.ReportCPUTimeRecords(ctx)
if err != nil {
Expand All @@ -119,19 +120,28 @@ func (r *GRPCReportClient) sendBatchCPUTimeRecord(ctx context.Context, records [
return err
}
}
topSQLReportRecordCounterHistogram.Observe(float64(len(records)))
// See https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream for how to avoid leaking the stream
_, err = stream.CloseAndRecv()
return err
if err != nil {
reportRecordDurationFailedHistogram.Observe(time.Since(start).Seconds())
return err
}
reportRecordDurationSuccHistogram.Observe(time.Since(start).Seconds())
return nil
}

// sendBatchSQLMeta sends a batch of SQL metas by stream.
func (r *GRPCReportClient) sendBatchSQLMeta(ctx context.Context, sqlMap *sync.Map) error {
start := time.Now()
client := tipb.NewTopSQLAgentClient(r.conn)
stream, err := client.ReportSQLMeta(ctx)
if err != nil {
return err
}
cnt := 0
sqlMap.Range(func(key, value interface{}) bool {
cnt++
sqlMeta := &tipb.SQLMeta{
SqlDigest: []byte(key.(string)),
NormalizedSql: value.(string),
Expand All @@ -145,23 +155,32 @@ func (r *GRPCReportClient) sendBatchSQLMeta(ctx context.Context, sqlMap *sync.Ma
if err != nil {
return err
}
topSQLReportSQLCountHistogram.Observe(float64(cnt))
_, err = stream.CloseAndRecv()
return err
if err != nil {
reportSQLDurationFailedHistogram.Observe(time.Since(start).Seconds())
return err
}
reportSQLDurationSuccHistogram.Observe(time.Since(start).Seconds())
return nil
}

// sendBatchPlanMeta sends a batch of SQL metas by stream.
func (r *GRPCReportClient) sendBatchPlanMeta(ctx context.Context, planMap *sync.Map) error {
start := time.Now()
client := tipb.NewTopSQLAgentClient(r.conn)
stream, err := client.ReportPlanMeta(ctx)
if err != nil {
return err
}
cnt := 0
planMap.Range(func(key, value interface{}) bool {
planDecoded, errDecode := r.decodePlan(value.(string))
if errDecode != nil {
logutil.BgLogger().Warn("[top-sql] decode plan failed", zap.Error(errDecode))
return true
}
cnt++
planMeta := &tipb.PlanMeta{
PlanDigest: []byte(key.(string)),
NormalizedPlan: planDecoded,
Expand All @@ -175,7 +194,13 @@ func (r *GRPCReportClient) sendBatchPlanMeta(ctx context.Context, planMap *sync.
if err != nil {
return err
}
topSQLReportPlanCountHistogram.Observe(float64(cnt))
_, err = stream.CloseAndRecv()
if err != nil {
reportPlanDurationFailedHistogram.Observe(time.Since(start).Seconds())
return err
}
reportPlanDurationSuccHistogram.Observe(time.Since(start).Seconds())
return err
}

Expand Down
30 changes: 28 additions & 2 deletions util/topsql/reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -133,6 +134,24 @@ func NewRemoteTopSQLReporter(client ReportClient) *RemoteTopSQLReporter {
return tsr
}

var (
ignoreExceedSQLCounter = metrics.TopSQLIgnoredCounter.WithLabelValues("ignore_exceed_sql")
ignoreExceedPlanCounter = metrics.TopSQLIgnoredCounter.WithLabelValues("ignore_exceed_plan")
ignoreCollectChannelFullCounter = metrics.TopSQLIgnoredCounter.WithLabelValues("ignore_collect_channel_full")
ignoreReportChannelFullCounter = metrics.TopSQLIgnoredCounter.WithLabelValues("ignore_report_channel_full")
reportAllDurationSuccHistogram = metrics.TopSQLReportDurationHistogram.WithLabelValues("all", metrics.LblOK)
reportAllDurationFailedHistogram = metrics.TopSQLReportDurationHistogram.WithLabelValues("all", metrics.LblError)
reportRecordDurationSuccHistogram = metrics.TopSQLReportDurationHistogram.WithLabelValues("record", metrics.LblOK)
reportRecordDurationFailedHistogram = metrics.TopSQLReportDurationHistogram.WithLabelValues("record", metrics.LblError)
reportSQLDurationSuccHistogram = metrics.TopSQLReportDurationHistogram.WithLabelValues("sql", metrics.LblOK)
reportSQLDurationFailedHistogram = metrics.TopSQLReportDurationHistogram.WithLabelValues("sql", metrics.LblError)
reportPlanDurationSuccHistogram = metrics.TopSQLReportDurationHistogram.WithLabelValues("plan", metrics.LblOK)
reportPlanDurationFailedHistogram = metrics.TopSQLReportDurationHistogram.WithLabelValues("plan", metrics.LblError)
topSQLReportRecordCounterHistogram = metrics.TopSQLReportDataHistogram.WithLabelValues("record")
topSQLReportSQLCountHistogram = metrics.TopSQLReportDataHistogram.WithLabelValues("sql")
topSQLReportPlanCountHistogram = metrics.TopSQLReportDataHistogram.WithLabelValues("plan")
)

// RegisterSQL registers a normalized SQL string to a SQL digest.
// This function is thread-safe and efficient.
//
Expand All @@ -141,6 +160,7 @@ func NewRemoteTopSQLReporter(client ReportClient) *RemoteTopSQLReporter {
// It should also return immediately, and do any CPU-intensive job asynchronously.
func (tsr *RemoteTopSQLReporter) RegisterSQL(sqlDigest []byte, normalizedSQL string) {
if tsr.sqlMapLength.Load() >= variable.TopSQLVariable.MaxCollect.Load() {
ignoreExceedSQLCounter.Inc()
return
}
m := tsr.normalizedSQLMap.Load().(*sync.Map)
Expand All @@ -155,6 +175,7 @@ func (tsr *RemoteTopSQLReporter) RegisterSQL(sqlDigest []byte, normalizedSQL str
// This function is thread-safe and efficient.
func (tsr *RemoteTopSQLReporter) RegisterPlan(planDigest []byte, normalizedBinaryPlan string) {
if tsr.planMapLength.Load() >= variable.TopSQLVariable.MaxCollect.Load() {
ignoreExceedPlanCounter.Inc()
return
}
m := tsr.normalizedPlanMap.Load().(*sync.Map)
Expand All @@ -178,6 +199,7 @@ func (tsr *RemoteTopSQLReporter) Collect(timestamp uint64, records []tracecpu.SQ
}:
default:
// ignore if chan blocked
ignoreCollectChannelFullCounter.Inc()
}
}

Expand Down Expand Up @@ -336,6 +358,8 @@ func (tsr *RemoteTopSQLReporter) takeDataAndSendToReportChan(collectedDataPtr *m
select {
case tsr.reportDataChan <- data:
default:
// ignore if chan blocked
ignoreReportChannelFullCounter.Inc()
}
}

Expand Down Expand Up @@ -391,7 +415,6 @@ func (tsr *RemoteTopSQLReporter) doReport(data reportData) {
}

agentAddr := variable.TopSQLVariable.AgentAddress.Load()

timeout := reportTimeout
failpoint.Inject("resetTimeoutForTest", func(val failpoint.Value) {
if val.(bool) {
Expand All @@ -402,10 +425,13 @@ func (tsr *RemoteTopSQLReporter) doReport(data reportData) {
}
})
ctx, cancel := context.WithTimeout(tsr.ctx, timeout)

start := time.Now()
err := tsr.client.Send(ctx, agentAddr, data)
if err != nil {
logutil.BgLogger().Warn("[top-sql] client failed to send data", zap.Error(err))
reportAllDurationFailedHistogram.Observe(time.Since(start).Seconds())
} else {
reportAllDurationSuccHistogram.Observe(time.Since(start).Seconds())
}
cancel()
}

0 comments on commit fee39d3

Please sign in to comment.