Skip to content

Commit

Permalink
ddl: Break campaignLoop when closing DDL (pingcap#3553)
Browse files Browse the repository at this point in the history
* ddl: break the loop when closing
  • Loading branch information
zimulala authored Jun 28, 2017
1 parent 76c07f6 commit d0e8591
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 14 deletions.
27 changes: 15 additions & 12 deletions ddl/owner_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package ddl

import (
"fmt"
"math"
"os"
"strconv"
Expand Down Expand Up @@ -130,7 +131,7 @@ func setManagerSessionTTL() error {
return nil
}

func newSession(ctx goctx.Context, etcdCli *clientv3.Client, retryCnt, ttl int) (*concurrency.Session, error) {
func newSession(ctx goctx.Context, flag string, etcdCli *clientv3.Client, retryCnt, ttl int) (*concurrency.Session, error) {
var err error
var etcdSession *concurrency.Session
for i := 0; i < retryCnt; i++ {
Expand All @@ -139,7 +140,7 @@ func newSession(ctx goctx.Context, etcdCli *clientv3.Client, retryCnt, ttl int)
if err == nil {
break
}
log.Warnf("[ddl] failed to new session, err %v", err)
log.Warnf("[ddl] %s failed to new session, err %v", flag, err)
if isContextFinished(err) {
break
}
Expand All @@ -151,11 +152,11 @@ func newSession(ctx goctx.Context, etcdCli *clientv3.Client, retryCnt, ttl int)

// CampaignOwners implements OwnerManager.CampaignOwners interface.
func (m *ownerManager) CampaignOwners(ctx goctx.Context) error {
ddlSession, err := newSession(ctx, m.etcdCli, newSessionDefaultRetryCnt, ManagerSessionTTL)
ddlSession, err := newSession(ctx, DDLOwnerKey, m.etcdCli, newSessionDefaultRetryCnt, ManagerSessionTTL)
if err != nil {
return errors.Trace(err)
}
bgSession, err := newSession(ctx, m.etcdCli, newSessionDefaultRetryCnt, ManagerSessionTTL)
bgSession, err := newSession(ctx, BgOwnerKey, m.etcdCli, newSessionDefaultRetryCnt, ManagerSessionTTL)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -169,14 +170,15 @@ func (m *ownerManager) CampaignOwners(ctx goctx.Context) error {
}

func (m *ownerManager) campaignLoop(ctx goctx.Context, etcdSession *concurrency.Session, key string) {
idInfo := fmt.Sprintf("%s ownerManager %s", key, m.ddlID)
var err error
for {
select {
case <-etcdSession.Done():
log.Infof("[ddl] %s etcd session is done, creates a new one", key)
etcdSession, err = newSession(ctx, m.etcdCli, newSessionRetryUnlimited, ManagerSessionTTL)
log.Infof("[ddl] %s etcd session is done, creates a new one", idInfo)
etcdSession, err = newSession(ctx, idInfo, m.etcdCli, newSessionRetryUnlimited, ManagerSessionTTL)
if err != nil {
log.Infof("[ddl] break %s campaign loop, err %v", key, err)
log.Infof("[ddl] %s break campaign loop, err %v", idInfo, err)
return
}
case <-ctx.Done():
Expand All @@ -186,16 +188,17 @@ func (m *ownerManager) campaignLoop(ctx goctx.Context, etcdSession *concurrency.
time.Duration(ManagerSessionTTL)*time.Second)
_, err = m.etcdCli.Revoke(ctx, etcdSession.Lease())
cancel()
log.Infof("[ddl] break %s campaign loop err %v", key, err)
log.Infof("[ddl] %s break campaign loop err %v", idInfo, err)
return
default:
}

elec := concurrency.NewElection(etcdSession, key)
err = elec.Campaign(ctx, m.ddlID)
if err != nil {
log.Infof("[ddl] %s ownerManager %s failed to campaign, err %v", key, m.ddlID, err)
log.Infof("[ddl] %s failed to campaign, err %v", idInfo, err)
if isContextFinished(err) {
log.Warnf("[ddl] break %s campaign loop, err %v", key, err)
log.Warnf("[ddl] %s campaign loop, err %v", idInfo, err)
return
}
continue
Expand Down Expand Up @@ -229,13 +232,13 @@ func GetOwnerInfo(ctx goctx.Context, elec *concurrency.Election, key, id string)
resp, err := elec.Leader(ctx)
if err != nil {
// If no leader elected currently, it returns ErrElectionNoLeader.
log.Infof("[ddl] failed to get leader, err %v", err)
log.Infof("[ddl] %s ownerManager %s failed to get leader, err %v", key, id, err)
return "", errors.Trace(err)
}
ownerID := string(resp.Kvs[0].Value)
log.Infof("[ddl] %s ownerManager is %s, owner is %v", key, id, ownerID)
if ownerID != id {
log.Warnf("[ddl] ownerManager %s isn't the owner", id)
log.Warnf("[ddl] %s ownerManager %s isn't the owner", key, id)
return "", errors.New("ownerInfoNotMatch")
}

Expand Down
6 changes: 4 additions & 2 deletions ddl/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ func (s *schemaVersionSyncer) Init(ctx goctx.Context) error {
if err != nil {
return errors.Trace(err)
}
s.session, err = newSession(ctx, s.etcdCli, newSessionDefaultRetryCnt, SyncerSessionTTL)
s.session, err = newSession(ctx, s.selfSchemaVerPath, s.etcdCli,
newSessionDefaultRetryCnt, SyncerSessionTTL)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -140,7 +141,8 @@ func (s *schemaVersionSyncer) Done() <-chan struct{} {
// Restart implements SchemaSyncer.Restart interface.
func (s *schemaVersionSyncer) Restart(ctx goctx.Context) error {
var err error
s.session, err = newSession(ctx, s.etcdCli, newSessionRetryUnlimited, SyncerSessionTTL)
s.session, err = newSession(ctx, s.selfSchemaVerPath, s.etcdCli,
newSessionRetryUnlimited, SyncerSessionTTL)
if err != nil {
return errors.Trace(err)
}
Expand Down

0 comments on commit d0e8591

Please sign in to comment.