Skip to content

Commit

Permalink
ddl: Cancel DDL job when the key is exist (pingcap#1844)
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala authored Oct 22, 2016
1 parent 9f615fc commit 037dc11
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 6 deletions.
82 changes: 80 additions & 2 deletions ddl/ddl_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,93 @@ func (s *testDBSuite) TestIndex(c *C) {
defer testleak.AfterTest(c)()
s.testAddIndex(c)
s.testDropIndex(c)
s.testAddUniqueIndexRollback(c)
}

func (s *testDBSuite) testGetTable(c *C, name string) table.Table {
ctx := s.s.(context.Context)
tbl, err := sessionctx.GetDomain(ctx).InfoSchema().TableByName(model.NewCIStr(s.schemaName), model.NewCIStr(name))
domain := sessionctx.GetDomain(ctx)
// Make sure the table schema is the new schema.
err := domain.MustReload()
c.Assert(err, IsNil)
tbl, err := domain.InfoSchema().TableByName(model.NewCIStr(s.schemaName), model.NewCIStr(name))
c.Assert(err, IsNil)
return tbl
}

func backgroundExec(s kv.Storage, sql string, done chan error) {
se, err := tidb.CreateSession(s)
if err != nil {
done <- errors.Trace(err)
return
}
defer se.Close()
_, err = se.Execute("use test_db")
if err != nil {
done <- errors.Trace(err)
return
}
_, err = se.Execute(sql)
done <- errors.Trace(err)
}

func (s *testDBSuite) testAddUniqueIndexRollback(c *C) {
// t1 (c1 int, c2 int, c3 int, primary key(c1))
s.mustExec(c, "delete from t1")
// defaultBatchSize is equal to ddl.defaultBatchSize
defaultBatchSize := 1024
base := defaultBatchSize * 2
count := base
// add some rows
for i := 0; i < count; i++ {
s.mustExec(c, "insert into t1 values (?, ?, ?)", i, i, i)
}
// add some duplicate rows
for i := count - 10; i < count; i++ {
s.mustExec(c, "insert into t1 values (?, ?, ?)", i+10, i, i)
}

done := make(chan error, 1)
go backgroundExec(s.store, "create unique index c3_index on t1 (c3)", done)

times := 0
ticker := time.NewTicker(s.lease / 2)
defer ticker.Stop()
LOOP:
for {
select {
case err := <-done:
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[kv:1062]Duplicate for key c3_index", Commentf("err:%v", err))
break LOOP
case <-ticker.C:
if times >= 10 {
break
}
step := 10
// delete some rows, and add some data
for i := count; i < count+step; i++ {
n := rand.Intn(count)
s.mustExec(c, "delete from t1 where c1 = ?", n)
s.mustExec(c, "insert into t1 values (?, ?, ?)", i+10, i, i)
}
count += step
times++
}
}

t := s.testGetTable(c, "t1")
for _, tidx := range t.Indices() {
c.Assert(strings.EqualFold(tidx.Meta().Name.L, "c3_index"), IsFalse)
}

// delete duplicate rows, then add index
for i := base - 10; i < base; i++ {
s.mustExec(c, "delete from t1 where c1 = ?", i+10)
}
sessionExec(c, s.store, "create index c3_index on t1 (c3)")
}

func (s *testDBSuite) testAddIndex(c *C) {
done := make(chan struct{}, 1)

Expand Down Expand Up @@ -186,7 +264,7 @@ LOOP:
break
}
}
// Make sure there is index with name c3_index
// Make sure there is index with name c3_index.
c.Assert(nidx, NotNil)
c.Assert(nidx.Meta().ID, Greater, int64(0))
txn, err := ctx.GetTxn(true)
Expand Down
4 changes: 3 additions & 1 deletion ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,9 @@ func (d *ddl) runDDLJob(t *meta.Meta, job *model.Job) {
return
}

job.State = model.JobRunning
if job.State != model.JobRollback {
job.State = model.JobRunning
}

var err error
switch job.Type {
Expand Down
40 changes: 38 additions & 2 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,16 @@ func dropIndexColumnFlag(tblInfo *model.TableInfo, indexInfo *model.IndexInfo) {
}

func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) error {
// rollback job
if job.State == model.JobRollback {
err := d.onDropIndex(t, job)
if err != nil {
return errors.Trace(err)
}
return nil
}

// normal job
schemaID := job.SchemaID
tblInfo, err := d.getTableInfo(t, job)
if err != nil {
Expand All @@ -123,7 +133,6 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) error {
indexID int64
idxColNames []*ast.IndexColName
)

err = job.DecodeArgs(&unique, &indexName, &indexID, &idxColNames)
if err != nil {
job.State = model.JobCancelled
Expand All @@ -140,6 +149,7 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) error {
}

indexInfo = idx
break
}
}

Expand Down Expand Up @@ -202,6 +212,10 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) error {
return nil
}
if err != nil {
if terror.ErrorEqual(err, kv.ErrKeyExists) {
log.Warnf("[ddl] run DDL job %v err %v, convert job to rollback job", job, err)
err = d.convert2RollbackJob(t, job, tblInfo, indexInfo)
}
return errors.Trace(err)
}

Expand All @@ -221,6 +235,23 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) error {
}
}

func (d *ddl) convert2RollbackJob(t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, indexInfo *model.IndexInfo) error {
job.State = model.JobRollback
job.Args = []interface{}{indexInfo.Name}
// If add index job rollbacks in write reorganization state, its need to delete all keys which has been added.
// Its work is the same as drop index job do.
// The write reorganization state in add index job that likes write only state in drop index job.
// So the next state is delete only state.
indexInfo.State = model.StateDeleteOnly
job.SchemaState = model.StateDeleteOnly
err := t.UpdateTable(job.SchemaID, tblInfo)
if err != nil {
return errors.Trace(err)
}
err = kv.ErrKeyExists.Gen("Duplicate for key %s", indexInfo.Name.O)
return errors.Trace(err)
}

func (d *ddl) onDropIndex(t *meta.Meta, job *model.Job) error {
schemaID := job.SchemaID
tblInfo, err := d.getTableInfo(t, job)
Expand All @@ -238,6 +269,7 @@ func (d *ddl) onDropIndex(t *meta.Meta, job *model.Job) error {
for _, idx := range tblInfo.Indices {
if idx.Name.L == indexName.L {
indexInfo = idx
break
}
}

Expand Down Expand Up @@ -305,7 +337,11 @@ func (d *ddl) onDropIndex(t *meta.Meta, job *model.Job) error {

// finish this job
job.SchemaState = model.StateNone
job.State = model.JobDone
if job.State == model.JobRollback {
job.State = model.JobRollbackDone
} else {
job.State = model.JobDone
}
return nil
default:
return ErrInvalidTableState.Gen("invalid table state %v", tblInfo.State)
Expand Down
11 changes: 10 additions & 1 deletion model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (job *Job) String() string {
// IsFinished returns whether job is finished or not.
// If the job state is Done or Cancelled, it is finished.
func (job *Job) IsFinished() bool {
return job.State == JobDone || job.State == JobCancelled
return job.State == JobDone || job.State == JobRollbackDone || job.State == JobCancelled
}

// IsDone returns whether job is done.
Expand All @@ -172,6 +172,11 @@ type JobState byte
const (
JobNone JobState = iota
JobRunning
// When DDL encouterred an unrecoverable error at reorganization state,
// some keys has been added already, we need to remove them.
// JobRollback is the state to do rollback work.
JobRollback
JobRollbackDone
JobDone
JobCancelled
)
Expand All @@ -181,6 +186,10 @@ func (s JobState) String() string {
switch s {
case JobRunning:
return "running"
case JobRollback:
return "rollback"
case JobRollbackDone:
return "rollback done"
case JobDone:
return "done"
case JobCancelled:
Expand Down

0 comments on commit 037dc11

Please sign in to comment.