Skip to content

Commit

Permalink
ddl: new only one delRange and sessionPool in ddl (pingcap#8522)
Browse files Browse the repository at this point in the history
  • Loading branch information
winkyao authored Dec 27, 2018
1 parent e45e699 commit 0840a5e
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 27 deletions.
30 changes: 27 additions & 3 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,9 @@ type ddl struct {
quitCh chan struct{}

*ddlCtx
workers map[workerType]*worker
workers map[workerType]*worker
sessPool *sessionPool
delRangeMgr delRangeManager
}

// ddlCtx is the context when we use worker to handle DDL jobs.
Expand Down Expand Up @@ -369,6 +371,19 @@ func (d *ddl) Stop() error {
return nil
}

func (d *ddl) newDeleteRangeManager(mock bool) delRangeManager {
var delRangeMgr delRangeManager
if !mock {
delRangeMgr = newDelRangeManager(d.store, d.sessPool)
log.Infof("[ddl] start delRangeManager OK, with emulator: %t", !d.store.SupportDeleteRange())
} else {
delRangeMgr = newMockDelRangeManager()
}

delRangeMgr.start()
return delRangeMgr
}

// start campaigns the owner and starts workers.
// ctxPool is used for the worker's delRangeManager and creates sessions.
func (d *ddl) start(ctx context.Context, ctxPool *pools.ResourcePool) {
Expand All @@ -382,8 +397,10 @@ func (d *ddl) start(ctx context.Context, ctxPool *pools.ResourcePool) {
terror.Log(errors.Trace(err))

d.workers = make(map[workerType]*worker, 2)
d.workers[generalWorker] = newWorker(generalWorker, d.store, ctxPool)
d.workers[addIdxWorker] = newWorker(addIdxWorker, d.store, ctxPool)
d.sessPool = newSessionPool(ctxPool)
d.delRangeMgr = d.newDeleteRangeManager(ctxPool == nil)
d.workers[generalWorker] = newWorker(generalWorker, d.store, d.sessPool, d.delRangeMgr)
d.workers[addIdxWorker] = newWorker(addIdxWorker, d.store, d.sessPool, d.delRangeMgr)
for _, worker := range d.workers {
worker.wg.Add(1)
w := worker
Expand Down Expand Up @@ -420,6 +437,13 @@ func (d *ddl) close() {
for _, worker := range d.workers {
worker.close()
}
if d.sessPool != nil {
d.sessPool.close()
}
if d.delRangeMgr != nil {
d.delRangeMgr.clear()
}

log.Infof("[ddl] closing DDL:%s takes time %v", d.uuid, time.Since(startTime))
}

Expand Down
27 changes: 8 additions & 19 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"sync/atomic"
"time"

"github.com/ngaut/pools"
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/terror"
Expand Down Expand Up @@ -66,23 +65,17 @@ type worker struct {
delRangeManager delRangeManager
}

func newWorker(tp workerType, store kv.Storage, ctxPool *pools.ResourcePool) *worker {
sessPool := &sessionPool{resPool: ctxPool}
func newWorker(tp workerType, store kv.Storage, sessPool *sessionPool, delRangeMgr delRangeManager) *worker {
worker := &worker{
id: atomic.AddInt32(&ddlWorkerID, 1),
tp: tp,
ddlJobCh: make(chan struct{}, 1),
quitCh: make(chan struct{}),
reorgCtx: &reorgCtx{notifyCancelReorgJob: 0},
sessPool: sessPool,
id: atomic.AddInt32(&ddlWorkerID, 1),
tp: tp,
ddlJobCh: make(chan struct{}, 1),
quitCh: make(chan struct{}),
reorgCtx: &reorgCtx{notifyCancelReorgJob: 0},
sessPool: sessPool,
delRangeManager: delRangeMgr,
}

if ctxPool != nil {
worker.delRangeManager = newDelRangeManager(store, sessPool)
log.Infof("[ddl] start delRangeManager OK, with emulator: %t", !store.SupportDeleteRange())
} else {
worker.delRangeManager = newMockDelRangeManager()
}
return worker
}

Expand All @@ -105,8 +98,6 @@ func (w *worker) String() string {

func (w *worker) close() {
close(w.quitCh)
w.delRangeManager.clear()
w.sessPool.close()
w.wg.Wait()
log.Infof("[ddl-%s] close DDL worker", w)
}
Expand All @@ -117,8 +108,6 @@ func (w *worker) start(d *ddlCtx) {
log.Infof("[ddl-%s] start DDL worker", w)
defer w.wg.Done()

w.delRangeManager.start()

// We use 4 * lease time to check owner's timeout, so here, we will update owner's status
// every 2 * lease time. If lease is 0, we will use default 1s.
// But we use etcd to speed up, normally it takes less than 1s now, so we use 1s as the max value.
Expand Down
3 changes: 1 addition & 2 deletions ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,9 @@ func (dr *delRange) start() {

// clear implements delRangeManager interface.
func (dr *delRange) clear() {
log.Infof("[ddl] closing delRange session pool")
log.Infof("[ddl] closing delRange")
close(dr.quitCh)
dr.wait.Wait()
dr.sessPool.close()
}

// startEmulator is only used for those storage engines which don't support
Expand Down
26 changes: 23 additions & 3 deletions ddl/session_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,37 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/mock"
log "github.com/sirupsen/logrus"
)

// sessionPool is used to new session.
type sessionPool struct {
mu sync.Mutex
mu struct {
sync.Mutex
closed bool
}
resPool *pools.ResourcePool
}

func newSessionPool(resPool *pools.ResourcePool) *sessionPool {
return &sessionPool{resPool: resPool}
}

// get gets sessionctx from context resource pool.
// Please remember to call put after you finished using sessionctx.
func (sg *sessionPool) get() (sessionctx.Context, error) {
if sg.resPool == nil {
return mock.NewContext(), nil
}

sg.mu.Lock()
if sg.mu.closed {
sg.mu.Unlock()
return nil, errors.Errorf("sessionPool is closed.")
}
sg.mu.Unlock()

// no need to protect sg.resPool
resource, err := sg.resPool.Get()
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -53,17 +69,21 @@ func (sg *sessionPool) put(ctx sessionctx.Context) {
return
}

// no need to protect sg.resPool, even the sg.resPool is closed, the ctx still need to
// put into resPool, because when resPool is closing, it will wait all the ctx returns, then resPool finish closing.
sg.resPool.Put(ctx.(pools.Resource))
}

// close clean up the sessionPool.
func (sg *sessionPool) close() {
sg.mu.Lock()
defer sg.mu.Unlock()
if sg.resPool == nil {
// prevent closing resPool twice.
if sg.mu.closed || sg.resPool == nil {
return
}

log.Info("[ddl] closing sessionPool")
sg.resPool.Close()
sg.resPool = nil
sg.mu.closed = true
}

0 comments on commit 0840a5e

Please sign in to comment.