Skip to content

Commit

Permalink
ddl, model: clean up (pingcap#5583)
Browse files Browse the repository at this point in the history
* ddl: rename updateTableInfo to updateVerAndTblInfo and add finish function
  • Loading branch information
zimulala authored Jan 15, 2018
1 parent d568bf7 commit bd53094
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 79 deletions.
39 changes: 15 additions & 24 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,31 +139,29 @@ func (d *ddl) onAddColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
// none -> delete only
job.SchemaState = model.StateDeleteOnly
columnInfo.State = model.StateDeleteOnly
ver, err = updateTableInfo(t, job, tblInfo, originalState)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != columnInfo.State)
case model.StateDeleteOnly:
// delete only -> write only
job.SchemaState = model.StateWriteOnly
columnInfo.State = model.StateWriteOnly
ver, err = updateTableInfo(t, job, tblInfo, originalState)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != columnInfo.State)
case model.StateWriteOnly:
// write only -> reorganization
job.SchemaState = model.StateWriteReorganization
columnInfo.State = model.StateWriteReorganization
ver, err = updateTableInfo(t, job, tblInfo, originalState)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != columnInfo.State)
case model.StateWriteReorganization:
// reorganization -> public
// Adjust column offset.
d.adjustColumnOffset(tblInfo.Columns, tblInfo.Indices, offset, true)
columnInfo.State = model.StatePublic
job.SchemaState = model.StatePublic
ver, err = updateTableInfo(t, job, tblInfo, originalState)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != columnInfo.State)
if err != nil {
return ver, errors.Trace(err)
}

// Finish this job.
job.State = model.JobStateDone
job.BinlogInfo.AddTableInfo(ver, tblInfo)
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
d.asyncNotifyEvent(&util.Event{Tp: model.ActionAddColumn, TableInfo: tblInfo, ColumnInfo: columnInfo})
default:
err = ErrInvalidColumnState.Gen("invalid column state %v", columnInfo.State)
Expand Down Expand Up @@ -204,17 +202,17 @@ func (d *ddl) onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
colInfo.State = model.StateWriteOnly
// Set this column's offset to the last and reset all following columns' offsets.
d.adjustColumnOffset(tblInfo.Columns, tblInfo.Indices, colInfo.Offset, false)
ver, err = updateTableInfo(t, job, tblInfo, originalState)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != colInfo.State)
case model.StateWriteOnly:
// write only -> delete only
job.SchemaState = model.StateDeleteOnly
colInfo.State = model.StateDeleteOnly
ver, err = updateTableInfo(t, job, tblInfo, originalState)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != colInfo.State)
case model.StateDeleteOnly:
// delete only -> reorganization
job.SchemaState = model.StateDeleteReorganization
colInfo.State = model.StateDeleteReorganization
ver, err = updateTableInfo(t, job, tblInfo, originalState)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != colInfo.State)
case model.StateDeleteReorganization:
// reorganization -> absent
// All reorganization jobs are done, drop this column.
Expand All @@ -225,15 +223,14 @@ func (d *ddl) onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
}
}
tblInfo.Columns = newColumns
job.SchemaState = model.StateNone
ver, err = updateTableInfo(t, job, tblInfo, originalState)
colInfo.State = model.StateNone
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != colInfo.State)
if err != nil {
return ver, errors.Trace(err)
}

// Finish this job.
job.State = model.JobStateDone
job.BinlogInfo.AddTableInfo(ver, tblInfo)
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
default:
err = ErrInvalidTableState.Gen("invalid table state %v", tblInfo.State)
}
Expand Down Expand Up @@ -489,16 +486,13 @@ func (d *ddl) doModifyColumn(t *meta.Meta, job *model.Job, col *model.ColumnInfo
}
}

originalState := job.SchemaState
job.SchemaState = model.StatePublic
ver, err = updateTableInfo(t, job, tblInfo, originalState)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

job.State = model.JobStateDone
job.BinlogInfo.AddTableInfo(ver, tblInfo)
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
return ver, nil
}

Expand All @@ -514,16 +508,13 @@ func (d *ddl) updateColumn(t *meta.Meta, job *model.Job, newCol *model.ColumnInf
}
*oldCol = *newCol

originalState := job.SchemaState
job.SchemaState = model.StatePublic
ver, err = updateTableInfo(t, job, tblInfo, originalState)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

job.State = model.JobStateDone
job.BinlogInfo.AddTableInfo(ver, tblInfo)
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
return ver, nil
}

Expand Down
12 changes: 4 additions & 8 deletions ddl/foreign_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,13 @@ func (d *ddl) onCreateForeignKey(t *meta.Meta, job *model.Job) (ver int64, _ err
case model.StateNone:
// We just support record the foreign key, so we just make it public.
// none -> public
job.SchemaState = model.StatePublic
fkInfo.State = model.StatePublic
ver, err = updateTableInfo(t, job, tblInfo, originalState)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != fkInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
// Finish this job.
job.State = model.JobStateDone
job.BinlogInfo.AddTableInfo(ver, tblInfo)
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
return ver, nil
default:
return ver, ErrInvalidForeignKeyState.Gen("invalid fk state %v", fkInfo.State)
Expand Down Expand Up @@ -99,15 +97,13 @@ func (d *ddl) onDropForeignKey(t *meta.Meta, job *model.Job) (ver int64, _ error
case model.StatePublic:
// We just support record the foreign key, so we just make it none.
// public -> none
job.SchemaState = model.StateNone
fkInfo.State = model.StateNone
ver, err = updateTableInfo(t, job, tblInfo, originalState)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != fkInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
// Finish this job.
job.State = model.JobStateDone
job.BinlogInfo.AddTableInfo(ver, tblInfo)
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
return ver, nil
default:
return ver, ErrInvalidForeignKeyState.Gen("invalid fk state %v", fkInfo.State)
Expand Down
31 changes: 13 additions & 18 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,19 +237,19 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) (ver int64, err error)
// none -> delete only
job.SchemaState = model.StateDeleteOnly
indexInfo.State = model.StateDeleteOnly
ver, err = updateTableInfo(t, job, tblInfo, originalState)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != indexInfo.State)
case model.StateDeleteOnly:
// delete only -> write only
job.SchemaState = model.StateWriteOnly
indexInfo.State = model.StateWriteOnly
ver, err = updateTableInfo(t, job, tblInfo, originalState)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != indexInfo.State)
case model.StateWriteOnly:
// write only -> reorganization
job.SchemaState = model.StateWriteReorganization
indexInfo.State = model.StateWriteReorganization
// Initialize SnapshotVer to 0 for later reorganization check.
job.SnapshotVer = 0
ver, err = updateTableInfo(t, job, tblInfo, originalState)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != indexInfo.State)
case model.StateWriteReorganization:
// reorganization -> public
var tbl table.Table
Expand Down Expand Up @@ -297,23 +297,20 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) (ver int64, err error)
indexInfo.State = model.StatePublic
// Set column index flag.
addIndexColumnFlag(tblInfo, indexInfo)

job.SchemaState = model.StatePublic
ver, err = updateTableInfo(t, job, tblInfo, originalState)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != indexInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
// Finish this job.
job.State = model.JobStateDone
job.BinlogInfo.AddTableInfo(ver, tblInfo)
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
default:
err = ErrInvalidIndexState.Gen("invalid index state %v", tblInfo.State)
}

return ver, errors.Trace(err)
}

func (d *ddl) convert2RollbackJob(t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, indexInfo *model.IndexInfo, err error) (ver int64, _ error) {
func (d *ddl) convert2RollbackJob(t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, indexInfo *model.IndexInfo, err error) (int64, error) {
job.State = model.JobStateRollingback
job.Args = []interface{}{indexInfo.Name}
// If add index job rollbacks in write reorganization state, its need to delete all keys which has been added.
Expand All @@ -323,7 +320,7 @@ func (d *ddl) convert2RollbackJob(t *meta.Meta, job *model.Job, tblInfo *model.T
indexInfo.State = model.StateDeleteOnly
originalState := indexInfo.State
job.SchemaState = model.StateDeleteOnly
_, err1 := updateTableInfo(t, job, tblInfo, originalState)
ver, err1 := updateVersionAndTableInfo(t, job, tblInfo, originalState != indexInfo.State)
if err1 != nil {
return ver, errors.Trace(err1)
}
Expand Down Expand Up @@ -360,17 +357,17 @@ func (d *ddl) onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {
// public -> write only
job.SchemaState = model.StateWriteOnly
indexInfo.State = model.StateWriteOnly
ver, err = updateTableInfo(t, job, tblInfo, originalState)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != indexInfo.State)
case model.StateWriteOnly:
// write only -> delete only
job.SchemaState = model.StateDeleteOnly
indexInfo.State = model.StateDeleteOnly
ver, err = updateTableInfo(t, job, tblInfo, originalState)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != indexInfo.State)
case model.StateDeleteOnly:
// delete only -> reorganization
job.SchemaState = model.StateDeleteReorganization
indexInfo.State = model.StateDeleteReorganization
ver, err = updateTableInfo(t, job, tblInfo, originalState)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != indexInfo.State)
case model.StateDeleteReorganization:
// reorganization -> absent
newIndices := make([]*model.IndexInfo, 0, len(tblInfo.Indices))
Expand All @@ -383,19 +380,17 @@ func (d *ddl) onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {
// Set column index flag.
dropIndexColumnFlag(tblInfo, indexInfo)

job.SchemaState = model.StateNone
ver, err = updateTableInfo(t, job, tblInfo, originalState)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != model.StateNone)
if err != nil {
return ver, errors.Trace(err)
}

// Finish this job.
if job.IsRollingback() {
job.State = model.JobStateRollbackDone
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo)
} else {
job.State = model.JobStateDone
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
}
job.BinlogInfo.AddTableInfo(ver, tblInfo)
job.Args = append(job.Args, indexInfo.ID)
default:
err = ErrInvalidTableState.Gen("invalid table state %v", tblInfo.State)
Expand Down
8 changes: 2 additions & 6 deletions ddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,13 @@ func (d *ddl) onCreateSchema(t *meta.Meta, job *model.Job) (ver int64, _ error)
switch dbInfo.State {
case model.StateNone:
// none -> public
job.SchemaState = model.StatePublic
dbInfo.State = model.StatePublic
err = t.CreateDatabase(dbInfo)
if err != nil {
return ver, errors.Trace(err)
}
// Finish this job.
job.State = model.JobStateDone
job.BinlogInfo.AddDBInfo(ver, dbInfo)
job.FinishDBJob(model.JobStateDone, model.StatePublic, ver, dbInfo)
return ver, nil
default:
// We can't enter here.
Expand Down Expand Up @@ -115,12 +113,10 @@ func (d *ddl) onDropSchema(t *meta.Meta, job *model.Job) (ver int64, _ error) {
}

// Finish this job.
job.BinlogInfo.AddDBInfo(ver, dbInfo)
if len(tables) > 0 {
job.Args = append(job.Args, getIDs(tables))
}
job.State = model.JobStateDone
job.SchemaState = model.StateNone
job.FinishDBJob(model.JobStateDone, model.StateNone, ver, dbInfo)
default:
// We can't enter here.
err = errors.Errorf("invalid db state %v", dbInfo.State)
Expand Down
36 changes: 14 additions & 22 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ func (d *ddl) onCreateTable(t *meta.Meta, job *model.Job) (ver int64, _ error) {
switch tbInfo.State {
case model.StateNone:
// none -> public
job.SchemaState = model.StatePublic
tbInfo.State = model.StatePublic
err = t.CreateTable(schemaID, tbInfo)
if err != nil {
Expand All @@ -65,8 +64,7 @@ func (d *ddl) onCreateTable(t *meta.Meta, job *model.Job) (ver int64, _ error) {
}
}
// Finish this job.
job.State = model.JobStateDone
job.BinlogInfo.AddTableInfo(ver, tbInfo)
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tbInfo)
d.asyncNotifyEvent(&util.Event{Tp: model.ActionCreateTable, TableInfo: tbInfo})
return ver, nil
default:
Expand Down Expand Up @@ -105,25 +103,23 @@ func (d *ddl) onDropTable(t *meta.Meta, job *model.Job) (ver int64, _ error) {
// public -> write only
job.SchemaState = model.StateWriteOnly
tblInfo.State = model.StateWriteOnly
ver, err = updateTableInfo(t, job, tblInfo, originalState)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != tblInfo.State)
case model.StateWriteOnly:
// write only -> delete only
job.SchemaState = model.StateDeleteOnly
tblInfo.State = model.StateDeleteOnly
ver, err = updateTableInfo(t, job, tblInfo, originalState)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != tblInfo.State)
case model.StateDeleteOnly:
tblInfo.State = model.StateNone
job.SchemaState = model.StateNone
ver, err = updateTableInfo(t, job, tblInfo, originalState)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != tblInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
if err = t.DropTable(job.SchemaID, tableID, true); err != nil {
break
}
// Finish this job.
job.State = model.JobStateDone
job.BinlogInfo.AddTableInfo(ver, tblInfo)
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
startKey := tablecodec.EncodeTablePrefix(tableID)
job.Args = append(job.Args, startKey)
default:
Expand Down Expand Up @@ -214,8 +210,7 @@ func (d *ddl) onTruncateTable(t *meta.Meta, job *model.Job) (ver int64, _ error)
if err != nil {
return ver, errors.Trace(err)
}
job.State = model.JobStateDone
job.BinlogInfo.AddTableInfo(ver, tblInfo)
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
startKey := tablecodec.EncodeTablePrefix(tableID)
job.Args = []interface{}{startKey}
return ver, nil
Expand Down Expand Up @@ -248,13 +243,12 @@ func (d *ddl) onRebaseAutoID(t *meta.Meta, job *model.Job) (ver int64, _ error)
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
job.State = model.JobStateDone
job.BinlogInfo.AddTableInfo(ver, tblInfo)
ver, err = updateTableInfo(t, job, tblInfo, tblInfo.State)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
return ver, nil
}

Expand All @@ -271,13 +265,12 @@ func (d *ddl) onShardRowID(t *meta.Meta, job *model.Job) (ver int64, _ error) {
return ver, errors.Trace(err)
}
tblInfo.ShardRowIDBits = shardRowIDBits
job.State = model.JobStateDone
job.BinlogInfo.AddTableInfo(ver, tblInfo)
ver, err = updateTableInfo(t, job, tblInfo, tblInfo.State)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
return ver, nil
}

Expand Down Expand Up @@ -337,9 +330,7 @@ func (d *ddl) onRenameTable(t *meta.Meta, job *model.Job) (ver int64, _ error) {
if err != nil {
return ver, errors.Trace(err)
}
job.State = model.JobStateDone
job.SchemaState = model.StatePublic
job.BinlogInfo.AddTableInfo(ver, tblInfo)
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
return ver, nil
}

Expand All @@ -366,9 +357,10 @@ func checkTableNotExists(t *meta.Meta, job *model.Job, schemaID int64, tableName
return nil
}

func updateTableInfo(t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, originalState model.SchemaState) (
// updateVersionAndTableInfo updates the schema version and the table information.
func updateVersionAndTableInfo(t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, shouldUpdateVer bool) (
ver int64, err error) {
if originalState != job.SchemaState {
if shouldUpdateVer {
ver, err = updateSchemaVersion(t, job)
if err != nil {
return 0, errors.Trace(err)
Expand Down
Loading

0 comments on commit bd53094

Please sign in to comment.