Skip to content

Commit

Permalink
ringpop: update hashring immediately on ring change (cadence-workflow…
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 authored Mar 25, 2020
1 parent 8bcbb4f commit 31d2619
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 26 deletions.
27 changes: 27 additions & 0 deletions common/membership/rpMonitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,30 @@ func (s *RpoSuite) TestRingpopMonitor() {
rpm.Stop()
testService.Stop()
}

func (s *RpoSuite) TestCompareMembers() {
s.testCompareMembers([]string{}, []string{"a"}, true)
s.testCompareMembers([]string{}, []string{"a", "b"}, true)
s.testCompareMembers([]string{"a"}, []string{"a", "b"}, true)
s.testCompareMembers([]string{}, []string{"a"}, true)
s.testCompareMembers([]string{}, []string{"a", "b"}, true)
s.testCompareMembers([]string{}, []string{}, false)
s.testCompareMembers([]string{"a"}, []string{"a"}, false)
s.testCompareMembers([]string{"a", "b"}, []string{"a", "b"}, false)
}

func (s *RpoSuite) testCompareMembers(curr []string, new []string, hasDiff bool) {
resolver := &ringpopServiceResolver{}
currMembers := make(map[string]struct{}, len(curr))
for _, m := range curr {
currMembers[m] = struct{}{}
}
resolver.membersMap = currMembers
newMembers, changed := resolver.compareMembers(new)
s.Equal(hasDiff, changed)
s.Equal(len(new), len(newMembers))
for _, m := range new {
_, ok := newMembers[m]
s.True(ok)
}
}
90 changes: 64 additions & 26 deletions common/membership/rpServiceResolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@ type ringpopServiceResolver struct {
shutdownWG sync.WaitGroup
logger log.Logger

ringLock sync.RWMutex
ringLastRefreshTime time.Time
ring *hashring.HashRing
ringValue atomic.Value // this stores the current hashring

refreshLock sync.Mutex
lastRefreshTime time.Time
membersMap map[string]struct{} // for de-duping change notifications

listenerLock sync.RWMutex
listeners map[string]chan<- *ChangedEvent
Expand All @@ -69,18 +71,22 @@ func newRingpopServiceResolver(
logger log.Logger,
) *ringpopServiceResolver {

return &ringpopServiceResolver{
resolver := &ringpopServiceResolver{
status: common.DaemonStatusInitialized,
service: service,
rp: rp,
refreshChan: make(chan struct{}),
shutdownCh: make(chan struct{}),
logger: logger.WithTags(tag.ComponentServiceResolver, tag.Service(service)),

ring: hashring.New(farm.Fingerprint32, replicaPoints),

listeners: make(map[string]chan<- *ChangedEvent),
membersMap: make(map[string]struct{}),
listeners: make(map[string]chan<- *ChangedEvent),
}
resolver.ringValue.Store(newHashRing())
return resolver
}

func newHashRing() *hashring.HashRing {
return hashring.New(farm.Fingerprint32, replicaPoints)
}

// Start starts the oracle
Expand Down Expand Up @@ -112,12 +118,10 @@ func (r *ringpopServiceResolver) Stop() {
return
}

r.ringLock.Lock()
defer r.ringLock.Unlock()
r.listenerLock.Lock()
defer r.listenerLock.Unlock()
r.rp.RemoveListener(r)
r.ring = hashring.New(farm.Fingerprint32, replicaPoints)
r.ringValue.Store(newHashRing())
r.listeners = make(map[string]chan<- *ChangedEvent)
close(r.shutdownCh)

Expand All @@ -131,9 +135,7 @@ func (r *ringpopServiceResolver) Lookup(
key string,
) (*HostInfo, error) {

r.ringLock.RLock()
defer r.ringLock.RUnlock()
addr, found := r.ring.Lookup(key)
addr, found := r.ring().Lookup(key)
if !found {
select {
case r.refreshChan <- struct{}{}:
Expand Down Expand Up @@ -174,12 +176,12 @@ func (r *ringpopServiceResolver) RemoveListener(
}

func (r *ringpopServiceResolver) MemberCount() int {
return r.ring.ServerCount()
return r.ring().ServerCount()
}

func (r *ringpopServiceResolver) Members() []*HostInfo {
var servers []*HostInfo
for _, s := range r.ring.Servers() {
for _, s := range r.ring().Servers() {
servers = append(servers, NewHostInfo(s, r.getLabelsMap()))
}

Expand All @@ -206,28 +208,42 @@ func (r *ringpopServiceResolver) HandleEvent(
}

func (r *ringpopServiceResolver) refresh() error {
r.ringLock.Lock()
defer r.ringLock.Unlock()
r.refreshLock.Lock()
defer r.refreshLock.Unlock()
return r.refreshNoLock()
}

if r.ringLastRefreshTime.After(time.Now().Add(-minRefreshInternal)) {
func (r *ringpopServiceResolver) refreshWithBackoff() error {
r.refreshLock.Lock()
defer r.refreshLock.Unlock()
if r.lastRefreshTime.After(time.Now().Add(-minRefreshInternal)) {
// refresh too frequently
return nil
}
return r.refreshNoLock()
}

r.ring = hashring.New(farm.Fingerprint32, replicaPoints)

func (r *ringpopServiceResolver) refreshNoLock() error {
addrs, err := r.rp.GetReachableMembers(swim.MemberWithLabelAndValue(RoleKey, r.service))
if err != nil {
return err
}

newMembersMap, changed := r.compareMembers(addrs)
if !changed {
return nil
}

ring := newHashRing()
for _, addr := range addrs {
host := NewHostInfo(addr, r.getLabelsMap())
r.ring.AddMembers(host)
ring.AddMembers(host)
}

r.ringLastRefreshTime = time.Now()
r.logger.Debug("Current reachable members", tag.Addresses(addrs))
r.membersMap = newMembersMap
r.lastRefreshTime = time.Now()
r.ringValue.Store(ring)
r.logger.Info("Current reachable members", tag.Addresses(addrs))
return nil
}

Expand Down Expand Up @@ -271,19 +287,41 @@ func (r *ringpopServiceResolver) refreshRingWorker() {
case <-r.shutdownCh:
return
case <-r.refreshChan:
if err := r.refresh(); err != nil {
if err := r.refreshWithBackoff(); err != nil {
r.logger.Error("error periodically refreshing ring", tag.Error(err))
}
case <-refreshTicker.C:
if err := r.refresh(); err != nil {
if err := r.refreshWithBackoff(); err != nil {
r.logger.Error("error periodically refreshing ring", tag.Error(err))
}
}
}
}

func (r *ringpopServiceResolver) ring() *hashring.HashRing {
return r.ringValue.Load().(*hashring.HashRing)
}

func (r *ringpopServiceResolver) getLabelsMap() map[string]string {
labels := make(map[string]string)
labels[RoleKey] = r.service
return labels
}

func (r *ringpopServiceResolver) compareMembers(addrs []string) (map[string]struct{}, bool) {
changed := false
newMembersMap := make(map[string]struct{}, len(addrs))
for _, addr := range addrs {
newMembersMap[addr] = struct{}{}
if _, ok := r.membersMap[addr]; !ok {
changed = true
}
}
for addr := range r.membersMap {
if _, ok := newMembersMap[addr]; !ok {
changed = true
break
}
}
return newMembersMap, changed
}

0 comments on commit 31d2619

Please sign in to comment.