Skip to content

Commit

Permalink
executor: add runtime information for point-get executor (pingcap#18666)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored Jul 28, 2020
1 parent 901d531 commit 8b19d67
Show file tree
Hide file tree
Showing 9 changed files with 208 additions and 7 deletions.
4 changes: 0 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor/aggfuncs"
Expand Down Expand Up @@ -98,9 +97,6 @@ type MockPhysicalPlan interface {
}

func (b *executorBuilder) build(p plannercore.Plan) Executor {
if config.GetGlobalConfig().EnableCollectExecutionInfo && b.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil {
b.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = execdetails.NewRuntimeStatsColl()
}
switch v := p.(type) {
case nil:
return nil
Expand Down
9 changes: 7 additions & 2 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1533,10 +1533,11 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
TaskID: stmtctx.AllocateTaskID(),
}
sc.MemTracker.AttachToGlobalTracker(GlobalMemoryUsageTracker)
if config.GetGlobalConfig().OOMUseTmpStorage && GlobalDiskUsageTracker != nil {
globalConfig := config.GetGlobalConfig()
if globalConfig.OOMUseTmpStorage && GlobalDiskUsageTracker != nil {
sc.DiskTracker.AttachToGlobalTracker(GlobalDiskUsageTracker)
}
switch config.GetGlobalConfig().OOMAction {
switch globalConfig.OOMAction {
case config.OOMActionCancel:
action := &memory.PanicOnExceed{ConnID: ctx.GetSessionVars().ConnectionID}
action.SetLogHook(domain.GetDomain(ctx).ExpensiveQueryHandle().LogOnQueryExceedMemQuota)
Expand Down Expand Up @@ -1653,6 +1654,10 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
} else if vars.StmtCtx.InSelectStmt {
sc.PrevAffectedRows = -1
}
if globalConfig.EnableCollectExecutionInfo {
sc.RuntimeStatsColl = execdetails.NewRuntimeStatsColl()
}

sc.TblInfo2UnionScan = make(map[*model.TableInfo]bool)
errCount, warnCount := vars.StmtCtx.NumErrorWarnings()
vars.SysErrorCount = errCount
Expand Down
38 changes: 38 additions & 0 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ import (
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/rowcodec"
)

Expand Down Expand Up @@ -83,6 +85,8 @@ type PointGetExecutor struct {

// virtualColumnRetFieldTypes records the RetFieldTypes of virtual columns.
virtualColumnRetFieldTypes []*types.FieldType

stats *pointGetRuntimeStats
}

// Init set fields needed for PointGetExecutor reuse, this does NOT change baseExecutor field
Expand Down Expand Up @@ -133,6 +137,15 @@ func (e *PointGetExecutor) Open(context.Context) error {
return err
}
}
if e.runtimeStats != nil {
snapshotStats := &tikv.SnapshotRuntimeStats{}
e.stats = &pointGetRuntimeStats{
BasicRuntimeStats: e.runtimeStats,
SnapshotRuntimeStats: snapshotStats,
}
e.snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats)
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id.String(), e.stats)
}
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
Expand All @@ -142,6 +155,9 @@ func (e *PointGetExecutor) Open(context.Context) error {

// Close implements the Executor interface.
func (e *PointGetExecutor) Close() error {
if e.runtimeStats != nil {
e.snapshot.DelOption(kv.CollectRuntimeStats)
}
return nil
}

Expand Down Expand Up @@ -435,3 +451,25 @@ func getColInfoByID(tbl *model.TableInfo, colID int64) *model.ColumnInfo {
}
return nil
}

type pointGetRuntimeStats struct {
*execdetails.BasicRuntimeStats
*tikv.SnapshotRuntimeStats
}

func (e *pointGetRuntimeStats) String() string {
var basic, rpcStatsStr string
if e.BasicRuntimeStats != nil {
basic = e.BasicRuntimeStats.String()
}
if e.SnapshotRuntimeStats != nil {
rpcStatsStr = e.SnapshotRuntimeStats.String()
}
if rpcStatsStr == "" {
return basic
}
if basic == "" {
return rpcStatsStr
}
return basic + ", " + rpcStatsStr
}
2 changes: 2 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ const (
TaskID
// InfoSchema is schema version used by txn startTS.
InfoSchema
// CollectRuntimeStats is used to enable collect runtime stats.
CollectRuntimeStats
)

// Priority value for transaction priority.
Expand Down
19 changes: 19 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
package core_test

import (
"bytes"
"fmt"
"strings"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -1420,3 +1422,20 @@ func (s *testIntegrationSuite) TestIndexJoinOnClusteredIndex(c *C) {
tk.MustQuery(tt).Check(testkit.Rows(output[i].Res...))
}
}

func (s *testIntegrationSerialSuite) TestExplainAnalyzePointGet(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int primary key, b varchar(20))")
tk.MustExec("insert into t values (1,1)")

res := tk.MustQuery("explain analyze select * from t where a=1;")
resBuff := bytes.NewBufferString("")
for _, row := range res.Rows() {
fmt.Fprintf(resBuff, "%s\n", row)
}
explain := resBuff.String()
c.Assert(strings.Contains(explain, "Get:{num_rpc:"), IsTrue, Commentf("%s", explain))
c.Assert(strings.Contains(explain, "total_time:"), IsTrue, Commentf("%s", explain))
}
7 changes: 7 additions & 0 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,13 +839,19 @@ type clientHelper struct {
*minCommitTSPushed
Client
resolveLite bool
stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats
}

// ResolveLocks wraps the ResolveLocks function and store the resolved result.
func (ch *clientHelper) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks []*Lock) (int64, error) {
var err error
var resolvedLocks []uint64
var msBeforeTxnExpired int64
if ch.stats != nil {
defer func(start time.Time) {
recordRegionRequestRuntimeStats(ch.stats, tikvrpc.CmdResolveLock, time.Since(start))
}(time.Now())
}
if ch.resolveLite {
msBeforeTxnExpired, resolvedLocks, err = ch.LockResolver.resolveLocksLite(bo, callerStartTS, locks)
} else {
Expand All @@ -867,6 +873,7 @@ func (ch *clientHelper) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, regionID
if len(directStoreAddr) > 0 {
sender.storeAddr = directStoreAddr
}
sender.stats = ch.stats
req.Context.ResolvedLocks = ch.minCommitTSPushed.Get()
resp, ctx, err := sender.SendReqCtx(bo, req, regionID, timeout, sType)
return resp, ctx, sender.storeAddr, err
Expand Down
33 changes: 33 additions & 0 deletions store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ type RegionRequestSender struct {
storeAddr string
rpcError error
failStoreIDs map[uint64]struct{}
stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats
}

// RegionRequestRuntimeStats records the runtime stats of send region requests.
type RegionRequestRuntimeStats struct {
count int64
// Send region request consume time.
consume int64
}

// RegionBatchRequestSender sends BatchCop requests to TiFlash server by stream way.
Expand All @@ -85,6 +93,11 @@ func (ss *RegionBatchRequestSender) sendStreamReqToAddr(bo *Backoffer, ctxs []co
if rawHook := ctx.Value(RPCCancellerCtxKey{}); rawHook != nil {
ctx, cancel = rawHook.(*RPCCanceller).WithCancel(ctx)
}
if ss.stats != nil {
defer func(start time.Time) {
recordRegionRequestRuntimeStats(ss.stats, req.Type, time.Since(start))
}(time.Now())
}
resp, err = ss.client.SendRequest(ctx, rpcCtx.Addr, req, timout)
if err != nil {
cancel()
Expand All @@ -101,6 +114,19 @@ func (ss *RegionBatchRequestSender) sendStreamReqToAddr(bo *Backoffer, ctxs []co
return
}

func recordRegionRequestRuntimeStats(stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats, cmd tikvrpc.CmdType, d time.Duration) {
stat, ok := stats[cmd]
if !ok {
stats[cmd] = &RegionRequestRuntimeStats{
count: 1,
consume: int64(d),
}
return
}
stat.count++
stat.consume += int64(d)
}

func (ss *RegionBatchRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err error) error {
// If it failed because the context is cancelled by ourself, don't retry.
if errors.Cause(err) == context.Canceled || status.Code(errors.Cause(err)) == codes.Canceled {
Expand Down Expand Up @@ -328,6 +354,13 @@ func (s *RegionRequestSender) sendReqToRegion(bo *Backoffer, rpcCtx *RPCContext,
}
defer s.releaseStoreToken(rpcCtx.Store)
}

if s.stats != nil {
defer func(start time.Time) {
recordRegionRequestRuntimeStats(s.stats, req.Type, time.Since(start))
}(time.Now())
}

ctx := bo.ctx
if rawHook := ctx.Value(RPCCancellerCtxKey{}); rawHook != nil {
var cancel context.CancelFunc
Expand Down
85 changes: 84 additions & 1 deletion store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type tikvSnapshot struct {
hitCnt int64
cached map[string][]byte
}
stats *SnapshotRuntimeStats
}

// newTiKVSnapshot creates a snapshot of an TiKV store.
Expand Down Expand Up @@ -143,6 +144,7 @@ func (s *tikvSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string]
m[string(k)] = v
mu.Unlock()
})
s.recordBackoffInfo(bo)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -236,6 +238,12 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll
minCommitTSPushed: &s.minCommitTSPushed,
Client: s.store.client,
}
if s.stats != nil {
cli.stats = make(map[tikvrpc.CmdType]*RegionRequestRuntimeStats)
defer func() {
s.mergeRegionRequestStats(cli.stats)
}()
}

pending := batch.keys
for {
Expand Down Expand Up @@ -317,7 +325,9 @@ func (s *tikvSnapshot) Get(ctx context.Context, k kv.Key) ([]byte, error) {
}(time.Now())

ctx = context.WithValue(ctx, txnStartKey, s.version.Ver)
val, err := s.get(NewBackofferWithVars(ctx, getMaxBackoff, s.vars), k)
bo := NewBackofferWithVars(ctx, getMaxBackoff, s.vars)
val, err := s.get(bo, k)
s.recordBackoffInfo(bo)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -357,6 +367,12 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {
Client: s.store.client,
resolveLite: true,
}
if s.stats != nil {
cli.stats = make(map[tikvrpc.CmdType]*RegionRequestRuntimeStats)
defer func() {
s.mergeRegionRequestStats(cli.stats)
}()
}

req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet,
&pb.GetRequest{
Expand Down Expand Up @@ -435,6 +451,8 @@ func (s *tikvSnapshot) SetOption(opt kv.Option, val interface{}) {
s.priority = kvPriorityToCommandPri(val.(int))
case kv.TaskID:
s.taskID = val.(uint64)
case kv.CollectRuntimeStats:
s.stats = val.(*SnapshotRuntimeStats)
}
}

Expand All @@ -443,6 +461,8 @@ func (s *tikvSnapshot) DelOption(opt kv.Option) {
switch opt {
case kv.ReplicaRead:
s.replicaRead = kv.ReplicaReadLeader
case kv.CollectRuntimeStats:
s.stats = nil
}
}

Expand Down Expand Up @@ -557,3 +577,66 @@ func prettyWriteKey(buf *bytes.Buffer, key []byte) {
logutil.BgLogger().Error("error", zap.Error(err4))
}
}

func (s *tikvSnapshot) recordBackoffInfo(bo *Backoffer) {
if s.stats == nil || bo.totalSleep == 0 {
return
}
s.mu.Lock()
defer s.mu.Unlock()
if s.stats.backoffSleepMS == nil {
s.stats.backoffSleepMS = bo.backoffSleepMS
s.stats.backoffTimes = bo.backoffTimes
return
}
for k, v := range bo.backoffSleepMS {
s.stats.backoffSleepMS[k] += v
}
for k, v := range bo.backoffTimes {
s.stats.backoffTimes[k] += v
}
}

func (s *tikvSnapshot) mergeRegionRequestStats(stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats) {
s.mu.Lock()
defer s.mu.Unlock()
if s.stats.rpcStats == nil {
s.stats.rpcStats = stats
return
}
for k, v := range stats {
stat, ok := s.stats.rpcStats[k]
if !ok {
s.stats.rpcStats[k] = v
continue
}
stat.count += v.count
stat.consume += v.consume
}
}

// SnapshotRuntimeStats records the runtime stats of snapshot.
type SnapshotRuntimeStats struct {
rpcStats map[tikvrpc.CmdType]*RegionRequestRuntimeStats
backoffSleepMS map[backoffType]int
backoffTimes map[backoffType]int
}

// String implements fmt.Stringer interface.
func (rs *SnapshotRuntimeStats) String() string {
var buf bytes.Buffer
for k, v := range rs.rpcStats {
if buf.Len() > 0 {
buf.WriteByte(',')
}
buf.WriteString(fmt.Sprintf("%s:{num_rpc:%d, total_time:%s}", k.String(), v.count, time.Duration(v.consume)))
}
for k, v := range rs.backoffTimes {
if buf.Len() > 0 {
buf.WriteByte(',')
}
ms := rs.backoffSleepMS[k]
buf.WriteString(fmt.Sprintf("%s_backoff:{num:%d, total_time:%d ms}", k.String(), v, ms))
}
return buf.String()
}
Loading

0 comments on commit 8b19d67

Please sign in to comment.