Skip to content

Commit

Permalink
sessionctx/binloginfo: write DDL binlog in the same way as DML binlog. (
Browse files Browse the repository at this point in the history
pingcap#1898)

Provide correct startTS and commitTS, and write rollback ddl binlog if transaction rollback.
  • Loading branch information
coocood authored Oct 31, 2016
1 parent d327907 commit da8bc01
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 99 deletions.
60 changes: 1 addition & 59 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tipb/go-binlog"
)

func (d *ddl) doDDLJob(ctx context.Context, job *model.Job) error {
Expand Down Expand Up @@ -102,37 +101,6 @@ func (d *ddl) doDDLJob(ctx context.Context, job *model.Job) error {
}
}

func (d *ddl) writePreDDLBinlog(job *model.Job, startTS uint64) error {
if binloginfo.PumpClient == nil {
return nil
}
bin := &binlog.Binlog{
Tp: binlog.BinlogType_PreDDL,
DdlJobId: job.ID,
StartTs: int64(startTS),
DdlQuery: []byte(job.Query),
}
err := binloginfo.WriteBinlog(bin)
return errors.Trace(err)
}

func (d *ddl) writePostDDLBinlog(jobID int64, startTS, commitTS uint64) {
if binloginfo.PumpClient == nil {
return
}
bin := &binlog.Binlog{
Tp: binlog.BinlogType_PostDDL,
DdlJobId: jobID,
StartTs: int64(startTS),
CommitTs: int64(commitTS),
}
err := binloginfo.WriteBinlog(bin)
if err != nil {
log.Errorf("failed to write PostDDL binlog %v", err)
}
return
}

func (d *ddl) getHistoryDDLJob(id int64) (*model.Job, error) {
var job *model.Job

Expand Down Expand Up @@ -295,7 +263,6 @@ func (d *ddl) handleDDLJobQueue() error {
}

waitTime := 2 * d.lease
var binlogStartTS uint64
var job *model.Job
err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
Expand Down Expand Up @@ -339,10 +306,7 @@ func (d *ddl) handleDDLJobQueue() error {
d.runDDLJob(t, job)

if job.IsFinished() {
err = d.writePreDDLBinlogIfNeeded(txn, job, &binlogStartTS)
if err != nil {
return errors.Trace(err)
}
binloginfo.SetDDLBinlog(txn, job.ID, job.Query)
err = d.finishDDLJob(t, job)
} else {
err = d.updateDDLJob(t, job)
Expand All @@ -364,12 +328,6 @@ func (d *ddl) handleDDLJobQueue() error {
// no job now, return and retry get later.
return nil
}
if binlogStartTS != 0 {
commitTS, err1 := d.store.CurrentVersion()
if err1 == nil {
d.writePostDDLBinlog(job.ID, binlogStartTS, commitTS.Ver)
}
}

d.hookMu.Lock()
d.hook.OnJobUpdated(job)
Expand All @@ -389,22 +347,6 @@ func (d *ddl) handleDDLJobQueue() error {
}
}

// writePreDDLBinlog writes preDDL binlog if job is done and the binlog has not been write before.
func (d *ddl) writePreDDLBinlogIfNeeded(txn kv.Transaction, job *model.Job, binlogStartTS *uint64) error {
if job.IsDone() {
// Avoid write multiple times.
if *binlogStartTS == 0 {
startTS := txn.StartTS()
err := d.writePreDDLBinlog(job, startTS)
if err != nil {
return errors.Trace(err)
}
*binlogStartTS = startTS
}
}
return nil
}

func chooseLeaseTime(n1 time.Duration, n2 time.Duration) time.Duration {
if n1 > 0 {
return n1
Expand Down
2 changes: 1 addition & 1 deletion kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ const (
PresumeKeyNotExistsError
// RetryAttempts is the number of txn retry attempt.
RetryAttempts
// BinlogData is the serialized bytes of Binlog prewrite value.
// BinlogData is the binlog data to write.
BinlogData
)

Expand Down
13 changes: 9 additions & 4 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/types"
"github.com/pingcap/tipb/go-binlog"
)

// Session context
Expand Down Expand Up @@ -198,13 +199,17 @@ func (s *session) finishTxn(rollback bool) error {
return s.txn.Rollback()
}
if binloginfo.PumpClient != nil {
bin := binloginfo.GetPrewriteValue(s, false)
if bin != nil {
binlogData, err := bin.Marshal()
prewriteValue := binloginfo.GetPrewriteValue(s, false)
if prewriteValue != nil {
prewriteData, err := prewriteValue.Marshal()
if err != nil {
return errors.Trace(err)
}
s.txn.SetOption(kv.BinlogData, binlogData)
bin := &binlog.Binlog{
Tp: binlog.BinlogType_Prewrite,
PrewriteValue: prewriteData,
}
s.txn.SetOption(kv.BinlogData, bin)
}
}
err := s.txn.Commit()
Expand Down
19 changes: 15 additions & 4 deletions sessionctx/binloginfo/binloginfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tipb/go-binlog"
goctx "golang.org/x/net/context"
)
Expand Down Expand Up @@ -44,12 +45,12 @@ const (
binlogKey keyType = 1
)

// SetSchemaVersion sets schema version to a context.
// SetSchemaVersion sets schema version to the context.
func SetSchemaVersion(ctx context.Context, version int64) {
ctx.SetValue(schemaVersionKey, version)
}

// GetSchemaVersion gets schema version in a context.
// GetSchemaVersion gets schema version in the context.
func GetSchemaVersion(ctx context.Context) int64 {
v, ok := ctx.Value(schemaVersionKey).(int64)
if !ok {
Expand All @@ -58,7 +59,7 @@ func GetSchemaVersion(ctx context.Context) int64 {
return v
}

// GetPrewriteValue gets binlog prewrite value in a context.
// GetPrewriteValue gets binlog prewrite value in the context.
func GetPrewriteValue(ctx context.Context, createIfNotExists bool) *binlog.PrewriteValue {
v, ok := ctx.Value(binlogKey).(*binlog.PrewriteValue)
if !ok && createIfNotExists {
Expand All @@ -80,7 +81,17 @@ func WriteBinlog(bin *binlog.Binlog) error {
return errors.Trace(err)
}

// ClearBinlog clears binlog in a context.
// SetDDLBinlog sets DDL binlog in the kv.Transaction.
func SetDDLBinlog(txn kv.Transaction, jobID int64, ddlQuery string) {
bin := &binlog.Binlog{
Tp: binlog.BinlogType_Prewrite,
DdlJobId: jobID,
DdlQuery: []byte(ddlQuery),
}
txn.SetOption(kv.BinlogData, bin)
}

// ClearBinlog clears binlog in the Context.
func ClearBinlog(ctx context.Context) {
ctx.ClearValue(binlogKey)
}
37 changes: 25 additions & 12 deletions sessionctx/binloginfo/binloginfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ func (s *testBinlogSuite) TestBinlog(c *C) {
tk.MustExec("drop table if exists local_binlog")
ddlQuery := "create table local_binlog (id int primary key, name varchar(10))"
tk.MustExec(ddlQuery)
c.Assert(getLatestBinlogDDLQuery(c, pump), Equals, ddlQuery)
time.Sleep(time.Millisecond)
checkLatestBinlogDDL(c, pump, ddlQuery)

tk.MustExec("insert local_binlog values (1, 'abc'), (2, 'cde')")
prewriteVal := getLatestBinlogPrewriteValue(c, pump)
Expand Down Expand Up @@ -186,22 +187,32 @@ func getLatestBinlogPrewriteValue(c *C, pump *mockBinlogPump) *binlog.PrewriteVa
return preVal
}

func getLatestBinlogDDLQuery(c *C, pump *mockBinlogPump) string {
var bin *binlog.Binlog
func checkLatestBinlogDDL(c *C, pump *mockBinlogPump, ddlQuery string) {
var preDDL, commitDDL *binlog.Binlog
pump.mu.Lock()
for i := len(pump.mu.payloads) - 1; i >= 0; i-- {
payload := pump.mu.payloads[i]
bin = new(binlog.Binlog)
bin := new(binlog.Binlog)
bin.Unmarshal(payload)
if bin.Tp == binlog.BinlogType_PreDDL {
if bin.Tp == binlog.BinlogType_Commit && bin.DdlJobId > 0 {
commitDDL = bin
}
if bin.Tp == binlog.BinlogType_Prewrite && bin.DdlJobId != 0 {
preDDL = bin
}
if preDDL != nil && commitDDL != nil {
break
}
}
pump.mu.Unlock()
c.Assert(bin.DdlJobId, Greater, int64(0))
c.Assert(bin.StartTs, Greater, int64(0))
c.Assert(bin.CommitTs, Equals, int64(0))
return string(bin.DdlQuery)
c.Assert(preDDL.DdlJobId, Greater, int64(0))
c.Assert(preDDL.StartTs, Greater, int64(0))
c.Assert(preDDL.CommitTs, Equals, int64(0))
c.Assert(string(preDDL.DdlQuery), Equals, ddlQuery)
c.Assert(commitDDL.StartTs, Equals, preDDL.StartTs)
c.Assert(commitDDL.CommitTs, Greater, commitDDL.StartTs)
c.Assert(commitDDL.DdlJobId, Equals, preDDL.DdlJobId)
return
}

func checkBinlogCount(c *C, pump *mockBinlogPump) {
Expand All @@ -215,9 +226,11 @@ func checkBinlogCount(c *C, pump *mockBinlogPump) {
bin = new(binlog.Binlog)
bin.Unmarshal(payload)
if bin.Tp == binlog.BinlogType_Prewrite {
prewriteCount++
} else if bin.Tp == binlog.BinlogType_PreDDL {
ddlCount++
if bin.DdlJobId != 0 {
ddlCount++
} else {
prewriteCount++
}
}
}
pump.mu.Unlock()
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,10 @@ func (txn *tikvTxn) Commit() error {
}
err = committer.Commit()
if err != nil {
committer.writeFinisheBinlog(binlog.BinlogType_Rollback, 0)
committer.writeFinishBinlog(binlog.BinlogType_Rollback, 0)
return errors.Trace(err)
}
committer.writeFinisheBinlog(binlog.BinlogType_Commit, int64(committer.commitTS))
committer.writeFinishBinlog(binlog.BinlogType_Commit, int64(committer.commitTS))
txn.commitTS = committer.commitTS
log.Debugf("[kv] finish commit txn %d", txn.StartTS())
return nil
Expand Down
29 changes: 12 additions & 17 deletions store/tikv/txn_committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,31 +390,26 @@ func (c *txnCommitter) prewriteBinlog() chan error {
}
ch := make(chan error, 1)
go func() {
prewriteValue := c.txn.us.GetOption(kv.BinlogData)
binPrewrite := &binlog.Binlog{
Tp: binlog.BinlogType_Prewrite,
StartTs: int64(c.startTS),
PrewriteKey: c.keys[0],
PrewriteValue: prewriteValue.([]byte),
bin := c.txn.us.GetOption(kv.BinlogData).(*binlog.Binlog)
bin.StartTs = int64(c.startTS)
if bin.Tp == binlog.BinlogType_Prewrite {
bin.PrewriteKey = c.keys[0]
}
err := binloginfo.WriteBinlog(binPrewrite)
err := binloginfo.WriteBinlog(bin)
ch <- errors.Trace(err)
}()
return ch
}

func (c *txnCommitter) writeFinisheBinlog(tp binlog.BinlogType, commitTS int64) {
func (c *txnCommitter) writeFinishBinlog(tp binlog.BinlogType, commitTS int64) {
if !c.shouldWriteBinlog() {
return
}
bin := c.txn.us.GetOption(kv.BinlogData).(*binlog.Binlog)
bin.Tp = tp
bin.CommitTs = commitTS
go func() {
binCommit := &binlog.Binlog{
Tp: tp,
StartTs: int64(c.startTS),
CommitTs: commitTS,
PrewriteKey: c.keys[0],
}
err := binloginfo.WriteBinlog(binCommit)
err := binloginfo.WriteBinlog(bin)
if err != nil {
log.Errorf("failed to write binlog: %v", err)
}
Expand All @@ -425,8 +420,8 @@ func (c *txnCommitter) shouldWriteBinlog() bool {
if binloginfo.PumpClient == nil {
return false
}
prewriteValue := c.txn.us.GetOption(kv.BinlogData)
return prewriteValue != nil
_, ok := c.txn.us.GetOption(kv.BinlogData).(*binlog.Binlog)
return ok
}

// TiKV recommends each RPC packet should be less than ~1MB. We keep each packet's
Expand Down

0 comments on commit da8bc01

Please sign in to comment.