Skip to content

Commit

Permalink
Hashring: return Hostinfo struct instead of string (cadence-workflow#…
Browse files Browse the repository at this point in the history
…4708)

* Set TChannel port to Ringpop labels
  • Loading branch information
mantas-sidlauskas authored Jan 27, 2022
1 parent 9530143 commit 45bc726
Show file tree
Hide file tree
Showing 22 changed files with 280 additions and 173 deletions.
2 changes: 1 addition & 1 deletion client/history/peerResolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestPeerResolver(t *testing.T) {
serviceResolver.EXPECT().Lookup(service.History, string(rune(common.DomainIDToHistoryShard("domainID", numShards)))).Return(membership.NewHostInfo("domainHost:thriftPort"), nil)
serviceResolver.EXPECT().Lookup(service.History, string(rune(common.WorkflowIDToHistoryShard("workflowID", numShards)))).Return(membership.NewHostInfo("workflowHost:thriftPort"), nil)
serviceResolver.EXPECT().Lookup(service.History, string(rune(99))).Return(membership.NewHostInfo("shardHost:thriftPort"), nil)
serviceResolver.EXPECT().Lookup(service.History, string(rune(11))).Return(nil, assert.AnError)
serviceResolver.EXPECT().Lookup(service.History, string(rune(11))).Return(membership.HostInfo{}, assert.AnError)

r := NewPeerResolver(numShards, serviceResolver, fakeAddressMapper)

Expand Down
4 changes: 2 additions & 2 deletions client/matching/peerResolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func TestPeerResolver(t *testing.T) {
controller := gomock.NewController(t)
serviceResolver := membership.NewMockResolver(controller)
serviceResolver.EXPECT().Lookup(service.Matching, "taskListA").Return(membership.NewHostInfo("taskListA:thriftPort"), nil)
serviceResolver.EXPECT().Lookup(service.Matching, "invalid").Return(nil, assert.AnError)
serviceResolver.EXPECT().Members(service.Matching).Return([]*membership.HostInfo{
serviceResolver.EXPECT().Lookup(service.Matching, "invalid").Return(membership.HostInfo{}, assert.AnError)
serviceResolver.EXPECT().Members(service.Matching).Return([]membership.HostInfo{
membership.NewHostInfo("taskListA:thriftPort"),
membership.NewHostInfo("taskListB:thriftPort"),
}, nil)
Expand Down
95 changes: 34 additions & 61 deletions common/membership/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,12 @@ const (
type PeerProvider interface {
common.Daemon

GetMembers(service string) ([]string, error)
WhoAmI() (string, error)
GetMembers(service string) ([]HostInfo, error)
WhoAmI() (HostInfo, error)
SelfEvict() error
Subscribe(name string, notifyChannel chan<- *ChangedEvent) error
}

// PortMap is a map of port names to port numbers.
type PortMap map[string]uint16

// HostInfo is a type that contains the info about a cadence host
type HostInfo struct {
addr string // ip:port
identity string
portMap PortMap // ports host is listening to
}

type ring struct {
status int32
service string
Expand All @@ -80,7 +70,7 @@ type ring struct {
members struct {
sync.Mutex
refreshed time.Time
keys map[string]struct{} // for de-duping change notifications
keys map[string]HostInfo // for mapping ip:port to HostInfo
}

subscribers struct {
Expand All @@ -99,11 +89,11 @@ func newHashring(
service: service,
peerProvider: provider,
shutdownCh: make(chan struct{}),
logger: logger.WithTags(tag.ComponentServiceResolver),
logger: logger,
refreshChan: make(chan *ChangedEvent),
}

hashring.members.keys = make(map[string]struct{})
hashring.members.keys = make(map[string]HostInfo)
hashring.subscribers.keys = make(map[string]chan<- *ChangedEvent)

hashring.value.Store(emptyHashring())
Expand Down Expand Up @@ -161,16 +151,20 @@ func (r *ring) Stop() {
// Lookup finds the host in the ring responsible for serving the given key
func (r *ring) Lookup(
key string,
) (*HostInfo, error) {
) (HostInfo, error) {
addr, found := r.ring().Lookup(key)
if !found {
select {
case r.refreshChan <- &ChangedEvent{}:
default:
}
return nil, ErrInsufficientHosts
return HostInfo{}, ErrInsufficientHosts
}
return NewHostInfo(addr), nil
host, ok := r.members.keys[addr]
if !ok {
return HostInfo{}, fmt.Errorf("host not found in member keys, host: %q", addr)
}
return host, nil
}

// Subscribe registers callback watcher.
Expand Down Expand Up @@ -206,13 +200,20 @@ func (r *ring) MemberCount() int {
return r.ring().ServerCount()
}

func (r *ring) Members() []*HostInfo {
var servers []*HostInfo
for _, s := range r.ring().Servers() {
servers = append(servers, NewHostInfo(s))
func (r *ring) Members() []HostInfo {
servers := r.ring().Servers()

var hosts = make([]HostInfo, 0, len(servers))
for _, s := range servers {
host, ok := r.members.keys[s]
if !ok {
r.logger.Warn("host not found in hashring keys", tag.Address(s))
continue
}
hosts = append(hosts, host)
}

return servers
return hosts
}

func (r *ring) refresh() error {
Expand All @@ -224,26 +225,25 @@ func (r *ring) refresh() error {
return nil
}

addrs, err := r.peerProvider.GetMembers(r.service)
members, err := r.peerProvider.GetMembers(r.service)
if err != nil {
return fmt.Errorf("getting members from peer provider: %w", err)
}

newMembersMap, changed := r.compareMembers(addrs)
newMembersMap, changed := r.compareMembers(members)
if !changed {
return nil
}

ring := emptyHashring()
for _, addr := range addrs {
host := NewHostInfo(addr)
ring.AddMembers(host)
for _, member := range members {
ring.AddMembers(member)
}

r.members.keys = newMembersMap
r.members.refreshed = time.Now()
r.value.Store(ring)
r.logger.Info("refreshed ring members", tag.Service(r.service), tag.Addresses(addrs))
r.logger.Info("refreshed ring members", tag.Value(members))

return nil
}
Expand Down Expand Up @@ -273,12 +273,12 @@ func (r *ring) ring() *hashring.HashRing {
return r.value.Load().(*hashring.HashRing)
}

func (r *ring) compareMembers(addrs []string) (map[string]struct{}, bool) {
func (r *ring) compareMembers(members []HostInfo) (map[string]HostInfo, bool) {
changed := false
newMembersMap := make(map[string]struct{}, len(addrs))
for _, addr := range addrs {
newMembersMap[addr] = struct{}{}
if _, ok := r.members.keys[addr]; !ok {
newMembersMap := make(map[string]HostInfo, len(members))
for _, member := range members {
newMembersMap[member.GetAddress()] = member
if _, ok := r.members.keys[member.GetAddress()]; !ok {
changed = true
}
}
Expand All @@ -290,30 +290,3 @@ func (r *ring) compareMembers(addrs []string) (map[string]struct{}, bool) {
}
return newMembersMap, changed
}

// NewHostInfo creates a new HostInfo instance
func NewHostInfo(addr string) *HostInfo {
return &HostInfo{
addr: addr,
}
}

// GetAddress returns the ip:port address
func (hi *HostInfo) GetAddress() string {
return hi.addr
}

// Identity implements ringpop's Membership interface
func (hi *HostInfo) Identity() string {
// for now we just use the address as the identity
return hi.addr
}

// Label implements ringpop's Membership interface
func (hi *HostInfo) Label(key string) (value string, has bool) {
return "", false
}

// SetLabel sets the label.
func (hi *HostInfo) SetLabel(key string, value string) {
}
50 changes: 15 additions & 35 deletions common/membership/hashring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,40 +33,40 @@ import (
"github.com/uber/cadence/common/log"
)

func testCompareMembers(t *testing.T, curr []string, new []string, hasDiff bool) {
func testCompareMembers(t *testing.T, curr []HostInfo, new []HostInfo, hasDiff bool) {
hashring := &ring{}
currMembers := make(map[string]struct{}, len(curr))
currMembers := make(map[string]HostInfo, len(curr))
for _, m := range curr {
currMembers[m] = struct{}{}
currMembers[m.GetAddress()] = m
}
hashring.members.keys = currMembers
newMembers, changed := hashring.compareMembers(new)
assert.Equal(t, hasDiff, changed)
assert.Equal(t, len(new), len(newMembers))
for _, m := range new {
_, ok := newMembers[m]
_, ok := newMembers[m.GetAddress()]
assert.True(t, ok)
}
}

func Test_ring_compareMembers(t *testing.T) {

tests := []struct {
curr []string
new []string
curr []HostInfo
new []HostInfo
hasDiff bool
}{
{curr: []string{}, new: []string{"a"}, hasDiff: true},
{curr: []string{}, new: []string{"a", "b"}, hasDiff: true},
{curr: []string{"a"}, new: []string{"a", "b"}, hasDiff: true},
{curr: []string{}, new: []string{}, hasDiff: false},
{curr: []string{"a"}, new: []string{"a"}, hasDiff: false},
{curr: []HostInfo{}, new: []HostInfo{NewHostInfo("a")}, hasDiff: true},
{curr: []HostInfo{}, new: []HostInfo{NewHostInfo("a"), NewHostInfo("b")}, hasDiff: true},
{curr: []HostInfo{NewHostInfo("a")}, new: []HostInfo{NewHostInfo("a"), NewHostInfo("b")}, hasDiff: true},
{curr: []HostInfo{}, new: []HostInfo{}, hasDiff: false},
{curr: []HostInfo{NewHostInfo("a")}, new: []HostInfo{NewHostInfo("a")}, hasDiff: false},
// order doesn't matter.
{curr: []string{"b", "a"}, new: []string{"a", "b"}, hasDiff: false},
{curr: []HostInfo{NewHostInfo("a"), NewHostInfo("b")}, new: []HostInfo{NewHostInfo("b"), NewHostInfo("a")}, hasDiff: false},
// member has left the ring
{curr: []string{"a", "b", "c"}, new: []string{"a", "b"}, hasDiff: true},
{curr: []HostInfo{NewHostInfo("a"), NewHostInfo("b"), NewHostInfo("c")}, new: []HostInfo{NewHostInfo("b"), NewHostInfo("a")}, hasDiff: true},
// ring becomes empty
{curr: []string{"a", "b", "c"}, new: []string{}, hasDiff: true},
{curr: []HostInfo{NewHostInfo("a"), NewHostInfo("b"), NewHostInfo("c")}, new: []HostInfo{}, hasDiff: true},
}

for _, tt := range tests {
Expand Down Expand Up @@ -165,7 +165,7 @@ func TestMemberCountReturnsNumber(t *testing.T) {
func TestErrorIsPropagatedWhenProviderFails(t *testing.T) {
ctrl := gomock.NewController(t)
pp := NewMockPeerProvider(ctrl)
pp.EXPECT().GetMembers(gomock.Any()).Return([]string{}, errors.New("error"))
pp.EXPECT().GetMembers(gomock.Any()).Return(nil, errors.New("error"))

hr := newHashring("test-service", pp, log.NewNoop())
assert.Error(t, hr.refresh())
Expand All @@ -182,23 +182,3 @@ func TestStopWillStopProvider(t *testing.T) {
hr.Stop()

}

func TestMembersUseOnlyLocalRing(t *testing.T) {

hr := newHashring("test-service",
nil, /* provider */
log.NewNoop(),
)
assert.Nil(t, hr.Members())

ring := emptyHashring()
for _, addr := range []string{"127", "128"} {
host := NewHostInfo(addr)
ring.AddMembers(host)
}
hr.value.Store(ring)

assert.Equal(t, 2, len(hr.Members()))
assert.Equal(t, 2, hr.MemberCount())

}
88 changes: 88 additions & 0 deletions common/membership/hostinfo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package membership

import (
"fmt"
"strings"
)

// PortMap is a map of port names to port numbers.
type PortMap map[string]uint16

// HostInfo is a type that contains the info about a cadence host
type HostInfo struct {
addr string // ip:port
identity string
portMap PortMap // ports host is listening to
}

// NewHostInfo creates a new HostInfo instance
func NewHostInfo(addr string) HostInfo {
return HostInfo{
addr: addr,
}
}

// String formats a PortMap into a string of name:port pairs
func (m PortMap) String() string {
res := make([]string, 0, len(m))
for name, port := range m {
res = append(res, fmt.Sprintf("%s:%d", name, port))
}
return strings.Join(res, ", ")
}

// NewDetailedHostInfo creates a new HostInfo instance with identity and portmap information
func NewDetailedHostInfo(addr string, identity string, portMap PortMap) HostInfo {
return HostInfo{
addr: addr,
identity: identity,
portMap: portMap,
}
}

// GetAddress returns the ip:port address
func (hi HostInfo) GetAddress() string {
return hi.addr
}

// Identity implements ringpop's Membership interface
func (hi HostInfo) Identity() string {
// for now we just use the address as the identity
return hi.addr
}

// Label is a noop function to conform to ringpop hashring member interface
func (hi HostInfo) Label(key string) (value string, has bool) {
return "", false
}

// SetLabel is a noop function to conform to ringpop hashring member interface
func (hi HostInfo) SetLabel(key string, value string) {
}

// String will return a human-readable host details
func (hi HostInfo) String() string {
return fmt.Sprintf("addr: %s, identity: %s, portMap: %s", hi.addr, hi.identity, hi.portMap)
}
Loading

0 comments on commit 45bc726

Please sign in to comment.