From 45bc726dcd92301aeebfa5c6c454b9c00e2281b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mantas=20=C5=A0idlauskas?= Date: Thu, 27 Jan 2022 07:39:08 +0200 Subject: [PATCH] Hashring: return Hostinfo struct instead of string (#4708) * Set TChannel port to Ringpop labels --- client/history/peerResolver_test.go | 2 +- client/matching/peerResolver_test.go | 4 +- common/membership/hashring.go | 95 +++++++----------- common/membership/hashring_test.go | 50 +++------- common/membership/hostinfo.go | 88 +++++++++++++++++ common/membership/peerprovider_mock.go | 8 +- common/membership/resolver.go | 26 +++-- common/membership/resolver_mock.go | 12 +-- common/membership/resolver_test.go | 22 +++-- .../{ringpop_config.go => config.go} | 0 ...{ringpop_config_test.go => config_test.go} | 0 .../peerprovider/ringpopprovider/provider.go | 96 ++++++++++++++++--- .../ringpopprovider/provider_test.go | 2 +- common/resource/resource.go | 2 +- common/resource/resourceImpl.go | 4 +- common/resource/resourceTest.go | 2 +- host/membership_hashring.go | 8 +- host/membership_resolver.go | 10 +- host/service.go | 6 +- service/history/shard/controller_test.go | 8 +- .../domain_replication_processor.go | 4 +- service/worker/replicator/replicator.go | 4 +- 22 files changed, 280 insertions(+), 173 deletions(-) create mode 100644 common/membership/hostinfo.go rename common/peerprovider/ringpopprovider/{ringpop_config.go => config.go} (100%) rename common/peerprovider/ringpopprovider/{ringpop_config_test.go => config_test.go} (100%) diff --git a/client/history/peerResolver_test.go b/client/history/peerResolver_test.go index 3045497920e..e8f9bb3f9bc 100644 --- a/client/history/peerResolver_test.go +++ b/client/history/peerResolver_test.go @@ -38,7 +38,7 @@ func TestPeerResolver(t *testing.T) { serviceResolver.EXPECT().Lookup(service.History, string(rune(common.DomainIDToHistoryShard("domainID", numShards)))).Return(membership.NewHostInfo("domainHost:thriftPort"), nil) serviceResolver.EXPECT().Lookup(service.History, string(rune(common.WorkflowIDToHistoryShard("workflowID", numShards)))).Return(membership.NewHostInfo("workflowHost:thriftPort"), nil) serviceResolver.EXPECT().Lookup(service.History, string(rune(99))).Return(membership.NewHostInfo("shardHost:thriftPort"), nil) - serviceResolver.EXPECT().Lookup(service.History, string(rune(11))).Return(nil, assert.AnError) + serviceResolver.EXPECT().Lookup(service.History, string(rune(11))).Return(membership.HostInfo{}, assert.AnError) r := NewPeerResolver(numShards, serviceResolver, fakeAddressMapper) diff --git a/client/matching/peerResolver_test.go b/client/matching/peerResolver_test.go index 24b9e65fbef..7724b58b175 100644 --- a/client/matching/peerResolver_test.go +++ b/client/matching/peerResolver_test.go @@ -34,8 +34,8 @@ func TestPeerResolver(t *testing.T) { controller := gomock.NewController(t) serviceResolver := membership.NewMockResolver(controller) serviceResolver.EXPECT().Lookup(service.Matching, "taskListA").Return(membership.NewHostInfo("taskListA:thriftPort"), nil) - serviceResolver.EXPECT().Lookup(service.Matching, "invalid").Return(nil, assert.AnError) - serviceResolver.EXPECT().Members(service.Matching).Return([]*membership.HostInfo{ + serviceResolver.EXPECT().Lookup(service.Matching, "invalid").Return(membership.HostInfo{}, assert.AnError) + serviceResolver.EXPECT().Members(service.Matching).Return([]membership.HostInfo{ membership.NewHostInfo("taskListA:thriftPort"), membership.NewHostInfo("taskListB:thriftPort"), }, nil) diff --git a/common/membership/hashring.go b/common/membership/hashring.go index f80fdce1b5e..c9e695d199c 100644 --- a/common/membership/hashring.go +++ b/common/membership/hashring.go @@ -50,22 +50,12 @@ const ( type PeerProvider interface { common.Daemon - GetMembers(service string) ([]string, error) - WhoAmI() (string, error) + GetMembers(service string) ([]HostInfo, error) + WhoAmI() (HostInfo, error) SelfEvict() error Subscribe(name string, notifyChannel chan<- *ChangedEvent) error } -// PortMap is a map of port names to port numbers. -type PortMap map[string]uint16 - -// HostInfo is a type that contains the info about a cadence host -type HostInfo struct { - addr string // ip:port - identity string - portMap PortMap // ports host is listening to -} - type ring struct { status int32 service string @@ -80,7 +70,7 @@ type ring struct { members struct { sync.Mutex refreshed time.Time - keys map[string]struct{} // for de-duping change notifications + keys map[string]HostInfo // for mapping ip:port to HostInfo } subscribers struct { @@ -99,11 +89,11 @@ func newHashring( service: service, peerProvider: provider, shutdownCh: make(chan struct{}), - logger: logger.WithTags(tag.ComponentServiceResolver), + logger: logger, refreshChan: make(chan *ChangedEvent), } - hashring.members.keys = make(map[string]struct{}) + hashring.members.keys = make(map[string]HostInfo) hashring.subscribers.keys = make(map[string]chan<- *ChangedEvent) hashring.value.Store(emptyHashring()) @@ -161,16 +151,20 @@ func (r *ring) Stop() { // Lookup finds the host in the ring responsible for serving the given key func (r *ring) Lookup( key string, -) (*HostInfo, error) { +) (HostInfo, error) { addr, found := r.ring().Lookup(key) if !found { select { case r.refreshChan <- &ChangedEvent{}: default: } - return nil, ErrInsufficientHosts + return HostInfo{}, ErrInsufficientHosts } - return NewHostInfo(addr), nil + host, ok := r.members.keys[addr] + if !ok { + return HostInfo{}, fmt.Errorf("host not found in member keys, host: %q", addr) + } + return host, nil } // Subscribe registers callback watcher. @@ -206,13 +200,20 @@ func (r *ring) MemberCount() int { return r.ring().ServerCount() } -func (r *ring) Members() []*HostInfo { - var servers []*HostInfo - for _, s := range r.ring().Servers() { - servers = append(servers, NewHostInfo(s)) +func (r *ring) Members() []HostInfo { + servers := r.ring().Servers() + + var hosts = make([]HostInfo, 0, len(servers)) + for _, s := range servers { + host, ok := r.members.keys[s] + if !ok { + r.logger.Warn("host not found in hashring keys", tag.Address(s)) + continue + } + hosts = append(hosts, host) } - return servers + return hosts } func (r *ring) refresh() error { @@ -224,26 +225,25 @@ func (r *ring) refresh() error { return nil } - addrs, err := r.peerProvider.GetMembers(r.service) + members, err := r.peerProvider.GetMembers(r.service) if err != nil { return fmt.Errorf("getting members from peer provider: %w", err) } - newMembersMap, changed := r.compareMembers(addrs) + newMembersMap, changed := r.compareMembers(members) if !changed { return nil } ring := emptyHashring() - for _, addr := range addrs { - host := NewHostInfo(addr) - ring.AddMembers(host) + for _, member := range members { + ring.AddMembers(member) } r.members.keys = newMembersMap r.members.refreshed = time.Now() r.value.Store(ring) - r.logger.Info("refreshed ring members", tag.Service(r.service), tag.Addresses(addrs)) + r.logger.Info("refreshed ring members", tag.Value(members)) return nil } @@ -273,12 +273,12 @@ func (r *ring) ring() *hashring.HashRing { return r.value.Load().(*hashring.HashRing) } -func (r *ring) compareMembers(addrs []string) (map[string]struct{}, bool) { +func (r *ring) compareMembers(members []HostInfo) (map[string]HostInfo, bool) { changed := false - newMembersMap := make(map[string]struct{}, len(addrs)) - for _, addr := range addrs { - newMembersMap[addr] = struct{}{} - if _, ok := r.members.keys[addr]; !ok { + newMembersMap := make(map[string]HostInfo, len(members)) + for _, member := range members { + newMembersMap[member.GetAddress()] = member + if _, ok := r.members.keys[member.GetAddress()]; !ok { changed = true } } @@ -290,30 +290,3 @@ func (r *ring) compareMembers(addrs []string) (map[string]struct{}, bool) { } return newMembersMap, changed } - -// NewHostInfo creates a new HostInfo instance -func NewHostInfo(addr string) *HostInfo { - return &HostInfo{ - addr: addr, - } -} - -// GetAddress returns the ip:port address -func (hi *HostInfo) GetAddress() string { - return hi.addr -} - -// Identity implements ringpop's Membership interface -func (hi *HostInfo) Identity() string { - // for now we just use the address as the identity - return hi.addr -} - -// Label implements ringpop's Membership interface -func (hi *HostInfo) Label(key string) (value string, has bool) { - return "", false -} - -// SetLabel sets the label. -func (hi *HostInfo) SetLabel(key string, value string) { -} diff --git a/common/membership/hashring_test.go b/common/membership/hashring_test.go index a4b626750c2..94350dd92f5 100644 --- a/common/membership/hashring_test.go +++ b/common/membership/hashring_test.go @@ -33,18 +33,18 @@ import ( "github.com/uber/cadence/common/log" ) -func testCompareMembers(t *testing.T, curr []string, new []string, hasDiff bool) { +func testCompareMembers(t *testing.T, curr []HostInfo, new []HostInfo, hasDiff bool) { hashring := &ring{} - currMembers := make(map[string]struct{}, len(curr)) + currMembers := make(map[string]HostInfo, len(curr)) for _, m := range curr { - currMembers[m] = struct{}{} + currMembers[m.GetAddress()] = m } hashring.members.keys = currMembers newMembers, changed := hashring.compareMembers(new) assert.Equal(t, hasDiff, changed) assert.Equal(t, len(new), len(newMembers)) for _, m := range new { - _, ok := newMembers[m] + _, ok := newMembers[m.GetAddress()] assert.True(t, ok) } } @@ -52,21 +52,21 @@ func testCompareMembers(t *testing.T, curr []string, new []string, hasDiff bool) func Test_ring_compareMembers(t *testing.T) { tests := []struct { - curr []string - new []string + curr []HostInfo + new []HostInfo hasDiff bool }{ - {curr: []string{}, new: []string{"a"}, hasDiff: true}, - {curr: []string{}, new: []string{"a", "b"}, hasDiff: true}, - {curr: []string{"a"}, new: []string{"a", "b"}, hasDiff: true}, - {curr: []string{}, new: []string{}, hasDiff: false}, - {curr: []string{"a"}, new: []string{"a"}, hasDiff: false}, + {curr: []HostInfo{}, new: []HostInfo{NewHostInfo("a")}, hasDiff: true}, + {curr: []HostInfo{}, new: []HostInfo{NewHostInfo("a"), NewHostInfo("b")}, hasDiff: true}, + {curr: []HostInfo{NewHostInfo("a")}, new: []HostInfo{NewHostInfo("a"), NewHostInfo("b")}, hasDiff: true}, + {curr: []HostInfo{}, new: []HostInfo{}, hasDiff: false}, + {curr: []HostInfo{NewHostInfo("a")}, new: []HostInfo{NewHostInfo("a")}, hasDiff: false}, // order doesn't matter. - {curr: []string{"b", "a"}, new: []string{"a", "b"}, hasDiff: false}, + {curr: []HostInfo{NewHostInfo("a"), NewHostInfo("b")}, new: []HostInfo{NewHostInfo("b"), NewHostInfo("a")}, hasDiff: false}, // member has left the ring - {curr: []string{"a", "b", "c"}, new: []string{"a", "b"}, hasDiff: true}, + {curr: []HostInfo{NewHostInfo("a"), NewHostInfo("b"), NewHostInfo("c")}, new: []HostInfo{NewHostInfo("b"), NewHostInfo("a")}, hasDiff: true}, // ring becomes empty - {curr: []string{"a", "b", "c"}, new: []string{}, hasDiff: true}, + {curr: []HostInfo{NewHostInfo("a"), NewHostInfo("b"), NewHostInfo("c")}, new: []HostInfo{}, hasDiff: true}, } for _, tt := range tests { @@ -165,7 +165,7 @@ func TestMemberCountReturnsNumber(t *testing.T) { func TestErrorIsPropagatedWhenProviderFails(t *testing.T) { ctrl := gomock.NewController(t) pp := NewMockPeerProvider(ctrl) - pp.EXPECT().GetMembers(gomock.Any()).Return([]string{}, errors.New("error")) + pp.EXPECT().GetMembers(gomock.Any()).Return(nil, errors.New("error")) hr := newHashring("test-service", pp, log.NewNoop()) assert.Error(t, hr.refresh()) @@ -182,23 +182,3 @@ func TestStopWillStopProvider(t *testing.T) { hr.Stop() } - -func TestMembersUseOnlyLocalRing(t *testing.T) { - - hr := newHashring("test-service", - nil, /* provider */ - log.NewNoop(), - ) - assert.Nil(t, hr.Members()) - - ring := emptyHashring() - for _, addr := range []string{"127", "128"} { - host := NewHostInfo(addr) - ring.AddMembers(host) - } - hr.value.Store(ring) - - assert.Equal(t, 2, len(hr.Members())) - assert.Equal(t, 2, hr.MemberCount()) - -} diff --git a/common/membership/hostinfo.go b/common/membership/hostinfo.go new file mode 100644 index 00000000000..68436e92d85 --- /dev/null +++ b/common/membership/hostinfo.go @@ -0,0 +1,88 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package membership + +import ( + "fmt" + "strings" +) + +// PortMap is a map of port names to port numbers. +type PortMap map[string]uint16 + +// HostInfo is a type that contains the info about a cadence host +type HostInfo struct { + addr string // ip:port + identity string + portMap PortMap // ports host is listening to +} + +// NewHostInfo creates a new HostInfo instance +func NewHostInfo(addr string) HostInfo { + return HostInfo{ + addr: addr, + } +} + +// String formats a PortMap into a string of name:port pairs +func (m PortMap) String() string { + res := make([]string, 0, len(m)) + for name, port := range m { + res = append(res, fmt.Sprintf("%s:%d", name, port)) + } + return strings.Join(res, ", ") +} + +// NewDetailedHostInfo creates a new HostInfo instance with identity and portmap information +func NewDetailedHostInfo(addr string, identity string, portMap PortMap) HostInfo { + return HostInfo{ + addr: addr, + identity: identity, + portMap: portMap, + } +} + +// GetAddress returns the ip:port address +func (hi HostInfo) GetAddress() string { + return hi.addr +} + +// Identity implements ringpop's Membership interface +func (hi HostInfo) Identity() string { + // for now we just use the address as the identity + return hi.addr +} + +// Label is a noop function to conform to ringpop hashring member interface +func (hi HostInfo) Label(key string) (value string, has bool) { + return "", false +} + +// SetLabel is a noop function to conform to ringpop hashring member interface +func (hi HostInfo) SetLabel(key string, value string) { +} + +// String will return a human-readable host details +func (hi HostInfo) String() string { + return fmt.Sprintf("addr: %s, identity: %s, portMap: %s", hi.addr, hi.identity, hi.portMap) +} diff --git a/common/membership/peerprovider_mock.go b/common/membership/peerprovider_mock.go index 8a32c0368ab..d7b48deb405 100644 --- a/common/membership/peerprovider_mock.go +++ b/common/membership/peerprovider_mock.go @@ -56,10 +56,10 @@ func (m *MockPeerProvider) EXPECT() *MockPeerProviderMockRecorder { } // GetMembers mocks base method. -func (m *MockPeerProvider) GetMembers(service string) ([]string, error) { +func (m *MockPeerProvider) GetMembers(service string) ([]HostInfo, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetMembers", service) - ret0, _ := ret[0].([]string) + ret0, _ := ret[0].([]HostInfo) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -123,10 +123,10 @@ func (mr *MockPeerProviderMockRecorder) Subscribe(name, notifyChannel interface{ } // WhoAmI mocks base method. -func (m *MockPeerProvider) WhoAmI() (string, error) { +func (m *MockPeerProvider) WhoAmI() (HostInfo, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "WhoAmI") - ret0, _ := ret[0].(string) + ret0, _ := ret[0].(HostInfo) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/common/membership/resolver.go b/common/membership/resolver.go index bd85d55b710..a52f96c0de2 100644 --- a/common/membership/resolver.go +++ b/common/membership/resolver.go @@ -29,6 +29,7 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/service" ) @@ -44,10 +45,10 @@ type ( // Resolver provides membership information for all cadence services. Resolver interface { common.Daemon - // WhoAmI returns self address. + // WhoAmI returns self host details. // To be consistent with peer provider, it is advised to use peer provider // to return this information - WhoAmI() (*HostInfo, error) + WhoAmI() (HostInfo, error) // EvictSelf evicts this member from the membership ring. After this method is // called, other members should discover that this node is no longer part of the @@ -56,7 +57,7 @@ type ( EvictSelf() error // Lookup will return host which is an owner for provided key. - Lookup(service, key string) (*HostInfo, error) + Lookup(service, key string) (HostInfo, error) // Subscribe adds a subscriber which will get detailed change data on the given // channel, whenever membership changes. @@ -69,7 +70,7 @@ type ( MemberCount(service string) (int, error) // Members returns all host addresses in a service specific hashring - Members(service string) ([]*HostInfo, error) + Members(service string) ([]HostInfo, error) } ) @@ -88,7 +89,7 @@ func NewResolver( provider PeerProvider, logger log.Logger, ) (*MultiringResolver, error) { - return NewMultiringResolver(service.List, provider, logger), nil + return NewMultiringResolver(service.List, provider, logger.WithTags(tag.ComponentServiceResolver)), nil } // NewMultiringResolver creates hashrings for all services @@ -144,13 +145,8 @@ func (rpo *MultiringResolver) Stop() { } // WhoAmI asks to provide current instance address -func (rpo *MultiringResolver) WhoAmI() (*HostInfo, error) { - address, err := rpo.provider.WhoAmI() - if err != nil { - return nil, err - } - - return NewHostInfo(address), nil +func (rpo *MultiringResolver) WhoAmI() (HostInfo, error) { + return rpo.provider.WhoAmI() } // EvictSelf is used to remove this host from membership ring @@ -166,10 +162,10 @@ func (rpo *MultiringResolver) getRing(service string) (*ring, error) { return ring, nil } -func (rpo *MultiringResolver) Lookup(service string, key string) (*HostInfo, error) { +func (rpo *MultiringResolver) Lookup(service string, key string) (HostInfo, error) { ring, err := rpo.getRing(service) if err != nil { - return nil, err + return HostInfo{}, err } return ring.Lookup(key) } @@ -190,7 +186,7 @@ func (rpo *MultiringResolver) Unsubscribe(service string, name string) error { return ring.Unsubscribe(name) } -func (rpo *MultiringResolver) Members(service string) ([]*HostInfo, error) { +func (rpo *MultiringResolver) Members(service string) ([]HostInfo, error) { ring, err := rpo.getRing(service) if err != nil { return nil, err diff --git a/common/membership/resolver_mock.go b/common/membership/resolver_mock.go index d44c7e58c93..da0b28663bc 100644 --- a/common/membership/resolver_mock.go +++ b/common/membership/resolver_mock.go @@ -70,10 +70,10 @@ func (mr *MockResolverMockRecorder) EvictSelf() *gomock.Call { } // Lookup mocks base method. -func (m *MockResolver) Lookup(service, key string) (*HostInfo, error) { +func (m *MockResolver) Lookup(service, key string) (HostInfo, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Lookup", service, key) - ret0, _ := ret[0].(*HostInfo) + ret0, _ := ret[0].(HostInfo) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -100,10 +100,10 @@ func (mr *MockResolverMockRecorder) MemberCount(service interface{}) *gomock.Cal } // Members mocks base method. -func (m *MockResolver) Members(service string) ([]*HostInfo, error) { +func (m *MockResolver) Members(service string) ([]HostInfo, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Members", service) - ret0, _ := ret[0].([]*HostInfo) + ret0, _ := ret[0].([]HostInfo) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -167,10 +167,10 @@ func (mr *MockResolverMockRecorder) Unsubscribe(service, name interface{}) *gomo } // WhoAmI mocks base method. -func (m *MockResolver) WhoAmI() (*HostInfo, error) { +func (m *MockResolver) WhoAmI() (HostInfo, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "WhoAmI") - ret0, _ := ret[0].(*HostInfo) + ret0, _ := ret[0].(HostInfo) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/common/membership/resolver_test.go b/common/membership/resolver_test.go index b0a52addf7c..c115990399d 100644 --- a/common/membership/resolver_test.go +++ b/common/membership/resolver_test.go @@ -55,24 +55,28 @@ func TestNewCreatesAllRings(t *testing.T) { func TestMethodsAreRoutedToARing(t *testing.T) { var changeCh = make(chan *ChangedEvent) - a, _ := newTestResolver(t) + a, pp := newTestResolver(t) + // add members to this ring - r, _ := a.getRing("test-worker") - ring := emptyHashring() + hosts := []HostInfo{} for _, addr := range []string{"127", "128"} { - host := NewHostInfo(addr) - ring.AddMembers(host) + hosts = append(hosts, NewHostInfo(addr)) } - r.value.Store(ring) + pp.EXPECT().GetMembers("test-worker").Return(hosts, nil).Times(1) + + r, err := a.getRing("test-worker") + r.refresh() - hi, err := a.Lookup("test-worker", "key") + assert.NoError(t, err) + + hi, err := r.Lookup("key") + assert.NoError(t, err) assert.Equal(t, "127", hi.GetAddress()) // the same ring will be picked here - nohi, err := a.Lookup("WRONG-RING-NAME", "key") + _, err = a.Lookup("WRONG-RING-NAME", "key") assert.Error(t, err) - assert.Nil(t, nohi) members, err := a.Members("test-worker") assert.NoError(t, err) diff --git a/common/peerprovider/ringpopprovider/ringpop_config.go b/common/peerprovider/ringpopprovider/config.go similarity index 100% rename from common/peerprovider/ringpopprovider/ringpop_config.go rename to common/peerprovider/ringpopprovider/config.go diff --git a/common/peerprovider/ringpopprovider/ringpop_config_test.go b/common/peerprovider/ringpopprovider/config_test.go similarity index 100% rename from common/peerprovider/ringpopprovider/ringpop_config_test.go rename to common/peerprovider/ringpopprovider/config_test.go diff --git a/common/peerprovider/ringpopprovider/provider.go b/common/peerprovider/ringpopprovider/provider.go index 6a3259707c8..c37cf8a43e9 100644 --- a/common/peerprovider/ringpopprovider/provider.go +++ b/common/peerprovider/ringpopprovider/provider.go @@ -24,6 +24,8 @@ package ringpopprovider import ( "fmt" + "net" + "strconv" "sync" "sync/atomic" @@ -48,6 +50,7 @@ type ( ringpop *ringpop.Ringpop bootParams *swim.BootstrapOptions logger log.Logger + channel tchannel.Channel mu sync.RWMutex subscribers map[string]chan<- *membership.ChangedEvent @@ -57,7 +60,9 @@ type ( const ( // roleKey label is set by every single service as soon as it bootstraps its // ringpop instance. The data for this key is the service name - roleKey = "serviceName" + roleKey = "serviceName" + portTchannel = "tchannel" + portgRPC = "grpc" ) var _ membership.PeerProvider = (*Provider)(nil) @@ -72,14 +77,9 @@ func New( return nil, err } - rp, err := ringpop.New(config.Name, ringpop.Channel(channel.(*tcg.Channel))) - if err != nil { - return nil, fmt.Errorf("ringpop creation failed: %v", err) - } - discoveryProvider, err := newDiscoveryProvider(config, logger) if err != nil { - return nil, fmt.Errorf("ringpop discovery provider: %v", err) + return nil, fmt.Errorf("ringpop discovery provider: %w", err) } bootstrapOpts := &swim.BootstrapOptions{ @@ -87,21 +87,29 @@ func New( DiscoverProvider: discoveryProvider, } - return NewRingpopProvider(service, rp, bootstrapOpts, logger), nil + rp, err := ringpop.New(config.Name, ringpop.Channel(channel.(*tcg.Channel))) + if err != nil { + return nil, fmt.Errorf("ringpop instance creation: %w", err) + } + + return NewRingpopProvider(service, rp, bootstrapOpts, channel, logger), nil } // NewRingpopProvider sets up ringpop based peer provider -func NewRingpopProvider(service string, +func NewRingpopProvider( + service string, rp *ringpop.Ringpop, bootstrapOpts *swim.BootstrapOptions, + channel tchannel.Channel, logger log.Logger, ) *Provider { return &Provider{ service: service, status: common.DaemonStatusInitialized, - ringpop: rp, bootParams: bootstrapOpts, logger: logger, + channel: channel, + ringpop: rp, subscribers: map[string]chan<- *membership.ChangedEvent{}, } } @@ -129,8 +137,18 @@ func (r *Provider) Start() { r.logger.Fatal("unable to get ring pop labels", tag.Error(err)) } + // set tchannel port to labels + _, port, err := net.SplitHostPort(r.channel.PeerInfo().HostPort) + if err != nil { + r.logger.Fatal("unable get tchannel port", tag.Error(err)) + } + + if err = labels.Set(portTchannel, port); err != nil { + r.logger.Fatal("unable to set ringpop tchannel label", tag.Error(err)) + } + if err = labels.Set(roleKey, r.service); err != nil { - r.logger.Fatal("unable to set ring pop labels", tag.Error(err)) + r.logger.Fatal("unable to set ringpop role label", tag.Error(err)) } } @@ -171,13 +189,53 @@ func (r *Provider) SelfEvict() error { } // GetMembers returns all hosts with a specified role value -func (r *Provider) GetMembers(service string) ([]string, error) { - return r.ringpop.GetReachableMembers(swim.MemberWithLabelAndValue(roleKey, service)) +func (r *Provider) GetMembers(service string) ([]membership.HostInfo, error) { + var res []membership.HostInfo + + // filter member by service name, add port info to Hostinfo if they are present + memberData := func(member swim.Member) bool { + portMap := make(membership.PortMap) + if v, ok := member.Label(roleKey); !ok || v != service { + return false + } + + if v, ok := member.Label(portTchannel); ok { + port, err := labelToPort(v) + if err != nil { + r.logger.Warn("tchannel port cannot be converted", tag.Error(err), tag.Value(v)) + } else { + portMap[portTchannel] = port + } + } + + if v, ok := member.Label(portgRPC); ok { + port, err := labelToPort(v) + if err != nil { + r.logger.Warn("grpc port cannot be converted", tag.Error(err), tag.Value(v)) + } else { + portMap[portgRPC] = port + } + } + + res = append(res, membership.NewDetailedHostInfo(member.GetAddress(), member.Identity(), portMap)) + + return true + } + _, err := r.ringpop.GetReachableMembers(memberData) + if err != nil { + return nil, fmt.Errorf("ringpop get members: %w", err) + } + + return res, nil } // WhoAmI returns address of this instance -func (r *Provider) WhoAmI() (string, error) { - return r.ringpop.WhoAmI() +func (r *Provider) WhoAmI() (membership.HostInfo, error) { + address, err := r.ringpop.WhoAmI() + if err != nil { + return membership.HostInfo{}, fmt.Errorf("ringpop doesn't know Who Am I: %w", err) + } + return membership.NewHostInfo(address), nil } // Stop stops ringpop @@ -206,3 +264,11 @@ func (r *Provider) Subscribe(name string, notifyChannel chan<- *membership.Chang r.subscribers[name] = notifyChannel return nil } + +func labelToPort(label string) (uint16, error) { + port, err := strconv.ParseInt(label, 0, 16) + if err != nil { + return 0, err + } + return uint16(port), nil +} diff --git a/common/peerprovider/ringpopprovider/provider_test.go b/common/peerprovider/ringpopprovider/provider_test.go index 2b5294e4721..7fe29990ca4 100644 --- a/common/peerprovider/ringpopprovider/provider_test.go +++ b/common/peerprovider/ringpopprovider/provider_test.go @@ -99,7 +99,7 @@ func NewTestRingpopCluster(ringPopApp string, size int, ipAddr string, seed stri return nil } - NewRingpopProvider(ringPopApp, ringPop, bOptions, logger) + NewRingpopProvider(ringPopApp, ringPop, bOptions, cluster.channels[i], logger) } return cluster diff --git a/common/resource/resource.go b/common/resource/resource.go index ab7b529251d..e5b9cad169a 100644 --- a/common/resource/resource.go +++ b/common/resource/resource.go @@ -55,7 +55,7 @@ type ( GetServiceName() string GetHostName() string - GetHostInfo() *membership.HostInfo + GetHostInfo() membership.HostInfo GetArchivalMetadata() archiver.ArchivalMetadata GetClusterMetadata() cluster.Metadata diff --git a/common/resource/resourceImpl.go b/common/resource/resourceImpl.go index 36fade252d7..50b39a2f280 100644 --- a/common/resource/resourceImpl.go +++ b/common/resource/resourceImpl.go @@ -73,7 +73,7 @@ type ( numShards int serviceName string hostName string - hostInfo *membership.HostInfo + hostInfo membership.HostInfo metricsScope tally.Scope clusterMetadata cluster.Metadata @@ -382,7 +382,7 @@ func (h *Impl) GetHostName() string { } // GetHostInfo return host info -func (h *Impl) GetHostInfo() *membership.HostInfo { +func (h *Impl) GetHostInfo() membership.HostInfo { return h.hostInfo } diff --git a/common/resource/resourceTest.go b/common/resource/resourceTest.go index e1fec6af73c..86989f080ce 100644 --- a/common/resource/resourceTest.go +++ b/common/resource/resourceTest.go @@ -218,7 +218,7 @@ func (s *Test) GetHostName() string { } // GetHostInfo for testing -func (s *Test) GetHostInfo() *membership.HostInfo { +func (s *Test) GetHostInfo() membership.HostInfo { return testHostInfo } diff --git a/host/membership_hashring.go b/host/membership_hashring.go index 75421f202ae..02eab308f8d 100644 --- a/host/membership_hashring.go +++ b/host/membership_hashring.go @@ -27,21 +27,21 @@ import ( ) type simpleHashring struct { - hosts []*membership.HostInfo + hosts []membership.HostInfo hashfunc func([]byte) uint32 } // newSimpleHashring returns a service resolver that maintains static mapping // between services and host info func newSimpleHashring(hosts []string) *simpleHashring { - hostInfos := make([]*membership.HostInfo, 0, len(hosts)) + hostInfos := make([]membership.HostInfo, 0, len(hosts)) for _, host := range hosts { hostInfos = append(hostInfos, membership.NewHostInfo(host)) } return &simpleHashring{hostInfos, farm.Fingerprint32} } -func (s *simpleHashring) Lookup(key string) (*membership.HostInfo, error) { +func (s *simpleHashring) Lookup(key string) (membership.HostInfo, error) { hash := int(s.hashfunc([]byte(key))) idx := hash % len(s.hosts) return s.hosts[idx], nil @@ -59,6 +59,6 @@ func (s *simpleHashring) MemberCount() int { return len(s.hosts) } -func (s *simpleHashring) Members() []*membership.HostInfo { +func (s *simpleHashring) Members() []membership.HostInfo { return s.hosts } diff --git a/host/membership_resolver.go b/host/membership_resolver.go index 8895f28c9f8..e31991550fb 100644 --- a/host/membership_resolver.go +++ b/host/membership_resolver.go @@ -27,7 +27,7 @@ import ( ) type simpleResolver struct { - hostInfo *membership.HostInfo + hostInfo membership.HostInfo resolvers map[string]*simpleHashring } @@ -53,7 +53,7 @@ func (s *simpleResolver) EvictSelf() error { return nil } -func (s *simpleResolver) WhoAmI() (*membership.HostInfo, error) { +func (s *simpleResolver) WhoAmI() (membership.HostInfo, error) { return s.hostInfo, nil } @@ -65,10 +65,10 @@ func (s *simpleResolver) Unsubscribe(service string, name string) error { return nil } -func (s *simpleResolver) Lookup(service string, key string) (*membership.HostInfo, error) { +func (s *simpleResolver) Lookup(service string, key string) (membership.HostInfo, error) { resolver, ok := s.resolvers[service] if !ok { - return nil, fmt.Errorf("cannot lookup host for service %q", service) + return membership.HostInfo{}, fmt.Errorf("cannot lookup host for service %q", service) } return resolver.Lookup(key) } @@ -77,6 +77,6 @@ func (s *simpleResolver) MemberCount(service string) (int, error) { return 0, nil } -func (s *simpleResolver) Members(service string) ([]*membership.HostInfo, error) { +func (s *simpleResolver) Members(service string) ([]membership.HostInfo, error) { return nil, nil } diff --git a/host/service.go b/host/service.go index b23e72fbbdb..4ae622bb1f7 100644 --- a/host/service.go +++ b/host/service.go @@ -60,7 +60,7 @@ type ( GetTimeSource() clock.TimeSource GetDispatcher() *yarpc.Dispatcher GetMembershipResolver() membership.Resolver - GetHostInfo() *membership.HostInfo + GetHostInfo() membership.HostInfo GetClusterMetadata() cluster.Metadata GetMessagingClient() messaging.Client GetBlobstoreClient() blobstore.Client @@ -74,7 +74,7 @@ type ( status int32 sName string hostName string - hostInfo *membership.HostInfo + hostInfo membership.HostInfo dispatcher *yarpc.Dispatcher membershipResolver membership.Resolver rpcFactory common.RPCFactory @@ -232,7 +232,7 @@ func (h *serviceImpl) GetMembershipResolver() membership.Resolver { return h.membershipResolver } -func (h *serviceImpl) GetHostInfo() *membership.HostInfo { +func (h *serviceImpl) GetHostInfo() membership.HostInfo { return h.hostInfo } diff --git a/service/history/shard/controller_test.go b/service/history/shard/controller_test.go index 4b70d296d81..687c8af27a7 100644 --- a/service/history/shard/controller_test.go +++ b/service/history/shard/controller_test.go @@ -59,7 +59,7 @@ type ( mockClusterMetadata *cluster.MockMetadata mockMembershipResolver *membership.MockResolver - hostInfo *membership.HostInfo + hostInfo membership.HostInfo mockShardManager *mmocks.ShardManager mockEngineFactory *MockEngineFactory @@ -280,12 +280,12 @@ func (s *controllerSuite) TestAcquireShardLookupFailure() { numShards := 2 s.config.NumberOfShards = numShards for shardID := 0; shardID < numShards; shardID++ { - s.mockMembershipResolver.EXPECT().Lookup(service.History, string(rune(shardID))).Return(nil, errors.New("ring failure")).Times(1) + s.mockMembershipResolver.EXPECT().Lookup(service.History, string(rune(shardID))).Return(membership.HostInfo{}, errors.New("ring failure")).Times(1) } s.shardController.acquireShards() for shardID := 0; shardID < numShards; shardID++ { - s.mockMembershipResolver.EXPECT().Lookup(service.History, string(rune(shardID))).Return(nil, errors.New("ring failure")).Times(1) + s.mockMembershipResolver.EXPECT().Lookup(service.History, string(rune(shardID))).Return(membership.HostInfo{}, errors.New("ring failure")).Times(1) s.Nil(s.shardController.GetEngineForShard(shardID)) } } @@ -445,7 +445,7 @@ func (s *controllerSuite) TestAcquireShardRenewLookupFailed() { s.shardController.acquireShards() for shardID := 0; shardID < numShards; shardID++ { - s.mockMembershipResolver.EXPECT().Lookup(service.History, string(rune(shardID))).Return(nil, errors.New("ring failure")).Times(1) + s.mockMembershipResolver.EXPECT().Lookup(service.History, string(rune(shardID))).Return(membership.HostInfo{}, errors.New("ring failure")).Times(1) } s.shardController.acquireShards() diff --git a/service/worker/replicator/domain_replication_processor.go b/service/worker/replicator/domain_replication_processor.go index d9ad0f0226e..4160b2f1fd9 100644 --- a/service/worker/replicator/domain_replication_processor.go +++ b/service/worker/replicator/domain_replication_processor.go @@ -51,7 +51,7 @@ const ( type ( domainReplicationProcessor struct { - hostInfo *membership.HostInfo + hostInfo membership.HostInfo membershipResolver membership.Resolver status int32 sourceCluster string @@ -75,7 +75,7 @@ func newDomainReplicationProcessor( remotePeer admin.Client, metricsClient metrics.Client, taskExecutor domain.ReplicationTaskExecutor, - hostInfo *membership.HostInfo, + hostInfo membership.HostInfo, resolver membership.Resolver, domainReplicationQueue domain.ReplicationQueue, replicationMaxRetry time.Duration, diff --git a/service/worker/replicator/replicator.go b/service/worker/replicator/replicator.go index aacf3487f9f..8197ea9d681 100644 --- a/service/worker/replicator/replicator.go +++ b/service/worker/replicator/replicator.go @@ -43,7 +43,7 @@ type ( domainProcessors []*domainReplicationProcessor logger log.Logger metricsClient metrics.Client - hostInfo *membership.HostInfo + hostInfo membership.HostInfo membershipResolver membership.Resolver domainReplicationQueue domain.ReplicationQueue replicationMaxRetry time.Duration @@ -56,7 +56,7 @@ func NewReplicator( clientBean client.Bean, logger log.Logger, metricsClient metrics.Client, - hostInfo *membership.HostInfo, + hostInfo membership.HostInfo, membership membership.Resolver, domainReplicationQueue domain.ReplicationQueue, domainReplicationTaskExecutor domain.ReplicationTaskExecutor,