Skip to content

Commit

Permalink
*: Add row count field in a job, fix a drop schema bug and pass race (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala authored Sep 28, 2016
1 parent 4d3844a commit 5df5085
Show file tree
Hide file tree
Showing 22 changed files with 318 additions and 142 deletions.
5 changes: 4 additions & 1 deletion ddl/bg_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,14 @@ func (d *ddl) handleBgJobQueue() error {

return errors.Trace(err)
})

if err != nil {
return errors.Trace(err)
}

d.hookMu.Lock()
d.hook.OnBgJobUpdated(job)
d.hookMu.Unlock()

return nil
}

Expand Down
3 changes: 0 additions & 3 deletions ddl/bg_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ func (s *testDDLSuite) TestDropSchemaError(c *C) {
job := &model.Job{
SchemaID: 1,
Type: model.ActionDropSchema,
Args: []interface{}{&model.DBInfo{
Name: model.CIStr{O: "test"},
}},
}
err := kv.RunInNewTxn(store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
Expand Down
7 changes: 7 additions & 0 deletions ddl/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type Callback interface {
OnJobRunBefore(job *model.Job)
// OnJobUpdated is called after the running job is updated.
OnJobUpdated(job *model.Job)
// OnBgJobUpdated is called after the running background job is updated.
OnBgJobUpdated(job *model.Job)
}

// BaseCallback implements Callback.OnChanged interface.
Expand All @@ -43,3 +45,8 @@ func (c *BaseCallback) OnJobRunBefore(job *model.Job) {
func (c *BaseCallback) OnJobUpdated(job *model.Job) {
// Nothing to do.
}

// OnBgJobUpdated implements Callback.OnBgJobUpdated interface.
func (c *BaseCallback) OnBgJobUpdated(job *model.Job) {
// Nothing to do.
}
11 changes: 11 additions & 0 deletions ddl/callback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type testDDLCallback struct {

onJobRunBefore func(*model.Job)
onJobUpdated func(*model.Job)
onBgJobUpdated func(*model.Job)
}

func (tc *testDDLCallback) OnJobRunBefore(job *model.Job) {
Expand All @@ -44,10 +45,20 @@ func (tc *testDDLCallback) OnJobUpdated(job *model.Job) {
tc.BaseCallback.OnJobUpdated(job)
}

func (tc *testDDLCallback) OnBgJobUpdated(job *model.Job) {
if tc.onBgJobUpdated != nil {
tc.onBgJobUpdated(job)
return
}

tc.BaseCallback.OnBgJobUpdated(job)
}

func (s *testDDLSuite) TestCallback(c *C) {
defer testleak.AfterTest(c)()
cb := &BaseCallback{}
c.Assert(cb.OnChanged(nil), IsNil)
cb.OnJobRunBefore(nil)
cb.OnJobUpdated(nil)
cb.OnBgJobUpdated(nil)
}
9 changes: 5 additions & 4 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (d *ddl) onAddColumn(t *meta.Meta, job *model.Job) error {
}
if columnInfo.DefaultValue != nil || mysql.HasNotNullFlag(columnInfo.Flag) {
err = d.runReorgJob(func() error {
return d.backfillColumn(tbl, columnInfo, reorgInfo)
return d.backfillColumn(tbl, columnInfo, reorgInfo, job)
})
if terror.ErrorEqual(err, errWaitReorgTimeout) {
// if timeout, we should return, check for the owner and re-wait job done.
Expand Down Expand Up @@ -303,10 +303,10 @@ func (d *ddl) onDropColumn(t *meta.Meta, job *model.Job) error {
// 3. For one row, if the row has been already deleted, skip to next row.
// 4. If not deleted, check whether column data has existed, if existed, skip to next row.
// 5. If column data doesn't exist, backfill the column with default value and then continue to handle next row.
func (d *ddl) backfillColumn(t table.Table, columnInfo *model.ColumnInfo, reorgInfo *reorgInfo) error {
func (d *ddl) backfillColumn(t table.Table, columnInfo *model.ColumnInfo, reorgInfo *reorgInfo, job *model.Job) error {
seekHandle := reorgInfo.Handle
version := reorgInfo.SnapshotVer
count := 0
count := job.GetRowCount()

for {
startTS := time.Now()
Expand All @@ -317,7 +317,7 @@ func (d *ddl) backfillColumn(t table.Table, columnInfo *model.ColumnInfo, reorgI
return nil
}

count += len(handles)
count += int64(len(handles))
seekHandle = handles[len(handles)-1] + 1
sub := time.Since(startTS).Seconds()
err = d.backfillColumnData(t, columnInfo, handles, reorgInfo)
Expand All @@ -326,6 +326,7 @@ func (d *ddl) backfillColumn(t table.Table, columnInfo *model.ColumnInfo, reorgI
return errors.Trace(err)
}

job.SetRowCount(count)
batchHandleDataHistogram.WithLabelValues(batchAddCol).Observe(sub)
log.Infof("[ddl] added column for %v rows, take time %v", count, sub)
}
Expand Down
6 changes: 3 additions & 3 deletions ddl/column_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (s *testColumnChangeSuite) TestColumnChange(c *C) {
mu.Unlock()
}
}
d.hook = tc
d.setHook(tc)
defaultValue := int64(3)
job := testCreateColumn(c, ctx, d, s.dbInfo, tblInfo, "c3", &ast.ColumnPosition{Tp: ast.ColumnPositionNone}, defaultValue)
c.Assert(errors.ErrorStack(checkErr), Equals, "")
Expand Down Expand Up @@ -154,7 +154,7 @@ func (s *testColumnChangeSuite) testAddColumnNoDefault(c *C, ctx context.Context
}
}
}
d.hook = tc
d.setHook(tc)
d.start()
job := testCreateColumn(c, ctx, d, s.dbInfo, tblInfo, "c3", &ast.ColumnPosition{Tp: ast.ColumnPositionNone}, nil)
c.Assert(errors.ErrorStack(checkErr), Equals, "")
Expand Down Expand Up @@ -183,7 +183,7 @@ func (s *testColumnChangeSuite) testColumnDrop(c *C, ctx context.Context, d *ddl
}
}
}
d.hook = tc
d.setHook(tc)
d.start()
c.Assert(errors.ErrorStack(checkErr), Equals, "")
testDropColumn(c, ctx, d, s.dbInfo, tbl.Meta(), dropCol.Name.L, false)
Expand Down
5 changes: 2 additions & 3 deletions ddl/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,6 @@ func (s *testColumnSuite) TestAddColumn(c *C) {
c.Assert(err, IsNil)

testCreateTable(c, ctx, d, s.dbInfo, tblInfo)

t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)

oldRow := types.MakeDatums(int64(1), int64(2), int64(3))
Expand Down Expand Up @@ -774,7 +773,7 @@ func (s *testColumnSuite) TestAddColumn(c *C) {
}
}

d.hook = tc
d.setHook(tc)

// Use local ddl for callback test.
s.d.close()
Expand Down Expand Up @@ -839,7 +838,7 @@ func (s *testColumnSuite) TestDropColumn(c *C) {
}
}

d.hook = tc
d.setHook(tc)

// Use local ddl for callback test.
s.d.close()
Expand Down
45 changes: 29 additions & 16 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ type ddl struct {

infoHandle *infoschema.Handle
hook Callback
hookMu sync.RWMutex
store kv.Storage
// schema lease seconds.
lease time.Duration
Expand Down Expand Up @@ -322,7 +323,7 @@ func (d *ddl) CreateSchema(ctx context.Context, schema model.CIStr, charsetInfo
}

err = d.doDDLJob(ctx, job)
err = d.hook.OnChanged(err)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

Expand All @@ -339,7 +340,7 @@ func (d *ddl) DropSchema(ctx context.Context, schema model.CIStr) (err error) {
}

err = d.doDDLJob(ctx, job)
err = d.hook.OnChanged(err)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

Expand Down Expand Up @@ -889,7 +890,7 @@ func (d *ddl) CreateTable(ctx context.Context, ident ast.Ident, colDefs []*ast.C
d.handleAutoIncID(tbInfo, schema.ID)
}
}
err = d.hook.OnChanged(err)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

Expand Down Expand Up @@ -1023,7 +1024,7 @@ func (d *ddl) AddColumn(ctx context.Context, ti ast.Ident, spec *ast.AlterTableS
}

err = d.doDDLJob(ctx, job)
err = d.hook.OnChanged(err)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

Expand Down Expand Up @@ -1054,7 +1055,7 @@ func (d *ddl) DropColumn(ctx context.Context, ti ast.Ident, colName model.CIStr)
}

err = d.doDDLJob(ctx, job)
err = d.hook.OnChanged(err)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

Expand All @@ -1078,7 +1079,7 @@ func (d *ddl) DropTable(ctx context.Context, ti ast.Ident) (err error) {
}

err = d.doDDLJob(ctx, job)
err = d.hook.OnChanged(err)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

Expand Down Expand Up @@ -1106,7 +1107,7 @@ func (d *ddl) CreateIndex(ctx context.Context, ti ast.Ident, unique bool, indexN
}

err = d.doDDLJob(ctx, job)
err = d.hook.OnChanged(err)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

Expand Down Expand Up @@ -1163,7 +1164,7 @@ func (d *ddl) CreateForeignKey(ctx context.Context, ti ast.Ident, fkName model.C
}

err = d.doDDLJob(ctx, job)
err = d.hook.OnChanged(err)
err = d.callHookOnChanged(err)
return errors.Trace(err)

}
Expand All @@ -1188,7 +1189,7 @@ func (d *ddl) DropForeignKey(ctx context.Context, ti ast.Ident, fkName model.CIS
}

err = d.doDDLJob(ctx, job)
err = d.hook.OnChanged(err)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

Expand All @@ -1212,10 +1213,23 @@ func (d *ddl) DropIndex(ctx context.Context, ti ast.Ident, indexName model.CIStr
}

err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

func (d *ddl) callHookOnChanged(err error) error {
d.hookMu.Lock()
err = d.hook.OnChanged(err)
d.hookMu.Unlock()
return errors.Trace(err)
}

func (d *ddl) setHook(h Callback) {
d.hookMu.Lock()
d.hook = h
d.hookMu.Unlock()
}

// findCol finds column in cols by name.
func findCol(cols []*model.ColumnInfo, name string) *model.ColumnInfo {
name = strings.ToLower(name)
Expand Down Expand Up @@ -1248,15 +1262,14 @@ const (
codeCantDropColWithIndex = 201
codeUnsupportedAddColumn = 202

codeBadNull = 1048
codeCantRemoveAllFields = 1090
codeCantDropFieldOrKey = 1091
codeInvalidOnUpdate = 1294
codeTooLongIdent = 1059

codeBadNull = 1048
codeTooLongIdent = 1059
codeTooLongKey = 1071
codeBlobKeyWithoutLength = 1170
codeIncorrectPrefixKey = 1089
codeCantRemoveAllFields = 1090
codeCantDropFieldOrKey = 1091
codeBlobKeyWithoutLength = 1170
codeInvalidOnUpdate = 1294
)

func init() {
Expand Down
8 changes: 4 additions & 4 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,9 @@ func (d *ddl) handleDDLJobQueue() error {

log.Warnf("[ddl] run DDL job %v", job)

d.m.RLock()
d.hookMu.Lock()
d.hook.OnJobRunBefore(job)
d.m.RUnlock()
d.hookMu.Unlock()

// if run job meets error, we will save this error in job Error
// and retry later if the job is not cancelled.
Expand Down Expand Up @@ -291,9 +291,9 @@ func (d *ddl) handleDDLJobQueue() error {
return nil
}

d.m.RLock()
d.hookMu.Lock()
d.hook.OnJobUpdated(job)
d.m.RUnlock()
d.hookMu.Unlock()

// here means the job enters another state (delete only, write only, public, etc...) or is cancelled.
// if the job is done or still running, we will wait 2 * lease time to guarantee other servers to update
Expand Down
2 changes: 1 addition & 1 deletion ddl/foreign_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (s *testForeighKeySuite) TestForeignKey(c *C) {
checkOK = true
}

d.hook = tc
d.setHook(tc)

d.close()
d.start()
Expand Down
Loading

0 comments on commit 5df5085

Please sign in to comment.