Skip to content

Commit

Permalink
distsql, metrics: refine distsql metrics. (pingcap#5774)
Browse files Browse the repository at this point in the history
Refine distsql metrics && Add scan keys metrics.
  • Loading branch information
coocood authored and shenli committed Feb 2, 2018
1 parent 4db4b45 commit 23bff27
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 86 deletions.
56 changes: 25 additions & 31 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/juju/errors"
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/terror"
Expand Down Expand Up @@ -53,8 +54,8 @@ type SelectResult interface {
// Fetch fetches partial results from client.
// The caller should call SetFields() before call Fetch().
Fetch(goctx.Context)
// ScanCount gets the total scan row count.
ScanCount() int64
// ScanKeys gets the total scan row count.
ScanKeys() int64
}

// PartialResult is the result from a single region server.
Expand All @@ -81,7 +82,8 @@ type selectResult struct {
selectResp *tipb.SelectResponse
respChkIdx int

scanCount int64
scanKeys int64 // number of keys scanned by TiKV.
partialCount int64 // number of partial results.
}

type newResultWithErr struct {
Expand All @@ -100,7 +102,7 @@ func (r *selectResult) fetch(goCtx goctx.Context) {
defer func() {
close(r.results)
duration := time.Since(startTime)
queryHistgram.WithLabelValues(r.label).Observe(duration.Seconds())
metrics.DistSQLQueryHistgram.WithLabelValues(r.label).Observe(duration.Seconds())
}()
for {
resultSubset, err := r.resp.Next(goCtx)
Expand Down Expand Up @@ -136,16 +138,21 @@ func (r *selectResult) Next(goCtx goctx.Context) (PartialResult, error) {
pr.rowLen = r.rowLen
err := pr.unmarshal(re.result)
if len(pr.resp.OutputCounts) > 0 {
r.scanCount += pr.resp.OutputCounts[0]
scanKeysPartial := pr.resp.OutputCounts[0]
metrics.DistSQLScanKeysPartialHistogram.Observe(float64(scanKeysPartial))
r.scanKeys += scanKeysPartial
} else {
r.scanCount = -1
r.scanKeys = -1
}
r.partialCount++
return pr, errors.Trace(err)
}

// NextRaw returns the next raw partial result.
func (r *selectResult) NextRaw(goCtx goctx.Context) ([]byte, error) {
re := <-r.results
r.partialCount++
r.scanKeys = -1
return re.result, errors.Trace(re.err)
}

Expand Down Expand Up @@ -187,19 +194,22 @@ func (r *selectResult) getSelectResp() error {
return errors.Trace(err)
}
if len(r.selectResp.OutputCounts) > 0 {
r.scanCount += r.selectResp.OutputCounts[0]
scanCountPartial := r.selectResp.OutputCounts[0]
metrics.DistSQLScanKeysPartialHistogram.Observe(float64(scanCountPartial))
r.scanKeys += scanCountPartial
} else {
r.scanCount = -1
r.scanKeys = -1
}
r.partialCount++
if len(r.selectResp.Chunks) == 0 {
continue
}
return nil
}
}

func (r *selectResult) ScanCount() int64 {
return r.scanCount
func (r *selectResult) ScanKeys() int64 {
return r.scanKeys
}

func (r *selectResult) readRowsData(chk *chunk.Chunk) (err error) {
Expand All @@ -221,6 +231,10 @@ func (r *selectResult) readRowsData(chk *chunk.Chunk) (err error) {
// Close closes selectResult.
func (r *selectResult) Close() error {
// Close this channel tell fetch goroutine to exit.
if r.scanKeys >= 0 {
metrics.DistSQLScanKeysHistogram.Observe(float64(r.scanKeys))
}
metrics.DistSQLPartialCountHistogram.Observe(float64(r.partialCount))
close(r.closed)
return r.resp.Close()
}
Expand Down Expand Up @@ -289,19 +303,9 @@ func (pr *partialResult) Close() error {
// SelectDAG sends a DAG request, returns SelectResult.
// In kvReq, KeyRanges is required, Concurrency/KeepOrder/Desc/IsolationLevel/Priority are optional.
func SelectDAG(goCtx goctx.Context, ctx context.Context, kvReq *kv.Request, fieldTypes []*types.FieldType) (SelectResult, error) {
var err error
defer func() {
// Add metrics.
if err != nil {
queryCounter.WithLabelValues(queryFailed).Inc()
} else {
queryCounter.WithLabelValues(querySucc).Inc()
}
}()

resp := ctx.GetClient().Send(goCtx, kvReq)
if resp == nil {
err = errors.New("client returns nil response")
err := errors.New("client returns nil response")
return nil, errors.Trace(err)
}

Expand All @@ -327,16 +331,6 @@ func SelectDAG(goCtx goctx.Context, ctx context.Context, kvReq *kv.Request, fiel

// Analyze do a analyze request.
func Analyze(ctx goctx.Context, client kv.Client, kvReq *kv.Request) (SelectResult, error) {
var err error
defer func() {
// Add metrics.
if err != nil {
queryCounter.WithLabelValues(queryFailed).Inc()
} else {
queryCounter.WithLabelValues(querySucc).Inc()
}
}()

resp := client.Send(ctx, kvReq)
if resp == nil {
return nil, errors.New("client returns nil response")
Expand Down
46 changes: 0 additions & 46 deletions distsql/metrics.go

This file was deleted.

21 changes: 16 additions & 5 deletions distsql/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/juju/errors"
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
Expand All @@ -32,12 +33,13 @@ type streamResult struct {
ctx context.Context

// NOTE: curr == nil means stream finish, while len(curr.RowsData) == 0 doesn't.
curr *tipb.Chunk
scanCount int64
curr *tipb.Chunk
scanKeys int64
partialCount int64
}

func (r *streamResult) ScanCount() int64 {
return r.scanCount
func (r *streamResult) ScanKeys() int64 {
return r.scanKeys
}

func (r *streamResult) Fetch(goctx.Context) {}
Expand Down Expand Up @@ -98,8 +100,11 @@ func (r *streamResult) readDataFromResponse(goCtx goctx.Context, resp kv.Respons
return false, errors.Trace(err)
}
if len(stream.OutputCounts) > 0 {
r.scanCount += stream.OutputCounts[0]
partialScanKeys := stream.OutputCounts[0]
metrics.DistSQLScanKeysPartialHistogram.Observe(float64(partialScanKeys))
r.scanKeys += partialScanKeys
}
r.partialCount++
return false, nil
}

Expand Down Expand Up @@ -143,10 +148,16 @@ func (r *streamResult) flushToChunk(chk *chunk.Chunk) (err error) {
}

func (r *streamResult) NextRaw(goCtx goctx.Context) ([]byte, error) {
r.partialCount++
r.scanKeys = -1
return r.resp.Next(goCtx)
}

func (r *streamResult) Close() error {
if r.scanKeys >= 0 {
metrics.DistSQLScanKeysHistogram.Observe(float64(r.scanKeys))
}
metrics.DistSQLPartialCountHistogram.Observe(float64(r.partialCount))
return nil
}

Expand Down
8 changes: 4 additions & 4 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ type IndexReaderExecutor struct {

// Close implements the Executor Close interface.
func (e *IndexReaderExecutor) Close() error {
e.feedback.SetIndexRanges(e.ranges).SetActual(e.result.ScanCount())
e.feedback.SetIndexRanges(e.ranges).SetActual(e.result.ScanKeys())
e.ctx.StoreQueryFeedback(e.feedback)
err := closeAll(e.result, e.partialResult)
e.result = nil
Expand Down Expand Up @@ -654,7 +654,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(goCtx goctx.Context, kvRanges []k
go func() {
goCtx1, cancel := goctx.WithCancel(goCtx)
err := worker.fetchHandles(goCtx1, result)
scanCount := result.ScanCount()
scanCount := result.ScanKeys()
if err != nil {
scanCount = -1
}
Expand Down Expand Up @@ -1094,8 +1094,8 @@ func (tr *tableResultHandler) Close() error {

func (tr *tableResultHandler) totalCount() (count int64) {
if tr.optionalResult != nil {
count += tr.optionalResult.ScanCount()
count += tr.optionalResult.ScanKeys()
}
count += tr.result.ScanCount()
count += tr.result.ScanKeys()
return count
}
62 changes: 62 additions & 0 deletions metrics/distsql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics

import (
"github.com/prometheus/client_golang/prometheus"
)

// distsql metrics.
var (
DistSQLQueryHistgram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "distsql",
Name: "handle_query_duration_seconds",
Help: "Bucketed histogram of processing time (s) of handled queries.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
}, []string{LblType})

DistSQLScanKeysPartialHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "distsql",
Name: "scan_keys_partial",
Help: "number of scanned keys for each partial result.",
},
)
DistSQLScanKeysHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "distsql",
Name: "scan_keys",
Help: "number of scanned keys for each query.",
},
)
DistSQLPartialCountHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "distsql",
Name: "partial_count",
Help: "number of partial results for each query.",
},
)
)

func init() {
prometheus.MustRegister(DistSQLQueryHistgram)
prometheus.MustRegister(DistSQLScanKeysPartialHistogram)
prometheus.MustRegister(DistSQLScanKeysHistogram)
prometheus.MustRegister(DistSQLPartialCountHistogram)
}

0 comments on commit 23bff27

Please sign in to comment.