Skip to content

Commit

Permalink
*: lazy eval explain id and tracker label (pingcap#10139)
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu authored and zz-jason committed Apr 22, 2019
1 parent 92bdfb5 commit d6396da
Show file tree
Hide file tree
Showing 26 changed files with 192 additions and 95 deletions.
3 changes: 2 additions & 1 deletion distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package distsql

import (
"context"
"fmt"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -82,7 +83,7 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
// The difference from Select is that SelectWithRuntimeStats will set copPlanIDs into selectResult,
// which can help selectResult to collect runtime stats.
func SelectWithRuntimeStats(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request,
fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []string) (SelectResult, error) {
fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []fmt.Stringer) (SelectResult, error) {
sr, err := Select(ctx, sctx, kvReq, fieldTypes, fb)
if err != nil {
return sr, err
Expand Down
11 changes: 9 additions & 2 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package distsql

import (
"context"
"fmt"
"sync"
"testing"
"time"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tipb/go-tipb"
)

Expand Down Expand Up @@ -63,7 +65,12 @@ func (s *testSuite) createSelectNormal(batch, totalRows int, c *C, planIDs []str
if planIDs == nil {
response, err = Select(context.TODO(), s.sctx, request, colTypes, statistics.NewQueryFeedback(0, nil, 0, false))
} else {
response, err = SelectWithRuntimeStats(context.TODO(), s.sctx, request, colTypes, statistics.NewQueryFeedback(0, nil, 0, false), planIDs)
var planIDFuncs []fmt.Stringer
for i := range planIDs {
idx := i
planIDFuncs = append(planIDFuncs, stringutil.StringerStr(planIDs[idx]))
}
response, err = SelectWithRuntimeStats(context.TODO(), s.sctx, request, colTypes, statistics.NewQueryFeedback(0, nil, 0, false), planIDFuncs)
}

c.Assert(err, IsNil)
Expand Down Expand Up @@ -115,7 +122,7 @@ func (s *testSuite) TestSelectWithRuntimeStats(c *C) {
c.Fatal("invalid copPlanIDs")
}
for i := range planIDs {
if response.copPlanIDs[i] != planIDs[i] {
if response.copPlanIDs[i].String() != planIDs[i] {
c.Fatal("invalid copPlanIDs")
}
}
Expand Down
3 changes: 2 additions & 1 deletion distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package distsql

import (
"fmt"
"math"

"github.com/pingcap/parser/mysql"
Expand Down Expand Up @@ -43,7 +44,7 @@ func (builder *RequestBuilder) Build() (*kv.Request, error) {
}

// SetMemTracker sets a memTracker for this request.
func (builder *RequestBuilder) SetMemTracker(sctx sessionctx.Context, label string) *RequestBuilder {
func (builder *RequestBuilder) SetMemTracker(sctx sessionctx.Context, label fmt.Stringer) *RequestBuilder {
t := memory.NewTracker(label, sctx.GetSessionVars().MemQuotaDistSQL)
t.AttachTo(sctx.GetSessionVars().StmtCtx.MemTracker)
builder.Request.MemTracker = t
Expand Down
5 changes: 3 additions & 2 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package distsql

import (
"context"
"fmt"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -74,7 +75,7 @@ type selectResult struct {

// copPlanIDs contains all copTasks' planIDs,
// which help to collect copTasks' runtime stats.
copPlanIDs []string
copPlanIDs []fmt.Stringer

memTracker *memory.Tracker
}
Expand Down Expand Up @@ -207,7 +208,7 @@ func (r *selectResult) updateCopRuntimeStats(callee string) {
detail.NumProducedRows != nil && detail.NumIterations != nil {
planID := r.copPlanIDs[i]
r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.
RecordOneCopTask(planID, callee, detail)
RecordOneCopTask(planID.String(), callee, detail)
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/stringutil"
)

var (
Expand Down Expand Up @@ -127,7 +128,7 @@ func (mds *mockDataSource) Next(ctx context.Context, req *chunk.RecordBatch) err
}

func buildMockDataSource(opt mockDataSourceParameters) *mockDataSource {
baseExec := newBaseExecutor(opt.ctx, opt.schema, "")
baseExec := newBaseExecutor(opt.ctx, opt.schema, stringutil.StringerStr(""))
m := &mockDataSource{baseExec, opt, nil, nil, 0}
types := m.retTypes()
colData := make([][]interface{}, len(types))
Expand Down
11 changes: 9 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor
import (
"bytes"
"context"
"fmt"
"math"
"sort"
"strings"
Expand Down Expand Up @@ -46,6 +47,7 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tipb/go-tipb"
)
Expand Down Expand Up @@ -666,9 +668,14 @@ func (b *executorBuilder) buildReplace(vals *InsertValues) Executor {
return replaceExec
}

var (
grantStmtLabel fmt.Stringer = stringutil.StringerStr("GrantStmt")
revokeStmtLabel fmt.Stringer = stringutil.StringerStr("RevokeStmt")
)

func (b *executorBuilder) buildGrant(grant *ast.GrantStmt) Executor {
e := &GrantExec{
baseExecutor: newBaseExecutor(b.ctx, nil, "GrantStmt"),
baseExecutor: newBaseExecutor(b.ctx, nil, grantStmtLabel),
Privs: grant.Privs,
ObjectType: grant.ObjectType,
Level: grant.Level,
Expand All @@ -681,7 +688,7 @@ func (b *executorBuilder) buildGrant(grant *ast.GrantStmt) Executor {

func (b *executorBuilder) buildRevoke(revoke *ast.RevokeStmt) Executor {
e := &RevokeExec{
baseExecutor: newBaseExecutor(b.ctx, nil, "RevokeStmt"),
baseExecutor: newBaseExecutor(b.ctx, nil, revokeStmtLabel),
ctx: b.ctx,
Privs: revoke.Privs,
ObjectType: revoke.ObjectType,
Expand Down
28 changes: 18 additions & 10 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package executor

import (
"context"
"fmt"
"math"
"runtime"
"sort"
Expand All @@ -41,6 +42,7 @@ import (
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -258,7 +260,7 @@ func (e *IndexReaderExecutor) Close() error {
err := e.result.Close()
e.result = nil
if e.runtimeStats != nil {
copStats := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.plans[0].ExplainID())
copStats := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.plans[0].ExplainID().String())
copStats.SetRowNum(e.feedback.Actual())
}
e.ctx.StoreQueryFeedback(e.feedback)
Expand Down Expand Up @@ -299,6 +301,8 @@ func (e *IndexReaderExecutor) Open(ctx context.Context) error {
return e.open(ctx, kvRanges)
}

var indexReaderDistSQLTrackerLabel fmt.Stringer = stringutil.StringerStr("IndexReaderDistSQLTracker")

func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error {
var err error
if e.corColInFilter {
Expand All @@ -320,7 +324,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
SetKeepOrder(e.keepOrder).
SetStreaming(e.streaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetMemTracker(e.ctx, "IndexReaderDistSQLTracker").
SetMemTracker(e.ctx, indexReaderDistSQLTrackerLabel).
Build()
if err != nil {
e.feedback.Invalidate()
Expand Down Expand Up @@ -442,6 +446,8 @@ func (e *IndexLookUpExecutor) startWorkers(ctx context.Context, initBatchSize in
return nil
}

var indexLookupDistSQLTrackerLabel fmt.Stringer = stringutil.StringerStr("IndexLookupDistSQLTracker")

// startIndexWorker launch a background goroutine to fetch handles, send the results to workCh.
func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []kv.KeyRange, workCh chan<- *lookupTableTask, initBatchSize int) error {
if e.runtimeStats != nil {
Expand All @@ -456,7 +462,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
SetKeepOrder(e.keepOrder).
SetStreaming(e.indexStreaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetMemTracker(e.ctx, "IndexLookupDistSQLTracker").
SetMemTracker(e.ctx, indexLookupDistSQLTrackerLabel).
Build()
if err != nil {
return err
Expand Down Expand Up @@ -492,9 +498,9 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
logutil.Logger(ctx).Error("close Select result failed", zap.Error(err))
}
if e.runtimeStats != nil {
copStats := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.idxPlans[len(e.idxPlans)-1].ExplainID())
copStats := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.idxPlans[len(e.idxPlans)-1].ExplainID().String())
copStats.SetRowNum(count)
copStats = e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.tblPlans[0].ExplainID())
copStats = e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.tblPlans[0].ExplainID().String())
copStats.SetRowNum(count)
}
e.ctx.StoreQueryFeedback(e.feedback)
Expand All @@ -505,6 +511,8 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
return nil
}

var tableWorkerLabel fmt.Stringer = stringutil.StringerStr("tableWorker")

// startTableWorker launchs some background goroutines which pick tasks from workCh and execute the task.
func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-chan *lookupTableTask) {
lookupConcurrencyLimit := e.ctx.GetSessionVars().IndexLookupConcurrency
Expand All @@ -517,7 +525,7 @@ func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-cha
keepOrder: e.keepOrder,
handleIdx: e.handleIdx,
isCheckOp: e.isCheckOp,
memTracker: memory.NewTracker("tableWorker", -1),
memTracker: memory.NewTracker(tableWorkerLabel, -1),
}
worker.memTracker.AttachTo(e.memTracker)
ctx1, cancel := context.WithCancel(ctx)
Expand All @@ -531,7 +539,7 @@ func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-cha

func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, handles []int64) (Executor, error) {
tableReaderExec := &TableReaderExecutor{
baseExecutor: newBaseExecutor(e.ctx, e.schema, e.id+"_tableReader"),
baseExecutor: newBaseExecutor(e.ctx, e.schema, stringutil.MemoizeStr(func() string { return e.id.String() + "_tableReader" })),
table: e.table,
dagPB: e.tableRequest,
streaming: e.tableStreaming,
Expand Down Expand Up @@ -565,7 +573,7 @@ func (e *IndexLookUpExecutor) Close() error {
e.memTracker.Detach()
e.memTracker = nil
if e.runtimeStats != nil {
copStats := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.idxPlans[0].ExplainID())
copStats := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.idxPlans[0].ExplainID().String())
copStats.SetRowNum(e.feedback.Actual())
}
return nil
Expand Down Expand Up @@ -852,8 +860,8 @@ func GetLackHandles(expectedHandles []int64, obtainedHandlesMap map[int64]struct
return diffHandles
}

func getPhysicalPlanIDs(plans []plannercore.PhysicalPlan) []string {
planIDs := make([]string, 0, len(plans))
func getPhysicalPlanIDs(plans []plannercore.PhysicalPlan) []fmt.Stringer {
planIDs := make([]fmt.Stringer, 0, len(plans))
for _, p := range plans {
planIDs = append(planIDs, p.ExplainID())
}
Expand Down
9 changes: 5 additions & 4 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/stringutil"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -76,7 +77,7 @@ var (

type baseExecutor struct {
ctx sessionctx.Context
id string
id fmt.Stringer
schema *expression.Schema
initCap int
maxChunkSize int
Expand Down Expand Up @@ -130,7 +131,7 @@ func (e *baseExecutor) Next(ctx context.Context, req *chunk.RecordBatch) error {
return nil
}

func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id string, children ...Executor) baseExecutor {
func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id fmt.Stringer, children ...Executor) baseExecutor {
e := baseExecutor{
children: children,
ctx: ctx,
Expand All @@ -140,7 +141,7 @@ func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id strin
maxChunkSize: ctx.GetSessionVars().MaxChunkSize,
}
if ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil {
e.runtimeStats = e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.id)
e.runtimeStats = e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.id.String())
}
if schema != nil {
cols := schema.Columns
Expand Down Expand Up @@ -1291,7 +1292,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
vars := ctx.GetSessionVars()
sc := &stmtctx.StatementContext{
TimeZone: vars.Location(),
MemTracker: memory.NewTracker(s.Text(), vars.MemQuotaQuery),
MemTracker: memory.NewTracker(stringutil.MemoizeStr(s.Text), vars.MemQuotaQuery),
}
switch config.GetGlobalConfig().OOMAction {
case config.OOMActionCancel:
Expand Down
3 changes: 2 additions & 1 deletion executor/executor_pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/stringutil"
)

var _ = Suite(&testExecSuite{})
Expand Down Expand Up @@ -92,7 +93,7 @@ func (s *testExecSuite) TestShowProcessList(c *C) {

// Compose executor.
e := &ShowExec{
baseExecutor: newBaseExecutor(sctx, schema, ""),
baseExecutor: newBaseExecutor(sctx, schema, stringutil.StringerStr("")),
Tp: ast.ShowProcessList,
}

Expand Down
Loading

0 comments on commit d6396da

Please sign in to comment.