diff --git a/ddl/owner_manager.go b/ddl/owner_manager.go index 4fad09fbcda55..345665845285c 100644 --- a/ddl/owner_manager.go +++ b/ddl/owner_manager.go @@ -29,7 +29,6 @@ import ( "github.com/ngaut/log" "github.com/pingcap/tidb/terror" goctx "golang.org/x/net/context" - "google.golang.org/grpc" ) // OwnerManager is used to campaign the owner and manage the owner information. @@ -117,17 +116,17 @@ func newSession(ctx goctx.Context, flag string, etcdCli *clientv3.Client, retryC var err error var etcdSession *concurrency.Session for i := 0; i < retryCnt; i++ { + if isContextDone(ctx) { + return etcdSession, errors.Trace(ctx.Err()) + } + etcdSession, err = concurrency.NewSession(etcdCli, concurrency.WithTTL(ttl), concurrency.WithContext(ctx)) if err == nil { break } log.Warnf("[ddl] %s failed to new session, err %v", flag, err) - if isContextFinished(err) || terror.ErrorEqual(err, grpc.ErrClientConnClosing) { - break - } time.Sleep(200 * time.Millisecond) - continue } return etcdSession, errors.Trace(err) } @@ -181,10 +180,6 @@ func (m *ownerManager) campaignLoop(ctx goctx.Context, etcdSession *concurrency. err = elec.Campaign(ctx, m.ddlID) if err != nil { log.Infof("[ddl] %s failed to campaign, err %v", idInfo, err) - if isContextFinished(err) { - log.Warnf("[ddl] %s campaign loop, err %v", idInfo, err) - return - } continue } diff --git a/ddl/syncer.go b/ddl/syncer.go index 5c38df7090d49..cdb568d42242b 100644 --- a/ddl/syncer.go +++ b/ddl/syncer.go @@ -23,7 +23,6 @@ import ( "github.com/coreos/etcd/clientv3/concurrency" "github.com/juju/errors" "github.com/ngaut/log" - "github.com/pingcap/tidb/terror" goctx "golang.org/x/net/context" ) @@ -96,10 +95,8 @@ func (s *schemaVersionSyncer) putKV(ctx goctx.Context, retryCnt int, key, val st opts ...clientv3.OpOption) error { var err error for i := 0; i < retryCnt; i++ { - select { - case <-ctx.Done(): + if isContextDone(ctx) { return errors.Trace(ctx.Err()) - default: } childCtx, cancel := goctx.WithTimeout(ctx, keyOpDefaultTimeout) @@ -184,10 +181,11 @@ func (s *schemaVersionSyncer) RemoveSelfVersionPath() error { return errors.Trace(err) } -func isContextFinished(err error) bool { - if terror.ErrorEqual(err, goctx.Canceled) || - terror.ErrorEqual(err, goctx.DeadlineExceeded) { +func isContextDone(ctx goctx.Context) bool { + select { + case <-ctx.Done(): return true + default: } return false } @@ -199,16 +197,11 @@ func (s *schemaVersionSyncer) OwnerCheckAllVersions(ctx goctx.Context, latestVer intervalCnt := int(time.Second / checkVersInterval) updatedMap := make(map[string]struct{}) for { - select { - case <-ctx.Done(): + if isContextDone(ctx) { return errors.Trace(ctx.Err()) - default: } resp, err := s.etcdCli.Get(ctx, DDLAllSchemaVersions, clientv3.WithPrefix()) - if isContextFinished(err) { - return errors.Trace(err) - } if err != nil { log.Infof("[syncer] check all versions failed %v", err) continue