Skip to content

Commit

Permalink
*: begin a transaction in PrepareTxnCtx, unify in transaction. (pingc…
Browse files Browse the repository at this point in the history
…ap#2290)

Transaction should has the same life cycle as TxnCtx, so create a
new transaction in PrepareTxnCtx if transaction is nil or invalid.
  • Loading branch information
coocood authored Dec 21, 2016
1 parent e49ca03 commit 0adabe5
Show file tree
Hide file tree
Showing 18 changed files with 114 additions and 86 deletions.
6 changes: 0 additions & 6 deletions ast/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,6 @@ type ResultSetNode interface {
// If the Exec method requires any Execution domain local data,
// they must be held out of the implementing instance.
type Statement interface {
// Explain gets the execution plans.
//Explain(ctx context.Context, w format.Formatter)

// IsDDL shows whether the statement is an DDL operation.
IsDDL() bool

// OriginText gets the origin SQL text.
OriginText() string

Expand Down
11 changes: 3 additions & 8 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,9 @@ func (a *recordSet) Close() error {
// statement implements the ast.Statement interface, it builds a plan.Plan to an ast.Statement.
type statement struct {
// The InfoSchema cannot change during execution, so we hold a reference to it.
is infoschema.InfoSchema
plan plan.Plan
text string
isDDL bool
is infoschema.InfoSchema
plan plan.Plan
text string
}

func (a *statement) OriginText() string {
Expand All @@ -78,10 +77,6 @@ func (a *statement) SetText(text string) {
return
}

func (a *statement) IsDDL() bool {
return a.isDDL
}

// Exec implements the ast.Statement Exec interface.
// This function builds an Executor from a plan. If the Executor doesn't return result,
// like the INSERT, UPDATE statements, it executes in this function, if the Executor returns
Expand Down
26 changes: 24 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/inspectkv"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/plan"
Expand Down Expand Up @@ -123,10 +124,31 @@ func (b *executorBuilder) build(p plan.Plan) Executor {
}

func (b *executorBuilder) buildShowDDL(v *plan.ShowDDL) Executor {
return &ShowDDLExec{
// We get DDLInfo here because for Executors that returns result set,
// next will be called after transaction has been committed.
// We need the transaction to get DDLInfo.
e := &ShowDDLExec{
ctx: b.ctx,
schema: v.GetSchema(),
}
txn, err := e.ctx.GetTxn(false)
if err != nil {
b.err = errors.Trace(err)
return nil
}
ddlInfo, err := inspectkv.GetDDLInfo(txn)
if err != nil {
b.err = errors.Trace(err)
return nil
}
bgInfo, err := inspectkv.GetBgDDLInfo(txn)
if err != nil {
b.err = errors.Trace(err)
return nil
}
e.ddlInfo = ddlInfo
e.bgInfo = bgInfo
return e
}

func (b *executorBuilder) buildCheckTable(v *plan.CheckTable) Executor {
Expand All @@ -146,7 +168,7 @@ func (b *executorBuilder) buildDeallocate(v *plan.Deallocate) Executor {

func (b *executorBuilder) buildSelectLock(v *plan.SelectLock) Executor {
src := b.build(v.GetChildByIndex(0))
if b.ctx.GetSessionVars().ShouldAutocommit() {
if !b.ctx.GetSessionVars().InTxn() {
// Locking of rows for update using SELECT FOR UPDATE only applies when autocommit
// is disabled (either by beginning transaction with START TRANSACTION or by setting
// autocommit to 0. If autocommit is enabled, the rows matching the specification are not locked.
Expand Down
8 changes: 3 additions & 5 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,10 @@ func (c *Compiler) Compile(ctx context.Context, node ast.StmtNode) (ast.Statemen
if err != nil {
return nil, errors.Trace(err)
}
_, isDDL := node.(ast.DDLNode)
sa := &statement{
is: is,
plan: p,
text: node.Text(),
isDDL: isDDL,
is: is,
plan: p,
text: node.Text(),
}
return sa, nil
}
Expand Down
42 changes: 15 additions & 27 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,11 @@ type Executor interface {

// ShowDDLExec represents a show DDL executor.
type ShowDDLExec struct {
schema expression.Schema
ctx context.Context
done bool
schema expression.Schema
ctx context.Context
ddlInfo *inspectkv.DDLInfo
bgInfo *inspectkv.DDLInfo
done bool
}

// Schema implements the Executor Schema interface.
Expand All @@ -132,42 +134,28 @@ func (e *ShowDDLExec) Next() (*Row, error) {
return nil, nil
}

txn, err := e.ctx.GetTxn(false)
if err != nil {
return nil, errors.Trace(err)
}

ddlInfo, err := inspectkv.GetDDLInfo(txn)
if err != nil {
return nil, errors.Trace(err)
}
bgInfo, err := inspectkv.GetBgDDLInfo(txn)
if err != nil {
return nil, errors.Trace(err)
}

var ddlOwner, ddlJob string
if ddlInfo.Owner != nil {
ddlOwner = ddlInfo.Owner.String()
if e.ddlInfo.Owner != nil {
ddlOwner = e.ddlInfo.Owner.String()
}
if ddlInfo.Job != nil {
ddlJob = ddlInfo.Job.String()
if e.ddlInfo.Job != nil {
ddlJob = e.ddlInfo.Job.String()
}

var bgOwner, bgJob string
if bgInfo.Owner != nil {
bgOwner = bgInfo.Owner.String()
if e.bgInfo.Owner != nil {
bgOwner = e.bgInfo.Owner.String()
}
if bgInfo.Job != nil {
bgJob = bgInfo.Job.String()
if e.bgInfo.Job != nil {
bgJob = e.bgInfo.Job.String()
}

row := &Row{}
row.Data = types.MakeDatums(
ddlInfo.SchemaVer,
e.ddlInfo.SchemaVer,
ddlOwner,
ddlJob,
bgInfo.SchemaVer,
e.bgInfo.SchemaVer,
bgOwner,
bgJob,
)
Expand Down
2 changes: 2 additions & 0 deletions executor/executor_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ func (e *DDLExec) Next() (*Row, error) {
if err != nil {
return nil, errors.Trace(err)
}
// DDL will force commit old transaction, after DDL, in transaction status should be false.
e.ctx.GetSessionVars().SetStatusFlag(mysql.ServerStatusInTrans, false)
e.done = true
return nil, nil
}
Expand Down
4 changes: 2 additions & 2 deletions executor/executor_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ func (s *testSuite) TestSetVar(c *C) {
vars := tk.Se.(context.Context).GetSessionVars()
tk.Se.CommitTxn()
tk.MustExec("set @@autocommit = 1")
c.Assert(vars.ShouldAutocommit(), IsTrue)
c.Assert(vars.IsAutocommit(), IsTrue)
tk.MustExec("set @@autocommit = 0")
c.Assert(vars.ShouldAutocommit(), IsFalse)
c.Assert(vars.IsAutocommit(), IsFalse)

tk.MustExec("set @@sql_mode = 'strict_trans_tables'")
c.Assert(vars.StrictSQLMode, IsTrue)
Expand Down
6 changes: 2 additions & 4 deletions executor/executor_simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (e *SimpleExec) Next() (*Row, error) {
case *ast.BeginStmt:
err = e.executeBegin(x)
case *ast.CommitStmt:
err = e.executeCommit(x)
e.executeCommit(x)
case *ast.RollbackStmt:
err = e.executeRollback(x)
case *ast.CreateUserStmt:
Expand Down Expand Up @@ -123,10 +123,8 @@ func (e *SimpleExec) executeBegin(s *ast.BeginStmt) error {
return nil
}

func (e *SimpleExec) executeCommit(s *ast.CommitStmt) error {
err := e.ctx.CommitTxn()
func (e *SimpleExec) executeCommit(s *ast.CommitStmt) {
e.ctx.GetSessionVars().SetStatusFlag(mysql.ServerStatusInTrans, false)
return errors.Trace(err)
}

func (e *SimpleExec) executeRollback(s *ast.RollbackStmt) error {
Expand Down
1 change: 0 additions & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1407,7 +1407,6 @@ func (s *testSuite) TestAdapterStatement(c *C) {
stmt, err := compiler.Compile(ctx, stmtNode)
c.Check(err, IsNil)
c.Check(stmt.OriginText(), Equals, "select 1")
c.Check(stmt.IsDDL(), IsFalse)

stmtNode, err = s.ParseOneStmt("create table t (a int)", "", "")
c.Check(err, IsNil)
Expand Down
3 changes: 3 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ type Transaction interface {
IsReadOnly() bool
// StartTS returns the transaction start timestamp.
StartTS() uint64
// Valid returns if the transaction is valid.
// A transaction become invalid after commit or rollback.
Valid() bool
}

// Client is used to send request to KV layer.
Expand Down
11 changes: 9 additions & 2 deletions kv/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import (

// mockTxn is a txn that returns a retryAble error when called Commit.
type mockTxn struct {
opts map[Option]interface{}
opts map[Option]interface{}
valid bool
}

// Always returns a retryable error.
Expand All @@ -28,6 +29,7 @@ func (t *mockTxn) Commit() error {
}

func (t *mockTxn) Rollback() error {
t.valid = false
return nil
}

Expand Down Expand Up @@ -79,13 +81,18 @@ func (t *mockTxn) Delete(k Key) error {
return nil
}

func (t *mockTxn) Valid() bool {
return t.valid
}

// mockStorage is used to start a must commit-failed txn.
type mockStorage struct {
}

func (s *mockStorage) Begin() (Transaction, error) {
tx := &mockTxn{
opts: make(map[Option]interface{}),
opts: make(map[Option]interface{}),
valid: true,
}
return tx, nil

Expand Down
20 changes: 10 additions & 10 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,11 +435,6 @@ func (s *session) SetGlobalSysVar(name string, value string) error {
return errors.Trace(err)
}

// IsAutocommit checks if it is in the auto-commit mode.
func (s *session) isAutocommit(ctx context.Context) bool {
return s.sessionVars.GetStatusFlag(mysql.ServerStatusAutocommit)
}

func (s *session) ParseSQL(sql, charset, collation string) ([]ast.StmtNode, error) {
return s.parser.Parse(sql, charset, collation)
}
Expand Down Expand Up @@ -467,6 +462,7 @@ func (s *session) Execute(sql string) ([]ast.RecordSet, error) {
st, err1 := Compile(s, rst)
if err1 != nil {
log.Warnf("[%d] compile error:\n%v\n%s", connID, err1, sql)
s.RollbackTxn()
return nil, errors.Trace(err1)
}
sessionExecuteCompileDuration.Observe(time.Since(startTS).Seconds())
Expand Down Expand Up @@ -499,7 +495,9 @@ func (s *session) PrepareStmt(sql string) (stmtID uint32, paramCount int, fields
if err := s.checkSchemaValidOrRollback(); err != nil {
return 0, 0, nil, errors.Trace(err)
}
PrepareTxnCtx(s)
if err := PrepareTxnCtx(s); err != nil {
return 0, 0, nil, errors.Trace(err)
}
prepareExec := &executor.PrepareExec{
IS: executor.GetInfoSchema(s),
Ctx: s,
Expand Down Expand Up @@ -564,7 +562,10 @@ func (s *session) ExecutePreparedStmt(stmtID uint32, args ...interface{}) (ast.R
if err != nil {
return nil, errors.Trace(err)
}
PrepareTxnCtx(s)
err = PrepareTxnCtx(s)
if err != nil {
return nil, errors.Trace(err)
}
st := executor.CompileExecutePreparedStmt(s, stmtID, args...)
r, err := runStmt(s, st)
return r, errors.Trace(err)
Expand Down Expand Up @@ -608,7 +609,7 @@ func (s *session) GetTxn(forceNew bool) (kv.Transaction, error) {
if err != nil {
return nil, errors.Trace(err)
}
ac := s.isAutocommit(s)
ac := s.sessionVars.IsAutocommit()
if !ac {
s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true)
}
Expand Down Expand Up @@ -813,8 +814,7 @@ const loadCommonGlobalVarsSQL = "select * from mysql.global_variables where vari
variable.DistSQLJoinConcurrencyVar + "', '" +
variable.DistSQLScanConcurrencyVar + "')"

// LoadCommonGlobalVariableIfNeeded loads and applies commonly used global variables for the session
// right before creating a transaction for the first time.
// LoadCommonGlobalVariableIfNeeded loads and applies commonly used global variables for the session.
func (s *session) loadCommonGlobalVariablesIfNeeded() error {
vars := s.sessionVars
if vars.CommonGlobalLoaded {
Expand Down
15 changes: 8 additions & 7 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,14 @@ func (s *SessionVars) GetStatusFlag(flag uint16) bool {
return s.Status&flag > 0
}

// ShouldAutocommit checks if current session should autocommit.
// With START TRANSACTION, autocommit remains disabled until you end
// the transaction with COMMIT or ROLLBACK.
func (s *SessionVars) ShouldAutocommit() bool {
isAutomcommit := s.GetStatusFlag(mysql.ServerStatusAutocommit)
inTransaction := s.GetStatusFlag(mysql.ServerStatusInTrans)
return isAutomcommit && !inTransaction
// InTxn returns if the session is in transaction.
func (s *SessionVars) InTxn() bool {
return s.GetStatusFlag(mysql.ServerStatusInTrans)
}

// IsAutocommit returns if the session is set to autocommit.
func (s *SessionVars) IsAutocommit() bool {
return s.GetStatusFlag(mysql.ServerStatusAutocommit)
}

// GetNextPreparedStmtID generates and returns the next session scope prepared statement id.
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/varsutil/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ func SetSystemVar(vars *variable.SessionVars, name string, value types.Datum) er
case variable.AutocommitVar:
isAutocommit := strings.EqualFold(sVal, "ON") || sVal == "1"
vars.SetStatusFlag(mysql.ServerStatusAutocommit, isAutocommit)
if isAutocommit {
vars.SetStatusFlag(mysql.ServerStatusInTrans, false)
}
case variable.TiDBSkipConstraintCheck:
vars.SkipConstraintCheck = (sVal == "1")
}
Expand Down
4 changes: 4 additions & 0 deletions store/localstore/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,7 @@ func (txn *dbTxn) IsReadOnly() bool {
func (txn *dbTxn) StartTS() uint64 {
return txn.tid
}

func (txn *dbTxn) Valid() bool {
return txn.valid
}
4 changes: 4 additions & 0 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,7 @@ func (txn *tikvTxn) IsReadOnly() bool {
func (txn *tikvTxn) StartTS() uint64 {
return txn.startTS
}

func (txn *tikvTxn) Valid() bool {
return txn.valid
}
2 changes: 1 addition & 1 deletion table/tables/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (ts *testSuite) TestBasic(c *C) {
_, err = tb.AddRecord(ctx, types.MakeDatums(1, "abc"))
c.Assert(err, IsNil)
c.Assert(indexCnt(), Greater, 0)

c.Assert(ctx.CommitTxn(), IsNil)
_, err = ts.se.Execute("drop table test.t")
c.Assert(err, IsNil)
}
Expand Down
Loading

0 comments on commit 0adabe5

Please sign in to comment.