Skip to content

Commit

Permalink
*: address comments and add test
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala committed Jan 21, 2016
1 parent 500509b commit 7a3daf5
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 91 deletions.
70 changes: 37 additions & 33 deletions ddl/bg_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (d *ddl) handleBgJobQueue() error {
return nil
}

task := &model.Job{}
job := &model.Job{}
err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
owner, err := d.checkOwner(t, bgJobFlag)
Expand All @@ -42,19 +42,19 @@ func (d *ddl) handleBgJobQueue() error {
}

// get the first background job and run
task, err = d.getFirstBgJob(t)
job, err = d.getFirstBgJob(t)
if err != nil {
return errors.Trace(err)
}
if task == nil {
if job == nil {
return nil
}

d.runBgJob(t, task)
if task.IsFinished() {
err = d.finishBgJob(t, task)
d.runBgJob(t, job)
if job.IsFinished() {
err = d.finishBgJob(t, job)
} else {
err = d.updateBgJob(t, task)
err = d.updateBgJob(t, job)
}
if err != nil {
return errors.Trace(err)
Expand All @@ -74,43 +74,45 @@ func (d *ddl) handleBgJobQueue() error {
}

// runBgJob runs background job.
func (d *ddl) runBgJob(t *meta.Meta, task *model.Job) {
task.State = model.JobRunning
func (d *ddl) runBgJob(t *meta.Meta, job *model.Job) {
job.State = model.JobRunning

var err error
switch task.Type {
switch job.Type {
case model.ActionDropSchema:
err = d.delReorgSchema(t, task)
err = d.delReorgSchema(t, job)
case model.ActionDropTable:
err = d.delReorgTable(t, task)
err = d.delReorgTable(t, job)
default:
task.State = model.JobCancelled
err = errors.Errorf("invalid background job %v", task)

job.State = model.JobCancelled
err = errors.Errorf("invalid background job %v", job)

}

if err != nil {
if task.State != model.JobCancelled {
if job.State != model.JobCancelled {
log.Errorf("run background job err %v", errors.ErrorStack(err))
}

task.Error = err.Error()
task.ErrorCount++
job.Error = err.Error()
job.ErrorCount++
}
}

// prepareBgJob prepares background job.
func (d *ddl) prepareBgJob(job *model.Job) error {
task := &model.Job{
ID: job.ID,
SchemaID: job.SchemaID,
TableID: job.TableID,
Type: job.Type,
Args: job.Args,
func (d *ddl) prepareBgJob(ddlJob *model.Job) error {
job := &model.Job{
ID: ddlJob.ID,
SchemaID: ddlJob.SchemaID,
TableID: ddlJob.TableID,
Type: ddlJob.Type,
Args: ddlJob.Args,
}

err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
err1 := t.EnQueueBgJob(task)
err1 := t.EnQueueBgJob(job)

return errors.Trace(err1)
})
Expand All @@ -128,32 +130,34 @@ func (d *ddl) startBgJob(tp model.ActionType) {

// getFirstBgJob gets the first background job.
func (d *ddl) getFirstBgJob(t *meta.Meta) (*model.Job, error) {
task, err := t.GetBgJob(0)
return task, errors.Trace(err)
job, err := t.GetBgJob(0)
return job, errors.Trace(err)
}

// updateBgJob updates background job.
func (d *ddl) updateBgJob(t *meta.Meta, task *model.Job) error {
err := t.UpdateBgJob(0, task)
func (d *ddl) updateBgJob(t *meta.Meta, job *model.Job) error {
err := t.UpdateBgJob(0, job)
return errors.Trace(err)
}

// finishBgJob finishs background job.
func (d *ddl) finishBgJob(t *meta.Meta, task *model.Job) error {
log.Warnf("[ddl] finish background job %v", task)
func (d *ddl) finishBgJob(t *meta.Meta, job *model.Job) error {
log.Warnf("[ddl] finish background job %v", job)
if _, err := t.DeQueueBgJob(); err != nil {
return errors.Trace(err)
}

err := t.AddHistoryBgJob(task)
err := t.AddHistoryBgJob(job)

return errors.Trace(err)
}

func (d *ddl) onBackgroundWorker() {
defer d.wait.Done()

// ensure that have ddl job convert to background job.
// for a ddl drop job from start to end, the state of it will be pubilc -> write only -> delete only -> none.
// for every state changes, we will wait as lease 2 * lease time.
// so here the ticker check is 8 * lease to ensure that ddl job have converted to background job.
checkTime := chooseLeaseTime(8*d.lease, 10*time.Second)

ticker := time.NewTicker(checkTime)
Expand Down
51 changes: 40 additions & 11 deletions ddl/bg_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/util/mock"
)

func (s *testDDLSuite) TestDropSchemaError(c *C) {
Expand All @@ -30,47 +31,75 @@ func (s *testDDLSuite) TestDropSchemaError(c *C) {
d := newDDL(store, nil, nil, lease)
defer d.close()

task := &model.Job{
job := &model.Job{
ID: 1,
SchemaID: 1,
Type: model.ActionDropSchema,
Args: []interface{}{&model.DBInfo{
Name: model.CIStr{O: "test"},
}},
}
d.prepareBgJob(task)
d.startBgJob(task.Type)
d.prepareBgJob(job)
d.startBgJob(job.Type)

time.Sleep(lease)
testCheckBgJobCancelled(c, d, task)
testCheckBgJobState(c, d, job, model.JobDone)
}

func testCheckBgJobCancelled(c *C, d *ddl, task *model.Job) {
func testCheckBgJobState(c *C, d *ddl, job *model.Job, state model.JobState) {
kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
historyBgJob, err := t.GetHistoryBgJob(task.ID)
historyBgJob, err := t.GetHistoryBgJob(job.ID)
c.Assert(err, IsNil)
c.Assert(historyBgJob.State, Equals, model.JobCancelled)
c.Assert(historyBgJob.State, Equals, state)

return nil
})
}

func (s *testDDLSuite) TestDropTableError(c *C) {
store := testCreateStore(c, "test_drop_table")
defer store.Close()

lease := 10 * time.Millisecond
d := newDDL(store, nil, nil, lease)
defer d.close()

dbInfo := testSchemaInfo(c, d, "test")
testCreateSchema(c, mock.NewContext(), d, dbInfo)

job := &model.Job{
ID: 1,
SchemaID: dbInfo.ID,
Type: model.ActionDropTable,
Args: []interface{}{&model.TableInfo{
ID: 1,
Name: model.CIStr{O: "t"},
}},
}
d.prepareBgJob(job)
d.startBgJob(job.Type)

time.Sleep(lease)
testCheckBgJobState(c, d, job, model.JobDone)
}

func (s *testDDLSuite) TestInvalidBgJobType(c *C) {
store := testCreateStore(c, "test_invalid_task_type")
store := testCreateStore(c, "test_invalid_bg_job_type")
defer store.Close()

lease := 10 * time.Millisecond
d := newDDL(store, nil, nil, lease)
defer d.close()

task := &model.Job{
job := &model.Job{
SchemaID: 1,
TableID: 1,
Type: model.ActionCreateTable,
}
d.prepareBgJob(task)
d.prepareBgJob(job)
d.startBgJob(model.ActionDropTable)

time.Sleep(lease)
testCheckBgJobCancelled(c, d, task)
testCheckBgJobState(c, d, job, model.JobCancelled)
}
4 changes: 2 additions & 2 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (d *ddl) Stop() error {
return nil
}

// job's owner is me, clean it so other servers can compete for it quickly.
// ddl job's owner is me, clean it so other servers can compete for it quickly.
return t.SetDDLJobOwner(&model.Owner{})
})
if err != nil {
Expand All @@ -148,7 +148,7 @@ func (d *ddl) Stop() error {
return nil
}

// task's owner is me, clean it so other servers can compete for it quickly.
// background job's owner is me, clean it so other servers can compete for it quickly.
return t.SetBgJobOwner(&model.Owner{})
})

Expand Down
4 changes: 2 additions & 2 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (d *ddl) startDDLJob(ctx context.Context, job *model.Job) error {
var historyJob *model.Job

// for a job from start to end, the state of it will be none -> delete only -> write only -> reorganization -> public
// for every state change, we will wait as lease 2 * lease time, so here the ticker check is 10 * lease.
// for every state changes, we will wait as lease 2 * lease time, so here the ticker check is 10 * lease.
ticker := time.NewTicker(chooseLeaseTime(10*d.lease, 10*time.Second))
defer ticker.Stop()
for {
Expand Down Expand Up @@ -133,7 +133,7 @@ func (d *ddl) checkOwner(t *meta.Meta, flag string) (*model.Owner, error) {
// 4 * lease to check its timeout.
maxTimeout := int64(4 * d.lease)
if flag == bgJobFlag {
// backgroun job doesn't need to guarantee other servers update the schema.
// background job doesn't need to guarantee other servers update the schema.
maxTimeout = int64(2 * d.lease)
}
if owner.OwnerID == d.uuid || now-owner.LastUpdateTS > maxTimeout {
Expand Down
24 changes: 15 additions & 9 deletions ddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/terror"
)

func (d *ddl) onCreateSchema(t *meta.Meta, job *model.Job) error {
Expand Down Expand Up @@ -73,15 +74,19 @@ func (d *ddl) onCreateSchema(t *meta.Meta, job *model.Job) error {
}
}

func (d *ddl) delReorgSchema(t *meta.Meta, task *model.Job) error {
if len(task.Args) != 1 || task.Args[0] == nil {
task.State = model.JobCancelled
return errors.Trace(infoschema.DatabaseNotExists)
func (d *ddl) delReorgSchema(t *meta.Meta, job *model.Job) error {
dbInfo := &model.DBInfo{}
if err := job.DecodeArgs(dbInfo); err != nil {
// arg error, cancel this job.
job.State = model.JobCancelled
return errors.Trace(err)
}
dbInfo := task.Args[0].(*model.DBInfo)

// wait reorganization jobs done and drop meta.
tables, err := t.ListTables(dbInfo.ID)
if terror.ErrorEqual(meta.ErrDBNotExists, err) {
job.State = model.JobDone
return nil
}
if err != nil {
return errors.Trace(err)
}
Expand All @@ -90,9 +95,9 @@ func (d *ddl) delReorgSchema(t *meta.Meta, task *model.Job) error {
return errors.Trace(err)
}

// finish this task
task.SchemaState = model.StateNone
task.State = model.JobDone
// finish this background job
job.SchemaState = model.StateNone
job.State = model.JobDone

return nil
}
Expand Down Expand Up @@ -132,6 +137,7 @@ func (d *ddl) onDropSchema(t *meta.Meta, job *model.Job) error {
break
}
// finish this job
job.Args = []interface{}{dbInfo}
job.State = model.JobDone
job.SchemaState = model.StateNone
default:
Expand Down
12 changes: 6 additions & 6 deletions ddl/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ func testDropSchema(c *C, ctx context.Context, d *ddl, dbInfo *model.DBInfo) *mo
}

func checkDrop(c *C, t *meta.Meta) bool {
task, err := t.GetBgJob(0)
bgJob, err := t.GetBgJob(0)
c.Assert(err, IsNil)
if task == nil {
if bgJob == nil {
return true
}

Expand All @@ -76,7 +76,7 @@ func checkDrop(c *C, t *meta.Meta) bool {
}

func testCheckSchemaState(c *C, d *ddl, dbInfo *model.DBInfo, state model.SchemaState) {
isDrop := true
isDropped := true

for {
kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
Expand All @@ -85,8 +85,8 @@ func testCheckSchemaState(c *C, d *ddl, dbInfo *model.DBInfo, state model.Schema
c.Assert(err, IsNil)

if state == model.StateNone {
isDrop = checkDrop(c, t)
if !isDrop {
isDropped = checkDrop(c, t)
if !isDropped {
return nil
}
c.Assert(info, IsNil)
Expand All @@ -98,7 +98,7 @@ func testCheckSchemaState(c *C, d *ddl, dbInfo *model.DBInfo, state model.Schema
return nil
})

if isDrop {
if isDropped {
break
}
}
Expand Down
Loading

0 comments on commit 7a3daf5

Please sign in to comment.