Skip to content

Commit

Permalink
*: concurrently begin a transaction and compile (pingcap#2393)
Browse files Browse the repository at this point in the history
  • Loading branch information
coocood authored Jan 5, 2017
1 parent a3b5a48 commit 36e9570
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 55 deletions.
4 changes: 4 additions & 0 deletions context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ type Context interface {
ClearValue(key fmt.Stringer)

GetSessionVars() *variable.SessionVars

// ActivePendingTxn receives the pending transaction from the transaction channel.
// It should be called right before we builds an executor.
ActivePendingTxn() error
}

type basicCtxType int
Expand Down
9 changes: 9 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ func (a *statement) SetText(text string) {
// like the INSERT, UPDATE statements, it executes in this function, if the Executor returns
// result, execution is done after this function returns, in the returned ast.RecordSet Next method.
func (a *statement) Exec(ctx context.Context) (ast.RecordSet, error) {
if _, ok := a.plan.(*plan.Execute); !ok {
// Do not sync transaction for Execute statement, because the real optimization work is done in
// "ExecuteExec.Build".
err := ctx.ActivePendingTxn()
if err != nil {
return nil, errors.Trace(err)
}
}

b := newExecutorBuilder(ctx, a.is)
e := b.build(a.plan)
if b.err != nil {
Expand Down
4 changes: 4 additions & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,10 @@ func init() {
// but the plan package cannot import the executor package because of the dependency cycle.
// So we assign a function implemented in the executor package to the plan package to avoid the dependency cycle.
plan.EvalSubquery = func(p plan.PhysicalPlan, is infoschema.InfoSchema, ctx context.Context) (d []types.Datum, err error) {
err = ctx.ActivePendingTxn()
if err != nil {
return d, errors.Trace(err)
}
e := &executorBuilder{is: is, ctx: ctx}
exec := e.build(p)
row, err := exec.Next()
Expand Down
3 changes: 1 addition & 2 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1195,10 +1195,9 @@ func (s *testSuite) TestAdapterStatement(c *C) {
defer testleak.AfterTest(c)()
se, err := tidb.CreateSession(s.store)
c.Check(err, IsNil)
se.GetSessionVars().TxnCtx.InfoSchema = sessionctx.GetDomain(se).InfoSchema()
compiler := &executor.Compiler{}
ctx := se.(context.Context)
c.Check(tidb.PrepareTxnCtx(ctx), IsNil)

stmtNode, err := s.ParseOneStmt("select 1", "", "")
c.Check(err, IsNil)
stmt, err := compiler.Compile(ctx, stmtNode)
Expand Down
4 changes: 4 additions & 0 deletions executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ func (e *ExecuteExec) Build() error {
if err != nil {
return errors.Trace(err)
}
err = e.Ctx.ActivePendingTxn()
if err != nil {
return errors.Trace(err)
}
b := newExecutorBuilder(e.Ctx, e.IS)
stmtExec := b.build(p)
if b.err != nil {
Expand Down
96 changes: 74 additions & 22 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ const unlimitedRetryCnt = -1

type session struct {
txn kv.Transaction // current transaction
txnCh chan *txnWithErr
values map[fmt.Stringer]interface{}
store kv.Storage
maxRetryCnt int // Max retry times. If maxRetryCnt <=0, there is no limitation for retry times.
Expand Down Expand Up @@ -245,6 +246,7 @@ func (s *session) RollbackTxn() error {
}
s.cleanRetryInfo()
s.txn = nil
s.txnCh = nil
s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, false)
return errors.Trace(err)
}
Expand Down Expand Up @@ -298,20 +300,18 @@ func (s *session) Retry() error {
var err error
retryCnt := 0
for {
err = PrepareTxnCtx(s)
if err == nil {
s.sessionVars.RetryInfo.ResetOffset()
for _, sr := range nh.history {
st := sr.st
txt := st.OriginText()
if len(txt) > sqlLogMaxLen {
txt = txt[:sqlLogMaxLen]
}
log.Warnf("Retry %s (len:%d)", txt, len(st.OriginText()))
_, err = st.Exec(s)
if err != nil {
break
}
s.prepareTxnCtx()
s.sessionVars.RetryInfo.ResetOffset()
for _, sr := range nh.history {
st := sr.st
txt := st.OriginText()
if len(txt) > sqlLogMaxLen {
txt = txt[:sqlLogMaxLen]
}
log.Warnf("Retry %s (len:%d)", txt, len(st.OriginText()))
_, err = st.Exec(s)
if err != nil {
break
}
}
if err == nil {
Expand All @@ -338,6 +338,7 @@ func (s *session) Retry() error {
// Unlike normal Exec, it doesn't reset statement status, doesn't commit or rollback the current transaction
// and doesn't write binlog.
func (s *session) ExecRestrictedSQL(ctx context.Context, sql string) (ast.RecordSet, error) {
s.prepareTxnCtx()
charset, collation := s.sessionVars.GetCharsetInfo()
rawStmts, err := s.ParseSQL(sql, charset, collation)
if err != nil {
Expand Down Expand Up @@ -366,7 +367,7 @@ func (s *session) ExecRestrictedSQL(ctx context.Context, sql string) (ast.Record
// getExecRet executes restricted sql and the result is one column.
// It returns a string value.
func (s *session) getExecRet(ctx context.Context, sql string) (string, error) {
cleanTxn := s.txn == nil
cleanTxn := s.txn == nil && s.txnCh == nil
rs, err := s.ExecRestrictedSQL(ctx, sql)
if err != nil {
return "", errors.Trace(err)
Expand All @@ -387,6 +388,7 @@ func (s *session) getExecRet(ctx context.Context, sql string) (string, error) {
// This function has some side effect. Run select may create new txn.
// We should make environment unchanged.
s.txn = nil
s.txnCh = nil
}
return value, nil
}
Expand Down Expand Up @@ -422,6 +424,7 @@ func (s *session) ParseSQL(sql, charset, collation string) ([]ast.StmtNode, erro
}

func (s *session) Execute(sql string) ([]ast.RecordSet, error) {
s.prepareTxnCtx()
startTS := time.Now()
charset, collation := s.sessionVars.GetCharsetInfo()
connID := s.sessionVars.ConnectionID
Expand All @@ -435,6 +438,7 @@ func (s *session) Execute(sql string) ([]ast.RecordSet, error) {
var rs []ast.RecordSet
ph := sessionctx.GetDomain(s).PerfSchema()
for i, rst := range rawStmts {
s.prepareTxnCtx()
startTS := time.Now()
// Some execution is done in compile stage, so we reset it before compile.
resetStmtCtx(s, rawStmts[0])
Expand Down Expand Up @@ -471,8 +475,9 @@ func (s *session) Execute(sql string) ([]ast.RecordSet, error) {

// For execute prepare statement in binary protocol
func (s *session) PrepareStmt(sql string) (stmtID uint32, paramCount int, fields []*ast.ResultField, err error) {
if err := PrepareTxnCtx(s); err != nil {
return 0, 0, nil, errors.Trace(err)
if s.sessionVars.TxnCtx.InfoSchema == nil {
// We don't need to create a transaction for prepare statement, just get information schema will do.
s.sessionVars.TxnCtx.InfoSchema = sessionctx.GetDomain(s).InfoSchema()
}
prepareExec := &executor.PrepareExec{
IS: executor.GetInfoSchema(s),
Expand Down Expand Up @@ -535,11 +540,7 @@ func (s *session) ExecutePreparedStmt(stmtID uint32, args ...interface{}) (ast.R
if err != nil {
return nil, errors.Trace(err)
}
err = PrepareTxnCtx(s)
if err != nil {
s.RollbackTxn()
return nil, errors.Trace(err)
}
s.prepareTxnCtx()
st := executor.CompileExecutePreparedStmt(s, stmtID, args...)
r, err := runStmt(s, st)
return r, errors.Trace(err)
Expand Down Expand Up @@ -840,3 +841,54 @@ func (s *session) loadCommonGlobalVariablesIfNeeded() error {
vars.CommonGlobalLoaded = true
return nil
}

type txnWithErr struct {
txn kv.Transaction
err error
}

// prepareTxnCtx starts a goroutine to begin a transaction if needed, and creates a new transaction context.
// It is called before we execute a sql query.
func (s *session) prepareTxnCtx() {
if s.txn != nil && s.txn.Valid() {
return
}
if s.txnCh != nil {
return
}
txnCh := make(chan *txnWithErr, 1)
go func() {
txn, err := s.store.Begin()
txnCh <- &txnWithErr{txn: txn, err: err}
}()
s.txnCh = txnCh
is := sessionctx.GetDomain(s).InfoSchema()
s.sessionVars.TxnCtx = &variable.TransactionContext{
InfoSchema: is,
SchemaVersion: is.SchemaMetaVersion(),
}
if !s.sessionVars.IsAutocommit() {
s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true)
}
}

// ActivePendingTxn implements Session.ActivePendingTxn interface.
func (s *session) ActivePendingTxn() error {
if s.txn != nil && s.txn.Valid() {
return nil
}
if s.txnCh == nil {
return errors.New("transaction channel is not set")
}
txnWithErr := <-s.txnCh
s.txnCh = nil
if txnWithErr.err != nil {
return errors.Trace(txnWithErr.err)
}
s.txn = txnWithErr.txn
err := s.loadCommonGlobalVariablesIfNeeded()
if err != nil {
return errors.Trace(err)
}
return nil
}
31 changes: 0 additions & 31 deletions tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/localstore"
"github.com/pingcap/tidb/store/localstore/engine"
Expand Down Expand Up @@ -144,10 +142,6 @@ func resetStmtCtx(ctx context.Context, s ast.StmtNode) {

// Compile is safe for concurrent use by multiple goroutines.
func Compile(ctx context.Context, rawStmt ast.StmtNode) (ast.Statement, error) {
err := PrepareTxnCtx(ctx)
if err != nil {
return nil, errors.Trace(err)
}
compiler := executor.Compiler{}
st, err := compiler.Compile(ctx, rawStmt)
if err != nil {
Expand All @@ -156,31 +150,6 @@ func Compile(ctx context.Context, rawStmt ast.StmtNode) (ast.Statement, error) {
return st, nil
}

// PrepareTxnCtx resets transaction context if session is not in a transaction.
func PrepareTxnCtx(ctx context.Context) error {
se := ctx.(*session)
if se.txn == nil || !se.txn.Valid() {
is := sessionctx.GetDomain(ctx).InfoSchema()
se.sessionVars.TxnCtx = &variable.TransactionContext{
InfoSchema: is,
SchemaVersion: is.SchemaMetaVersion(),
}
var err error
se.txn, err = se.store.Begin()
if err != nil {
return errors.Trace(err)
}
err = se.loadCommonGlobalVariablesIfNeeded()
if err != nil {
return errors.Trace(err)
}
if !se.sessionVars.IsAutocommit() {
se.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true)
}
}
return nil
}

// runStmt executes the ast.Statement and commit or rollback the current transaction.
func runStmt(ctx context.Context, s ast.Statement) (ast.RecordSet, error) {
var err error
Expand Down
15 changes: 15 additions & 0 deletions util/mock/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,21 @@ func (c *Context) NewTxn() error {
return nil
}

// ActivePendingTxn implements the context.Context interface.
func (c *Context) ActivePendingTxn() error {
if c.txn != nil {
return nil
}
if c.Store != nil {
txn, err := c.Store.Begin()
if err != nil {
return errors.Trace(err)
}
c.txn = txn
}
return nil
}

// NewContext creates a new mocked context.Context.
func NewContext() *Context {
return &Context{
Expand Down

0 comments on commit 36e9570

Please sign in to comment.