Skip to content

Commit

Permalink
*: use int instead of fmt.Stringer as executor id (pingcap#19207)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored Aug 19, 2020
1 parent cd8654f commit a2e2ce6
Show file tree
Hide file tree
Showing 48 changed files with 342 additions and 359 deletions.
3 changes: 1 addition & 2 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package distsql

import (
"context"
"fmt"
"unsafe"

"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -92,7 +91,7 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
// The difference from Select is that SelectWithRuntimeStats will set copPlanIDs into selectResult,
// which can help selectResult to collect runtime stats.
func SelectWithRuntimeStats(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request,
fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []fmt.Stringer, rootPlanID fmt.Stringer) (SelectResult, error) {
fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []int, rootPlanID int) (SelectResult, error) {
sr, err := Select(ctx, sctx, kvReq, fieldTypes, fb)
if err == nil {
if selectResult, ok := sr.(*selectResult); ok {
Expand Down
19 changes: 6 additions & 13 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package distsql

import (
"context"
"fmt"
"sync"
"testing"
"time"
Expand All @@ -32,17 +31,16 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tipb/go-tipb"
)

func (s *testSuite) createSelectNormal(batch, totalRows int, c *C, planIDs []string) (*selectResult, []*types.FieldType) {
func (s *testSuite) createSelectNormal(batch, totalRows int, c *C, planIDs []int) (*selectResult, []*types.FieldType) {
request, err := (&RequestBuilder{}).SetKeyRanges(nil).
SetDAGRequest(&tipb.DAGRequest{}).
SetDesc(false).
SetKeepOrder(false).
SetFromSessionVars(variable.NewSessionVars()).
SetMemTracker(memory.NewTracker(stringutil.StringerStr("testSuite.createSelectNormal"), -1)).
SetMemTracker(memory.NewTracker(-1, -1)).
Build()
c.Assert(err, IsNil)

Expand All @@ -66,12 +64,7 @@ func (s *testSuite) createSelectNormal(batch, totalRows int, c *C, planIDs []str
if planIDs == nil {
response, err = Select(context.TODO(), s.sctx, request, colTypes, statistics.NewQueryFeedback(0, nil, 0, false))
} else {
var planIDFuncs []fmt.Stringer
for i := range planIDs {
idx := i
planIDFuncs = append(planIDFuncs, stringutil.StringerStr(planIDs[idx]))
}
response, err = SelectWithRuntimeStats(context.TODO(), s.sctx, request, colTypes, statistics.NewQueryFeedback(0, nil, 0, false), planIDFuncs, stringutil.StringerStr("root_0"))
response, err = SelectWithRuntimeStats(context.TODO(), s.sctx, request, colTypes, statistics.NewQueryFeedback(0, nil, 0, false), planIDs, 1)
}

c.Assert(err, IsNil)
Expand Down Expand Up @@ -134,13 +127,13 @@ func (s *testSuite) TestSelectNormalChunkSize(c *C) {
}

func (s *testSuite) TestSelectWithRuntimeStats(c *C) {
planIDs := []string{"1", "2", "3"}
planIDs := []int{1, 2, 3}
response, colTypes := s.createSelectNormal(1, 2, c, planIDs)
if len(response.copPlanIDs) != len(planIDs) {
c.Fatal("invalid copPlanIDs")
}
for i := range planIDs {
if response.copPlanIDs[i].String() != planIDs[i] {
if response.copPlanIDs[i] != planIDs[i] {
c.Fatal("invalid copPlanIDs")
}
}
Expand Down Expand Up @@ -440,7 +433,7 @@ func createSelectNormal(batch, totalRows int, ctx sessionctx.Context) (*selectRe
SetDesc(false).
SetKeepOrder(false).
SetFromSessionVars(variable.NewSessionVars()).
SetMemTracker(memory.NewTracker(stringutil.StringerStr("testSuite.createSelectNormal"), -1)).
SetMemTracker(memory.NewTracker(-1, -1)).
Build()

/// 4 int64 types.
Expand Down
5 changes: 2 additions & 3 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tipb/go-tipb"
)
Expand All @@ -57,8 +56,8 @@ type testSuite struct {
func (s *testSuite) SetUpSuite(c *C) {
ctx := mock.NewContext()
ctx.GetSessionVars().StmtCtx = &stmtctx.StatementContext{
MemTracker: memory.NewTracker(stringutil.StringerStr("testSuite"), -1),
DiskTracker: disk.NewTracker(stringutil.StringerStr("testSuite"), -1),
MemTracker: memory.NewTracker(-1, -1),
DiskTracker: disk.NewTracker(-1, -1),
}
ctx.Store = &mock.Store{
Client: &mock.Client{
Expand Down
16 changes: 6 additions & 10 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ type selectResult struct {

// copPlanIDs contains all copTasks' planIDs,
// which help to collect copTasks' runtime stats.
copPlanIDs []fmt.Stringer
rootPlanID fmt.Stringer
copPlanIDs []int
rootPlanID int

fetchDuration time.Duration
durationReported bool
Expand Down Expand Up @@ -248,7 +248,7 @@ func (r *selectResult) readFromChunk(ctx context.Context, chk *chunk.Chunk) erro

func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *tikv.CopRuntimeStats, respTime time.Duration) {
callee := copStats.CalleeAddress
if r.rootPlanID == nil || r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil || callee == "" {
if r.rootPlanID <= 0 || r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil || callee == "" {
return
}
if len(r.selectResp.GetExecutionSummaries()) != len(r.copPlanIDs) {
Expand All @@ -260,7 +260,7 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *tikv
}
if r.stats == nil {
stmtCtx := r.ctx.GetSessionVars().StmtCtx
id := r.rootPlanID.String()
id := r.rootPlanID
originRuntimeStats := stmtCtx.RuntimeStatsColl.GetRootStats(id)
r.stats = &selectResultRuntimeStats{
RuntimeStats: originRuntimeStats,
Expand All @@ -274,12 +274,8 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *tikv
for i, detail := range r.selectResp.GetExecutionSummaries() {
if detail != nil && detail.TimeProcessedNs != nil &&
detail.NumProducedRows != nil && detail.NumIterations != nil {
planID := ""
if detail.GetExecutorId() != "" {
planID = detail.GetExecutorId()
} else {
planID = r.copPlanIDs[i].String()
}
// Fixme: Use detail.GetExecutorId() if exist.
planID := r.copPlanIDs[i]
r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.
RecordOneCopTask(planID, callee, detail)
}
Expand Down
15 changes: 4 additions & 11 deletions distsql/select_result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package distsql

import (
"context"
"fmt"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/sessionctx/stmtctx"
Expand All @@ -30,7 +29,7 @@ func (s *testSuite) TestUpdateCopRuntimeStats(c *C) {
ctx.GetSessionVars().StmtCtx = new(stmtctx.StatementContext)
sr := selectResult{ctx: ctx}
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, IsNil)
sr.rootPlanID = copPlan{}
sr.rootPlanID = 1234
sr.updateCopRuntimeStats(context.Background(), &tikv.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "a"}}, 0)

ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = execdetails.NewRuntimeStatsColl()
Expand All @@ -42,17 +41,11 @@ func (s *testSuite) TestUpdateCopRuntimeStats(c *C) {
}
c.Assert(len(sr.selectResp.GetExecutionSummaries()) != len(sr.copPlanIDs), IsTrue)
sr.updateCopRuntimeStats(context.Background(), &tikv.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "callee"}}, 0)
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.ExistsCopStats("callee"), IsFalse)
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.ExistsCopStats(1234), IsFalse)

sr.copPlanIDs = []fmt.Stringer{copPlan{}}
sr.copPlanIDs = []int{sr.rootPlanID}
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, NotNil)
c.Assert(len(sr.selectResp.GetExecutionSummaries()), Equals, len(sr.copPlanIDs))
sr.updateCopRuntimeStats(context.Background(), &tikv.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "callee"}}, 0)
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetCopStats("callee").String(), Equals, "time:1ns, loops:1")
}

type copPlan struct{}

func (p copPlan) String() string {
return "callee"
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetCopStats(1234).String(), Equals, "time:1ns, loops:1")
}
2 changes: 1 addition & 1 deletion executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (e *HashAggExec) Close() error {
finalConcurrencyInfo := execdetails.NewConcurrencyInfo("FinalConcurrency", finalConcurrency)
runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats}
runtimeStats.SetConcurrencyInfo(partialConcurrencyInfo, finalConcurrencyInfo)
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id.String(), runtimeStats)
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats)
}
return e.baseExecutor.Close()
}
Expand Down
3 changes: 1 addition & 2 deletions executor/apply_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/kvcache"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/stringutil"
)

// applyCache is used in the apply executor. When we get the same value of the outer row.
Expand Down Expand Up @@ -47,7 +46,7 @@ func newApplyCache(ctx sessionctx.Context) (*applyCache, error) {
c := applyCache{
cache: cache,
memCapacity: ctx.GetSessionVars().NestedLoopJoinCacheCapacity,
memTracker: memory.NewTracker(stringutil.StringerStr("applyCache"), -1),
memTracker: memory.NewTracker(memory.LabelForApplyCache, -1),
}
return &c, nil
}
Expand Down
2 changes: 1 addition & 1 deletion executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (e *BatchPointGetExec) Open(context.Context) error {
SnapshotRuntimeStats: snapshotStats,
}
snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats)
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id.String(), e.stats)
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
Expand Down
30 changes: 15 additions & 15 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (mds *mockDataSource) Next(ctx context.Context, req *chunk.Chunk) error {
}

func buildMockDataSource(opt mockDataSourceParameters) *mockDataSource {
baseExec := newBaseExecutor(opt.ctx, opt.schema, nil)
baseExec := newBaseExecutor(opt.ctx, opt.schema, 0)
m := &mockDataSource{baseExec, opt, nil, nil, 0}
rTypes := retTypes(m)
colData := make([][]interface{}, len(rTypes))
Expand Down Expand Up @@ -742,8 +742,8 @@ func defaultHashJoinTestCase(cols []*types.FieldType, joinType core.JoinType, us
ctx := mock.NewContext()
ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(nil, -1)
ctx.GetSessionVars().StmtCtx.DiskTracker = disk.NewTracker(nil, -1)
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(-1, -1)
ctx.GetSessionVars().StmtCtx.DiskTracker = disk.NewTracker(-1, -1)
ctx.GetSessionVars().SetIndexLookupJoinConcurrency(4)
tc := &hashJoinTestCase{rows: 100000, concurrency: 4, ctx: ctx, keyIdx: []int{0, 1}, rawData: wideString}
tc.cols = cols
Expand Down Expand Up @@ -785,7 +785,7 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor)
probeKeys = append(probeKeys, cols1[keyIdx])
}
e := &HashJoinExec{
baseExecutor: newBaseExecutor(testCase.ctx, joinSchema, stringutil.StringerStr("HashJoin"), innerExec, outerExec),
baseExecutor: newBaseExecutor(testCase.ctx, joinSchema, 5, innerExec, outerExec),
concurrency: uint(testCase.concurrency),
joinType: testCase.joinType, // 0 for InnerJoin, 1 for LeftOutersJoin, 2 for RightOuterJoin
isOuterJoin: false,
Expand All @@ -809,9 +809,9 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor)
if testCase.disk {
memLimit = 1
}
t := memory.NewTracker(stringutil.StringerStr("root of prepare4HashJoin"), memLimit)
t := memory.NewTracker(-1, memLimit)
t.SetActionOnExceed(nil)
t2 := disk.NewTracker(stringutil.StringerStr("root of prepare4HashJoin"), -1)
t2 := disk.NewTracker(-1, -1)
e.ctx.GetSessionVars().StmtCtx.MemTracker = t
e.ctx.GetSessionVars().StmtCtx.DiskTracker = t2
return e
Expand Down Expand Up @@ -1131,8 +1131,8 @@ func defaultIndexJoinTestCase() *indexJoinTestCase {
ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().SnapshotTS = 1
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(nil, -1)
ctx.GetSessionVars().StmtCtx.DiskTracker = disk.NewTracker(nil, -1)
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(-1, -1)
ctx.GetSessionVars().StmtCtx.DiskTracker = disk.NewTracker(-1, -1)
tc := &indexJoinTestCase{
outerRows: 100000,
innerRows: variable.DefMaxChunkSize * 100,
Expand Down Expand Up @@ -1185,7 +1185,7 @@ func prepare4IndexInnerHashJoin(tc *indexJoinTestCase, outerDS *mockDataSource,
keyOff2IdxOff[i] = i
}
e := &IndexLookUpJoin{
baseExecutor: newBaseExecutor(tc.ctx, joinSchema, stringutil.StringerStr("IndexInnerHashJoin"), outerDS),
baseExecutor: newBaseExecutor(tc.ctx, joinSchema, 1, outerDS),
outerCtx: outerCtx{
rowTypes: leftTypes,
keyCols: tc.outerJoinKeyIdx,
Expand Down Expand Up @@ -1247,7 +1247,7 @@ func prepare4IndexMergeJoin(tc *indexJoinTestCase, outerDS *mockDataSource, inne
outerCompareFuncs = append(outerCompareFuncs, expression.GetCmpFunction(nil, outerJoinKeys[i], outerJoinKeys[i]))
}
e := &IndexLookUpMergeJoin{
baseExecutor: newBaseExecutor(tc.ctx, joinSchema, stringutil.StringerStr("IndexMergeJoin"), outerDS),
baseExecutor: newBaseExecutor(tc.ctx, joinSchema, 2, outerDS),
outerMergeCtx: outerMergeCtx{
rowTypes: leftTypes,
keyCols: tc.outerJoinKeyIdx,
Expand Down Expand Up @@ -1406,7 +1406,7 @@ func prepare4MergeJoin(tc *mergeJoinTestCase, leftExec, rightExec *mockDataSourc
// only benchmark inner join
e := &MergeJoinExec{
stmtCtx: tc.ctx.GetSessionVars().StmtCtx,
baseExecutor: newBaseExecutor(tc.ctx, joinSchema, stringutil.StringerStr("MergeJoin"), leftExec, rightExec),
baseExecutor: newBaseExecutor(tc.ctx, joinSchema, 3, leftExec, rightExec),
compareFuncs: compareFuncs,
isOuterJoin: false,
}
Expand Down Expand Up @@ -1446,8 +1446,8 @@ func newMergeJoinBenchmark(numOuterRows, numInnerDup, numInnerRedundant int) (tc
ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().SnapshotTS = 1
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(nil, -1)
ctx.GetSessionVars().StmtCtx.DiskTracker = disk.NewTracker(nil, -1)
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(-1, -1)
ctx.GetSessionVars().StmtCtx.DiskTracker = disk.NewTracker(-1, -1)

numInnerRows := numOuterRows*numInnerDup + numInnerRedundant
itc := &indexJoinTestCase{
Expand Down Expand Up @@ -1607,7 +1607,7 @@ func defaultSortTestCase() *sortCase {
ctx := mock.NewContext()
ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(nil, -1)
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(-1, -1)
tc := &sortCase{rows: 300000, orderByIdx: []int{0, 1}, ndvs: []int{0, 0}, ctx: ctx}
return tc
}
Expand All @@ -1621,7 +1621,7 @@ func benchmarkSortExec(b *testing.B, cas *sortCase) {
}
dataSource := buildMockDataSource(opt)
exec := &SortExec{
baseExecutor: newBaseExecutor(cas.ctx, dataSource.schema, stringutil.StringerStr("sort"), dataSource),
baseExecutor: newBaseExecutor(cas.ctx, dataSource.schema, 4, dataSource),
ByItems: make([]*util.ByItems, 0, len(cas.orderByIdx)),
schema: dataSource.schema,
}
Expand Down
6 changes: 1 addition & 5 deletions executor/brie.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package executor

import (
"context"
"fmt"
"net/url"
"strings"
"sync"
Expand Down Expand Up @@ -44,7 +43,6 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/stringutil"
)

// brieTaskProgress tracks a task's current progress.
Expand Down Expand Up @@ -169,11 +167,9 @@ func (b *executorBuilder) parseTSString(ts string) (uint64, error) {
return variable.GoTimeToTS(t1), nil
}

var brieStmtLabel fmt.Stringer = stringutil.StringerStr("BRIEStmt")

func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema) Executor {
e := &BRIEExec{
baseExecutor: newBaseExecutor(b.ctx, schema, brieStmtLabel),
baseExecutor: newBaseExecutor(b.ctx, schema, 0),
info: &brieTaskInfo{
kind: s.Kind,
},
Expand Down
Loading

0 comments on commit a2e2ce6

Please sign in to comment.