Skip to content

Commit

Permalink
*: don't record binlog when statement rollback (pingcap#5699)
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored Jan 25, 2018
1 parent f763e87 commit 24333b3
Show file tree
Hide file tree
Showing 14 changed files with 110 additions and 53 deletions.
8 changes: 8 additions & 0 deletions context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/kvcache"
binlog "github.com/pingcap/tipb/go-binlog"
goctx "golang.org/x/net/context"
)

Expand Down Expand Up @@ -70,6 +71,13 @@ type Context interface {

// StoreQueryFeedback stores the query feedback.
StoreQueryFeedback(feedback interface{})

// StmtCommit flush all changes by the statement to the underlying transaction.
StmtCommit() error
// StmtRollback provides statement level rollback.
StmtRollback()
// StmtGetMutation gets the binlog mutation for current statement.
StmtGetMutation(int64) *binlog.TableMutation
}

type basicCtxType int
Expand Down
2 changes: 1 addition & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func checkCases(tests []testCase, ld *executor.LoadDataInfo,
c.Assert(data, DeepEquals, tt.restData,
Commentf("data1:%v, data2:%v, data:%v", string(tt.data1), string(tt.data2), string(data)))
}
terror.Log(ctx.Txn().StmtCommit())
terror.Log(ctx.StmtCommit())
err1 = ctx.Txn().Commit(goctx.Background())
c.Assert(err1, IsNil)
r := tk.MustQuery(selectSQL)
Expand Down
6 changes: 3 additions & 3 deletions executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (e *DeleteExec) deleteSingleTable(goCtx goctx.Context) error {
batchSize := e.ctx.GetSessionVars().DMLBatchSize
for {
if batchDelete && rowCount >= batchSize {
terror.Log(e.ctx.Txn().StmtCommit())
terror.Log(e.ctx.StmtCommit())
if err := e.ctx.NewTxn(); err != nil {
// We should return a special error for batch insert.
return ErrBatchInsertFail.Gen("BatchDelete failed with error: %v", err)
Expand Down Expand Up @@ -348,7 +348,7 @@ func (e *DeleteExec) deleteSingleTableByChunk(goCtx goctx.Context) error {

for chunkRow := iter.Begin(); chunkRow != iter.End(); chunkRow = iter.Next() {
if batchDelete && rowCount >= batchDMLSize {
terror.Log(e.ctx.Txn().StmtCommit())
terror.Log(e.ctx.StmtCommit())
if err = e.ctx.NewTxn(); err != nil {
// We should return a special error for batch insert.
return ErrBatchInsertFail.Gen("BatchDelete failed with error: %v", err)
Expand Down Expand Up @@ -901,7 +901,7 @@ func (e *InsertExec) exec(goCtx goctx.Context, rows [][]types.Datum) (Row, error
continue
}
if batchInsert && rowCount >= batchSize {
terror.Log(e.ctx.Txn().StmtCommit())
terror.Log(e.ctx.StmtCommit())
if err := e.ctx.NewTxn(); err != nil {
// We should return a special error for batch insert.
return nil, ErrBatchInsertFail.Gen("BatchInsert failed with error: %v", err)
Expand Down
10 changes: 6 additions & 4 deletions expression/builtin_time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,11 +884,13 @@ func (s *testEvaluatorSuite) TestSysDate(c *C) {
defer testleak.AfterTest(c)()
fc := funcs[ast.Sysdate]

ctx := mock.NewContext()
ctx.GetSessionVars().StmtCtx.TimeZone = time.Local
timezones := []types.Datum{types.NewDatum(1234), types.NewDatum(0)}
for _, timezone := range timezones {
// sysdate() result is not affected by "timestamp" session variable.
variable.SetSessionSystemVar(s.ctx.GetSessionVars(), "timestamp", timezone)
f, err := fc.getFunction(s.ctx, s.datumsToConstants(nil))
variable.SetSessionSystemVar(ctx.GetSessionVars(), "timestamp", timezone)
f, err := fc.getFunction(ctx, s.datumsToConstants(nil))
c.Assert(err, IsNil)
v, err := evalBuiltinFunc(f, nil)
last := time.Now()
Expand All @@ -898,14 +900,14 @@ func (s *testEvaluatorSuite) TestSysDate(c *C) {
}

last := time.Now()
f, err := fc.getFunction(s.ctx, s.datumsToConstants(types.MakeDatums(6)))
f, err := fc.getFunction(ctx, s.datumsToConstants(types.MakeDatums(6)))
c.Assert(err, IsNil)
v, err := evalBuiltinFunc(f, nil)
c.Assert(err, IsNil)
n := v.GetMysqlTime()
c.Assert(n.String(), GreaterEqual, last.Format(types.TimeFormat))

f, err = fc.getFunction(s.ctx, s.datumsToConstants(types.MakeDatums(-2)))
f, err = fc.getFunction(ctx, s.datumsToConstants(types.MakeDatums(-2)))
c.Assert(err, IsNil)
_, err = evalBuiltinFunc(f, nil)
c.Assert(err, NotNil)
Expand Down
5 changes: 0 additions & 5 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,6 @@ type Transaction interface {
GetMemBuffer() MemBuffer
// GetSnapshot returns the snapshot of this transaction.
GetSnapshot() Snapshot

// StmtCommit flush all changes by the statement to the underlying transaction.
StmtCommit() error
// StmtRollback provides statement level rollback.
StmtRollback()
}

// Client is used to send request to KV layer.
Expand Down
8 changes: 0 additions & 8 deletions kv/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,6 @@ func (t *mockTxn) Rollback() error {
return nil
}

func (t *mockTxn) StmtCommit() error {
return nil
}

func (t *mockTxn) StmtRollback() {
return
}

func (t *mockTxn) String() string {
return ""
}
Expand Down
2 changes: 1 addition & 1 deletion plan/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@ func (s *testPlanSuite) TestDAGPlanBuilderUnionScan(c *C) {
c.Assert(err, IsNil)
// Make txn not read only.
se.Txn().Set(kv.Key("AAA"), []byte("BBB"))
se.Txn().StmtCommit()
se.StmtCommit()
p, err := plan.Optimize(se, stmt, s.is)
c.Assert(err, IsNil)
c.Assert(plan.ToString(p), Equals, tt.best, Commentf("for %s", tt.sql))
Expand Down
2 changes: 1 addition & 1 deletion server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ func (cc *clientConn) handleLoadData(goCtx goctx.Context, loadDataInfo *executor
}

txn := loadDataInfo.Ctx.Txn()
terror.Log(txn.StmtCommit())
terror.Log(loadDataInfo.Ctx.StmtCommit())
if err != nil {
if txn != nil && txn.Valid() {
if err1 := txn.Rollback(); err1 != nil {
Expand Down
70 changes: 59 additions & 11 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,10 +476,10 @@ func (s *session) retry(goCtx goctx.Context, maxCnt int) error {
s.sessionVars.StmtCtx.ResetForRetry()
_, err = st.Exec(goCtx)
if err != nil {
s.Txn().StmtRollback()
s.StmtRollback()
break
}
terror.Log(s.Txn().StmtCommit())
terror.Log(s.StmtCommit())
}
if hook := ctx.Value("preCommitHook"); hook != nil {
// For testing purpose.
Expand Down Expand Up @@ -916,8 +916,8 @@ func (s *session) DropPreparedStmt(stmtID uint32) error {
type StmtTxn struct {
kv.Transaction

buf kv.MemBuffer
// TODO: Add binlog related data here.
buf kv.MemBuffer
mutations map[int64]*binlog.TableMutation
// TODO: DirtyDB need to be statement level too.
}

Expand Down Expand Up @@ -972,24 +972,70 @@ func (st *StmtTxn) SeekReverse(k kv.Key) (kv.Iterator, error) {
return kv.NewUnionIter(bufferIt, retrieverIt, true)
}

// StmtCommit implements the Transaction interface.
func (st *StmtTxn) StmtCommit() error {
err := kv.WalkMemBuffer(st.buf, func(k kv.Key, v []byte) error {
func (st *StmtTxn) cleanup() {
st.buf.Reset()
for key := range st.mutations {
delete(st.mutations, key)
}
}

// StmtCommit implements the context.Context interface.
func (s *session) StmtCommit() error {
defer s.StmtTxn.cleanup()
st := &s.StmtTxn
err := kv.WalkMemBuffer(s.buf, func(k kv.Key, v []byte) error {
if len(v) == 0 {
return errors.Trace(st.Transaction.Delete(k))
}
return errors.Trace(st.Transaction.Set(k, v))
})
st.buf.Reset()

// Need to flush binlog.
for tableID, delta := range st.mutations {
mutation := getBinlogMutation(s, tableID)
if err == nil {
mergeToMutation(mutation, delta)
}
}
return errors.Trace(err)
}

// StmtRollback implements the Transaction interface.
func (st *StmtTxn) StmtRollback() {
st.buf.Reset()
// StmtRollback implements the context.Context interface.
func (s *session) StmtRollback() {
s.StmtTxn.cleanup()
return
}

// StmtGetMutation implements the context.Context interface.
func (s *session) StmtGetMutation(tableID int64) *binlog.TableMutation {
st := &s.StmtTxn
if _, ok := st.mutations[tableID]; !ok {
st.mutations[tableID] = &binlog.TableMutation{TableId: tableID}
}
return st.mutations[tableID]
}

func getBinlogMutation(ctx context.Context, tableID int64) *binlog.TableMutation {
bin := binloginfo.GetPrewriteValue(ctx, true)
for i := range bin.Mutations {
if bin.Mutations[i].TableId == tableID {
return &bin.Mutations[i]
}
}
idx := len(bin.Mutations)
bin.Mutations = append(bin.Mutations, binlog.TableMutation{TableId: tableID})
return &bin.Mutations[idx]
}

func mergeToMutation(m1, m2 *binlog.TableMutation) {
m1.InsertedRows = append(m1.InsertedRows, m2.InsertedRows...)
m1.UpdatedRows = append(m1.UpdatedRows, m2.UpdatedRows...)
m1.DeletedIds = append(m1.DeletedIds, m2.DeletedIds...)
m1.DeletedPks = append(m1.DeletedPks, m2.DeletedPks...)
m1.DeletedRows = append(m1.DeletedRows, m2.DeletedRows...)
m1.Sequence = append(m1.Sequence, m2.Sequence...)
}

func (s *session) Txn() kv.Transaction {
if s.Transaction == nil {
return nil
Expand Down Expand Up @@ -1225,6 +1271,7 @@ func createSession(store kv.Storage) (*session, error) {
s.sessionVars.GlobalVarsAccessor = s
s.sessionVars.BinlogClient = binloginfo.GetPumpClient()
s.StmtTxn.buf = kv.NewMemDbBuffer(kv.DefaultTxnMembufCap)
s.StmtTxn.mutations = make(map[int64]*binlog.TableMutation)
return s, nil
}

Expand All @@ -1246,6 +1293,7 @@ func createSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, er
// session implements variable.GlobalVarAccessor. Bind it to ctx.
s.sessionVars.GlobalVarsAccessor = s
s.StmtTxn.buf = kv.NewMemDbBuffer(kv.DefaultTxnMembufCap)
s.StmtTxn.mutations = make(map[int64]*binlog.TableMutation)
return s, nil
}

Expand Down
13 changes: 13 additions & 0 deletions sessionctx/binloginfo/binloginfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,19 @@ func (s *testBinlogSuite) TestBinlog(c *C) {
binlog.MutationType_Update,
})

// Test statement rollback.
tk.MustExec("create table local_binlog5 (c1 int primary key")
tk.MustExec("begin")
tk.MustExec("insert into local_binlog5 value (1)")
// This statement execute fail and should not write binlog.
_, err := tk.Exec("insert into local_binlog5 value (4),(3),(1),(2)")
c.Assert(err, NotNil)
tk.MustExec("commit")
prewriteVal = getLatestBinlogPrewriteValue(c, pump)
c.Assert(prewriteVal.Mutations[0].Sequence, DeepEquals, []binlog.MutationType{
binlog.MutationType_Insert,
})

checkBinlogCount(c, pump)

pump.mu.Lock()
Expand Down
7 changes: 0 additions & 7 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,6 @@ func (txn *tikvTxn) Get(k kv.Key) ([]byte, error) {
return ret, nil
}

func (txn *tikvTxn) StmtCommit() error {
return nil
}

func (txn *tikvTxn) StmtRollback() {
}

func (txn *tikvTxn) Set(k kv.Key, v []byte) error {
txn.setCnt++

Expand Down
11 changes: 1 addition & 10 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
Expand Down Expand Up @@ -812,15 +811,7 @@ func shouldWriteBinlog(ctx context.Context) bool {
}

func (t *Table) getMutation(ctx context.Context) *binlog.TableMutation {
bin := binloginfo.GetPrewriteValue(ctx, true)
for i := range bin.Mutations {
if bin.Mutations[i].TableId == t.ID {
return &bin.Mutations[i]
}
}
idx := len(bin.Mutations)
bin.Mutations = append(bin.Mutations, binlog.TableMutation{TableId: t.ID})
return &bin.Mutations[idx]
return ctx.StmtGetMutation(t.ID)
}

// canSkip is for these cases, we can skip the columns in encoded row:
Expand Down
4 changes: 2 additions & 2 deletions tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,9 @@ func runStmt(goCtx goctx.Context, ctx context.Context, s ast.Statement) (ast.Rec
GetHistory(ctx).Add(0, s, se.sessionVars.StmtCtx)
if txn := ctx.Txn(); txn != nil {
if err != nil {
txn.StmtRollback()
ctx.StmtRollback()
} else {
terror.Log(txn.StmtCommit())
terror.Log(ctx.StmtCommit())
}
}
}
Expand Down
15 changes: 15 additions & 0 deletions util/mock/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/kvcache"
binlog "github.com/pingcap/tipb/go-binlog"
goctx "golang.org/x/net/context"
)

Expand Down Expand Up @@ -188,6 +189,20 @@ func (c *Context) GoCtx() goctx.Context {
// StoreQueryFeedback stores the query feedback.
func (c *Context) StoreQueryFeedback(_ interface{}) {}

// StmtCommit implements the context.Context interface.
func (c *Context) StmtCommit() error {
return nil
}

// StmtRollback implements the context.Context interface.
func (c *Context) StmtRollback() {
}

// StmtGetMutation implements the context.Context interface.
func (c *Context) StmtGetMutation(tableID int64) *binlog.TableMutation {
return nil
}

// NewContext creates a new mocked context.Context.
func NewContext() *Context {
goCtx, cancel := goctx.WithCancel(goctx.Background())
Expand Down

0 comments on commit 24333b3

Please sign in to comment.