Skip to content

Commit

Permalink
add tracing (arana-db#194)
Browse files Browse the repository at this point in the history
* add tracing

Signed-off-by: jyz0309 <[email protected]>

* add all todo

Signed-off-by: jyz0309 <[email protected]>

* add some tracing

Signed-off-by: jyz0309 <[email protected]>

* format code

Signed-off-by: jyz0309 <[email protected]>

* remove useless line

Signed-off-by: jyz0309 <[email protected]>

* fix comment

Signed-off-by: jyz0309 <[email protected]>

* add license

Signed-off-by: jyz0309 <[email protected]>

Co-authored-by: Xin.Zh <[email protected]>
  • Loading branch information
jyz0309 and AlexStocks authored Jun 30, 2022
1 parent 58bdf66 commit 7ccab23
Show file tree
Hide file tree
Showing 21 changed files with 322 additions and 3 deletions.
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/cespare/xxhash/v2 v2.1.2
github.com/dop251/goja v0.0.0-20220422102209-3faab1d8f20e
github.com/dubbogo/gost v1.12.3
github.com/dubbogo/tools v1.0.9 // indirect
github.com/go-playground/validator/v10 v10.10.1
github.com/go-sql-driver/mysql v1.6.0
github.com/golang/mock v1.5.0
Expand All @@ -19,12 +20,14 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.0
github.com/spf13/cobra v1.2.1
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.7.1
github.com/testcontainers/testcontainers-go v0.12.0
github.com/tidwall/gjson v1.14.0
go.etcd.io/etcd/api/v3 v3.5.1
go.etcd.io/etcd/client/v3 v3.5.0
go.etcd.io/etcd/server/v3 v3.5.0-alpha.0
go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/trace v1.7.0 // indirect
go.uber.org/atomic v1.9.0
go.uber.org/zap v1.19.1
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
Expand Down
233 changes: 233 additions & 0 deletions go.sum

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions pkg/executor/redirect.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import (
"github.com/arana-db/parser/ast"

"github.com/pkg/errors"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

import (
Expand All @@ -44,6 +47,8 @@ import (
)

var (
Tracer = otel.Tracer("Executor")

errMissingTx = stdErrors.New("no transaction found")
errNoDatabaseSelected = mysqlErrors.NewSQLError(mConstants.ERNoDb, mConstants.SSNoDatabaseSelected, "No database selected")
)
Expand Down Expand Up @@ -136,6 +141,10 @@ func (executor *RedirectExecutor) ExecuteFieldList(ctx *proto.Context) ([]proto.
}

func (executor *RedirectExecutor) ExecutorComQuery(ctx *proto.Context) (proto.Result, uint16, error) {
var span trace.Span
ctx.Context, span = Tracer.Start(ctx.Context, "ExecutorComQuery")
defer span.End()

var (
schemaless bool // true if schema is not specified
err error
Expand Down
3 changes: 3 additions & 0 deletions pkg/runtime/optimize/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/arana-db/arana/pkg/proto"
"github.com/arana-db/arana/pkg/proto/rule"
"github.com/arana-db/arana/pkg/proto/schema_manager"
"github.com/arana-db/arana/pkg/runtime"
rast "github.com/arana-db/arana/pkg/runtime/ast"
"github.com/arana-db/arana/pkg/runtime/cmp"
rcontext "github.com/arana-db/arana/pkg/runtime/context"
Expand Down Expand Up @@ -87,7 +88,9 @@ func (o *optimizer) SchemaLoader() proto.SchemaLoader {
}

func (o optimizer) Optimize(ctx context.Context, conn proto.VConn, stmt ast.StmtNode, args ...interface{}) (plan proto.Plan, err error) {
ctx, span := runtime.Tracer.Start(ctx, "Optimize")
defer func() {
span.End()
if rec := recover(); rec != nil {
err = errors.Errorf("cannot analyze sql %s", rcontext.SQL(ctx))
log.Errorf("optimize panic: sql=%s, rec=%v", rcontext.SQL(ctx), rec)
Expand Down
2 changes: 2 additions & 0 deletions pkg/runtime/plan/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ func (a *AggregatePlan) Type() proto.PlanType {
}

func (a *AggregatePlan) ExecIn(ctx context.Context, conn proto.VConn) (proto.Result, error) {
ctx, span := Tracer.Start(ctx, "AggregatePlan.ExecIn")
defer span.End()
res, err := a.Plan.ExecIn(ctx, conn)
if err != nil {
return nil, errors.WithStack(err)
Expand Down
1 change: 1 addition & 0 deletions pkg/runtime/plan/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (d *AlterTablePlan) Type() proto.PlanType {
}

func (at *AlterTablePlan) ExecIn(ctx context.Context, conn proto.VConn) (proto.Result, error) {
// TODO: ADD trace in all plan ExecIn
if at.Shards == nil {
// non-sharding alter table
var sb strings.Builder
Expand Down
2 changes: 2 additions & 0 deletions pkg/runtime/plan/describle.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ func (d *DescribePlan) ExecIn(ctx context.Context, vConn proto.VConn) (proto.Res
res proto.Result
err error
)
ctx, span := Tracer.Start(ctx, "DescribePlan.ExecIn")
defer span.End()

if err = d.generate(&sb, &indexes); err != nil {
return nil, errors.Wrap(err, "failed to generate desc/describe sql")
Expand Down
2 changes: 2 additions & 0 deletions pkg/runtime/plan/drop_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ func (d *DropTablePlan) Type() proto.PlanType {
}

func (d *DropTablePlan) ExecIn(ctx context.Context, conn proto.VConn) (proto.Result, error) {
ctx, span := Tracer.Start(ctx, "DropTablePlan.ExecIn")
defer span.End()
var (
sb strings.Builder
args []int
Expand Down
2 changes: 2 additions & 0 deletions pkg/runtime/plan/show_databases.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ func (s *ShowDatabasesPlan) Type() proto.PlanType {
}

func (s *ShowDatabasesPlan) ExecIn(ctx context.Context, conn proto.VConn) (proto.Result, error) {
ctx, span := Tracer.Start(ctx, "ShowDatabasesPlan.ExecIn")
defer span.End()
_ = conn
tenant, ok := security.DefaultTenantManager().GetTenantOfCluster(rcontext.Schema(ctx))
if !ok {
Expand Down
2 changes: 2 additions & 0 deletions pkg/runtime/plan/show_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ func (st *ShowTablesPlan) ExecIn(ctx context.Context, conn proto.VConn) (proto.R
res proto.Result
err error
)
ctx, span := Tracer.Start(ctx, "ShowTablesPlan.ExecIn")
defer span.End()

if err = st.Stmt.Restore(ast.RestoreDefault, &sb, &indexes); err != nil {
return nil, errors.WithStack(err)
Expand Down
2 changes: 2 additions & 0 deletions pkg/runtime/plan/simple_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ func (s *SimpleDeletePlan) Type() proto.PlanType {
}

func (s *SimpleDeletePlan) ExecIn(ctx context.Context, conn proto.VConn) (proto.Result, error) {
ctx, span := Tracer.Start(ctx, "SimpleDeletePlan.ExecIn")
defer span.End()
if s.shards == nil || s.shards.IsEmpty() {
return resultx.New(), nil
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/runtime/plan/simple_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func (sp *SimpleInsertPlan) ExecIn(ctx context.Context, conn proto.VConn) (proto
affects uint64
lastInsertId uint64
)
ctx, span := Tracer.Start(ctx, "SimpleInsertPlan.ExecIn")
defer span.End()
// TODO: consider wrap a transaction if insert into multiple databases
// TODO: insert in parallel
for db, inserts := range sp.batch {
Expand Down
3 changes: 3 additions & 0 deletions pkg/runtime/plan/simple_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ func (s *SimpleJoinPlan) ExecIn(ctx context.Context, conn proto.VConn) (proto.Re
err error
)

ctx, span := Tracer.Start(ctx, "SimpleJoinPlan.ExecIn")
defer span.End()

if err := s.generateSelect(&sb, &indexes); err != nil {
return nil, err
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/runtime/plan/simple_select.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ func (s *SimpleQueryPlan) ExecIn(ctx context.Context, conn proto.VConn) (proto.R
err error
)

ctx, span := Tracer.Start(ctx, "SimpleQueryPlan.ExecIn")
defer span.End()

discard := s.filter()

if err = s.generate(&sb, &indexes); err != nil {
Expand Down
24 changes: 24 additions & 0 deletions pkg/runtime/plan/trace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package plan

import (
"go.opentelemetry.io/otel"
)

var Tracer = otel.Tracer("ExecPlan")
2 changes: 2 additions & 0 deletions pkg/runtime/plan/transparent.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ func (tp *TransparentPlan) ExecIn(ctx context.Context, conn proto.VConn) (proto.
args []int
err error
)
ctx, span := Tracer.Start(ctx, "TransparentPlan.ExecIn")
defer span.End()

if err = tp.stmt.Restore(rast.RestoreDefault, &sb, &args); err != nil {
return nil, errors.WithStack(err)
Expand Down
2 changes: 2 additions & 0 deletions pkg/runtime/plan/truncate.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ func (s *TruncatePlan) Type() proto.PlanType {
}

func (s *TruncatePlan) ExecIn(ctx context.Context, conn proto.VConn) (proto.Result, error) {
ctx, span := Tracer.Start(ctx, "TruncatePlan.ExecIn")
defer span.End()
if s.shards == nil || s.shards.IsEmpty() {
return resultx.New(), nil
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/runtime/plan/union.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ func (u UnionPlan) Type() proto.PlanType {
}

func (u UnionPlan) ExecIn(ctx context.Context, conn proto.VConn) (proto.Result, error) {
ctx, span := Tracer.Start(ctx, "UnionPlan.ExecIn")
defer span.End()
switch u.Plans[0].Type() {
case proto.PlanTypeQuery:
return u.query(ctx, conn)
Expand Down
2 changes: 2 additions & 0 deletions pkg/runtime/plan/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func (up *UpdatePlan) Type() proto.PlanType {
}

func (up *UpdatePlan) ExecIn(ctx context.Context, conn proto.VConn) (proto.Result, error) {
ctx, span := Tracer.Start(ctx, "UpdatePlan.ExecIn")
defer span.End()
if up.shards == nil {
var sb strings.Builder
if err := up.stmt.Restore(ast.RestoreDefault, &sb, nil); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/runtime/plan/variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,12 @@ func (s *ShowVariablesPlan) Type() proto.PlanType {
}

func (s *ShowVariablesPlan) ExecIn(ctx context.Context, vConn proto.VConn) (proto.Result, error) {

var (
sb strings.Builder
args []int
)
ctx, span := Tracer.Start(ctx, "ShowVariablesPlan.ExecIn")
defer span.End()

if err := s.stmt.Restore(ast.RestoreDefault, &sb, &args); err != nil {
return nil, errors.Wrap(err, "failed to execute DELETE statement")
Expand Down
19 changes: 18 additions & 1 deletion pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ import (

"github.com/pkg/errors"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"

"go.uber.org/atomic"

"golang.org/x/sync/errgroup"
Expand All @@ -55,6 +58,8 @@ var (
_ Runtime = (*defaultRuntime)(nil)
_ proto.VConn = (*defaultRuntime)(nil)
_ proto.VConn = (*compositeTx)(nil)

Tracer = otel.Tracer("Runtime")
)

var (
Expand Down Expand Up @@ -176,8 +181,11 @@ func (tx *compositeTx) String() string {
}

func (tx *compositeTx) Execute(ctx *proto.Context) (res proto.Result, warn uint16, err error) {
var span trace.Span
ctx.Context, span = Tracer.Start(ctx.Context, "compositeTx.Execute")
execStart := time.Now()
defer func() {
span.End()
metrics.ExecuteDuration.Observe(time.Since(execStart).Seconds())
}()
if tx.closed.Load() {
Expand Down Expand Up @@ -235,10 +243,11 @@ func (tx *compositeTx) Commit(ctx context.Context) (proto.Result, uint16, error)
if !tx.closed.CAS(false, true) {
return nil, 0, errTxClosed
}

ctx, span := Tracer.Start(ctx, "compositeTx.Commit")
defer func() { // cleanup
tx.rt = nil
tx.txs = nil
span.End()
}()

var g errgroup.Group
Expand All @@ -264,6 +273,8 @@ func (tx *compositeTx) Commit(ctx context.Context) (proto.Result, uint16, error)
}

func (tx *compositeTx) Rollback(ctx context.Context) (proto.Result, uint16, error) {
ctx, span := Tracer.Start(ctx, "compositeTx.Rollback")
defer span.End()
if !tx.closed.CAS(false, true) {
return nil, 0, errTxClosed
}
Expand Down Expand Up @@ -560,6 +571,9 @@ type defaultRuntime struct {
}

func (pi *defaultRuntime) Begin(ctx *proto.Context) (proto.Tx, error) {
var span trace.Span
ctx.Context, span = Tracer.Start(ctx, "defaultRuntime.Begin")
defer span.End()
tx := &compositeTx{
id: nextTxID(),
rt: pi,
Expand Down Expand Up @@ -594,8 +608,11 @@ func (pi *defaultRuntime) Exec(ctx context.Context, db string, query string, arg
}

func (pi *defaultRuntime) Execute(ctx *proto.Context) (res proto.Result, warn uint16, err error) {
var span trace.Span
ctx.Context, span = Tracer.Start(ctx.Context, "defaultRuntime.Execute")
execStart := time.Now()
defer func() {
span.End()
metrics.ExecuteDuration.Observe(time.Since(execStart).Seconds())
}()
args := pi.extractArgs(ctx)
Expand Down

0 comments on commit 7ccab23

Please sign in to comment.