From 0840a5e478455928da0298816f6cbd85cea2b116 Mon Sep 17 00:00:00 2001 From: winkyao Date: Thu, 27 Dec 2018 14:57:47 +0800 Subject: [PATCH] ddl: new only one delRange and sessionPool in ddl (#8522) --- ddl/ddl.go | 30 +++++++++++++++++++++++++++--- ddl/ddl_worker.go | 27 ++++++++------------------- ddl/delete_range.go | 3 +-- ddl/session_pool.go | 26 +++++++++++++++++++++++--- 4 files changed, 59 insertions(+), 27 deletions(-) diff --git a/ddl/ddl.go b/ddl/ddl.go index 026f0f08b5b97..fec3e64e15bdb 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -251,7 +251,9 @@ type ddl struct { quitCh chan struct{} *ddlCtx - workers map[workerType]*worker + workers map[workerType]*worker + sessPool *sessionPool + delRangeMgr delRangeManager } // ddlCtx is the context when we use worker to handle DDL jobs. @@ -369,6 +371,19 @@ func (d *ddl) Stop() error { return nil } +func (d *ddl) newDeleteRangeManager(mock bool) delRangeManager { + var delRangeMgr delRangeManager + if !mock { + delRangeMgr = newDelRangeManager(d.store, d.sessPool) + log.Infof("[ddl] start delRangeManager OK, with emulator: %t", !d.store.SupportDeleteRange()) + } else { + delRangeMgr = newMockDelRangeManager() + } + + delRangeMgr.start() + return delRangeMgr +} + // start campaigns the owner and starts workers. // ctxPool is used for the worker's delRangeManager and creates sessions. func (d *ddl) start(ctx context.Context, ctxPool *pools.ResourcePool) { @@ -382,8 +397,10 @@ func (d *ddl) start(ctx context.Context, ctxPool *pools.ResourcePool) { terror.Log(errors.Trace(err)) d.workers = make(map[workerType]*worker, 2) - d.workers[generalWorker] = newWorker(generalWorker, d.store, ctxPool) - d.workers[addIdxWorker] = newWorker(addIdxWorker, d.store, ctxPool) + d.sessPool = newSessionPool(ctxPool) + d.delRangeMgr = d.newDeleteRangeManager(ctxPool == nil) + d.workers[generalWorker] = newWorker(generalWorker, d.store, d.sessPool, d.delRangeMgr) + d.workers[addIdxWorker] = newWorker(addIdxWorker, d.store, d.sessPool, d.delRangeMgr) for _, worker := range d.workers { worker.wg.Add(1) w := worker @@ -420,6 +437,13 @@ func (d *ddl) close() { for _, worker := range d.workers { worker.close() } + if d.sessPool != nil { + d.sessPool.close() + } + if d.delRangeMgr != nil { + d.delRangeMgr.clear() + } + log.Infof("[ddl] closing DDL:%s takes time %v", d.uuid, time.Since(startTime)) } diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 11775b45844d3..fdc787a24c85b 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -20,7 +20,6 @@ import ( "sync/atomic" "time" - "github.com/ngaut/pools" "github.com/pingcap/errors" "github.com/pingcap/parser/model" "github.com/pingcap/parser/terror" @@ -66,23 +65,17 @@ type worker struct { delRangeManager delRangeManager } -func newWorker(tp workerType, store kv.Storage, ctxPool *pools.ResourcePool) *worker { - sessPool := &sessionPool{resPool: ctxPool} +func newWorker(tp workerType, store kv.Storage, sessPool *sessionPool, delRangeMgr delRangeManager) *worker { worker := &worker{ - id: atomic.AddInt32(&ddlWorkerID, 1), - tp: tp, - ddlJobCh: make(chan struct{}, 1), - quitCh: make(chan struct{}), - reorgCtx: &reorgCtx{notifyCancelReorgJob: 0}, - sessPool: sessPool, + id: atomic.AddInt32(&ddlWorkerID, 1), + tp: tp, + ddlJobCh: make(chan struct{}, 1), + quitCh: make(chan struct{}), + reorgCtx: &reorgCtx{notifyCancelReorgJob: 0}, + sessPool: sessPool, + delRangeManager: delRangeMgr, } - if ctxPool != nil { - worker.delRangeManager = newDelRangeManager(store, sessPool) - log.Infof("[ddl] start delRangeManager OK, with emulator: %t", !store.SupportDeleteRange()) - } else { - worker.delRangeManager = newMockDelRangeManager() - } return worker } @@ -105,8 +98,6 @@ func (w *worker) String() string { func (w *worker) close() { close(w.quitCh) - w.delRangeManager.clear() - w.sessPool.close() w.wg.Wait() log.Infof("[ddl-%s] close DDL worker", w) } @@ -117,8 +108,6 @@ func (w *worker) start(d *ddlCtx) { log.Infof("[ddl-%s] start DDL worker", w) defer w.wg.Done() - w.delRangeManager.start() - // We use 4 * lease time to check owner's timeout, so here, we will update owner's status // every 2 * lease time. If lease is 0, we will use default 1s. // But we use etcd to speed up, normally it takes less than 1s now, so we use 1s as the max value. diff --git a/ddl/delete_range.go b/ddl/delete_range.go index dfee35efe0ca6..3b50347b92e1b 100644 --- a/ddl/delete_range.go +++ b/ddl/delete_range.go @@ -100,10 +100,9 @@ func (dr *delRange) start() { // clear implements delRangeManager interface. func (dr *delRange) clear() { - log.Infof("[ddl] closing delRange session pool") + log.Infof("[ddl] closing delRange") close(dr.quitCh) dr.wait.Wait() - dr.sessPool.close() } // startEmulator is only used for those storage engines which don't support diff --git a/ddl/session_pool.go b/ddl/session_pool.go index cd6f5be9bdaa9..a36a27c7f1edb 100644 --- a/ddl/session_pool.go +++ b/ddl/session_pool.go @@ -21,14 +21,22 @@ import ( "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/util/mock" + log "github.com/sirupsen/logrus" ) // sessionPool is used to new session. type sessionPool struct { - mu sync.Mutex + mu struct { + sync.Mutex + closed bool + } resPool *pools.ResourcePool } +func newSessionPool(resPool *pools.ResourcePool) *sessionPool { + return &sessionPool{resPool: resPool} +} + // get gets sessionctx from context resource pool. // Please remember to call put after you finished using sessionctx. func (sg *sessionPool) get() (sessionctx.Context, error) { @@ -36,6 +44,14 @@ func (sg *sessionPool) get() (sessionctx.Context, error) { return mock.NewContext(), nil } + sg.mu.Lock() + if sg.mu.closed { + sg.mu.Unlock() + return nil, errors.Errorf("sessionPool is closed.") + } + sg.mu.Unlock() + + // no need to protect sg.resPool resource, err := sg.resPool.Get() if err != nil { return nil, errors.Trace(err) @@ -53,6 +69,8 @@ func (sg *sessionPool) put(ctx sessionctx.Context) { return } + // no need to protect sg.resPool, even the sg.resPool is closed, the ctx still need to + // put into resPool, because when resPool is closing, it will wait all the ctx returns, then resPool finish closing. sg.resPool.Put(ctx.(pools.Resource)) } @@ -60,10 +78,12 @@ func (sg *sessionPool) put(ctx sessionctx.Context) { func (sg *sessionPool) close() { sg.mu.Lock() defer sg.mu.Unlock() - if sg.resPool == nil { + // prevent closing resPool twice. + if sg.mu.closed || sg.resPool == nil { return } + log.Info("[ddl] closing sessionPool") sg.resPool.Close() - sg.resPool = nil + sg.mu.closed = true }