Skip to content

Commit

Permalink
*: redesign trace statement with json output (pingcap#8357)
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored and jackysp committed Nov 21, 2018
1 parent c99b1c8 commit 6fb260f
Show file tree
Hide file tree
Showing 18 changed files with 223 additions and 21 deletions.
11 changes: 11 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync/atomic"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -94,6 +95,11 @@ func schema2ResultFields(schema *expression.Schema, defaultDB string) (rfs []*as
// next query.
// If stmt is not nil and chunk with some rows inside, we simply update last query found rows by the number of row in chunk.
func (a *recordSet) Next(ctx context.Context, chk *chunk.Chunk) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("executor.Next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}

err := a.executor.Next(ctx, chk)
if err != nil {
a.lastErr = err
Expand Down Expand Up @@ -253,6 +259,11 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) {
}

func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Context, e Executor) (sqlexec.RecordSet, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("executor.handleNoDelayExecutor", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}

// Check if "tidb_snapshot" is set for the write executors.
// In history read mode, we can not do write operations.
switch e.(type) {
Expand Down
9 changes: 9 additions & 0 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"sync"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/executor/aggfuncs"
Expand Down Expand Up @@ -516,6 +517,10 @@ func (w *HashAggFinalWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitGro

// Next implements the Executor Next interface.
func (e *HashAggExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("hashagg.Next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
Expand Down Expand Up @@ -778,6 +783,10 @@ func (e *StreamAggExec) Close() error {

// Next implements the Executor Next interface.
func (e *StreamAggExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("streamAgg.Next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
Expand Down
1 change: 1 addition & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ func (b *executorBuilder) buildTrace(v *plannercore.Trace) Executor {
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
stmtNode: v.StmtNode,
builder: b,
format: v.Format,
}
}

Expand Down
4 changes: 2 additions & 2 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ type Compiler struct {

// Compile compiles an ast.StmtNode to a physical plan.
func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStmt, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span1 := opentracing.StartSpan("executor.Compile", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("executor.Compile", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}

Expand Down
5 changes: 5 additions & 0 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"
"unsafe"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
Expand Down Expand Up @@ -244,6 +245,10 @@ func (e *IndexReaderExecutor) Close() error {

// Next implements the Executor Next interface.
func (e *IndexReaderExecutor) Next(ctx context.Context, chk *chunk.Chunk) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("tableReader.Next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
Expand Down
25 changes: 25 additions & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/cznic/mathutil"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -663,6 +664,10 @@ type LimitExec struct {

// Next implements the Executor Next interface.
func (e *LimitExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("executor.Next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
Expand Down Expand Up @@ -782,6 +787,10 @@ func (e *TableDualExec) Open(ctx context.Context) error {

// Next implements the Executor Next interface.
func (e *TableDualExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("tableDual.Next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
Expand Down Expand Up @@ -837,6 +846,10 @@ func (e *SelectionExec) Close() error {

// Next implements the Executor Next interface.
func (e *SelectionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("selection.Next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
Expand Down Expand Up @@ -916,6 +929,10 @@ type TableScanExec struct {

// Next implements the Executor Next interface.
func (e *TableScanExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("tableScan.Next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
Expand Down Expand Up @@ -1020,6 +1037,10 @@ func (e *MaxOneRowExec) Open(ctx context.Context) error {

// Next implements the Executor Next interface.
func (e *MaxOneRowExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("maxOneRow.Next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
Expand Down Expand Up @@ -1166,6 +1187,10 @@ func (e *UnionExec) resultPuller(ctx context.Context, childID int) {

// Next implements the Executor Next interface.
func (e *UnionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("union.Next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
Expand Down
6 changes: 6 additions & 0 deletions executor/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor
import (
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/sessionctx"
Expand Down Expand Up @@ -143,6 +144,11 @@ func (e *ProjectionExec) Open(ctx context.Context) error {
// +------------------------------+ +----------------------+
//
func (e *ProjectionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("projection.Next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}

if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
Expand Down
9 changes: 9 additions & 0 deletions executor/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sort"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/expression"
plannercore "github.com/pingcap/tidb/planner/core"
Expand Down Expand Up @@ -74,6 +75,10 @@ func (e *SortExec) Open(ctx context.Context) error {

// Next implements the Executor Next interface.
func (e *SortExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("sort.Next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
Expand Down Expand Up @@ -301,6 +306,10 @@ func (e *TopNExec) Open(ctx context.Context) error {

// Next implements the Executor Next interface.
func (e *TopNExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("topN.Next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
Expand Down
5 changes: 5 additions & 0 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor
import (
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/distsql"
Expand Down Expand Up @@ -100,6 +101,10 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error {
// Next fills data into the chunk passed by its caller.
// The task was actually done by tableReaderHandler.
func (e *TableReaderExecutor) Next(ctx context.Context, chk *chunk.Chunk) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("tableReader.Next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
Expand Down
67 changes: 66 additions & 1 deletion executor/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,22 @@
package executor

import (
"encoding/json"
"time"

"github.com/opentracing/basictracer-go"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/parser/ast"
"github.com/pingcap/tidb/planner"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/tracing"
"golang.org/x/net/context"
"sourcegraph.com/sourcegraph/appdash"
traceImpl "sourcegraph.com/sourcegraph/appdash/opentracing"
)

// TraceExec represents a root executor of trace query.
Expand All @@ -41,6 +46,7 @@ type TraceExec struct {
rootTrace opentracing.Span

builder *executorBuilder
format string
}

// Next executes real query and collects span later.
Expand All @@ -50,6 +56,43 @@ func (e *TraceExec) Next(ctx context.Context, chk *chunk.Chunk) error {
return nil
}

if e.format == "json" {
if se, ok := e.ctx.(sqlexec.SQLExecutor); ok {
store := appdash.NewMemoryStore()
tracer := traceImpl.NewTracer(store)
span := tracer.StartSpan("trace")
defer span.Finish()
ctx = opentracing.ContextWithSpan(ctx, span)
recordSets, err := se.Execute(ctx, e.stmtNode.Text())
if err != nil {
return errors.Trace(err)
}

for _, rs := range recordSets {
_, err = drainRecordSet(ctx, e.ctx, rs)
if err != nil {
return errors.Trace(err)
}
if err = rs.Close(); err != nil {
return errors.Trace(err)
}
}

traces, err := store.Traces(appdash.TracesOpts{})
if err != nil {
return errors.Trace(err)
}
data, err := json.Marshal(traces)
if err != nil {
return errors.Trace(err)
}
chk.AppendString(0, string(data))
}
e.exhausted = true
return nil
}

// TODO: If the following code is never used, remove it later.
// record how much time was spent for optimizeing plan
optimizeSp := e.rootTrace.Tracer().StartSpan("plan_optimize", opentracing.FollowsFrom(e.rootTrace.Context()))
stmtPlan, err := planner.Optimize(e.builder.ctx, e.stmtNode, e.builder.is)
Expand Down Expand Up @@ -106,6 +149,28 @@ func (e *TraceExec) Next(ctx context.Context, chk *chunk.Chunk) error {
return nil
}

func drainRecordSet(ctx context.Context, sctx sessionctx.Context, rs sqlexec.RecordSet) ([]chunk.Row, error) {
var rows []chunk.Row
chk := rs.NewChunk()

if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("executor.Next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}

for {
err := rs.Next(ctx, chk)
if err != nil || chk.NumRows() == 0 {
return rows, errors.Trace(err)
}
iter := chunk.NewIterator4Chunk(chk)
for r := iter.Begin(); r != iter.End(); r = iter.Next() {
rows = append(rows, r)
}
chk = chunk.Renew(chk, sctx.GetSessionVars().MaxChunkSize)
}
}

func dfsTree(span basictracer.RawSpan, tree map[uint64][]basictracer.RawSpan, prefix string, isLast bool, chk *chunk.Chunk) {
suffix := ""
spans := tree[span.Context.SpanID]
Expand Down
10 changes: 3 additions & 7 deletions executor/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,12 @@ import (
"github.com/pingcap/tidb/util/testkit"
)

type testTraceExec struct{}

func (s *testTraceExec) SetupSuite(c *C) {
}

func (s *testSuite) TestTraceExec(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
testSQL := `create table trace (id int PRIMARY KEY AUTO_INCREMENT, c1 int, c2 int, c3 int default 1);`
tk.MustExec(testSQL)
// TODO: check result later in another PR.
tk.MustExec("trace select * from trace where id = 0;")
tk.MustExec("trace insert into trace (c1, c2, c3) values (1, 2, 3)")
rows := tk.MustQuery("trace select * from trace where id = 0;").Rows()
c.Assert(rows, HasLen, 1)
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ require (
github.com/pingcap/errors v0.11.0
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20181105061835-1b5d69cd1d26
github.com/pingcap/parser v0.0.0-20181113072426-4a9a1b13b591
github.com/pingcap/parser v0.0.0-20181120072820-10951bcfca73
github.com/pingcap/pd v2.1.0-rc.4+incompatible
github.com/pingcap/tidb-tools v0.0.0-20181112132202-4860a0d5de03
github.com/pingcap/tipb v0.0.0-20181012112600-11e33c750323
Expand All @@ -47,4 +47,5 @@ require (
golang.org/x/text v0.3.0
google.golang.org/grpc v1.16.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
sourcegraph.com/sourcegraph/appdash v0.0.0-20180531100431-4c381bd170b4
)
Loading

0 comments on commit 6fb260f

Please sign in to comment.