Skip to content

Commit

Permalink
[no-release-notes] tracing infra (dolthub#2477)
Browse files Browse the repository at this point in the history
* add context to handler interfaces

* fix ctx ref

* trace execBuilder

* vitess bump

* more regions

* [ga-format-pr] Run ./format_repo.sh to fix formatting

* fix build

---------

Co-authored-by: max-hoffman <[email protected]>
  • Loading branch information
max-hoffman and max-hoffman authored Jun 26, 2024
1 parent 80f4e40 commit f32860f
Show file tree
Hide file tree
Showing 18 changed files with 201 additions and 165 deletions.
8 changes: 4 additions & 4 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (e *Engine) PrepareQuery(
query string,
) (sql.Node, error) {
query = sql.RemoveSpaceAndDelimiter(query, ';')
stmt, _, err := e.Parser.ParseOneWithOptions(query, sql.LoadSqlMode(ctx).ParserOptions())
stmt, _, err := e.Parser.ParseOneWithOptions(ctx, query, sql.LoadSqlMode(ctx).ParserOptions())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -534,15 +534,15 @@ func (e *Engine) analyzeNode(ctx *sql.Context, query string, bound sql.Node) (sq
// todo(max): improve name resolution so we can cache post name-binding.
// this involves expression memoization, which currently screws up aggregation
// and order by aliases
prepStmt, _, err := e.Parser.ParseOneWithOptions(query, sqlMode.ParserOptions())
prepStmt, _, err := e.Parser.ParseOneWithOptions(ctx, query, sqlMode.ParserOptions())
if err != nil {
return nil, err
}
prepare, ok := prepStmt.(*sqlparser.Prepare)
if !ok {
return nil, fmt.Errorf("expected *sqlparser.Prepare, found %T", prepStmt)
}
cacheStmt, _, err := e.Parser.ParseOneWithOptions(prepare.Expr, sqlMode.ParserOptions())
cacheStmt, _, err := e.Parser.ParseOneWithOptions(ctx, prepare.Expr, sqlMode.ParserOptions())
if err != nil && strings.HasPrefix(prepare.Expr, "@") {
val, err := expression.NewUserVar(strings.TrimPrefix(prepare.Expr, "@")).Eval(ctx, nil)
if err != nil {
Expand All @@ -552,7 +552,7 @@ func (e *Engine) analyzeNode(ctx *sql.Context, query string, bound sql.Node) (sq
if !ok {
return nil, fmt.Errorf("expected string, found %T", val)
}
cacheStmt, _, err = e.Parser.ParseOneWithOptions(valStr, sqlMode.ParserOptions())
cacheStmt, _, err = e.Parser.ParseOneWithOptions(ctx, valStr, sqlMode.ParserOptions())
if err != nil {
return nil, err
}
Expand Down
12 changes: 6 additions & 6 deletions enginetest/evaluation.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ func injectBindVarsAndPrepare(
q string,
) (string, map[string]*querypb.BindVariable, error) {
sqlMode := sql.LoadSqlMode(ctx)
stmt, err := sqlparser.ParseWithOptions(q, sqlMode.ParserOptions())
parsed, err := sqlparser.ParseWithOptions(ctx, q, sqlMode.ParserOptions())
if err != nil {
// cannot prepare empty statement, can query
if err.Error() == "empty statement" {
Expand All @@ -479,7 +479,7 @@ func injectBindVarsAndPrepare(
return q, nil, sql.ErrSyntaxError.New(err)
}

switch p := stmt.(type) {
switch p := parsed.(type) {
case *sqlparser.Load, *sqlparser.Prepare, *sqlparser.Execute:
// LOAD DATA query cannot be used as PREPARED STATEMENT
return q, nil, nil
Expand All @@ -494,7 +494,7 @@ func injectBindVarsAndPrepare(

b := planbuilder.New(ctx, e.EngineAnalyzer().Catalog, sql.NewMysqlParser())
b.SetParserOptions(sql.LoadSqlMode(ctx).ParserOptions())
resPlan, err := b.BindOnly(stmt, q)
resPlan, err := b.BindOnly(parsed, q)
if err != nil {
return q, nil, err
}
Expand Down Expand Up @@ -543,7 +543,7 @@ func injectBindVarsAndPrepare(
default:
}
return true, nil
}, stmt)
}, parsed)
if err != nil {
return "", nil, err
}
Expand All @@ -552,8 +552,8 @@ func injectBindVarsAndPrepare(
}

buf := sqlparser.NewTrackedBuffer(nil)
stmt.Format(buf)
e.EnginePreparedDataCache().CacheStmt(ctx.Session.ID(), buf.String(), stmt)
parsed.Format(buf)
e.EnginePreparedDataCache().CacheStmt(ctx.Session.ID(), buf.String(), parsed)

_, isDatabaser := resPlan.(sql.Databaser)

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/dolthub/go-icu-regex v0.0.0-20230524105445-af7e7991c97e
github.com/dolthub/jsonpath v0.0.2-0.20240227200619-19675ab05c71
github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81
github.com/dolthub/vitess v0.0.0-20240617225939-55a46c5dcfc8
github.com/dolthub/vitess v0.0.0-20240626174323-4083c07f5e9c
github.com/go-kit/kit v0.10.0
github.com/go-sql-driver/mysql v1.7.2-0.20231213112541-0004702b931d
github.com/gocraft/dbr/v2 v2.7.2
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,8 @@ github.com/dolthub/jsonpath v0.0.2-0.20240227200619-19675ab05c71 h1:bMGS25NWAGTE
github.com/dolthub/jsonpath v0.0.2-0.20240227200619-19675ab05c71/go.mod h1:2/2zjLQ/JOOSbbSboojeg+cAwcRV0fDLzIiWch/lhqI=
github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81 h1:7/v8q9XGFa6q5Ap4Z/OhNkAMBaK5YeuEzwJt+NZdhiE=
github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81/go.mod h1:siLfyv2c92W1eN/R4QqG/+RjjX5W2+gCTRjZxBjI3TY=
github.com/dolthub/vitess v0.0.0-20240614201043-78bdf1b2871a h1:EtDV1H9IuIAjqjx3/yorUX29OAhWqJ6EeuyUzI19vu4=
github.com/dolthub/vitess v0.0.0-20240614201043-78bdf1b2871a/go.mod h1:uBvlRluuL+SbEWTCZ68o0xvsdYZER3CEG/35INdzfJM=
github.com/dolthub/vitess v0.0.0-20240617225939-55a46c5dcfc8 h1:d+dOTwI8dkwNYmcweXNjei2ot3GHJB3HqLWUeNvAkC0=
github.com/dolthub/vitess v0.0.0-20240617225939-55a46c5dcfc8/go.mod h1:uBvlRluuL+SbEWTCZ68o0xvsdYZER3CEG/35INdzfJM=
github.com/dolthub/vitess v0.0.0-20240626174323-4083c07f5e9c h1:Y3M0hPCUvT+5RTNbJLKywGc9aHIRCIlg+0NOhC91GYE=
github.com/dolthub/vitess v0.0.0-20240626174323-4083c07f5e9c/go.mod h1:uBvlRluuL+SbEWTCZ68o0xvsdYZER3CEG/35INdzfJM=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
Expand Down
7 changes: 3 additions & 4 deletions server/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ func (s *SessionManager) session(conn *mysql.Conn) sql.Session {
}

// NewContext creates a new context for the session at the given conn.
func (s *SessionManager) NewContext(conn *mysql.Conn) (*sql.Context, error) {
return s.NewContextWithQuery(conn, "")
func (s *SessionManager) NewContext(ctx context.Context, conn *mysql.Conn, query string) (*sql.Context, error) {
return s.NewContextWithQuery(ctx, conn, query)
}

func (s *SessionManager) getOrCreateSession(ctx context.Context, conn *mysql.Conn) (sql.Session, error) {
Expand All @@ -203,8 +203,7 @@ func (s *SessionManager) getOrCreateSession(ctx context.Context, conn *mysql.Con
}

// NewContextWithQuery creates a new context for the session at the given conn.
func (s *SessionManager) NewContextWithQuery(conn *mysql.Conn, query string) (*sql.Context, error) {
ctx := context.Background()
func (s *SessionManager) NewContextWithQuery(ctx context.Context, conn *mysql.Conn, query string) (*sql.Context, error) {
sess, err := s.getOrCreateSession(ctx, conn)

if err != nil {
Expand Down
49 changes: 25 additions & 24 deletions server/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package server

import (
"context"
"sort"

"github.com/dolthub/vitess/go/mysql"
Expand Down Expand Up @@ -62,7 +63,7 @@ type Interceptor interface {
// Note the contents of the query slice may change after
// the first call to callback. So the Handler should not
// hang on to the byte slice.
Query(chain Chain, c *mysql.Conn, query string, callback func(res *sqltypes.Result, more bool) error) error
Query(ctx context.Context, chain Chain, c *mysql.Conn, query string, callback func(res *sqltypes.Result, more bool) error) error

// ParsedQuery is called when a connection receives a
// query that has already been parsed. Note the contents
Expand All @@ -75,15 +76,15 @@ type Interceptor interface {
// client supports MULTI_STATEMENT. It should process the first
// statement in |query| and return the remainder. It will be called
// multiple times until the remainder is |""|.
MultiQuery(chain Chain, c *mysql.Conn, query string, callback func(res *sqltypes.Result, more bool) error) (string, error)
MultiQuery(ctx context.Context, chain Chain, c *mysql.Conn, query string, callback func(res *sqltypes.Result, more bool) error) (string, error)

// Prepare is called when a connection receives a prepared
// statement query.
Prepare(chain Chain, c *mysql.Conn, query string, prepare *mysql.PrepareData) ([]*querypb.Field, error)
Prepare(ctx context.Context, chain Chain, c *mysql.Conn, query string, prepare *mysql.PrepareData) ([]*querypb.Field, error)

// StmtExecute is called when a connection receives a statement
// execute query.
StmtExecute(chain Chain, c *mysql.Conn, prepare *mysql.PrepareData, callback func(*sqltypes.Result) error) error
StmtExecute(ctx context.Context, chain Chain, c *mysql.Conn, prepare *mysql.PrepareData, callback func(*sqltypes.Result) error) error
}

type Chain interface {
Expand All @@ -92,42 +93,42 @@ type Chain interface {
// Note the contents of the query slice may change after
// the first call to callback. So the Handler should not
// hang on to the byte slice.
ComQuery(c *mysql.Conn, query string, callback mysql.ResultSpoolFn) error
ComQuery(ctx context.Context, c *mysql.Conn, query string, callback mysql.ResultSpoolFn) error

// ComMultiQuery is called when a connection receives a query and the
// client supports MULTI_STATEMENT. It should process the first
// statement in |query| and return the remainder. It will be called
// multiple times until the remainder is |""|.
ComMultiQuery(c *mysql.Conn, query string, callback mysql.ResultSpoolFn) (string, error)
ComMultiQuery(ctx context.Context, c *mysql.Conn, query string, callback mysql.ResultSpoolFn) (string, error)

// ComPrepare is called when a connection receives a prepared
// statement query.
ComPrepare(c *mysql.Conn, query string, prepare *mysql.PrepareData) ([]*querypb.Field, error)
ComPrepare(ctx context.Context, c *mysql.Conn, query string, prepare *mysql.PrepareData) ([]*querypb.Field, error)

// ComStmtExecute is called when a connection receives a statement
// execute query.
ComStmtExecute(c *mysql.Conn, prepare *mysql.PrepareData, callback func(*sqltypes.Result) error) error
ComStmtExecute(ctx context.Context, c *mysql.Conn, prepare *mysql.PrepareData, callback func(*sqltypes.Result) error) error
}

type chainInterceptor struct {
i Interceptor
c Chain
}

func (ci *chainInterceptor) ComQuery(c *mysql.Conn, query string, callback mysql.ResultSpoolFn) error {
return ci.i.Query(ci.c, c, query, callback)
func (ci *chainInterceptor) ComQuery(ctx context.Context, c *mysql.Conn, query string, callback mysql.ResultSpoolFn) error {
return ci.i.Query(ctx, ci.c, c, query, callback)
}

func (ci *chainInterceptor) ComMultiQuery(c *mysql.Conn, query string, callback mysql.ResultSpoolFn) (string, error) {
return ci.i.MultiQuery(ci.c, c, query, callback)
func (ci *chainInterceptor) ComMultiQuery(ctx context.Context, c *mysql.Conn, query string, callback mysql.ResultSpoolFn) (string, error) {
return ci.i.MultiQuery(ctx, ci.c, c, query, callback)
}

func (ci *chainInterceptor) ComPrepare(c *mysql.Conn, query string, prepare *mysql.PrepareData) ([]*querypb.Field, error) {
return ci.i.Prepare(ci.c, c, query, prepare)
func (ci *chainInterceptor) ComPrepare(ctx context.Context, c *mysql.Conn, query string, prepare *mysql.PrepareData) ([]*querypb.Field, error) {
return ci.i.Prepare(ctx, ci.c, c, query, prepare)
}

func (ci *chainInterceptor) ComStmtExecute(c *mysql.Conn, prepare *mysql.PrepareData, callback func(*sqltypes.Result) error) error {
return ci.i.StmtExecute(ci.c, c, prepare, callback)
func (ci *chainInterceptor) ComStmtExecute(ctx context.Context, c *mysql.Conn, prepare *mysql.PrepareData, callback func(*sqltypes.Result) error) error {
return ci.i.StmtExecute(ctx, ci.c, c, prepare, callback)
}

type interceptorHandler struct {
Expand All @@ -153,20 +154,20 @@ func (ih *interceptorHandler) ComInitDB(c *mysql.Conn, schemaName string) error
return ih.h.ComInitDB(c, schemaName)
}

func (ih *interceptorHandler) ComQuery(c *mysql.Conn, query string, callback mysql.ResultSpoolFn) error {
return ih.c.ComQuery(c, query, callback)
func (ih *interceptorHandler) ComQuery(ctx context.Context, c *mysql.Conn, query string, callback mysql.ResultSpoolFn) error {
return ih.c.ComQuery(ctx, c, query, callback)
}

func (ih *interceptorHandler) ComMultiQuery(c *mysql.Conn, query string, callback mysql.ResultSpoolFn) (string, error) {
return ih.c.ComMultiQuery(c, query, callback)
func (ih *interceptorHandler) ComMultiQuery(ctx context.Context, c *mysql.Conn, query string, callback mysql.ResultSpoolFn) (string, error) {
return ih.c.ComMultiQuery(ctx, c, query, callback)
}

func (ih *interceptorHandler) ComPrepare(c *mysql.Conn, query string, prepare *mysql.PrepareData) ([]*querypb.Field, error) {
return ih.c.ComPrepare(c, query, prepare)
func (ih *interceptorHandler) ComPrepare(ctx context.Context, c *mysql.Conn, query string, prepare *mysql.PrepareData) ([]*querypb.Field, error) {
return ih.c.ComPrepare(ctx, c, query, prepare)
}

func (ih *interceptorHandler) ComStmtExecute(c *mysql.Conn, prepare *mysql.PrepareData, callback func(*sqltypes.Result) error) error {
return ih.c.ComStmtExecute(c, prepare, callback)
func (ih *interceptorHandler) ComStmtExecute(ctx context.Context, c *mysql.Conn, prepare *mysql.PrepareData, callback func(*sqltypes.Result) error) error {
return ih.c.ComStmtExecute(ctx, c, prepare, callback)
}

func (ih *interceptorHandler) WarningCount(c *mysql.Conn) uint16 {
Expand Down
31 changes: 18 additions & 13 deletions server/golden/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package golden

import (
"context"
dsql "database/sql"
"fmt"
"math"
Expand Down Expand Up @@ -139,12 +140,12 @@ func (h MySqlProxy) ComInitDB(c *mysql.Conn, schemaName string) error {
}

// ComPrepare implements mysql.Handler.
func (h MySqlProxy) ComPrepare(_ *mysql.Conn, _ string, _ *mysql.PrepareData) ([]*querypb.Field, error) {
func (h MySqlProxy) ComPrepare(ctx context.Context, _ *mysql.Conn, _ string, _ *mysql.PrepareData) ([]*querypb.Field, error) {
return nil, fmt.Errorf("ComPrepare unsupported")
}

// ComStmtExecute implements mysql.Handler.
func (h MySqlProxy) ComStmtExecute(c *mysql.Conn, prepare *mysql.PrepareData, callback func(*sqltypes.Result) error) error {
func (h MySqlProxy) ComStmtExecute(ctx context.Context, c *mysql.Conn, prepare *mysql.PrepareData, callback func(*sqltypes.Result) error) error {
return fmt.Errorf("ComStmtExecute unsupported")
}

Expand Down Expand Up @@ -173,6 +174,7 @@ func (h MySqlProxy) ConnectionAborted(c *mysql.Conn, reason string) error {

// ComMultiQuery implements mysql.Handler.
func (h MySqlProxy) ComMultiQuery(
ctx context.Context,
c *mysql.Conn,
query string,
callback mysql.ResultSpoolFn,
Expand All @@ -183,7 +185,7 @@ func (h MySqlProxy) ComMultiQuery(
}
conn.Entry = conn.Entry.WithField("query", query)

remainder, err := h.processQuery(c, conn, query, true, callback)
remainder, err := h.processQuery(ctx, c, conn, query, true, callback)
if err != nil {
conn.Errorf("Failed to process MySQL results: %s", err)
}
Expand All @@ -192,6 +194,7 @@ func (h MySqlProxy) ComMultiQuery(

// ComQuery implements mysql.Handler.
func (h MySqlProxy) ComQuery(
ctx context.Context,
c *mysql.Conn,
query string,
callback mysql.ResultSpoolFn,
Expand All @@ -202,7 +205,7 @@ func (h MySqlProxy) ComQuery(
}
conn.Entry = conn.Entry.WithField("query", query)

_, err = h.processQuery(c, conn, query, false, callback)
_, err = h.processQuery(ctx, c, conn, query, false, callback)
if err != nil {
conn.Errorf("Failed to process MySQL results: %s", err)
}
Expand All @@ -211,25 +214,27 @@ func (h MySqlProxy) ComQuery(

// ComParsedQuery implements mysql.Handler.
func (h MySqlProxy) ComParsedQuery(
ctx context.Context,
c *mysql.Conn,
query string,
parsed sqlparser.Statement,
callback func(*sqltypes.Result, bool) error,
) error {
return h.ComQuery(c, query, callback)
return h.ComQuery(ctx, c, query, callback)
}

func (h MySqlProxy) processQuery(
ctx context.Context,
c *mysql.Conn,
proxy proxyConn,
query string,
isMultiStatement bool,
callback func(*sqltypes.Result, bool) error,
) (string, error) {
ctx := sql.NewContext(h.ctx)
sqlCtx := sql.NewContext(ctx)
var remainder string
if isMultiStatement {
_, ri, err := sqlparser.ParseOne(query)
_, ri, err := sqlparser.ParseOne(ctx, query)
if err != nil {
return "", err
}
Expand All @@ -240,7 +245,7 @@ func (h MySqlProxy) processQuery(
}
}

ctx = ctx.WithQuery(query)
sqlCtx = sqlCtx.WithQuery(query)
more := remainder != ""

proxy.Debugf("Sending query to MySQL")
Expand All @@ -258,7 +263,7 @@ func (h MySqlProxy) processQuery(
res := &sqltypes.Result{}
ok := true
for ok {
if res, ok, err = fetchMySqlRows(ctx, rows, 128); err != nil {
if res, ok, err = fetchMySqlRows(sqlCtx, rows, 128); err != nil {
return "", err
}
if err := callback(res, more); err != nil {
Expand All @@ -267,19 +272,19 @@ func (h MySqlProxy) processQuery(
processedAtLeastOneBatch = true
}

if err := setConnStatusFlags(ctx, c); err != nil {
if err := setConnStatusFlags(sqlCtx, c); err != nil {
return remainder, err
}

switch len(res.Rows) {
case 0:
if len(res.Info) > 0 {
ctx.GetLogger().Tracef("returning result %s", res.Info)
sqlCtx.GetLogger().Tracef("returning result %s", res.Info)
} else {
ctx.GetLogger().Tracef("returning empty result")
sqlCtx.GetLogger().Tracef("returning empty result")
}
case 1:
ctx.GetLogger().Tracef("returning result %v", res)
sqlCtx.GetLogger().Tracef("returning result %v", res)
}

// processedAtLeastOneBatch means we already called resultsCB() at least
Expand Down
Loading

0 comments on commit f32860f

Please sign in to comment.