diff --git a/executor/builder.go b/executor/builder.go index 2fc08909e5d02..2659c4804d49e 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -133,7 +133,6 @@ func (b *executorBuilder) buildCheckTable(v *plan.CheckTable) Executor { return &CheckTableExec{ tables: v.Tables, ctx: b.ctx, - is: b.is, } } @@ -221,7 +220,7 @@ func (b *executorBuilder) buildSimple(v *plan.Simple) Executor { case *ast.GrantStmt: return b.buildGrant(s) } - return &SimpleExec{Statement: v.Statement, ctx: b.ctx, is: b.is} + return &SimpleExec{Statement: v.Statement, ctx: b.ctx} } func (b *executorBuilder) buildSet(v *plan.Set) Executor { @@ -306,7 +305,6 @@ func (b *executorBuilder) buildGrant(grant *ast.GrantStmt) Executor { ObjectType: grant.ObjectType, Level: grant.Level, Users: grant.Users, - is: b.is, } } diff --git a/executor/compiler.go b/executor/compiler.go index 44aef6ccfcddf..a68899fceb54a 100644 --- a/executor/compiler.go +++ b/executor/compiler.go @@ -20,6 +20,8 @@ import ( "github.com/pingcap/tidb/context" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/plan" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/binloginfo" ) // Compiler compiles an ast.StmtNode to a stmt.Statement. @@ -152,7 +154,15 @@ func getSelectStmtLabel(x *ast.SelectStmt) string { // then wrappped to an adapter *statement as stmt.Statement. func (c *Compiler) Compile(ctx context.Context, node ast.StmtNode) (ast.Statement, error) { stmtCount(node) - is := GetInfoSchema(ctx) + var is infoschema.InfoSchema + sessVar := ctx.GetSessionVars() + if snap := sessVar.SnapshotInfoschema; snap != nil { + is = snap.(infoschema.InfoSchema) + log.Infof("[%d] use snapshot schema %d", sessVar.ConnectionID, is.SchemaMetaVersion()) + } else { + is = sessionctx.GetDomain(ctx).InfoSchema() + binloginfo.SetSchemaVersion(ctx, is.SchemaMetaVersion()) + } if err := plan.Preprocess(node, is, ctx); err != nil { return nil, errors.Trace(err) } @@ -173,17 +183,3 @@ func (c *Compiler) Compile(ctx context.Context, node ast.StmtNode) (ast.Statemen } return sa, nil } - -// GetInfoSchema gets TxnCtx InfoSchema if snapshot schema is not set, -// Otherwise, snapshot schema is returned. -func GetInfoSchema(ctx context.Context) infoschema.InfoSchema { - sessVar := ctx.GetSessionVars() - var is infoschema.InfoSchema - if snap := sessVar.SnapshotInfoschema; snap != nil { - is = snap.(infoschema.InfoSchema) - log.Infof("[%d] use snapshot schema %d", sessVar.ConnectionID, is.SchemaMetaVersion()) - } else { - is = sessVar.TxnCtx.InfoSchema.(infoschema.InfoSchema) - } - return is -} diff --git a/executor/executor.go b/executor/executor.go index c28b9b9959fa5..1017188d63474 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -29,6 +29,8 @@ import ( "github.com/pingcap/tidb/model" "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/plan" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/forupdate" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" @@ -188,7 +190,6 @@ type CheckTableExec struct { tables []*ast.TableName ctx context.Context done bool - is infoschema.InfoSchema } // Schema implements the Executor Schema interface. @@ -203,9 +204,10 @@ func (e *CheckTableExec) Next() (*Row, error) { } dbName := model.NewCIStr(e.ctx.GetSessionVars().CurrentDB) + is := sessionctx.GetDomain(e.ctx).InfoSchema() for _, t := range e.tables { - tb, err := e.is.TableByName(dbName, t.Name) + tb, err := is.TableByName(dbName, t.Name) if err != nil { return nil, errors.Trace(err) } @@ -258,7 +260,7 @@ func (e *SelectLockExec) Next() (*Row, error) { return nil, nil } if len(row.RowKeys) != 0 && e.Lock == ast.SelectLockForUpdate { - e.ctx.GetSessionVars().TxnCtx.ForUpdate = true + forupdate.SetForUpdate(e.ctx) txn, err := e.ctx.GetTxn(false) if err != nil { return nil, errors.Trace(err) diff --git a/executor/executor_simple.go b/executor/executor_simple.go index 9c68781137a17..9a3af72b5aa23 100644 --- a/executor/executor_simple.go +++ b/executor/executor_simple.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/model" "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/plan/statistics" + "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/terror" "github.com/pingcap/tidb/util" @@ -44,7 +45,6 @@ type SimpleExec struct { Statement ast.StmtNode ctx context.Context done bool - is infoschema.InfoSchema } // Schema implements the Executor Schema interface. @@ -97,7 +97,7 @@ func (e *SimpleExec) Close() error { func (e *SimpleExec) executeUse(s *ast.UseStmt) error { dbname := model.NewCIStr(s.DBName) - dbinfo, exists := e.is.SchemaByName(dbname) + dbinfo, exists := sessionctx.GetDomain(e.ctx).InfoSchema().SchemaByName(dbname) if !exists { return infoschema.ErrDatabaseNotExists.GenByArgs(dbname) } diff --git a/executor/executor_test.go b/executor/executor_test.go index 1ffe47ab3da0c..e5fb14f67febb 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -1400,7 +1400,6 @@ func (s *testSuite) TestAdapterStatement(c *C) { c.Check(err, IsNil) compiler := &executor.Compiler{} ctx := se.(context.Context) - tidb.PrepareTxnCtx(ctx) stmtNode, err := s.ParseOneStmt("select 1", "", "") c.Check(err, IsNil) diff --git a/executor/grant.go b/executor/grant.go index 4c077ba6b6b4a..b7bfaeb0140d6 100644 --- a/executor/grant.go +++ b/executor/grant.go @@ -21,9 +21,9 @@ import ( "github.com/pingcap/tidb/ast" "github.com/pingcap/tidb/context" "github.com/pingcap/tidb/expression" - "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/model" "github.com/pingcap/tidb/mysql" + "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/util/types" @@ -45,7 +45,6 @@ type GrantExec struct { Users []*ast.UserSpec ctx context.Context - is infoschema.InfoSchema done bool } @@ -495,7 +494,8 @@ func (e *GrantExec) getTargetSchema() (*model.DBInfo, error) { } //check if db exists schema := model.NewCIStr(dbName) - db, ok := e.is.SchemaByName(schema) + is := sessionctx.GetDomain(e.ctx).InfoSchema() + db, ok := is.SchemaByName(schema) if !ok { return nil, errors.Errorf("Unknown schema name: %s", dbName) } @@ -509,7 +509,8 @@ func (e *GrantExec) getTargetSchemaAndTable() (*model.DBInfo, table.Table, error return nil, nil, errors.Trace(err) } name := model.NewCIStr(e.Level.TableName) - tbl, err := e.is.TableByName(db.Name, name) + is := sessionctx.GetDomain(e.ctx).InfoSchema() + tbl, err := is.TableByName(db.Name, name) if err != nil { return nil, nil, errors.Trace(err) } diff --git a/executor/prepared.go b/executor/prepared.go index 3d2caab04216b..cf06442f008c6 100644 --- a/executor/prepared.go +++ b/executor/prepared.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/plan" + "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/util/sqlexec" ) @@ -282,7 +283,7 @@ func CompileExecutePreparedStmt(ctx context.Context, ID uint32, args ...interfac execPlan.UsingVars[i] = &expression.Constant{Value: value.Datum, RetType: &value.Type} } sa := &statement{ - is: GetInfoSchema(ctx), + is: sessionctx.GetDomain(ctx).InfoSchema(), plan: execPlan, } return sa diff --git a/executor/union_scan.go b/executor/union_scan.go index 2c867ec725e35..c1c93b4d71060 100644 --- a/executor/union_scan.go +++ b/executor/union_scan.go @@ -72,12 +72,21 @@ type dirtyTable struct { truncated bool } +type dirtyDBKeyType int + +func (u dirtyDBKeyType) String() string { + return "dirtyDBKeyType" +} + +// DirtyDBKey is the key to *dirtyDB for a context. +const DirtyDBKey dirtyDBKeyType = 1 + func getDirtyDB(ctx context.Context) *dirtyDB { var udb *dirtyDB - x := ctx.GetSessionVars().TxnCtx.DirtyDB + x := ctx.Value(DirtyDBKey) if x == nil { udb = &dirtyDB{tables: make(map[int64]*dirtyTable)} - ctx.GetSessionVars().TxnCtx.DirtyDB = udb + ctx.SetValue(DirtyDBKey, udb) } else { udb = x.(*dirtyDB) } diff --git a/kv/kv.go b/kv/kv.go index ebbef1d10bf5f..9dc47ae8dc33e 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -27,6 +27,8 @@ const ( // PresumeKeyNotExistsError is the option key for error. // When PresumeKeyNotExists is set and condition is not match, should throw the error. PresumeKeyNotExistsError + // RetryAttempts is the number of txn retry attempt. + RetryAttempts // BinlogData is the binlog data to write. BinlogData ) diff --git a/session.go b/session.go index c41f8b86e7a56..db9933f49b767 100644 --- a/session.go +++ b/session.go @@ -40,6 +40,7 @@ import ( "github.com/pingcap/tidb/privilege/privileges" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/binloginfo" + "github.com/pingcap/tidb/sessionctx/forupdate" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/sessionctx/varsutil" "github.com/pingcap/tidb/store/localstore" @@ -117,8 +118,11 @@ type session struct { schemaVerInCurrTxn int64 values map[fmt.Stringer]interface{} store kv.Storage + history stmtHistory maxRetryCnt int // Max retry times. If maxRetryCnt <=0, there is no limitation for retry times. + debugInfos map[string]interface{} // Vars for debug and unit tests. + // For performance_schema only. stmtState *perfschema.StatementState parser *parser.Parser @@ -191,6 +195,12 @@ func (s *session) AffectedRows() uint64 { return s.sessionVars.StmtCtx.AffectedRows() } +func (s *session) resetHistory() { + s.ClearValue(executor.DirtyDBKey) + s.ClearValue(forupdate.ForUpdateKey) + s.history.reset() +} + func (s *session) SetClientCapability(capability uint32) { s.sessionVars.ClientCapability = capability } @@ -199,14 +209,22 @@ func (s *session) SetConnectionID(connectionID uint64) { s.sessionVars.ConnectionID = connectionID } -func (s *session) doCommit() error { +func (s *session) finishTxn(rollback bool) error { + // transaction has already been committed or rolled back if s.txn == nil { return nil } defer func() { s.txn = nil s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, false) + binloginfo.ClearBinlog(s) }() + + if rollback { + s.resetHistory() + s.cleanRetryInfo() + return s.txn.Rollback() + } if binloginfo.PumpClient != nil { prewriteValue := binloginfo.GetPrewriteValue(s, false) if prewriteValue != nil { @@ -221,47 +239,45 @@ func (s *session) doCommit() error { s.txn.SetOption(kv.BinlogData, bin) } } + if err := s.checkSchemaValid(); err != nil { - err1 := s.txn.Rollback() - if err1 != nil { - log.Errorf("rollback txn failed, err:%v", err1) + if !s.sessionVars.RetryInfo.Retrying && s.isRetryableError(err) { + err = s.Retry() + } else { + err1 := s.txn.Rollback() + if err1 != nil { + // TODO: Handle this error. + log.Errorf("rollback txn failed, err:%v", errors.ErrorStack(err)) + } } + if err != nil { + log.Warnf("finished txn:%s, %v", s.txn, err) + } + s.resetHistory() + s.cleanRetryInfo() return errors.Trace(err) } if err := s.txn.Commit(); err != nil { - return errors.Trace(err) - } - return nil -} - -func (s *session) doCommitWithRetry() error { - err := s.doCommit() - if err != nil { - if s.isRetryableError(err) { + if !s.sessionVars.RetryInfo.Retrying && s.isRetryableError(err) { err = s.Retry() } + if err != nil { + log.Warnf("finished txn:%s, %v", s.txn, err) + return errors.Trace(err) + } } + + s.resetHistory() s.cleanRetryInfo() - if err != nil { - log.Warnf("finished txn:%s, %v", s.txn, err) - return errors.Trace(err) - } return nil } func (s *session) CommitTxn() error { - return s.doCommitWithRetry() + return s.finishTxn(false) } func (s *session) RollbackTxn() error { - if s.txn == nil { - return nil - } - s.cleanRetryInfo() - err := s.txn.Rollback() - s.txn = nil - s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, false) - return errors.Trace(err) + return s.finishTxn(true) } func (s *session) GetClient() kv.Client { @@ -302,18 +318,33 @@ func (s *session) isRetryableError(err error) bool { } func (s *session) Retry() error { - if s.sessionVars.TxnCtx.ForUpdate { - return errors.Errorf("can not retry select for update statement") - } s.sessionVars.RetryInfo.Retrying = true + nh := s.history.clone() + // Debug infos. + if len(nh.history) == 0 { + s.debugInfos[retryEmptyHistoryList] = true + } else { + s.debugInfos[retryEmptyHistoryList] = false + } defer func() { + s.history.history = nh.history s.sessionVars.RetryInfo.Retrying = false }() - nh := getHistory(s) + + if forUpdate := s.Value(forupdate.ForUpdateKey); forUpdate != nil { + return errors.Errorf("can not retry select for update statement") + } var err error retryCnt := 0 for { - PrepareTxnCtx(s) + s.sessionVars.RetryInfo.Attempts = retryCnt + 1 + s.resetHistory() + log.Info("RollbackTxn for retry txn.") + err = s.RollbackTxn() + if err != nil { + // TODO: handle this error. + log.Errorf("rollback txn failed, err:%v", errors.ErrorStack(err)) + } success := true s.sessionVars.RetryInfo.ResetOffset() for _, sr := range nh.history { @@ -323,7 +354,7 @@ func (s *session) Retry() error { txt = txt[:sqlLogMaxLen] } log.Warnf("Retry %s (len:%d)", txt, len(st.OriginText())) - _, err = st.Exec(s) + _, err = runStmt(s, st) if err != nil { if s.isRetryableError(err) { success = false @@ -334,7 +365,7 @@ func (s *session) Retry() error { } } if success { - err = s.doCommit() + err = s.CommitTxn() if !s.isRetryableError(err) { break } @@ -499,9 +530,8 @@ func (s *session) PrepareStmt(sql string) (stmtID uint32, paramCount int, fields if err := s.checkSchemaValidOrRollback(); err != nil { return 0, 0, nil, errors.Trace(err) } - PrepareTxnCtx(s) prepareExec := &executor.PrepareExec{ - IS: executor.GetInfoSchema(s), + IS: sessionctx.GetDomain(s).InfoSchema(), Ctx: s, SQLText: sql, } @@ -564,7 +594,6 @@ func (s *session) ExecutePreparedStmt(stmtID uint32, args ...interface{}) (ast.R if err != nil { return nil, errors.Trace(err) } - PrepareTxnCtx(s) st := executor.CompileExecutePreparedStmt(s, stmtID, args...) r, err := runStmt(s, st) return r, errors.Trace(err) @@ -597,6 +626,7 @@ func (s *session) GetTxn(forceNew bool) (kv.Transaction, error) { if err != nil { return nil, errors.Trace(err) } + s.resetHistory() } else if forceNew { err = s.CommitTxn() if err != nil { @@ -613,6 +643,11 @@ func (s *session) GetTxn(forceNew bool) (kv.Transaction, error) { s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true) } log.Infof("[%d] %s new txn:%s", s.sessionVars.ConnectionID, force, s.txn) + + retryInfo := s.sessionVars.RetryInfo + if retryInfo.Retrying { + s.txn.SetOption(kv.RetryAttempts, retryInfo.Attempts) + } return s.txn, nil } @@ -743,6 +778,7 @@ func createSession(store kv.Storage) (*session, error) { s := &session{ values: make(map[fmt.Stringer]interface{}), store: store, + debugInfos: make(map[string]interface{}), maxRetryCnt: 10, parser: parser.New(), sessionVars: variable.NewSessionVars(), diff --git a/session_test.go b/session_test.go index 7e8231fab6947..52697c80122d8 100644 --- a/session_test.go +++ b/session_test.go @@ -26,6 +26,8 @@ import ( "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/plan" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/sessionctx/varsutil" "github.com/pingcap/tidb/terror" "github.com/pingcap/tidb/util/testleak" "github.com/pingcap/tidb/util/types" @@ -2379,6 +2381,41 @@ func (s *testSessionSuite) TestSqlLogicTestCase(c *C) { mustExecMatch(c, se, sql, [][]interface{}{{"26"}}) } +func newSessionWithoutInit(c *C, store kv.Storage) *session { + s := &session{ + values: make(map[fmt.Stringer]interface{}), + store: store, + debugInfos: make(map[string]interface{}), + maxRetryCnt: 10, + sessionVars: variable.NewSessionVars(), + } + return s +} + +func (s *testSessionSuite) TestRetryAttempts(c *C) { + defer testleak.AfterTest(c)() + store := kv.NewMockStorage() + se := newSessionWithoutInit(c, store) + c.Assert(se, NotNil) + sv := se.sessionVars + // Prevent getting variable value from storage. + varsutil.SetSystemVar(se.sessionVars, "autocommit", types.NewDatum("ON")) + sv.CommonGlobalLoaded = true + + // Add retry info. + retryInfo := sv.RetryInfo + retryInfo.Retrying = true + retryInfo.Attempts = 10 + tx, err := se.GetTxn(true) + c.Assert(tx, NotNil) + c.Assert(err, IsNil) + mtx, ok := tx.(kv.MockTxn) + c.Assert(ok, IsTrue) + // Make sure RetryAttempts option is set. + cnt := mtx.GetOption(kv.RetryAttempts) + c.Assert(cnt.(int), Equals, retryInfo.Attempts) +} + func (s *testSessionSuite) TestXAggregateWithIndexScan(c *C) { initSQL := ` drop table IF EXISTS t; diff --git a/sessionctx/binloginfo/binloginfo.go b/sessionctx/binloginfo/binloginfo.go index b92a3f36729fe..f2b003e71b7f0 100644 --- a/sessionctx/binloginfo/binloginfo.go +++ b/sessionctx/binloginfo/binloginfo.go @@ -15,6 +15,7 @@ package binloginfo import ( "github.com/juju/errors" + "github.com/ngaut/log" "github.com/pingcap/tidb/context" "github.com/pingcap/tidb/kv" "github.com/pingcap/tipb/go-binlog" @@ -30,14 +31,43 @@ func init() { // shared by all sessions. var PumpClient binlog.PumpClient +// keyType is a dummy type to avoid naming collision in context. +type keyType int + +// String defines a Stringer function for debugging and pretty printing. +func (k keyType) String() string { + if k == schemaVersionKey { + return "schema_version" + } + return "binlog" +} + +const ( + schemaVersionKey keyType = 0 + binlogKey keyType = 1 +) + +// SetSchemaVersion sets schema version to the context. +func SetSchemaVersion(ctx context.Context, version int64) { + ctx.SetValue(schemaVersionKey, version) +} + +// GetSchemaVersion gets schema version in the context. +func GetSchemaVersion(ctx context.Context) int64 { + v, ok := ctx.Value(schemaVersionKey).(int64) + if !ok { + log.Error("get schema version failed") + } + return v +} + // GetPrewriteValue gets binlog prewrite value in the context. func GetPrewriteValue(ctx context.Context, createIfNotExists bool) *binlog.PrewriteValue { - vars := ctx.GetSessionVars() - v, ok := vars.TxnCtx.Binlog.(*binlog.PrewriteValue) + v, ok := ctx.Value(binlogKey).(*binlog.PrewriteValue) if !ok && createIfNotExists { - schemaVer := ctx.GetSessionVars().TxnCtx.SchemaVersion + schemaVer := GetSchemaVersion(ctx) v = &binlog.PrewriteValue{SchemaVersion: schemaVer} - vars.TxnCtx.Binlog = v + ctx.SetValue(binlogKey, v) } return v } @@ -62,3 +92,8 @@ func SetDDLBinlog(txn kv.Transaction, jobID int64, ddlQuery string) { } txn.SetOption(kv.BinlogData, bin) } + +// ClearBinlog clears binlog in the Context. +func ClearBinlog(ctx context.Context) { + ctx.ClearValue(binlogKey) +} diff --git a/sessionctx/forupdate/for_update_ctx.go b/sessionctx/forupdate/for_update_ctx.go new file mode 100644 index 0000000000000..f413bd1c50f56 --- /dev/null +++ b/sessionctx/forupdate/for_update_ctx.go @@ -0,0 +1,33 @@ +// Copyright 2015 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package forupdate record information for "select ... for update" statement +package forupdate + +import "github.com/pingcap/tidb/context" + +// A dummy type to avoid naming collision in context. +type forupdateKeyType int + +// String defines a Stringer function for debugging and pretty printing. +func (k forupdateKeyType) String() string { + return "for update" +} + +// ForUpdateKey is used to retrive "select for update" statement information +const ForUpdateKey forupdateKeyType = 0 + +// SetForUpdate set "select for update" flag. +func SetForUpdate(ctx context.Context) { + ctx.SetValue(ForUpdateKey, true) +} diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 4dd768269c38e..71acd1eb23451 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -38,6 +38,8 @@ type RetryInfo struct { Retrying bool currRetryOff int autoIncrementIDs []int64 + // Attempts is the current number of retry attempts. + Attempts int } // Clean does some clean work. @@ -46,6 +48,7 @@ func (r *RetryInfo) Clean() { if len(r.autoIncrementIDs) > 0 { r.autoIncrementIDs = r.autoIncrementIDs[:0] } + r.Attempts = 0 } // AddAutoIncrementID adds id to AutoIncrementIDs. @@ -69,16 +72,6 @@ func (r *RetryInfo) GetCurrAutoIncrementID() (int64, error) { return id, nil } -// TransactionContext is used to store variables that has transaction scope. -type TransactionContext struct { - ForUpdate bool - DirtyDB interface{} - Binlog interface{} - InfoSchema interface{} - Histroy interface{} - SchemaVersion int64 -} - // SessionVars is to handle user-defined or global variables in current session. type SessionVars struct { // user-defined variables @@ -93,8 +86,6 @@ type SessionVars struct { // retry information RetryInfo *RetryInfo - // Should be reset on transaction finished. - TxnCtx *TransactionContext // following variables are special for current session Status uint16 @@ -149,7 +140,6 @@ func NewSessionVars() *SessionVars { Systems: make(map[string]string), PreparedStmts: make(map[uint32]interface{}), PreparedStmtNameToID: make(map[string]uint32), - TxnCtx: &TransactionContext{}, RetryInfo: &RetryInfo{}, StrictSQLMode: true, Status: mysql.ServerStatusAutocommit, diff --git a/table/tables/tables_test.go b/table/tables/tables_test.go index d3a3ca7f5c676..3048e033b626b 100644 --- a/table/tables/tables_test.go +++ b/table/tables/tables_test.go @@ -203,7 +203,6 @@ func (ts *testSuite) TestUniqueIndexMultipleNullEntries(c *C) { c.Assert(err, IsNil) _, err = tb.AddRecord(ctx, types.MakeDatums(2, nil)) c.Assert(err, IsNil) - ctx.RollbackTxn() _, err = ts.se.Execute("drop table test.t") c.Assert(err, IsNil) } diff --git a/tidb.go b/tidb.go index af5c144623f3d..c4539fcd425d9 100644 --- a/tidb.go +++ b/tidb.go @@ -31,7 +31,6 @@ import ( "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser" - "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/localstore" "github.com/pingcap/tidb/store/localstore/engine" @@ -143,7 +142,6 @@ func resetStmtCtx(ctx context.Context, s ast.StmtNode) { // Compile is safe for concurrent use by multiple goroutines. func Compile(ctx context.Context, rawStmt ast.StmtNode) (ast.Statement, error) { - PrepareTxnCtx(ctx) compiler := executor.Compiler{} st, err := compiler.Compile(ctx, rawStmt) if err != nil { @@ -152,18 +150,6 @@ func Compile(ctx context.Context, rawStmt ast.StmtNode) (ast.Statement, error) { return st, nil } -// PrepareTxnCtx resets transaction context if session is not in a transaction. -func PrepareTxnCtx(ctx context.Context) { - se := ctx.(*session) - if se.txn == nil { - is := sessionctx.GetDomain(ctx).InfoSchema() - se.sessionVars.TxnCtx = &variable.TransactionContext{ - InfoSchema: is, - SchemaVersion: is.SchemaMetaVersion(), - } - } -} - // runStmt executes the ast.Statement and commit or rollback the current transaction. func runStmt(ctx context.Context, s ast.Statement) (ast.RecordSet, error) { var err error @@ -177,7 +163,7 @@ func runStmt(ctx context.Context, s ast.Statement) (ast.RecordSet, error) { rs, err = s.Exec(ctx) // All the history should be added here. se := ctx.(*session) - getHistory(ctx).add(0, s) + se.history.add(0, s) // MySQL DDL should be auto-commit. if s.IsDDL() || se.sessionVars.ShouldAutocommit() { if err != nil { @@ -190,16 +176,6 @@ func runStmt(ctx context.Context, s ast.Statement) (ast.RecordSet, error) { return rs, errors.Trace(err) } -func getHistory(ctx context.Context) *stmtHistory { - hist, ok := ctx.GetSessionVars().TxnCtx.Histroy.(*stmtHistory) - if ok { - return hist - } - hist = new(stmtHistory) - ctx.GetSessionVars().TxnCtx.Histroy = hist - return hist -} - // GetRows gets all the rows from a RecordSet. func GetRows(rs ast.RecordSet) ([][]types.Datum, error) { if rs == nil {