Skip to content

Commit

Permalink
ddl: initial support for parallel DDL (pingcap#6955)
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala authored Jul 25, 2018
1 parent 3299f86 commit da6f0c1
Show file tree
Hide file tree
Showing 12 changed files with 461 additions and 301 deletions.
17 changes: 13 additions & 4 deletions ddl/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ func (s *testColumnSuite) TearDownSuite(c *C) {
testleak.AfterTest(c)()
}

func testCreateColumn(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo,
colName string, pos *ast.ColumnPosition, defaultValue interface{}) *model.Job {
func buildCreateColumnJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, colName string,
pos *ast.ColumnPosition, defaultValue interface{}) *model.Job {
col := &model.ColumnInfo{
Name: model.NewCIStr(colName),
Offset: len(tblInfo.Columns),
Expand All @@ -79,22 +79,31 @@ func testCreateColumn(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{col, pos, 0},
}
return job
}

func testCreateColumn(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo,
colName string, pos *ast.ColumnPosition, defaultValue interface{}) *model.Job {
job := buildCreateColumnJob(dbInfo, tblInfo, colName, pos, defaultValue)
err := d.doDDLJob(ctx, job)
c.Assert(err, IsNil)
v := getSchemaVer(c, ctx)
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo})
return job
}

func testDropColumn(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, colName string, isError bool) *model.Job {
job := &model.Job{
func buildDropColumnJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, colName string) *model.Job {
return &model.Job{
SchemaID: dbInfo.ID,
TableID: tblInfo.ID,
Type: model.ActionDropColumn,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{model.NewCIStr(colName)},
}
}

func testDropColumn(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, colName string, isError bool) *model.Job {
job := buildDropColumnJob(dbInfo, tblInfo, colName)
err := d.doDDLJob(ctx, job)
if isError {
c.Assert(err, NotNil)
Expand Down
9 changes: 4 additions & 5 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ import (
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
"golang.org/x/net/context"
Expand Down Expand Up @@ -580,15 +580,14 @@ func (s *testStateChangeSuite) testControlParallelExecSQL(c *C, sql1, sql2 strin
if times != 0 {
return
}
var qLen int64
var err1 error
var qLen int
for {
kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error {
m := meta.NewMeta(txn)
qLen, err1 = m.DDLJobQueueLen()
jobs, err1 := admin.GetDDLJobs(txn)
if err1 != nil {
return err1
}
qLen = len(jobs)
return nil
})
if qLen == 2 {
Expand Down
42 changes: 26 additions & 16 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ type ddl struct {
quitCh chan struct{}

*ddlCtx
workers []*worker
workers map[workerType]*worker
}

// ddlCtx is the context when we use worker to handle DDL jobs.
Expand All @@ -236,7 +236,6 @@ type ddlCtx struct {
store kv.Storage
ownerManager owner.Manager
schemaSyncer SchemaSyncer
ddlJobCh chan struct{}
ddlJobDoneCh chan struct{}
ddlEventCh chan<- *util.Event
lease time.Duration // lease is schema lease.
Expand Down Expand Up @@ -317,7 +316,6 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage,
uuid: id,
store: store,
lease: lease,
ddlJobCh: make(chan struct{}, 1),
ddlJobDoneCh: make(chan struct{}, 1),
ownerManager: manager,
schemaSyncer: syncer,
Expand Down Expand Up @@ -359,20 +357,20 @@ func (d *ddl) start(ctx context.Context, ctxPool *pools.ResourcePool) {
err := d.ownerManager.CampaignOwner(ctx)
terror.Log(errors.Trace(err))

d.workers = make([]*worker, 1)
// TODO: Add addIdxWorker.
d.workers[0] = newWorker(generalWorker, 0, d.store, ctxPool)
d.workers = make(map[workerType]*worker, 2)
d.workers[generalWorker] = newWorker(generalWorker, 0, d.store, ctxPool)
d.workers[addIdxWorker] = newWorker(addIdxWorker, 1, d.store, ctxPool)
for _, worker := range d.workers {
worker.wg.Add(1)
go worker.start(d.ddlCtx)
// TODO: Add the type of DDL worker.
metrics.DDLCounter.WithLabelValues(metrics.CreateDDLWorker).Inc()

// When the start function is called, we will send a fake job to let worker
// checks owner firstly and try to find whether a job exists and run.
asyncNotify(worker.ddlJobCh)
}
}

// For every start, we will send a fake job to let worker
// check owner firstly and try to find whether a job exists and run.
asyncNotify(d.ddlJobCh)
}

func (d *ddl) close() {
Expand Down Expand Up @@ -418,16 +416,15 @@ func (d *ddl) genGlobalID() (int64, error) {
globalID, err = meta.NewMeta(txn).GenGlobalID()
return errors.Trace(err)
})

return globalID, errors.Trace(err)
}

// generalWorker returns the first worker. The ddl structure has only one worker before we implement the parallel worker.
// generalWorker returns the general worker.
// It's used for testing.
// TODO: Remove this function.
func (d *ddl) generalWorker() *worker {
if len(d.workers) == 0 {
return nil
}
return d.workers[0]
return d.workers[generalWorker]
}

// SchemaSyncer implements DDL.SchemaSyncer interface.
Expand All @@ -449,6 +446,19 @@ func checkJobMaxInterval(job *model.Job) time.Duration {
return 1 * time.Second
}

func (d *ddl) asyncNotifyWorker(jobTp model.ActionType) {
// If the workers don't run, we needn't to notify workers.
if !RunWorker {
return
}

if jobTp == model.ActionAddIndex {
asyncNotify(d.workers[addIdxWorker].ddlJobCh)
} else {
asyncNotify(d.workers[generalWorker].ddlJobCh)
}
}

func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
// For every DDL, we must commit current transaction.
if err := ctx.NewTxn(); err != nil {
Expand All @@ -463,7 +473,7 @@ func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
ctx.GetSessionVars().StmtCtx.IsDDLJobInQueue = true

// Notice worker that we push a new job and wait the job done.
asyncNotify(d.ddlJobCh)
d.asyncNotifyWorker(job.Type)
log.Infof("[ddl] start DDL job %s, Query:%s", job, job.Query)

var historyJob *model.Job
Expand Down
24 changes: 20 additions & 4 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ func checkHistoryJobArgs(c *C, ctx sessionctx.Context, id int64, args *historyJo
}
}

func testCreateIndex(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, unique bool, indexName string, colName string) *model.Job {
job := &model.Job{
func buildCreateIdxJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, unique bool, indexName string, colName string) *model.Job {
return &model.Job{
SchemaID: dbInfo.ID,
TableID: tblInfo.ID,
Type: model.ActionAddIndex,
Expand All @@ -145,26 +145,42 @@ func testCreateIndex(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo,
Column: &ast.ColumnName{Name: model.NewCIStr(colName)},
Length: types.UnspecifiedLength}}},
}
}

func testCreateIndex(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, unique bool, indexName string, colName string) *model.Job {
job := buildCreateIdxJob(dbInfo, tblInfo, unique, indexName, colName)
err := d.doDDLJob(ctx, job)
c.Assert(err, IsNil)
v := getSchemaVer(c, ctx)
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo})
return job
}

func testDropIndex(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, indexName string) *model.Job {
job := &model.Job{
func buildDropIdxJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, indexName string) *model.Job {
return &model.Job{
SchemaID: dbInfo.ID,
TableID: tblInfo.ID,
Type: model.ActionDropIndex,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{model.NewCIStr(indexName)},
}
}

func testDropIndex(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, indexName string) *model.Job {
job := buildDropIdxJob(dbInfo, tblInfo, indexName)
err := d.doDDLJob(ctx, job)
c.Assert(err, IsNil)
v := getSchemaVer(c, ctx)
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo})
return job
}

func buildRebaseAutoIDJobJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, newBaseID int64) *model.Job {
return &model.Job{
SchemaID: dbInfo.ID,
TableID: tblInfo.ID,
Type: model.ActionRebaseAutoID,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{newBaseID},
}
}
Loading

0 comments on commit da6f0c1

Please sign in to comment.