Skip to content

Commit

Permalink
Protect membership member keys concurrent access (cadence-workflow#4731)
Browse files Browse the repository at this point in the history
  • Loading branch information
mantas-sidlauskas authored Feb 8, 2022
1 parent 32cf612 commit 7e3d48c
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 2 deletions.
7 changes: 5 additions & 2 deletions common/membership/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type ring struct {
value atomic.Value // this stores the current hashring

members struct {
sync.Mutex
sync.RWMutex
refreshed time.Time
keys map[string]HostInfo // for mapping ip:port to HostInfo
}
Expand Down Expand Up @@ -160,6 +160,8 @@ func (r *ring) Lookup(
}
return HostInfo{}, ErrInsufficientHosts
}
r.members.RLock()
defer r.members.RUnlock()
host, ok := r.members.keys[addr]
if !ok {
return HostInfo{}, fmt.Errorf("host not found in member keys, host: %q", addr)
Expand Down Expand Up @@ -204,6 +206,8 @@ func (r *ring) Members() []HostInfo {
servers := r.ring().Servers()

var hosts = make([]HostInfo, 0, len(servers))
r.members.RLock()
defer r.members.RUnlock()
for _, s := range servers {
host, ok := r.members.keys[s]
if !ok {
Expand Down Expand Up @@ -239,7 +243,6 @@ func (r *ring) refresh() error {
for _, member := range members {
ring.AddMembers(member)
}

r.members.keys = newMembersMap
r.members.refreshed = time.Now()
r.value.Store(ring)
Expand Down
51 changes: 51 additions & 0 deletions common/membership/hashring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ package membership

import (
"errors"
"math/rand"
"sync"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
Expand All @@ -33,6 +36,24 @@ import (
"github.com/uber/cadence/common/log"
)

var letters = []rune("abcdefghijklmnopqrstuvwxyz")

func randSeq(n int) string {
b := make([]rune, n)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
}

func randomHostInfo(n int) []HostInfo {
res := make([]HostInfo, n, n)
for i := 0; i < n; i++ {
res = append(res, NewHostInfo(randSeq(5)))
}
return res
}

func testCompareMembers(t *testing.T, curr []HostInfo, new []HostInfo, hasDiff bool) {
hashring := &ring{}
currMembers := make(map[string]HostInfo, len(curr))
Expand Down Expand Up @@ -182,3 +203,33 @@ func TestStopWillStopProvider(t *testing.T) {
hr.Stop()

}

func TestLookupAndRefreshRaceCondition(t *testing.T) {
ctrl := gomock.NewController(t)
pp := NewMockPeerProvider(ctrl)
var wg sync.WaitGroup

pp.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1)
pp.EXPECT().GetMembers("test-service").AnyTimes().DoAndReturn(func(service string) ([]HostInfo, error) {
return randomHostInfo(5), nil
})
hr := newHashring("test-service", pp, log.NewNoop())
hr.Start()
wg.Add(2)
go func() {
for i := 0; i < 50; i++ {
hr.Lookup("a")
}
wg.Done()
}()
go func() {
for i := 0; i < 50; i++ {
// to bypass internal check
hr.members.refreshed = time.Now().AddDate(0, 0, -1)
hr.refresh()
}
wg.Done()
}()

wg.Wait()
}

0 comments on commit 7e3d48c

Please sign in to comment.