Skip to content

Commit

Permalink
ddl: write DDL binlog when DDL is done. (pingcap#1850)
Browse files Browse the repository at this point in the history
Previous implementation write DDL binlog as early as when job is added to the queue,
but DDL may be canceled, we should not sync the ddl at that time, there would be a long
 time waiting for the DDL to be done.

After this commit, DDL binlog is written right before the DDL job is done, so we don't need to wait.

Also expose a 'GetAllHistoryJobs' method in meta which will be used later.
  • Loading branch information
coocood authored Oct 20, 2016
1 parent 6c3bae9 commit 662cb41
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 14 deletions.
46 changes: 32 additions & 14 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,8 @@ func (d *ddl) doDDLJob(ctx context.Context, job *model.Job) error {
if err != nil {
return errors.Trace(err)
}
err = d.writePreDDLBinlog(ctx, job.ID, startTS)
if err != nil {
return errors.Trace(err)
}
ddlQuery, _ := ctx.Value(context.QueryString).(string)
job.Query = ddlQuery

// Create a new job and queue it.
err = kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
Expand All @@ -57,11 +55,6 @@ func (d *ddl) doDDLJob(ctx context.Context, job *model.Job) error {
if err != nil {
return errors.Trace(err)
}
commitVer, err := d.store.CurrentVersion()
if err != nil {
return errors.Trace(err)
}
d.writePostDDLBinlog(job.ID, startTS, commitVer.Ver)

// notice worker that we push a new job and wait the job done.
asyncNotify(d.ddlJobCh)
Expand Down Expand Up @@ -109,16 +102,15 @@ func (d *ddl) doDDLJob(ctx context.Context, job *model.Job) error {
}
}

func (d *ddl) writePreDDLBinlog(ctx context.Context, jobID int64, startTS uint64) error {
func (d *ddl) writePreDDLBinlog(job *model.Job, startTS uint64) error {
if binloginfo.PumpClient == nil {
return nil
}
ddlQuery, _ := ctx.Value(context.QueryString).(string)
bin := &binlog.Binlog{
Tp: binlog.BinlogType_PreDDL,
DdlJobId: jobID,
DdlQuery: []byte(ddlQuery),
DdlJobId: job.ID,
StartTs: int64(startTS),
DdlQuery: []byte(job.Query),
}
err := binloginfo.WriteBinlog(bin)
return errors.Trace(err)
Expand Down Expand Up @@ -283,7 +275,7 @@ func (d *ddl) handleDDLJobQueue() error {
}

waitTime := 2 * d.lease

var binlogStartTS uint64
var job *model.Job
err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
Expand Down Expand Up @@ -327,6 +319,10 @@ func (d *ddl) handleDDLJobQueue() error {
d.runDDLJob(t, job)

if job.IsFinished() {
err = d.writePreDDLBinlogIfNeeded(txn, job, &binlogStartTS)
if err != nil {
return errors.Trace(err)
}
err = d.finishDDLJob(t, job)
} else {
err = d.updateDDLJob(t, job)
Expand All @@ -348,6 +344,12 @@ func (d *ddl) handleDDLJobQueue() error {
// no job now, return and retry get later.
return nil
}
if binlogStartTS != 0 {
commitTS, err1 := d.store.CurrentVersion()
if err1 == nil {
d.writePostDDLBinlog(job.ID, binlogStartTS, commitTS.Ver)
}
}

d.hookMu.Lock()
d.hook.OnJobUpdated(job)
Expand All @@ -367,6 +369,22 @@ func (d *ddl) handleDDLJobQueue() error {
}
}

// writePreDDLBinlog writes preDDL binlog if job is done and the binlog has not been write before.
func (d *ddl) writePreDDLBinlogIfNeeded(txn kv.Transaction, job *model.Job, binlogStartTS *uint64) error {
if job.IsDone() {
// Avoid write multiple times.
if *binlogStartTS == 0 {
startTS := txn.StartTS()
err := d.writePreDDLBinlog(job, startTS)
if err != nil {
return errors.Trace(err)
}
*binlogStartTS = startTS
}
}
return nil
}

func chooseLeaseTime(n1 time.Duration, n2 time.Duration) time.Duration {
if n1 > 0 {
return n1
Expand Down
38 changes: 38 additions & 0 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -568,6 +569,43 @@ func (m *Meta) GetHistoryDDLJob(id int64) (*model.Job, error) {
return m.getHistoryDDLJob(mDDLJobHistoryKey, id)
}

// GetAllHistoryDDLJobs gets all history DDL jobs.
func (m *Meta) GetAllHistoryDDLJobs() ([]*model.Job, error) {
pairs, err := m.txn.HGetAll(mDDLJobHistoryKey)
if err != nil {
return nil, errors.Trace(err)
}
var jobs []*model.Job
for _, pair := range pairs {
job := &model.Job{}
err = job.Decode(pair.Value)
if err != nil {
return nil, errors.Trace(err)
}
jobs = append(jobs, job)
}
sorter := &jobsSorter{jobs: jobs}
sort.Sort(sorter)
return jobs, nil
}

// jobsSorter implements the sort.Interface interface.
type jobsSorter struct {
jobs []*model.Job
}

func (s *jobsSorter) Swap(i, j int) {
s.jobs[i], s.jobs[j] = s.jobs[j], s.jobs[i]
}

func (s *jobsSorter) Len() int {
return len(s.jobs)
}

func (s *jobsSorter) Less(i, j int) bool {
return s.jobs[i].ID < s.jobs[j].ID
}

// GetBootstrapVersion returns the version of the server which boostrap the store.
// If the store is not bootstraped, the version will be zero.
func (m *Meta) GetBootstrapVersion() (int64, error) {
Expand Down
8 changes: 8 additions & 0 deletions meta/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,14 @@ func (s *testSuite) TestDDL(c *C) {
c.Assert(err, IsNil)
c.Assert(v, DeepEquals, job)

all, err := t.GetAllHistoryDDLJobs()
c.Assert(err, IsNil)
var lastID int64
for _, job := range all {
c.Assert(job.ID, Greater, lastID)
lastID = job.ID
}

// DDL background job test
err = t.SetBgJobOwner(owner)
c.Assert(err, IsNil)
Expand Down
7 changes: 7 additions & 0 deletions model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ type Job struct {
// unix nano seconds
// TODO: use timestamp allocated by TSO.
LastUpdateTS int64 `json:"last_update_ts"`
// Query string of the ddl job.
Query string `json:"query"`
}

// SetRowCount sets the number of rows. Make sure it can pass `make race`.
Expand Down Expand Up @@ -153,6 +155,11 @@ func (job *Job) IsFinished() bool {
return job.State == JobDone || job.State == JobCancelled
}

// IsDone returns whether job is done.
func (job *Job) IsDone() bool {
return job.State == JobDone
}

// IsRunning returns whether job is still running or not.
func (job *Job) IsRunning() bool {
return job.State == JobRunning
Expand Down

0 comments on commit 662cb41

Please sign in to comment.