From d0e859174a9ef8cbf1cab0b2f85c79a3512942d9 Mon Sep 17 00:00:00 2001 From: Lynn Date: Wed, 28 Jun 2017 10:54:20 +0800 Subject: [PATCH] ddl: Break campaignLoop when closing DDL (#3553) * ddl: break the loop when closing --- ddl/owner_manager.go | 27 +++++++++++++++------------ ddl/syncer.go | 6 ++++-- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/ddl/owner_manager.go b/ddl/owner_manager.go index a37a25ffa937d..3ecd772116d68 100644 --- a/ddl/owner_manager.go +++ b/ddl/owner_manager.go @@ -14,6 +14,7 @@ package ddl import ( + "fmt" "math" "os" "strconv" @@ -130,7 +131,7 @@ func setManagerSessionTTL() error { return nil } -func newSession(ctx goctx.Context, etcdCli *clientv3.Client, retryCnt, ttl int) (*concurrency.Session, error) { +func newSession(ctx goctx.Context, flag string, etcdCli *clientv3.Client, retryCnt, ttl int) (*concurrency.Session, error) { var err error var etcdSession *concurrency.Session for i := 0; i < retryCnt; i++ { @@ -139,7 +140,7 @@ func newSession(ctx goctx.Context, etcdCli *clientv3.Client, retryCnt, ttl int) if err == nil { break } - log.Warnf("[ddl] failed to new session, err %v", err) + log.Warnf("[ddl] %s failed to new session, err %v", flag, err) if isContextFinished(err) { break } @@ -151,11 +152,11 @@ func newSession(ctx goctx.Context, etcdCli *clientv3.Client, retryCnt, ttl int) // CampaignOwners implements OwnerManager.CampaignOwners interface. func (m *ownerManager) CampaignOwners(ctx goctx.Context) error { - ddlSession, err := newSession(ctx, m.etcdCli, newSessionDefaultRetryCnt, ManagerSessionTTL) + ddlSession, err := newSession(ctx, DDLOwnerKey, m.etcdCli, newSessionDefaultRetryCnt, ManagerSessionTTL) if err != nil { return errors.Trace(err) } - bgSession, err := newSession(ctx, m.etcdCli, newSessionDefaultRetryCnt, ManagerSessionTTL) + bgSession, err := newSession(ctx, BgOwnerKey, m.etcdCli, newSessionDefaultRetryCnt, ManagerSessionTTL) if err != nil { return errors.Trace(err) } @@ -169,14 +170,15 @@ func (m *ownerManager) CampaignOwners(ctx goctx.Context) error { } func (m *ownerManager) campaignLoop(ctx goctx.Context, etcdSession *concurrency.Session, key string) { + idInfo := fmt.Sprintf("%s ownerManager %s", key, m.ddlID) var err error for { select { case <-etcdSession.Done(): - log.Infof("[ddl] %s etcd session is done, creates a new one", key) - etcdSession, err = newSession(ctx, m.etcdCli, newSessionRetryUnlimited, ManagerSessionTTL) + log.Infof("[ddl] %s etcd session is done, creates a new one", idInfo) + etcdSession, err = newSession(ctx, idInfo, m.etcdCli, newSessionRetryUnlimited, ManagerSessionTTL) if err != nil { - log.Infof("[ddl] break %s campaign loop, err %v", key, err) + log.Infof("[ddl] %s break campaign loop, err %v", idInfo, err) return } case <-ctx.Done(): @@ -186,16 +188,17 @@ func (m *ownerManager) campaignLoop(ctx goctx.Context, etcdSession *concurrency. time.Duration(ManagerSessionTTL)*time.Second) _, err = m.etcdCli.Revoke(ctx, etcdSession.Lease()) cancel() - log.Infof("[ddl] break %s campaign loop err %v", key, err) + log.Infof("[ddl] %s break campaign loop err %v", idInfo, err) + return default: } elec := concurrency.NewElection(etcdSession, key) err = elec.Campaign(ctx, m.ddlID) if err != nil { - log.Infof("[ddl] %s ownerManager %s failed to campaign, err %v", key, m.ddlID, err) + log.Infof("[ddl] %s failed to campaign, err %v", idInfo, err) if isContextFinished(err) { - log.Warnf("[ddl] break %s campaign loop, err %v", key, err) + log.Warnf("[ddl] %s campaign loop, err %v", idInfo, err) return } continue @@ -229,13 +232,13 @@ func GetOwnerInfo(ctx goctx.Context, elec *concurrency.Election, key, id string) resp, err := elec.Leader(ctx) if err != nil { // If no leader elected currently, it returns ErrElectionNoLeader. - log.Infof("[ddl] failed to get leader, err %v", err) + log.Infof("[ddl] %s ownerManager %s failed to get leader, err %v", key, id, err) return "", errors.Trace(err) } ownerID := string(resp.Kvs[0].Value) log.Infof("[ddl] %s ownerManager is %s, owner is %v", key, id, ownerID) if ownerID != id { - log.Warnf("[ddl] ownerManager %s isn't the owner", id) + log.Warnf("[ddl] %s ownerManager %s isn't the owner", key, id) return "", errors.New("ownerInfoNotMatch") } diff --git a/ddl/syncer.go b/ddl/syncer.go index 2a2690a99ce67..3100e4f81548e 100644 --- a/ddl/syncer.go +++ b/ddl/syncer.go @@ -123,7 +123,8 @@ func (s *schemaVersionSyncer) Init(ctx goctx.Context) error { if err != nil { return errors.Trace(err) } - s.session, err = newSession(ctx, s.etcdCli, newSessionDefaultRetryCnt, SyncerSessionTTL) + s.session, err = newSession(ctx, s.selfSchemaVerPath, s.etcdCli, + newSessionDefaultRetryCnt, SyncerSessionTTL) if err != nil { return errors.Trace(err) } @@ -140,7 +141,8 @@ func (s *schemaVersionSyncer) Done() <-chan struct{} { // Restart implements SchemaSyncer.Restart interface. func (s *schemaVersionSyncer) Restart(ctx goctx.Context) error { var err error - s.session, err = newSession(ctx, s.etcdCli, newSessionRetryUnlimited, SyncerSessionTTL) + s.session, err = newSession(ctx, s.selfSchemaVerPath, s.etcdCli, + newSessionRetryUnlimited, SyncerSessionTTL) if err != nil { return errors.Trace(err) }