Skip to content

Commit

Permalink
*: add telemetry support for CTE (pingcap#25022)
Browse files Browse the repository at this point in the history
  • Loading branch information
wjhuang2016 authored Jun 3, 2021
1 parent 3e3f977 commit 0f151af
Show file tree
Hide file tree
Showing 13 changed files with 177 additions and 15 deletions.
11 changes: 9 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ func (a *recordSet) OnFetchReturned() {
a.stmt.LogSlowQuery(a.txnStartTS, a.lastErr == nil, true)
}

// TelemetryInfo records some telemetry information during execution.
type TelemetryInfo struct {
UseNonRecursive bool
UseRecursive bool
}

// ExecStmt implements the sqlexec.Statement interface, it builds a planner.Plan to an sqlexec.Statement.
type ExecStmt struct {
// GoCtx stores parent go context.Context for a stmt.
Expand Down Expand Up @@ -204,6 +210,7 @@ type ExecStmt struct {
// OutputNames will be set if using cached plan
OutputNames []*types.FieldName
PsStmt *plannercore.CachedPrepareStmt
Ti *TelemetryInfo
}

// PointGet short path for point exec directly from plan, keep only necessary steps
Expand Down Expand Up @@ -236,7 +243,7 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec
}
}
if a.PsStmt.Executor == nil {
b := newExecutorBuilder(a.Ctx, is)
b := newExecutorBuilder(a.Ctx, is, a.Ti)
newExecutor := b.build(a.Plan)
if b.err != nil {
return nil, b.err
Expand Down Expand Up @@ -782,7 +789,7 @@ func (a *ExecStmt) buildExecutor() (Executor, error) {
ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow
}

b := newExecutorBuilder(ctx, a.InfoSchema)
b := newExecutorBuilder(ctx, a.InfoSchema, a.Ti)
b.snapshotTS = a.SnapshotTS
e := b.build(a.Plan)
if b.err != nil {
Expand Down
10 changes: 5 additions & 5 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func buildHashAggExecutor(ctx sessionctx.Context, src Executor, schema *expressi
plan.SetSchema(schema)
plan.Init(ctx, nil, 0)
plan.SetChildren(nil)
b := newExecutorBuilder(ctx, nil)
b := newExecutorBuilder(ctx, nil, nil)
exec := b.build(plan)
hashAgg := exec.(*HashAggExec)
hashAgg.children[0] = src
Expand Down Expand Up @@ -342,7 +342,7 @@ func buildStreamAggExecutor(ctx sessionctx.Context, srcExec Executor, schema *ex
plan = sg
}

b := newExecutorBuilder(ctx, nil)
b := newExecutorBuilder(ctx, nil, nil)
return b.build(plan)
}

Expand Down Expand Up @@ -575,7 +575,7 @@ func buildWindowExecutor(ctx sessionctx.Context, windowFunc string, funcs int, f
plan = win
}

b := newExecutorBuilder(ctx, nil)
b := newExecutorBuilder(ctx, nil, nil)
exec := b.build(plan)
return exec
}
Expand Down Expand Up @@ -1290,7 +1290,7 @@ func prepare4IndexInnerHashJoin(tc *indexJoinTestCase, outerDS *mockDataSource,
hashCols: tc.outerHashKeyIdx,
},
innerCtx: innerCtx{
readerBuilder: &dataReaderBuilder{Plan: &mockPhysicalIndexReader{e: innerDS}, executorBuilder: newExecutorBuilder(tc.ctx, nil)},
readerBuilder: &dataReaderBuilder{Plan: &mockPhysicalIndexReader{e: innerDS}, executorBuilder: newExecutorBuilder(tc.ctx, nil, nil)},
rowTypes: rightTypes,
colLens: colLens,
keyCols: tc.innerJoinKeyIdx,
Expand Down Expand Up @@ -1356,7 +1356,7 @@ func prepare4IndexMergeJoin(tc *indexJoinTestCase, outerDS *mockDataSource, inne
compareFuncs: outerCompareFuncs,
},
innerMergeCtx: innerMergeCtx{
readerBuilder: &dataReaderBuilder{Plan: &mockPhysicalIndexReader{e: innerDS}, executorBuilder: newExecutorBuilder(tc.ctx, nil)},
readerBuilder: &dataReaderBuilder{Plan: &mockPhysicalIndexReader{e: innerDS}, executorBuilder: newExecutorBuilder(tc.ctx, nil, nil)},
rowTypes: rightTypes,
joinKeys: innerJoinKeys,
colLens: colLens,
Expand Down
10 changes: 9 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type executorBuilder struct {
snapshotTSCached bool
err error // err is set when there is error happened during Executor building process.
hasLock bool
Ti *TelemetryInfo
}

// CTEStorages stores resTbl and iterInTbl for CTEExec.
Expand All @@ -92,10 +93,11 @@ type CTEStorages struct {
IterInTbl cteutil.Storage
}

func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema) *executorBuilder {
func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo) *executorBuilder {
return &executorBuilder{
ctx: ctx,
is: is,
Ti: ti,
}
}

Expand Down Expand Up @@ -4086,6 +4088,9 @@ func (b *executorBuilder) buildTableSample(v *plannercore.PhysicalTableSample) *

func (b *executorBuilder) buildCTE(v *plannercore.PhysicalCTE) Executor {
// 1. Build seedPlan.
if b.Ti != nil {
b.Ti.UseNonRecursive = true
}
seedExec := b.build(v.SeedPlan)
if b.err != nil {
return nil
Expand Down Expand Up @@ -4127,6 +4132,9 @@ func (b *executorBuilder) buildCTE(v *plannercore.PhysicalCTE) Executor {
}

// 3. Build recursive part.
if v.RecurPlan != nil && b.Ti != nil {
b.Ti.UseRecursive = true
}
recursiveExec := b.build(v.RecurPlan)
if b.err != nil {
return nil
Expand Down
1 change: 1 addition & 0 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
StmtNode: stmtNode,
Ctx: c.Ctx,
OutputNames: names,
Ti: &TelemetryInfo{},
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion executor/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (h *CoprocessorDAGHandler) buildDAGExecutor(req *coprocessor.Request) (Exec
}
plan = core.InjectExtraProjection(plan)
// Build executor.
b := newExecutorBuilder(h.sctx, is)
b := newExecutorBuilder(h.sctx, is, nil)
return b.build(plan), nil
}

Expand Down
2 changes: 1 addition & 1 deletion executor/executor_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@ func buildMergeJoinExec(ctx sessionctx.Context, joinType plannercore.JoinType, i
j.CompareFuncs = append(j.CompareFuncs, expression.GetCmpFunction(nil, j.LeftJoinKeys[i], j.RightJoinKeys[i]))
}

b := newExecutorBuilder(ctx, nil)
b := newExecutorBuilder(ctx, nil, nil)
return b.build(j)
}

Expand Down
1 change: 1 addition & 0 deletions executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ func CompileExecutePreparedStmt(ctx context.Context, sctx sessionctx.Context,
StmtNode: execStmt,
Ctx: sctx,
OutputNames: names,
Ti: &TelemetryInfo{},
}
if preparedPointer, ok := sctx.GetSessionVars().PreparedStmts[ID]; ok {
preparedObj, ok := preparedPointer.(*plannercore.CachedPrepareStmt)
Expand Down
1 change: 1 addition & 0 deletions metrics/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,5 @@ const (
LblInTxn = "in_txn"
LblVersion = "version"
LblHash = "hash"
LblCTEType = "cte_type"
)
68 changes: 68 additions & 0 deletions metrics/telemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics

import (
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
)

// Metrics
var (
TelemetrySQLCTECnt = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "telemetry",
Name: "non_recursive_cte_usage",
Help: "Counter of usage of CTE",
}, []string{LblCTEType})
)

// readCounter reads the value of a prometheus.Counter.
// Returns -1 when failing to read the value.
func readCounter(m prometheus.Counter) int64 {
// Actually, it's not recommended to read the value of prometheus metric types directly:
// https://github.com/prometheus/client_golang/issues/486#issuecomment-433345239
pb := &dto.Metric{}
// It's impossible to return an error though.
if err := m.Write(pb); err != nil {
return -1
}
return int64(pb.GetCounter().GetValue())
}

// CTEUsageCounter records the usages of CTE.
type CTEUsageCounter struct {
NonRecursiveCTEUsed int64 `json:"nonRecursiveCTEUsed"`
RecursiveUsed int64 `json:"recursiveUsed"`
NonCTEUsed int64 `json:"nonCTEUsed"`
}

// Sub returns the difference of two counters.
func (c CTEUsageCounter) Sub(rhs CTEUsageCounter) CTEUsageCounter {
new := CTEUsageCounter{}
new.NonRecursiveCTEUsed = c.NonRecursiveCTEUsed - rhs.NonRecursiveCTEUsed
new.RecursiveUsed = c.RecursiveUsed - rhs.RecursiveUsed
new.NonCTEUsed = c.NonCTEUsed - rhs.NonCTEUsed
return new
}

// GetCTECounter gets the TxnCommitCounter.
func GetCTECounter() CTEUsageCounter {
return CTEUsageCounter{
NonRecursiveCTEUsed: readCounter(TelemetrySQLCTECnt.With(prometheus.Labels{LblCTEType: "nonRecurCTE"})),
RecursiveUsed: readCounter(TelemetrySQLCTECnt.With(prometheus.Labels{LblCTEType: "recurCTE"})),
NonCTEUsed: readCounter(TelemetrySQLCTECnt.With(prometheus.Labels{LblCTEType: "notCTE"})),
}
}
30 changes: 26 additions & 4 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ var (
sessionExecuteParseDurationInternal = metrics.SessionExecuteParseDuration.WithLabelValues(metrics.LblInternal)
sessionExecuteParseDurationGeneral = metrics.SessionExecuteParseDuration.WithLabelValues(metrics.LblGeneral)

telemetryCTEUsage = metrics.TelemetrySQLCTECnt

tiKVGCAutoConcurrency = "tikv_gc_auto_concurrency"
)

Expand Down Expand Up @@ -609,7 +611,7 @@ func (s *session) doCommitWithRetry(ctx context.Context) error {
// Finally t1 will have more data than t2, with no errors return to user!
if s.isTxnRetryableError(err) && !s.sessionVars.BatchInsert && commitRetryLimit > 0 && !isPessimistic {
logutil.Logger(ctx).Warn("sql",
zap.String("label", s.getSQLLabel()),
zap.String("label", s.GetSQLLabel()),
zap.Error(err),
zap.String("txn", s.txn.GoString()))
// Transactions will retry 2 ~ commitRetryLimit times.
Expand All @@ -619,7 +621,7 @@ func (s *session) doCommitWithRetry(ctx context.Context) error {
err = s.retry(ctx, uint(maxRetryCount))
} else if !errIsNoisy(err) {
logutil.Logger(ctx).Warn("can not retry txn",
zap.String("label", s.getSQLLabel()),
zap.String("label", s.GetSQLLabel()),
zap.Error(err),
zap.Bool("IsBatchInsert", s.sessionVars.BatchInsert),
zap.Bool("IsPessimistic", isPessimistic),
Expand Down Expand Up @@ -734,7 +736,7 @@ const sqlLogMaxLen = 1024
// SchemaChangedWithoutRetry is used for testing.
var SchemaChangedWithoutRetry uint32

func (s *session) getSQLLabel() string {
func (s *session) GetSQLLabel() string {
if s.sessionVars.InRestrictedSQL {
return metrics.LblInternal
}
Expand Down Expand Up @@ -794,7 +796,7 @@ func (s *session) retry(ctx context.Context, maxCnt uint) (err error) {
var schemaVersion int64
sessVars := s.GetSessionVars()
orgStartTS := sessVars.TxnCtx.StartTS
label := s.getSQLLabel()
label := s.GetSQLLabel()
for {
s.PrepareTxnCtx(ctx)
s.sessionVars.RetryInfo.ResetOffset()
Expand Down Expand Up @@ -1649,6 +1651,7 @@ func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec.
return nil, err
}
rs, err = s.Exec(ctx)
se.updateTelemetryMetric(s.(*executor.ExecStmt))
sessVars.TxnCtx.StatementCount++
if rs != nil {
return &execStmtResult{
Expand Down Expand Up @@ -1816,6 +1819,7 @@ func (s *session) cachedPlanExec(ctx context.Context,
Ctx: s,
OutputNames: execPlan.OutputNames(),
PsStmt: prepareStmt,
Ti: &executor.TelemetryInfo{},
}
compileDuration := time.Since(s.sessionVars.StartTime)
sessionExecuteCompileDurationGeneral.Observe(compileDuration.Seconds())
Expand Down Expand Up @@ -2926,3 +2930,21 @@ func (s *session) GetInfoSchema() sessionctx.InfoschemaMetaVersion {
}
return domain.GetDomain(s).InfoSchema()
}

func (s *session) updateTelemetryMetric(es *executor.ExecStmt) {
if es.Ti == nil {
return
}
if s.isInternal() {
return
}

ti := es.Ti
if ti.UseRecursive {
telemetryCTEUsage.WithLabelValues("recurCTE").Inc()
} else if ti.UseNonRecursive {
telemetryCTEUsage.WithLabelValues("nonRecurCTE").Inc()
} else {
telemetryCTEUsage.WithLabelValues("notCTE").Inc()
}
}
1 change: 1 addition & 0 deletions telemetry/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,6 @@ func generateTelemetryData(ctx sessionctx.Context, trackingID string) telemetryD

func postReportTelemetryData() {
postReportTxnUsage()
postReportCTEUsage()
postReportSlowQueryStats()
}
19 changes: 18 additions & 1 deletion telemetry/data_feature_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/infoschema"
m "github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv/metrics"
Expand All @@ -33,6 +34,7 @@ type featureUsage struct {
// key is the first 6 characters of sha2(TABLE_NAME, 256)
ClusterIndex *ClusterIndexUsage `json:"clusterIndex"`
TemporaryTable bool `json:"temporaryTable"`
CTE *m.CTEUsageCounter `json:"cte"`
}

func getFeatureUsage(ctx sessionctx.Context) (*featureUsage, error) {
Expand All @@ -49,7 +51,9 @@ func getFeatureUsage(ctx sessionctx.Context) (*featureUsage, error) {
// Avoid the circle dependency.
temporaryTable := ctx.(TemporaryTableFeatureChecker).TemporaryTableExists()

return &featureUsage{txnUsage, clusterIdxUsage, temporaryTable}, nil
cteUsage := GetCTEUsageInfo(ctx)

return &featureUsage{txnUsage, clusterIdxUsage, temporaryTable, cteUsage}, nil
}

// ClusterIndexUsage records the usage info of all the tables, no more than 10k tables
Expand Down Expand Up @@ -149,6 +153,7 @@ type TxnUsage struct {
}

var initialTxnCommitCounter metrics.TxnCommitCounter
var initialCTECounter m.CTEUsageCounter

// GetTxnUsageInfo gets the usage info of transaction related features. It's exported for tests.
func GetTxnUsageInfo(ctx sessionctx.Context) *TxnUsage {
Expand All @@ -168,3 +173,15 @@ func GetTxnUsageInfo(ctx sessionctx.Context) *TxnUsage {
func postReportTxnUsage() {
initialTxnCommitCounter = metrics.GetTxnCommitCounter()
}

// ResetCTEUsage resets CTE usages.
func postReportCTEUsage() {
initialCTECounter = m.GetCTECounter()
}

// GetCTEUsageInfo gets the CTE usages.
func GetCTEUsageInfo(ctx sessionctx.Context) *m.CTEUsageCounter {
curr := m.GetCTECounter()
diff := curr.Sub(initialCTECounter)
return &diff
}
Loading

0 comments on commit 0f151af

Please sign in to comment.