Skip to content

Commit

Permalink
*: remove goCtx from session struct (pingcap#5174)
Browse files Browse the repository at this point in the history
1. go context should not be stored
2. change Executor interface to Open(goctx.Context)
3. many other changes forced by this refactor
  • Loading branch information
tiancaiamao authored Nov 22, 2017
1 parent 9921f41 commit 02f6bb2
Show file tree
Hide file tree
Showing 36 changed files with 223 additions and 229 deletions.
5 changes: 2 additions & 3 deletions ast/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
package ast

import (
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
goctx "golang.org/x/net/context"
)

// Node is the basic element of the AST.
Expand Down Expand Up @@ -129,7 +129,6 @@ type ResultField struct {

// RecordSet is an abstract result set interface to help get data from Plan.
type RecordSet interface {

// Fields gets result fields.
Fields() []*ResultField

Expand Down Expand Up @@ -182,7 +181,7 @@ type Statement interface {
OriginText() string

// Exec executes SQL and gets a Recordset.
Exec(ctx context.Context) (RecordSet, error)
Exec(goCtx goctx.Context) (RecordSet, error)

// IsPrepared returns whether this statement is prepared statement.
IsPrepared() bool
Expand Down
5 changes: 1 addition & 4 deletions context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ type Context interface {
// Txn returns the current transaction which is created before executing a statement.
Txn() kv.Transaction

// GoCtx returns the standard context.Context which is bound with current transaction.
GoCtx() goctx.Context

// GetClient gets a kv.Client.
GetClient() kv.Client

Expand All @@ -55,7 +52,7 @@ type Context interface {
// RefreshTxnCtx commits old transaction without retry,
// and creates a new transaction.
// now just for load data and batch insert.
RefreshTxnCtx() error
RefreshTxnCtx(goctx.Context) error

// ActivePendingTxn receives the pending transaction from the transaction channel.
// It should be called right before we builds an executor.
Expand Down
2 changes: 1 addition & 1 deletion ddl/column_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (s *testColumnChangeSuite) testAddColumnNoDefault(c *C, ctx context.Context
checkErr = errors.Trace(err)
}
}
err = hookCtx.Txn().Commit(ctx.GoCtx())
err = hookCtx.Txn().Commit(goctx.TODO())
if err != nil {
checkErr = errors.Trace(err)
}
Expand Down
12 changes: 6 additions & 6 deletions ddl/ddl_db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,16 @@ func (t *testExecInfo) parseSQLs(p *parser.Parser) error {
}

func (t *testExecInfo) compileSQL(idx int) (err error) {
compiler := executor.Compiler{}
for _, info := range t.sqlInfos {
c := info.cases[idx]
compiler := executor.Compiler{c.session}
se := c.session
se.PrepareTxnCtx(se.GoCtx())
goCtx := goctx.TODO()
se.PrepareTxnCtx(goCtx)
ctx := se.(context.Context)
executor.ResetStmtCtx(ctx, c.rawStmt)

c.stmt, err = compiler.Compile(ctx, c.rawStmt)
c.stmt, err = compiler.Compile(goCtx, c.rawStmt)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -263,8 +264,7 @@ func (t *testExecInfo) compileSQL(idx int) (err error) {
func (t *testExecInfo) execSQL(idx int) error {
for _, sqlInfo := range t.sqlInfos {
c := sqlInfo.cases[idx]
ctx := c.session.(context.Context)
_, err := c.stmt.Exec(ctx)
_, err := c.stmt.Exec(goctx.TODO())
if c.expectedErr != nil {
if err == nil {
err = errors.Errorf("expected error %s but got nil", c.expectedErr)
Expand All @@ -275,7 +275,7 @@ func (t *testExecInfo) execSQL(idx int) error {
if err != nil {
return errors.Trace(err)
}
err = c.session.CommitTxn(c.session.GoCtx())
err = c.session.CommitTxn(goctx.TODO())
if err != nil {
return errors.Trace(err)
}
Expand Down
18 changes: 3 additions & 15 deletions ddl/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,7 @@ func (t DelRangeTask) Range() ([]byte, []byte) {
// LoadDeleteRanges loads delete range tasks from gc_delete_range table.
func LoadDeleteRanges(ctx context.Context, safePoint uint64) (ranges []DelRangeTask, _ error) {
sql := fmt.Sprintf(loadDeleteRangeSQL, safePoint)
goCtx := ctx.GoCtx()
if goCtx == nil {
goCtx = goctx.Background()
}
rss, err := ctx.(sqlexec.SQLExecutor).Execute(goCtx, sql)
rss, err := ctx.(sqlexec.SQLExecutor).Execute(goctx.TODO(), sql)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -84,11 +80,7 @@ func LoadDeleteRanges(ctx context.Context, safePoint uint64) (ranges []DelRangeT
// NOTE: This function WILL NOT start and run in a new transaction internally.
func CompleteDeleteRange(ctx context.Context, dr DelRangeTask) error {
sql := fmt.Sprintf(completeDeleteRangeSQL, dr.JobID, dr.ElementID)
goCtx := ctx.GoCtx()
if goCtx == nil {
goCtx = goctx.Background()
}
_, err := ctx.(sqlexec.SQLExecutor).Execute(goCtx, sql)
_, err := ctx.(sqlexec.SQLExecutor).Execute(goctx.TODO(), sql)
return errors.Trace(err)
}

Expand All @@ -97,10 +89,6 @@ func UpdateDeleteRange(ctx context.Context, dr DelRangeTask, newStartKey, oldSta
newStartKeyHex := hex.EncodeToString(newStartKey)
oldStartKeyHex := hex.EncodeToString(oldStartKey)
sql := fmt.Sprintf(updateDeleteRangeSQL, newStartKeyHex, dr.JobID, dr.ElementID, oldStartKeyHex)
goCtx := ctx.GoCtx()
if goCtx == nil {
goCtx = goctx.Background()
}
_, err := ctx.(sqlexec.SQLExecutor).Execute(goCtx, sql)
_, err := ctx.(sqlexec.SQLExecutor).Execute(goctx.TODO(), sql)
return errors.Trace(err)
}
24 changes: 12 additions & 12 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
goctx "golang.org/x/net/context"
)

type processinfoSetter interface {
Expand All @@ -54,7 +55,7 @@ func (a *recordSet) Fields() []*ast.ResultField {
for _, col := range a.executor.Schema().Columns {
dbName := col.DBName.O
if dbName == "" && col.TblName.L != "" {
dbName = a.stmt.ctx.GetSessionVars().CurrentDB
dbName = a.stmt.Ctx.GetSessionVars().CurrentDB
}
rf := &ast.ResultField{
ColumnAsName: col.ColName,
Expand All @@ -80,13 +81,13 @@ func (a *recordSet) Next() (types.Row, error) {
}
if row == nil {
if a.stmt != nil {
a.stmt.ctx.GetSessionVars().LastFoundRows = a.stmt.ctx.GetSessionVars().StmtCtx.FoundRows()
a.stmt.Ctx.GetSessionVars().LastFoundRows = a.stmt.Ctx.GetSessionVars().StmtCtx.FoundRows()
}
return nil, nil
}

if a.stmt != nil {
a.stmt.ctx.GetSessionVars().StmtCtx.AddFoundRows(1)
a.stmt.Ctx.GetSessionVars().StmtCtx.AddFoundRows(1)
}
return row, nil
}
Expand All @@ -100,12 +101,12 @@ func (a *recordSet) NextChunk(chk *chunk.Chunk) error {
numRows := chk.NumRows()
if numRows == 0 {
if a.stmt != nil {
a.stmt.ctx.GetSessionVars().LastFoundRows = a.stmt.ctx.GetSessionVars().StmtCtx.FoundRows()
a.stmt.Ctx.GetSessionVars().LastFoundRows = a.stmt.Ctx.GetSessionVars().StmtCtx.FoundRows()
}
return nil
}
if a.stmt != nil {
a.stmt.ctx.GetSessionVars().StmtCtx.AddFoundRows(uint64(numRows))
a.stmt.Ctx.GetSessionVars().StmtCtx.AddFoundRows(uint64(numRows))
}
return nil
}
Expand Down Expand Up @@ -140,7 +141,7 @@ type ExecStmt struct {
// Text represents the origin query text.
Text string

ctx context.Context
Ctx context.Context
startTime time.Time
isPreparedStmt bool

Expand All @@ -167,10 +168,9 @@ func (a *ExecStmt) IsReadOnly() bool {
// This function builds an Executor from a plan. If the Executor doesn't return result,
// like the INSERT, UPDATE statements, it executes in this function, if the Executor returns
// result, execution is done after this function returns, in the returned ast.RecordSet Next method.
func (a *ExecStmt) Exec(ctx context.Context) (ast.RecordSet, error) {
func (a *ExecStmt) Exec(goCtx goctx.Context) (ast.RecordSet, error) {
a.startTime = time.Now()
a.ctx = ctx

ctx := a.Ctx
if _, ok := a.Plan.(*plan.Analyze); ok && ctx.GetSessionVars().InRestrictedSQL {
oriStats := ctx.GetSessionVars().Systems[variable.TiDBBuildStatsConcurrency]
oriScan := ctx.GetSessionVars().DistSQLScanConcurrency
Expand All @@ -193,7 +193,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (ast.RecordSet, error) {
return nil, errors.Trace(err)
}

if err := e.Open(); err != nil {
if err := e.Open(goCtx); err != nil {
return nil, errors.Trace(err)
}

Expand Down Expand Up @@ -323,8 +323,8 @@ func (a *ExecStmt) logSlowQuery(txnTS uint64, succ bool) {
if len(sql) > cfg.Log.QueryLogMaxLen {
sql = fmt.Sprintf("%.*q(len:%d)", cfg.Log.QueryLogMaxLen, sql, len(a.Text))
}
connID := a.ctx.GetSessionVars().ConnectionID
currentDB := a.ctx.GetSessionVars().CurrentDB
connID := a.Ctx.GetSessionVars().ConnectionID
currentDB := a.Ctx.GetSessionVars().CurrentDB
logEntry := log.NewEntry(logutil.SlowQueryLogger)
logEntry.Data = log.Fields{
"connectionId": connID,
Expand Down
9 changes: 5 additions & 4 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/mvmap"
goctx "golang.org/x/net/context"
)

type aggCtxsMapper map[string][]*aggregation.AggEvaluateContext
Expand Down Expand Up @@ -52,12 +53,12 @@ func (e *HashAggExec) Close() error {
}

// Open implements the Executor Open interface.
func (e *HashAggExec) Open() error {
func (e *HashAggExec) Open(goCtx goctx.Context) error {
e.executed = false
e.groupMap = mvmap.NewMVMap()
e.groupIterator = e.groupMap.NewIterator()
e.aggCtxsMap = make(aggCtxsMapper, 0)
return errors.Trace(e.children[0].Open())
return errors.Trace(e.children[0].Open(goCtx))
}

// Next implements the Executor Next interface.
Expand Down Expand Up @@ -172,11 +173,11 @@ type StreamAggExec struct {
}

// Open implements the Executor Open interface.
func (e *StreamAggExec) Open() error {
func (e *StreamAggExec) Open(goCtx goctx.Context) error {
e.executed = false
e.hasData = false
e.aggCtxs = make([]*aggregation.AggEvaluateContext, 0, len(e.AggFuncs))
return errors.Trace(e.children[0].Open())
return errors.Trace(e.children[0].Open(goCtx))
}

// Next implements the Executor Next interface.
Expand Down
13 changes: 8 additions & 5 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tipb/go-tipb"
goctx "golang.org/x/net/context"
)

var _ Executor = &AnalyzeExec{}
Expand All @@ -51,7 +52,7 @@ const (
)

// Open implements the Executor Open interface.
func (e *AnalyzeExec) Open() error {
func (e *AnalyzeExec) Open(goctx.Context) error {
return nil
}

Expand Down Expand Up @@ -193,11 +194,12 @@ func (e *AnalyzeIndexExec) open() error {
Build()
kvReq.Concurrency = e.concurrency
kvReq.IsolationLevel = kv.RC
e.result, err = distsql.Analyze(e.ctx.GoCtx(), e.ctx.GetClient(), kvReq)
goCtx := goctx.TODO()
e.result, err = distsql.Analyze(goCtx, e.ctx.GetClient(), kvReq)
if err != nil {
return errors.Trace(err)
}
e.result.Fetch(e.ctx.GoCtx())
e.result.Fetch(goCtx)
return nil
}

Expand Down Expand Up @@ -286,11 +288,12 @@ func (e *AnalyzeColumnsExec) open() error {
if err != nil {
return errors.Trace(err)
}
e.result, err = distsql.Analyze(e.ctx.GoCtx(), e.ctx.GetClient(), kvReq)
goCtx := goctx.TODO()
e.result, err = distsql.Analyze(goCtx, e.ctx.GetClient(), kvReq)
if err != nil {
return errors.Trace(err)
}
e.result.Fetch(e.ctx.GoCtx())
e.result.Fetch(goCtx)
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1186,7 +1186,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForDatums(goCtx goctx.Context,
if err != nil {
return nil, errors.Trace(err)
}
e.result, err = distsql.SelectDAG(e.ctx.GoCtx(), e.ctx.GetClient(), kvReq, e.schema.GetTypes(), builder.ctx.GetSessionVars().GetTimeZone())
e.result, err = distsql.SelectDAG(goCtx, e.ctx.GetClient(), kvReq, e.schema.GetTypes(), builder.ctx.GetSessionVars().GetTimeZone())
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -1203,6 +1203,6 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForDatums(goCtx goctx.Co
if err != nil {
return nil, errors.Trace(err)
}
err = e.open(kvRanges)
err = e.open(goCtx, kvRanges)
return e, errors.Trace(err)
}
21 changes: 11 additions & 10 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,27 @@ import (
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/plan"
goctx "golang.org/x/net/context"
)

// Compiler compiles an ast.StmtNode to a physical plan.
type Compiler struct {
Ctx context.Context
}

// Compile compiles an ast.StmtNode to a physical plan.
func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStmt, error) {
if ctx.GoCtx() != nil {
if span := opentracing.SpanFromContext(ctx.GoCtx()); span != nil {
span1 := opentracing.StartSpan("executor.Compile", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}
func (c *Compiler) Compile(goCtx goctx.Context, stmtNode ast.StmtNode) (*ExecStmt, error) {
if span := opentracing.SpanFromContext(goCtx); span != nil {
span1 := opentracing.StartSpan("executor.Compile", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}

infoSchema := GetInfoSchema(ctx)
if err := plan.Preprocess(ctx, stmtNode, infoSchema, false); err != nil {
infoSchema := GetInfoSchema(c.Ctx)
if err := plan.Preprocess(c.Ctx, stmtNode, infoSchema, false); err != nil {
return nil, errors.Trace(err)
}

finalPlan, err := plan.Optimize(ctx, stmtNode, infoSchema)
finalPlan, err := plan.Optimize(c.Ctx, stmtNode, infoSchema)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -54,10 +54,11 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
return &ExecStmt{
InfoSchema: infoSchema,
Plan: finalPlan,
Expensive: stmtCount(stmtNode, finalPlan, ctx.GetSessionVars().InRestrictedSQL),
Expensive: stmtCount(stmtNode, finalPlan, c.Ctx.GetSessionVars().InRestrictedSQL),
Cacheable: plan.Cacheable(stmtNode),
Text: stmtNode.Text(),
ReadOnly: ast.IsReadOnly(readOnlyCheckStmt),
Ctx: c.Ctx,
}, nil
}

Expand Down
3 changes: 2 additions & 1 deletion executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessionctx/varsutil"
"github.com/pingcap/tidb/types"
goctx "golang.org/x/net/context"
)

// DDLExec represents a DDL executor.
Expand Down Expand Up @@ -85,7 +86,7 @@ func (e *DDLExec) Close() error {
}

// Open implements the Executor Open interface.
func (e *DDLExec) Open() error {
func (e *DDLExec) Open(goCtx goctx.Context) error {
return nil
}

Expand Down
Loading

0 comments on commit 02f6bb2

Please sign in to comment.