Skip to content

Commit

Permalink
*: update job state name (pingcap#4818)
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala authored and hanfei1991 committed Oct 19, 2017
1 parent f1c82ac commit 6a82257
Show file tree
Hide file tree
Showing 13 changed files with 102 additions and 102 deletions.
38 changes: 19 additions & 19 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,21 +104,21 @@ func (d *ddl) onAddColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
offset := 0
err = job.DecodeArgs(col, pos, &offset)
if err != nil {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

columnInfo := findCol(tblInfo.Columns, col.Name.L)
if columnInfo != nil {
if columnInfo.State == model.StatePublic {
// We already have a column with the same column name.
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, infoschema.ErrColumnExists.GenByArgs(col.Name)
}
} else {
columnInfo, offset, err = d.createColumnInfo(tblInfo, col, pos)
if err != nil {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
// Set offset arg to job.
Expand Down Expand Up @@ -156,7 +156,7 @@ func (d *ddl) onAddColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
}

// Finish this job.
job.State = model.JobDone
job.State = model.JobStateDone
job.BinlogInfo.AddTableInfo(ver, tblInfo)
d.asyncNotifyEvent(&Event{Tp: model.ActionAddColumn, TableInfo: tblInfo, ColumnInfo: columnInfo})
default:
Expand All @@ -176,17 +176,17 @@ func (d *ddl) onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
var colName model.CIStr
err = job.DecodeArgs(&colName)
if err != nil {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

colInfo := findCol(tblInfo.Columns, colName.L)
if colInfo == nil {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, ErrCantDropFieldOrKey.Gen("column %s doesn't exist", colName)
}
if err = isDroppableColumn(tblInfo, colName); err != nil {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

Expand Down Expand Up @@ -226,7 +226,7 @@ func (d *ddl) onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
}

// Finish this job.
job.State = model.JobDone
job.State = model.JobStateDone
job.BinlogInfo.AddTableInfo(ver, tblInfo)
d.asyncNotifyEvent(&Event{Tp: model.ActionDropColumn, TableInfo: tblInfo, ColumnInfo: colInfo})
default:
Expand Down Expand Up @@ -258,7 +258,7 @@ func (d *ddl) addTableColumn(t table.Table, columnInfo *model.ColumnInfo, reorgI
if columnInfo.DefaultValue != nil {
colMeta.defaultVal, err = table.GetColDefaultValue(ctx, columnInfo)
if err != nil {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
log.Errorf("[ddl] fatal: this case shouldn't happen, column %v err %v", columnInfo, err)
return errors.Trace(err)
}
Expand Down Expand Up @@ -387,7 +387,7 @@ func (d *ddl) onSetDefaultValue(t *meta.Meta, job *model.Job) (ver int64, _ erro
newCol := &model.ColumnInfo{}
err := job.DecodeArgs(newCol)
if err != nil {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

Expand All @@ -400,7 +400,7 @@ func (d *ddl) onModifyColumn(t *meta.Meta, job *model.Job) (ver int64, _ error)
pos := &ast.ColumnPosition{}
err := job.DecodeArgs(newCol, oldColName, pos)
if err != nil {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

Expand All @@ -416,7 +416,7 @@ func (d *ddl) doModifyColumn(t *meta.Meta, job *model.Job, col *model.ColumnInfo

oldCol := findCol(tblInfo.Columns, oldName.L)
if oldCol == nil || oldCol.State != model.StatePublic {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, infoschema.ErrColumnNotExists.GenByArgs(oldName, tblInfo.Name)
}

Expand All @@ -425,13 +425,13 @@ func (d *ddl) doModifyColumn(t *meta.Meta, job *model.Job, col *model.ColumnInfo
if pos.Tp == ast.ColumnPositionAfter {
if oldName.L == pos.RelativeColumn.Name.L {
// `alter table tableName modify column b int after b` will return ver,ErrColumnNotExists.
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, infoschema.ErrColumnNotExists.GenByArgs(oldName, tblInfo.Name)
}

relative := findCol(tblInfo.Columns, pos.RelativeColumn.Name.L)
if relative == nil || relative.State != model.StatePublic {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, infoschema.ErrColumnNotExists.GenByArgs(pos.RelativeColumn, tblInfo.Name)
}

Expand Down Expand Up @@ -482,11 +482,11 @@ func (d *ddl) doModifyColumn(t *meta.Meta, job *model.Job, col *model.ColumnInfo
job.SchemaState = model.StatePublic
ver, err = updateTableInfo(t, job, tblInfo, originalState)
if err != nil {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

job.State = model.JobDone
job.State = model.JobStateDone
job.BinlogInfo.AddTableInfo(ver, tblInfo)
return ver, nil
}
Expand All @@ -498,7 +498,7 @@ func (d *ddl) updateColumn(t *meta.Meta, job *model.Job, newCol *model.ColumnInf
}
oldCol := findCol(tblInfo.Columns, oldColName.L)
if oldCol == nil || oldCol.State != model.StatePublic {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, infoschema.ErrColumnNotExists.GenByArgs(newCol.Name, tblInfo.Name)
}
*oldCol = *newCol
Expand All @@ -507,11 +507,11 @@ func (d *ddl) updateColumn(t *meta.Meta, job *model.Job, newCol *model.ColumnInf
job.SchemaState = model.StatePublic
ver, err = updateTableInfo(t, job, tblInfo, originalState)
if err != nil {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

job.State = model.JobDone
job.State = model.JobStateDone
job.BinlogInfo.AddTableInfo(ver, tblInfo)
return ver, nil
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func checkEqualTable(c *C, t1, t2 *model.TableInfo) {
}

func checkHistoryJob(c *C, job *model.Job) {
c.Assert(job.State, Equals, model.JobSynced)
c.Assert(job.State, Equals, model.JobStateSynced)
}

func checkHistoryJobArgs(c *C, ctx context.Context, id int64, args *historyJobArgs) {
Expand Down
14 changes: 7 additions & 7 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (d *ddl) handleUpdateJobError(t *meta.Meta, job *model.Job, err error) erro
job.BinlogInfo.Clean()
job.Error = toTError(err)
job.SchemaState = model.StateNone
job.State = model.JobCancelled
job.State = model.JobStateCancelled
err = d.finishDDLJob(t, job)
}
return errors.Trace(err)
Expand Down Expand Up @@ -200,7 +200,7 @@ func (d *ddl) handleDDLJobQueue() error {

if job.IsDone() {
binloginfo.SetDDLBinlog(d.workerVars.BinlogClient, txn, job.ID, job.Query)
job.State = model.JobSynced
job.State = model.JobStateSynced
err = d.finishDDLJob(t, job)
return errors.Trace(err)
}
Expand Down Expand Up @@ -233,7 +233,7 @@ func (d *ddl) handleDDLJobQueue() error {
// Here means the job enters another state (delete only, write only, public, etc...) or is cancelled.
// If the job is done or still running, we will wait 2 * lease time to guarantee other servers to update
// the newest schema.
if job.State == model.JobRunning || job.State == model.JobDone {
if job.State == model.JobStateRunning || job.State == model.JobStateDone {
d.waitSchemaChanged(nil, waitTime, schemaVer)
}
if job.IsSynced() {
Expand Down Expand Up @@ -262,15 +262,15 @@ func (d *ddl) runDDLJob(t *meta.Meta, job *model.Job) (ver int64) {
log.Infof("[ddl] run the cancelling DDL job %s", job)
asyncNotify(d.notifyCancelReorgJob)
} else {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
job.Error = errCancelledDDLJob
job.ErrorCount++
return
}
}

if !job.IsRollingback() && !job.IsCancelling() {
job.State = model.JobRunning
job.State = model.JobStateRunning
}

var err error
Expand Down Expand Up @@ -305,14 +305,14 @@ func (d *ddl) runDDLJob(t *meta.Meta, job *model.Job) (ver int64) {
ver, err = d.onSetDefaultValue(t, job)
default:
// Invalid job, cancel it.
job.State = model.JobCancelled
job.State = model.JobStateCancelled
err = errInvalidDDLJob.Gen("invalid ddl job %v", job)
}

// Save errors in job, so that others can know errors happened.
if err != nil {
// If job is not cancelled, we should log this error.
if job.State != model.JobCancelled {
if job.State != model.JobStateCancelled {
log.Errorf("[ddl] run DDL job err %v", errors.ErrorStack(err))
} else {
log.Infof("[ddl] the DDL job is normal to cancel because %v", errors.ErrorStack(err))
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func checkCancelState(txn kv.Transaction, job *model.Job, test *testCancelJob) e
// If the action is adding index and the state is writing reorganization, it wants to test the case of cancelling the job when backfilling indexes.
// When the job satisfies this case of addIndexFirstReorg, the worker hasn't started to backfill indexes.
if test.cancelState == job.SchemaState && !addIndexFirstReorg {
if job.SchemaState == model.StateNone && job.State != model.JobDone {
if job.SchemaState == model.StateNone && job.State != model.JobStateDone {
// If the schema state is none, we only test the job is finished.
} else {
errs, err := inspectkv.CancelJobs(txn, test.jobIDs)
Expand Down
10 changes: 5 additions & 5 deletions ddl/foreign_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (d *ddl) onCreateForeignKey(t *meta.Meta, job *model.Job) (ver int64, _ err
var fkInfo model.FKInfo
err = job.DecodeArgs(&fkInfo)
if err != nil {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
fkInfo.ID = allocateIndexID(tblInfo)
Expand All @@ -48,7 +48,7 @@ func (d *ddl) onCreateForeignKey(t *meta.Meta, job *model.Job) (ver int64, _ err
return ver, errors.Trace(err)
}
// Finish this job.
job.State = model.JobDone
job.State = model.JobStateDone
job.BinlogInfo.AddTableInfo(ver, tblInfo)
return ver, nil
default:
Expand All @@ -70,7 +70,7 @@ func (d *ddl) onDropForeignKey(t *meta.Meta, job *model.Job) (ver int64, _ error
)
err = job.DecodeArgs(&fkName)
if err != nil {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

Expand All @@ -82,7 +82,7 @@ func (d *ddl) onDropForeignKey(t *meta.Meta, job *model.Job) (ver int64, _ error
}

if !found {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, infoschema.ErrForeignKeyNotExists.GenByArgs(fkName)
}

Expand All @@ -106,7 +106,7 @@ func (d *ddl) onDropForeignKey(t *meta.Meta, job *model.Job) (ver int64, _ error
return ver, errors.Trace(err)
}
// Finish this job.
job.State = model.JobDone
job.State = model.JobStateDone
job.BinlogInfo.AddTableInfo(ver, tblInfo)
return ver, nil
default:
Expand Down
4 changes: 2 additions & 2 deletions ddl/foreign_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (s *testForeighKeySuite) TestForeignKey(c *C) {
var hookErr error
tc := &TestDDLCallback{}
tc.onJobUpdated = func(job *model.Job) {
if job.State != model.JobDone {
if job.State != model.JobStateDone {
return
}
mu.Lock()
Expand Down Expand Up @@ -174,7 +174,7 @@ func (s *testForeighKeySuite) TestForeignKey(c *C) {
checkOK = false
mu.Unlock()
tc.onJobUpdated = func(job *model.Job) {
if job.State != model.JobDone {
if job.State != model.JobStateDone {
return
}
mu.Lock()
Expand Down
18 changes: 9 additions & 9 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,20 +198,20 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) (ver int64, err error)
)
err = job.DecodeArgs(&unique, &indexName, &idxColNames, &indexOption)
if err != nil {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

indexInfo := findIndexByName(indexName.L, tblInfo.Indices)
if indexInfo != nil && indexInfo.State == model.StatePublic {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, errDupKeyName.Gen("index already exist %s", indexName)
}

if indexInfo == nil {
indexInfo, err = buildIndexInfo(tblInfo, indexName, idxColNames, model.StateNone)
if err != nil {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
if indexOption != nil {
Expand Down Expand Up @@ -304,7 +304,7 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) (ver int64, err error)
return ver, errors.Trace(err)
}
// Finish this job.
job.State = model.JobDone
job.State = model.JobStateDone
job.BinlogInfo.AddTableInfo(ver, tblInfo)
default:
err = ErrInvalidIndexState.Gen("invalid index state %v", tblInfo.State)
Expand All @@ -314,7 +314,7 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) (ver int64, err error)
}

func (d *ddl) convert2RollbackJob(t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, indexInfo *model.IndexInfo, err error) (ver int64, _ error) {
job.State = model.JobRollingback
job.State = model.JobStateRollingback
job.Args = []interface{}{indexInfo.Name}
// If add index job rollbacks in write reorganization state, its need to delete all keys which has been added.
// Its work is the same as drop index job do.
Expand Down Expand Up @@ -344,13 +344,13 @@ func (d *ddl) onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {

var indexName model.CIStr
if err = job.DecodeArgs(&indexName); err != nil {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

indexInfo := findIndexByName(indexName.L, tblInfo.Indices)
if indexInfo == nil {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, ErrCantDropFieldOrKey.Gen("index %s doesn't exist", indexName)
}

Expand Down Expand Up @@ -391,9 +391,9 @@ func (d *ddl) onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {

// Finish this job.
if job.IsRollingback() {
job.State = model.JobRollbackDone
job.State = model.JobStateRollbackDone
} else {
job.State = model.JobDone
job.State = model.JobStateDone
d.asyncNotifyEvent(&Event{Tp: model.ActionDropIndex, TableInfo: tblInfo, IndexInfo: indexInfo})
}
job.BinlogInfo.AddTableInfo(ver, tblInfo)
Expand Down
Loading

0 comments on commit 6a82257

Please sign in to comment.