Skip to content

Commit

Permalink
*: add check when watching (pingcap#6671)
Browse files Browse the repository at this point in the history
* *: add check when watching
  • Loading branch information
zimulala authored May 30, 2018
1 parent 68b96b9 commit d890ee6
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 10 deletions.
3 changes: 3 additions & 0 deletions ddl/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ func (s *mockSchemaSyncer) GlobalVersionCh() clientv3.WatchChan {
return s.globalVerCh
}

// WatchGlobalSchemaVer implements SchemaSyncer.WatchGlobalSchemaVer interface.
func (s *mockSchemaSyncer) WatchGlobalSchemaVer(context.Context) {}

// UpdateSelfVersion implements SchemaSyncer.UpdateSelfVersion interface.
func (s *mockSchemaSyncer) UpdateSelfVersion(ctx context.Context, version int64) error {
atomic.StoreInt64(&s.selfSchemaVersion, version)
Expand Down
39 changes: 36 additions & 3 deletions ddl/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"math"
"strconv"
"sync"
"time"

"github.com/coreos/etcd/clientv3"
Expand Down Expand Up @@ -68,6 +69,8 @@ type SchemaSyncer interface {
OwnerUpdateGlobalVersion(ctx context.Context, version int64) error
// GlobalVersionCh gets the chan for watching global version.
GlobalVersionCh() clientv3.WatchChan
// WatchGlobalSchemaVer watches the global schema version.
WatchGlobalSchemaVer(ctx context.Context)
// MustGetGlobalVersion gets the global version. The only reason it fails is that ctx is done.
MustGetGlobalVersion(ctx context.Context) (int64, error)
// Done returns a channel that closes when the syncer is no longer being refreshed.
Expand All @@ -84,7 +87,10 @@ type schemaVersionSyncer struct {
selfSchemaVerPath string
etcdCli *clientv3.Client
session *concurrency.Session
globalVerCh clientv3.WatchChan
mu struct {
sync.RWMutex
globalVerCh clientv3.WatchChan
}
}

// NewSchemaSyncer creates a new SchemaSyncer.
Expand Down Expand Up @@ -135,7 +141,11 @@ func (s *schemaVersionSyncer) Init(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
s.globalVerCh = s.etcdCli.Watch(ctx, DDLGlobalSchemaVersion)

s.mu.Lock()
s.mu.globalVerCh = s.etcdCli.Watch(ctx, DDLGlobalSchemaVersion)
s.mu.Unlock()

err = s.putKV(ctx, keyOpDefaultRetryCnt, s.selfSchemaVerPath, InitialVersion,
clientv3.WithLease(s.session.Lease()))
return errors.Trace(err)
Expand Down Expand Up @@ -172,7 +182,30 @@ func (s *schemaVersionSyncer) Restart(ctx context.Context) error {

// GlobalVersionCh implements SchemaSyncer.GlobalVersionCh interface.
func (s *schemaVersionSyncer) GlobalVersionCh() clientv3.WatchChan {
return s.globalVerCh
s.mu.RLock()
defer s.mu.RUnlock()
return s.mu.globalVerCh
}

// WatchGlobalSchemaVer implements SchemaSyncer.WatchGlobalSchemaVer interface.
func (s *schemaVersionSyncer) WatchGlobalSchemaVer(ctx context.Context) {
startTime := time.Now()
// Make sure the globalVerCh doesn't receive the information of 'close' before we finish the rewatch.
s.mu.Lock()
s.mu.globalVerCh = nil
s.mu.Unlock()

go func() {
defer func() {
metrics.DeploySyncerHistogram.WithLabelValues(metrics.SyncerRewatch, metrics.RetLabel(nil)).Observe(time.Since(startTime).Seconds())
}()
ch := s.etcdCli.Watch(ctx, DDLGlobalSchemaVersion)

s.mu.Lock()
s.mu.globalVerCh = ch
s.mu.Unlock()
log.Info("[syncer] watch global schema finished")
}()
}

// UpdateSelfVersion implements SchemaSyncer.UpdateSelfVersion interface.
Expand Down
7 changes: 6 additions & 1 deletion domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,11 +338,16 @@ func (do *Domain) loadSchemaInLoop(lease time.Duration) {
if err != nil {
log.Errorf("[ddl] reload schema in loop err %v", errors.ErrorStack(err))
}
case <-syncer.GlobalVersionCh():
case _, ok := <-syncer.GlobalVersionCh():
err := do.Reload()
if err != nil {
log.Errorf("[ddl] reload schema in loop err %v", errors.ErrorStack(err))
}
if !ok {
log.Warn("[ddl] reload schema in loop, schema syncer need rewatch")
// Make sure the rewatch doesn't affect load schema, so we watch the global schema version asynchronously.
syncer.WatchGlobalSchemaVer(context.Background())
}
case <-syncer.Done():
// The schema syncer stops, we need stop the schema validator to synchronize the schema version.
log.Info("[ddl] reload schema in loop, schema syncer need restart")
Expand Down
8 changes: 4 additions & 4 deletions metrics/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ var (
Buckets: prometheus.ExponentialBuckets(0.001, 2, 20),
}, []string{LblType})

SyncerInit = "init"
SyncerRestart = "restart"
SyncerClear = "clear"

SyncerInit = "init"
SyncerRestart = "restart"
SyncerClear = "clear"
SyncerRewatch = "rewatch"
DeploySyncerHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "tidb",
Expand Down
1 change: 1 addition & 0 deletions metrics/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var (
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 22),
}, []string{LblType, LblResult})

WatcherClosed = "watcher_closed"
Cancelled = "cancelled"
Deleted = "deleted"
SessionDone = "session_done"
Expand Down
9 changes: 7 additions & 2 deletions owner/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,15 @@ func (m *ownerManager) watchOwner(ctx context.Context, etcdSession *concurrency.
watchCh := m.etcdCli.Watch(ctx, key)
for {
select {
case resp := <-watchCh:
case resp, ok := <-watchCh:
if !ok {
metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.WatcherClosed).Inc()
log.Infof("%s watcher is closed, no owner", logPrefix)
return
}
if resp.Canceled {
metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.Cancelled).Inc()
log.Infof("%s failed, no owner", logPrefix)
log.Infof("%s canceled, no owner", logPrefix)
return
}

Expand Down

0 comments on commit d890ee6

Please sign in to comment.