Skip to content

Commit

Permalink
ddl: fix a bug that cancel add index ddl job when workers are not run…
Browse files Browse the repository at this point in the history
…, which cause tidb ddl hang up (pingcap#8171)
  • Loading branch information
winkyao authored Nov 28, 2018
1 parent 3227954 commit d301c16
Show file tree
Hide file tree
Showing 9 changed files with 360 additions and 69 deletions.
17 changes: 15 additions & 2 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,16 @@ func createColumnInfo(tblInfo *model.TableInfo, colInfo *model.ColumnInfo, pos *
return colInfo, position, nil
}

func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
// Handle the rolling back job.
if job.IsRollingback() {
ver, err = onDropColumn(t, job)
if err != nil {
return ver, errors.Trace(err)
}
return ver, nil
}

schemaID := job.SchemaID
tblInfo, err := getTableInfo(t, job, schemaID)
if err != nil {
Expand Down Expand Up @@ -254,7 +263,11 @@ func onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
}

// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
if job.IsRollingback() {
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo)
} else {
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
}
default:
err = ErrInvalidTableState.GenWithStack("invalid table state %v", tblInfo.State)
}
Expand Down
70 changes: 65 additions & 5 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,15 +404,15 @@ func (s *testDBSuite) TestCancelAddIndex(c *C) {
s.mustExec(c, "insert into t1 values (?, ?, ?)", i, i, i)
}

var checkErr error
var c3IdxInfo *model.IndexInfo
hook := &ddl.TestDDLCallback{}
oldReorgWaitTimeout := ddl.ReorgWaitTimeout
// let hook.OnJobUpdatedExported has chance to cancel the job.
// the hook.OnJobUpdatedExported is called when the job is updated, runReorgJob will wait ddl.ReorgWaitTimeout, then return the ddl.runDDLJob.
// After that ddl call d.hook.OnJobUpdated(job), so that we can canceled the job in this test case.
ddl.ReorgWaitTimeout = 50 * time.Millisecond
hook.OnJobUpdatedExported, c3IdxInfo = backgroundExecOnJobUpdatedExported(c, s, hook, checkErr)
var checkErr error
hook.OnJobUpdatedExported, c3IdxInfo, checkErr = backgroundExecOnJobUpdatedExported(c, s, hook)
s.dom.DDL().(ddl.DDLForTest).SetHook(hook)
done := make(chan error, 1)
go backgroundExec(s.store, "create unique index c3_index on t1 (c3)", done)
Expand Down Expand Up @@ -459,6 +459,65 @@ LOOP:
s.dom.DDL().(ddl.DDLForTest).SetHook(callback)
}

// TestCancelAddIndex1 tests canceling ddl job when the add index worker is not started.
func (s *testDBSuite) TestCancelAddIndex1(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.mustExec(c, "use test_db")
s.mustExec(c, "drop table if exists t")
s.mustExec(c, "create table t(c1 int, c2 int)")
defer s.mustExec(c, "drop table t;")

for i := 0; i < 50; i++ {
s.mustExec(c, "insert into t values (?, ?)", i, i)
}

var checkErr error
oldReorgWaitTimeout := ddl.ReorgWaitTimeout
ddl.ReorgWaitTimeout = 50 * time.Millisecond
defer func() { ddl.ReorgWaitTimeout = oldReorgWaitTimeout }()
hook := &ddl.TestDDLCallback{}
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionAddIndex && job.State == model.JobStateRunning && job.SchemaState == model.StateWriteReorganization && job.SnapshotVer == 0 {
jobIDs := []int64{job.ID}
hookCtx := mock.NewContext()
hookCtx.Store = s.store
err := hookCtx.NewTxn()
if err != nil {
checkErr = errors.Trace(err)
return
}
errs, err := admin.CancelJobs(hookCtx.Txn(true), jobIDs)
if err != nil {
checkErr = errors.Trace(err)
return
}

if errs[0] != nil {
checkErr = errors.Trace(errs[0])
return
}

checkErr = hookCtx.Txn(true).Commit(context.Background())
}
}
s.dom.DDL().(ddl.DDLForTest).SetHook(hook)
rs, err := s.tk.Exec("alter table t add index idx_c2(c2)")
if rs != nil {
rs.Close()
}
c.Assert(checkErr, IsNil)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:12]cancelled DDL job")

s.dom.DDL().(ddl.DDLForTest).SetHook(&ddl.TestDDLCallback{})
t := s.testGetTable(c, "t")
for _, idx := range t.Indices() {
c.Assert(strings.EqualFold(idx.Meta().Name.L, "idx_c2"), IsFalse)
}
s.mustExec(c, "alter table t add index idx_c2(c2)")
s.mustExec(c, "alter table t drop index idx_c2")
}

func (s *testDBSuite) TestAddAnonymousIndex(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.tk.MustExec("use " + s.schemaName)
Expand Down Expand Up @@ -3377,7 +3436,7 @@ func (s *testDBSuite) TestPartitionCancelAddIndex(c *C) {
hook := &ddl.TestDDLCallback{}
oldReorgWaitTimeout := ddl.ReorgWaitTimeout
ddl.ReorgWaitTimeout = 10 * time.Millisecond
hook.OnJobUpdatedExported, c3IdxInfo = backgroundExecOnJobUpdatedExported(c, s, hook, checkErr)
hook.OnJobUpdatedExported, c3IdxInfo, checkErr = backgroundExecOnJobUpdatedExported(c, s, hook)
s.dom.DDL().(ddl.DDLForTest).SetHook(hook)
done := make(chan error, 1)
go backgroundExec(s.store, "create index c3_index on t1 (c3)", done)
Expand Down Expand Up @@ -3427,7 +3486,8 @@ LOOP:
s.dom.DDL().(ddl.DDLForTest).SetHook(callback)
}

func backgroundExecOnJobUpdatedExported(c *C, s *testDBSuite, hook *ddl.TestDDLCallback, checkErr error) (func(*model.Job), *model.IndexInfo) {
func backgroundExecOnJobUpdatedExported(c *C, s *testDBSuite, hook *ddl.TestDDLCallback) (func(*model.Job), *model.IndexInfo, error) {
var checkErr error
first := true
ddl.ReorgWaitTimeout = 10 * time.Millisecond
c3IdxInfo := &model.IndexInfo{}
Expand Down Expand Up @@ -3480,7 +3540,7 @@ func backgroundExecOnJobUpdatedExported(c *C, s *testDBSuite, hook *ddl.TestDDLC
checkErr = errors.Trace(err)
}
}
return hook.OnJobUpdatedExported, c3IdxInfo
return hook.OnJobUpdatedExported, c3IdxInfo, checkErr
}

func (s *testDBSuite) TestColumnModifyingDefinition(c *C) {
Expand Down
15 changes: 15 additions & 0 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,21 @@ func testCreateIndex(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo,
return job
}

func testAddColumn(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, args []interface{}) *model.Job {
job := &model.Job{
SchemaID: dbInfo.ID,
TableID: tblInfo.ID,
Type: model.ActionAddColumn,
Args: args,
BinlogInfo: &model.HistoryInfo{},
}
err := d.doDDLJob(ctx, job)
c.Assert(err, IsNil)
v := getSchemaVer(c, ctx)
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo})
return job
}

func buildDropIdxJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, indexName string) *model.Job {
return &model.Job{
SchemaID: dbInfo.ID,
Expand Down
12 changes: 2 additions & 10 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
if job.State != model.JobStateRollbackDone {
break
}

// After rolling back an AddIndex operation, we need to use delete-range to delete the half-done index data.
err = w.deleteRange(job)
case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropTablePartition:
Expand Down Expand Up @@ -453,16 +454,7 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
}
// The cause of this job state is that the job is cancelled by client.
if job.IsCancelling() {
// If the value of SnapshotVer isn't zero, it means the work is backfilling the indexes.
if job.Type == model.ActionAddIndex && job.SchemaState == model.StateWriteReorganization && job.SnapshotVer != 0 {
log.Infof("[ddl-%s] run the cancelling DDL job %s", w, job)
w.reorgCtx.notifyReorgCancel()
} else {
job.State = model.JobStateCancelled
job.Error = errCancelledDDLJob
job.ErrorCount++
return
}
return convertJob2RollbackJob(w, d, t, job)
}

if !job.IsRollingback() && !job.IsCancelling() {
Expand Down
119 changes: 94 additions & 25 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/mock"
Expand Down Expand Up @@ -285,7 +286,7 @@ func doDDLJobErrWithSchemaState(ctx sessionctx.Context, d *ddl, c *C, schemaID,
}
err := d.doDDLJob(ctx, job)
// TODO: Add the detail error check.
c.Assert(err, NotNil)
c.Assert(err, NotNil, Commentf("err:%v", err))
testCheckJobCancelled(c, d, job, state)

return job
Expand All @@ -311,7 +312,7 @@ func checkCancelState(txn kv.Transaction, job *model.Job, test *testCancelJob) e
return checkErr
}
// It only tests cancel one DDL job.
if errs[0] != test.cancelRetErrs[0] {
if !terror.ErrorEqual(errs[0], test.cancelRetErrs[0]) {
checkErr = errors.Trace(errs[0])
return checkErr
}
Expand All @@ -333,22 +334,52 @@ func buildCancelJobTests(firstID int64) []testCancelJob {
errs := []error{err}
noErrs := []error{nil}
tests := []testCancelJob{
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 1}, cancelRetErrs: errs, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 2}, cancelRetErrs: errs, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 3}, cancelRetErrs: errs, cancelState: model.StateWriteReorganization, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 4}, cancelRetErrs: noErrs, cancelState: model.StatePublic, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 1}, cancelRetErrs: noErrs, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 2}, cancelRetErrs: noErrs, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 3}, cancelRetErrs: noErrs, cancelState: model.StateWriteReorganization, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 4}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenWithStackByArgs(firstID + 4)}, cancelState: model.StatePublic, ddlRetErr: err},

// TODO: after fix drop index and create table rollback bug, the below test cases maybe need to change.
{act: model.ActionDropIndex, jobIDs: []int64{firstID + 5}, cancelRetErrs: errs, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionDropIndex, jobIDs: []int64{firstID + 6}, cancelRetErrs: errs, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionDropIndex, jobIDs: []int64{firstID + 7}, cancelRetErrs: errs, cancelState: model.StateDeleteReorganization, ddlRetErr: err},
{act: model.ActionDropIndex, jobIDs: []int64{firstID + 8}, cancelRetErrs: noErrs, cancelState: model.StateNone, ddlRetErr: err},

{act: model.ActionCreateTable, jobIDs: []int64{firstID + 9}, cancelRetErrs: noErrs, cancelState: model.StatePublic, ddlRetErr: err},
{act: model.ActionDropIndex, jobIDs: []int64{firstID + 8}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenWithStackByArgs(firstID + 8)}, cancelState: model.StateNone, ddlRetErr: err},
// TODO: add create table back after we fix the cancel bug.
//{act: model.ActionCreateTable, jobIDs: []int64{firstID + 9}, cancelRetErrs: noErrs, cancelState: model.StatePublic, ddlRetErr: err},

{act: model.ActionAddColumn, jobIDs: []int64{firstID + 9}, cancelRetErrs: noErrs, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 10}, cancelRetErrs: noErrs, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 11}, cancelRetErrs: noErrs, cancelState: model.StateWriteReorganization, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 12}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenWithStackByArgs(firstID + 12)}, cancelState: model.StatePublic, ddlRetErr: err},
}

return tests
}

func (s *testDDLSuite) checkAddIdx(c *C, d *ddl, schemaID int64, tableID int64, idxName string, success bool) {
changedTable := testGetTable(c, d, schemaID, tableID)
var found bool
for _, idxInfo := range changedTable.Meta().Indices {
if idxInfo.Name.O == idxName {
found = true
break
}
}
c.Assert(found, Equals, success)
}

func (s *testDDLSuite) checkAddColumn(c *C, d *ddl, schemaID int64, tableID int64, colName string, success bool) {
changedTable := testGetTable(c, d, schemaID, tableID)
var found bool
for _, colInfo := range changedTable.Meta().Columns {
if colInfo.Name.O == colName {
found = true
break
}
}
c.Assert(found, Equals, success)
}

func (s *testDDLSuite) TestCancelJob(c *C) {
store := testCreateStore(c, "test_cancel_job")
defer store.Close()
Expand Down Expand Up @@ -378,47 +409,60 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
var checkErr error
var test *testCancelJob
tc.onJobUpdated = func(job *model.Job) {
if job.State == model.JobStateSynced || job.State == model.JobStateCancelled || job.State == model.JobStateCancelling {
return
}
if checkErr != nil {
return
}

hookCtx := mock.NewContext()
hookCtx.Store = store
var err error
err = hookCtx.NewTxn()
if err != nil {
checkErr = errors.Trace(err)
var err1 error
err1 = hookCtx.NewTxn()
if err1 != nil {
checkErr = errors.Trace(err1)
return
}
checkCancelState(hookCtx.Txn(true), job, test)
err = hookCtx.Txn(true).Commit(context.Background())
if err != nil {
checkErr = errors.Trace(err)
checkErr = checkCancelState(hookCtx.Txn(true), job, test)
if checkErr != nil {
return
}
err1 = hookCtx.Txn(true).Commit(context.Background())
if err1 != nil {
checkErr = errors.Trace(err1)
return
}
}
d.SetHook(tc)

// for adding index
test = &tests[0]
validArgs := []interface{}{false, model.NewCIStr("idx"),
idxOrigName := "idx"
validArgs := []interface{}{false, model.NewCIStr(idxOrigName),
[]*ast.IndexColName{{
Column: &ast.ColumnName{Name: model.NewCIStr("c1")},
Length: -1,
}}, nil}
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddIndex, validArgs, &test.cancelState)

// When the job satisfies this test case, the option will be rollback, so the job's schema state is none.
cancelState := model.StateNone
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddIndex, validArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddIdx(c, d, dbInfo.ID, tblInfo.ID, idxOrigName, false)
test = &tests[1]
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddIndex, validArgs, &test.cancelState)
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddIndex, validArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddIdx(c, d, dbInfo.ID, tblInfo.ID, idxOrigName, false)
test = &tests[2]
// When the job satisfies this test case, the option will be rollback, so the job's schema state is none.
cancelState := model.StateNone
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddIndex, validArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddIdx(c, d, dbInfo.ID, tblInfo.ID, idxOrigName, false)
test = &tests[3]
testCreateIndex(c, ctx, d, dbInfo, tblInfo, false, "idx", "c2")
c.Check(errors.ErrorStack(checkErr), Equals, "")
c.Assert(ctx.Txn(true).Commit(context.Background()), IsNil)
s.checkAddIdx(c, d, dbInfo.ID, tblInfo.ID, idxOrigName, true)

// for dropping index
idxName := []interface{}{model.NewCIStr("idx")}
Expand All @@ -435,10 +479,35 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
testDropIndex(c, ctx, d, dbInfo, tblInfo, "idx")
c.Check(errors.ErrorStack(checkErr), Equals, "")

// for creating table
// for add column
test = &tests[8]
tblInfo = testTableInfo(c, d, "t1", 3)
testCreateTable(c, ctx, d, dbInfo, tblInfo)
addingColName := "colA"

newColumnDef := &ast.ColumnDef{
Name: &ast.ColumnName{Name: model.NewCIStr(addingColName)},
Tp: &types.FieldType{Tp: mysql.TypeLonglong},
Options: []*ast.ColumnOption{},
}
col, _, err := buildColumnAndConstraint(ctx, 2, newColumnDef, nil)
addColumnArgs := []interface{}{col, &ast.ColumnPosition{Tp: ast.ColumnPositionNone}, 0}
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddColumn, addColumnArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, false)

test = &tests[9]
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddColumn, addColumnArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, false)

test = &tests[10]
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddColumn, addColumnArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, false)

test = &tests[11]
testAddColumn(c, ctx, d, dbInfo, tblInfo, addColumnArgs)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, true)
}

func (s *testDDLSuite) TestIgnorableSpec(c *C) {
Expand Down
Loading

0 comments on commit d301c16

Please sign in to comment.