From 3557eb5dd40baf32bf5cc312f8ca725bdff39997 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mantas=20=C5=A0idlauskas?= Date: Fri, 19 Nov 2021 13:44:51 +0200 Subject: [PATCH] Merge membership Monitor and ServiceResolver to membership.Resolver (#4646) --- client/clientfactory.go | 18 +- client/history/peerResolver.go | 9 +- client/history/peerResolver_test.go | 11 +- client/matching/peerResolver.go | 16 +- client/matching/peerResolver_test.go | 11 +- cmd/server/cadence/server.go | 2 +- common/log/tag/tags.go | 6 +- common/membership/hashring.go | 330 +++++++++++++++ common/membership/monitor.go | 155 ------- common/membership/monitor_mock.go | 291 ------------- common/membership/resolver.go | 388 +++++++----------- common/membership/resolver_mock.go | 182 ++++++++ .../{rpMonitor_test.go => resolver_test.go} | 16 +- common/membership/rpMonitor.go | 77 ---- common/membership/rp_cluster_test.go | 6 +- common/mocks/ServiceResolver.go | 113 ----- common/resource/params.go | 2 +- common/resource/resource.go | 7 +- common/resource/resourceImpl.go | 77 +--- common/resource/resourceTest.go | 59 +-- environment/env.go | 2 +- ...viceResolver.go => membership_hashring.go} | 18 +- ...impleMonitor.go => membership_resolver.go} | 48 ++- host/onebox.go | 22 +- host/service.go | 20 +- service/frontend/adminHandler.go | 14 +- service/frontend/workflowHandler.go | 9 +- service/frontend/workflowHandler_test.go | 4 +- service/history/handler.go | 5 +- service/history/service.go | 2 +- service/history/shard/controller.go | 14 +- service/history/shard/controller_test.go | 50 +-- service/matching/handler.go | 2 +- service/matching/matchingEngine.go | 10 +- service/matching/service.go | 2 +- .../domain_replication_processor.go | 46 ++- .../domain_replication_processor_test.go | 5 +- service/worker/replicator/replicator.go | 8 +- service/worker/service.go | 2 +- 39 files changed, 884 insertions(+), 1175 deletions(-) create mode 100644 common/membership/hashring.go delete mode 100644 common/membership/monitor.go delete mode 100644 common/membership/monitor_mock.go create mode 100644 common/membership/resolver_mock.go rename common/membership/{rpMonitor_test.go => resolver_test.go} (89%) delete mode 100644 common/membership/rpMonitor.go delete mode 100644 common/mocks/ServiceResolver.go rename host/{simpleServiceResolver.go => membership_hashring.go} (77%) rename host/{simpleMonitor.go => membership_resolver.go} (53%) diff --git a/client/clientfactory.go b/client/clientfactory.go index b1fc4c48f0c..8f082f33025 100644 --- a/client/clientfactory.go +++ b/client/clientfactory.go @@ -67,7 +67,7 @@ type ( rpcClientFactory struct { rpcFactory common.RPCFactory - monitor membership.Monitor + resolver membership.Resolver metricsClient metrics.Client dynConfig *dynamicconfig.Collection numberOfHistoryShards int @@ -78,7 +78,7 @@ type ( // NewRPCClientFactory creates an instance of client factory that knows how to dispatch RPC calls. func NewRPCClientFactory( rpcFactory common.RPCFactory, - monitor membership.Monitor, + resolver membership.Resolver, metricsClient metrics.Client, dc *dynamicconfig.Collection, numberOfHistoryShards int, @@ -86,7 +86,7 @@ func NewRPCClientFactory( ) Factory { return &rpcClientFactory{ rpcFactory: rpcFactory, - monitor: monitor, + resolver: resolver, metricsClient: metricsClient, dynConfig: dc, numberOfHistoryShards: numberOfHistoryShards, @@ -115,11 +115,7 @@ func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) ( rawClient = history.NewThriftClient(historyserviceclient.New(outboundConfig)) } - resolver, err := cf.monitor.GetResolver(service.History) - if err != nil { - return nil, err - } - peerResolver := history.NewPeerResolver(cf.numberOfHistoryShards, resolver, addressMapper) + peerResolver := history.NewPeerResolver(cf.numberOfHistoryShards, cf.resolver, addressMapper) supportedMessageSize := cf.rpcFactory.GetMaxMessageSize() maxSizeConfig := cf.dynConfig.GetIntProperty(dynamicconfig.GRPCMaxSizeInByte, supportedMessageSize) @@ -164,11 +160,7 @@ func (cf *rpcClientFactory) NewMatchingClientWithTimeout( rawClient = matching.NewThriftClient(matchingserviceclient.New(outboundConfig)) } - resolver, err := cf.monitor.GetResolver(service.Matching) - if err != nil { - return nil, err - } - peerResolver := matching.NewPeerResolver(resolver, addressMapper) + peerResolver := matching.NewPeerResolver(cf.resolver, addressMapper) client := matching.NewClient( timeout, diff --git a/client/history/peerResolver.go b/client/history/peerResolver.go index c3e04474bd1..5a3e81f5416 100644 --- a/client/history/peerResolver.go +++ b/client/history/peerResolver.go @@ -23,6 +23,7 @@ package history import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/membership" + "github.com/uber/cadence/common/service" ) // PeerResolver is used to resolve history peers. @@ -30,17 +31,17 @@ import ( // The resulting peer is simply an address of form ip:port where RPC calls can be routed to. type PeerResolver struct { numberOfShards int - membership membership.ServiceResolver + resolver membership.Resolver addressMapper AddressMapperFn } type AddressMapperFn func(string) (string, error) // NewPeerResolver creates a new history peer resolver. -func NewPeerResolver(numberOfShards int, membership membership.ServiceResolver, addressMapper AddressMapperFn) PeerResolver { +func NewPeerResolver(numberOfShards int, resolver membership.Resolver, addressMapper AddressMapperFn) PeerResolver { return PeerResolver{ numberOfShards: numberOfShards, - membership: membership, + resolver: resolver, addressMapper: addressMapper, } } @@ -66,7 +67,7 @@ func (pr PeerResolver) FromDomainID(domainID string) (string, error) { // FromHostAddress is used for further resolving. func (pr PeerResolver) FromShardID(shardID int) (string, error) { shardIDString := string(rune(shardID)) - host, err := pr.membership.Lookup(shardIDString) + host, err := pr.resolver.Lookup(service.History, shardIDString) if err != nil { return "", err } diff --git a/client/history/peerResolver_test.go b/client/history/peerResolver_test.go index 94488c70d66..5c7822fac29 100644 --- a/client/history/peerResolver_test.go +++ b/client/history/peerResolver_test.go @@ -28,16 +28,17 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/membership" + "github.com/uber/cadence/common/service" ) func TestPeerResolver(t *testing.T) { numShards := 123 controller := gomock.NewController(t) - serviceResolver := membership.NewMockServiceResolver(controller) - serviceResolver.EXPECT().Lookup(string(rune(common.DomainIDToHistoryShard("domainID", numShards)))).Return(membership.NewHostInfo("domainHost:thriftPort", nil), nil) - serviceResolver.EXPECT().Lookup(string(rune(common.WorkflowIDToHistoryShard("workflowID", numShards)))).Return(membership.NewHostInfo("workflowHost:thriftPort", nil), nil) - serviceResolver.EXPECT().Lookup(string(rune(99))).Return(membership.NewHostInfo("shardHost:thriftPort", nil), nil) - serviceResolver.EXPECT().Lookup(string(rune(11))).Return(nil, assert.AnError) + serviceResolver := membership.NewMockResolver(controller) + serviceResolver.EXPECT().Lookup(service.History, string(rune(common.DomainIDToHistoryShard("domainID", numShards)))).Return(membership.NewHostInfo("domainHost:thriftPort", nil), nil) + serviceResolver.EXPECT().Lookup(service.History, string(rune(common.WorkflowIDToHistoryShard("workflowID", numShards)))).Return(membership.NewHostInfo("workflowHost:thriftPort", nil), nil) + serviceResolver.EXPECT().Lookup(service.History, string(rune(99))).Return(membership.NewHostInfo("shardHost:thriftPort", nil), nil) + serviceResolver.EXPECT().Lookup(service.History, string(rune(11))).Return(nil, assert.AnError) r := NewPeerResolver(numShards, serviceResolver, fakeAddressMapper) diff --git a/client/matching/peerResolver.go b/client/matching/peerResolver.go index 2785fc1687e..7e7f47389a4 100644 --- a/client/matching/peerResolver.go +++ b/client/matching/peerResolver.go @@ -20,20 +20,23 @@ package matching -import "github.com/uber/cadence/common/membership" +import ( + "github.com/uber/cadence/common/membership" + "github.com/uber/cadence/common/service" +) // PeerResolver is used to resolve matching peers. // Those are deployed instances of Cadence matching services that participate in the cluster ring. // The resulting peer is simply an address of form ip:port where RPC calls can be routed to. type PeerResolver struct { - membership membership.ServiceResolver + resolver membership.Resolver addressMapper AddressMapperFn } type AddressMapperFn func(string) (string, error) // NewPeerResolver creates a new matching peer resolver. -func NewPeerResolver(membership membership.ServiceResolver, addressMapper AddressMapperFn) PeerResolver { +func NewPeerResolver(membership membership.Resolver, addressMapper AddressMapperFn) PeerResolver { return PeerResolver{membership, addressMapper} } @@ -41,7 +44,7 @@ func NewPeerResolver(membership membership.ServiceResolver, addressMapper Addres // It uses our membership provider to lookup which instance currently owns the given task list. // FromHostAddress is used for further resolving. func (pr PeerResolver) FromTaskList(taskListName string) (string, error) { - host, err := pr.membership.Lookup(taskListName) + host, err := pr.resolver.Lookup(service.Matching, taskListName) if err != nil { return "", err } @@ -51,7 +54,10 @@ func (pr PeerResolver) FromTaskList(taskListName string) (string, error) { // GetAllPeers returns all matching service peers in the cluster ring. func (pr PeerResolver) GetAllPeers() ([]string, error) { - hosts := pr.membership.Members() + hosts, err := pr.resolver.Members(service.Matching) + if err != nil { + return nil, err + } peers := make([]string, 0, len(hosts)) for _, host := range hosts { peer, err := pr.FromHostAddress(host.GetAddress()) diff --git a/client/matching/peerResolver_test.go b/client/matching/peerResolver_test.go index dcfc1d87a54..33321f4e0ee 100644 --- a/client/matching/peerResolver_test.go +++ b/client/matching/peerResolver_test.go @@ -27,17 +27,18 @@ import ( "github.com/stretchr/testify/assert" "github.com/uber/cadence/common/membership" + "github.com/uber/cadence/common/service" ) func TestPeerResolver(t *testing.T) { controller := gomock.NewController(t) - serviceResolver := membership.NewMockServiceResolver(controller) - serviceResolver.EXPECT().Lookup("taskListA").Return(membership.NewHostInfo("taskListA:thriftPort", nil), nil) - serviceResolver.EXPECT().Lookup("invalid").Return(nil, assert.AnError) - serviceResolver.EXPECT().Members().Return([]*membership.HostInfo{ + serviceResolver := membership.NewMockResolver(controller) + serviceResolver.EXPECT().Lookup(service.Matching, "taskListA").Return(membership.NewHostInfo("taskListA:thriftPort", nil), nil) + serviceResolver.EXPECT().Lookup(service.Matching, "invalid").Return(nil, assert.AnError) + serviceResolver.EXPECT().Members(service.Matching).Return([]*membership.HostInfo{ membership.NewHostInfo("taskListA:thriftPort", nil), membership.NewHostInfo("taskListB:thriftPort", nil), - }) + }, nil) r := NewPeerResolver(serviceResolver, fakeAddressMapper) diff --git a/cmd/server/cadence/server.go b/cmd/server/cadence/server.go index 88fcf0b869a..cfeadb16fe4 100644 --- a/cmd/server/cadence/server.go +++ b/cmd/server/cadence/server.go @@ -156,7 +156,7 @@ func (s *server) startService() common.Daemon { ) rpcFactory := rpc.NewFactory(params.Logger, rpcParams) params.RPCFactory = rpcFactory - params.MembershipMonitor, err = membership.NewMonitor( + params.MembershipResolver, err = membership.NewResolver( &s.cfg.Ringpop, rpcFactory.GetChannel(), params.Name, diff --git a/common/log/tag/tags.go b/common/log/tag/tags.go index 457787c5a50..5c61073f5bc 100644 --- a/common/log/tag/tags.go +++ b/common/log/tag/tags.go @@ -338,9 +338,9 @@ func Addresses(ads []string) Tag { return newObjectTag("addresses", ads) } -// ListenerName returns tag for ListenerName -func ListenerName(name string) Tag { - return newStringTag("listener-name", name) +// Subscriber returns tag for Subscriber +func Subscriber(subscriber string) Tag { + return newStringTag("subscriber", subscriber) } // Address return tag for Address diff --git a/common/membership/hashring.go b/common/membership/hashring.go new file mode 100644 index 00000000000..cf447c65338 --- /dev/null +++ b/common/membership/hashring.go @@ -0,0 +1,330 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package membership + +import ( + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/dgryski/go-farm" + "github.com/uber/ringpop-go/events" + "github.com/uber/ringpop-go/hashring" + "github.com/uber/ringpop-go/swim" + + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/common/types" +) + +// ErrInsufficientHosts is thrown when there are not enough hosts to serve the request +var ErrInsufficientHosts = &types.InternalServiceError{Message: "Not enough hosts to serve the request"} + +const ( + // RoleKey label is set by every single service as soon as it bootstraps its + // ringpop instance. The data for this key is the service name + RoleKey = "serviceName" + minRefreshInternal = time.Second * 4 + defaultRefreshInterval = time.Second * 10 + replicaPoints = 100 +) + +type ringpopHashring struct { + status int32 + service string + rp *RingpopWrapper + refreshChan chan struct{} + shutdownCh chan struct{} + shutdownWG sync.WaitGroup + logger log.Logger + + ringValue atomic.Value // this stores the current hashring + + refreshLock sync.Mutex + lastRefreshTime time.Time + membersMap map[string]struct{} // for de-duping change notifications + + smu sync.RWMutex + subscribers map[string]chan<- *ChangedEvent +} + +func newRingpopHashring( + service string, + rp *RingpopWrapper, + logger log.Logger, +) *ringpopHashring { + + hashring := &ringpopHashring{ + status: common.DaemonStatusInitialized, + service: service, + rp: rp, + refreshChan: make(chan struct{}), + shutdownCh: make(chan struct{}), + logger: logger.WithTags(tag.ComponentServiceResolver), + membersMap: make(map[string]struct{}), + subscribers: make(map[string]chan<- *ChangedEvent), + } + hashring.ringValue.Store(emptyHashring()) + return hashring +} + +func emptyHashring() *hashring.HashRing { + return hashring.New(farm.Fingerprint32, replicaPoints) +} + +// Start starts the oracle +func (r *ringpopHashring) Start() { + if !atomic.CompareAndSwapInt32( + &r.status, + common.DaemonStatusInitialized, + common.DaemonStatusStarted, + ) { + return + } + + r.rp.AddListener(r) + if err := r.refresh(); err != nil { + r.logger.Fatal("unable to start ring pop service resolver", tag.Error(err)) + } + + r.shutdownWG.Add(1) + go r.refreshRingWorker() +} + +// Stop stops the resolver +func (r *ringpopHashring) Stop() { + if !atomic.CompareAndSwapInt32( + &r.status, + common.DaemonStatusStarted, + common.DaemonStatusStopped, + ) { + return + } + + r.smu.Lock() + defer r.smu.Unlock() + r.rp.RemoveListener(r) + r.ringValue.Store(emptyHashring()) + r.subscribers = make(map[string]chan<- *ChangedEvent) + close(r.shutdownCh) + + if success := common.AwaitWaitGroup(&r.shutdownWG, time.Minute); !success { + r.logger.Warn("service resolver timed out on shutdown.") + } +} + +// Lookup finds the host in the ring responsible for serving the given key +func (r *ringpopHashring) Lookup( + key string, +) (*HostInfo, error) { + + addr, found := r.ring().Lookup(key) + if !found { + select { + case r.refreshChan <- struct{}{}: + default: + } + return nil, ErrInsufficientHosts + } + return NewHostInfo(addr, r.getLabelsMap()), nil +} + +func (r *ringpopHashring) AddListener( + name string, + notifyChannel chan<- *ChangedEvent, +) error { + + r.smu.Lock() + defer r.smu.Unlock() + _, ok := r.subscribers[name] + if ok { + return fmt.Errorf("listener already exist for service %q", name) + } + r.subscribers[name] = notifyChannel + return nil +} + +func (r *ringpopHashring) RemoveListener( + name string, +) error { + + r.smu.Lock() + defer r.smu.Unlock() + _, ok := r.subscribers[name] + if !ok { + return nil + } + delete(r.subscribers, name) + return nil +} + +func (r *ringpopHashring) MemberCount() int { + return r.ring().ServerCount() +} + +func (r *ringpopHashring) Members() []*HostInfo { + var servers []*HostInfo + for _, s := range r.ring().Servers() { + servers = append(servers, NewHostInfo(s, r.getLabelsMap())) + } + + return servers +} + +// HandleEvent handles updates from ringpop +func (r *ringpopHashring) HandleEvent( + event events.Event, +) { + + // We only care about RingChangedEvent + e, ok := event.(events.RingChangedEvent) + if ok { + r.logger.Info("Received a ring changed event") + // Note that we receive events asynchronously, possibly out of order. + // We cannot rely on the content of the event, rather we load everything + // from ringpop when we get a notification that something changed. + if err := r.refresh(); err != nil { + r.logger.Error("error refreshing ring when receiving a ring changed event", tag.Error(err)) + } + r.emitEvent(e) + } +} + +func (r *ringpopHashring) refresh() error { + r.refreshLock.Lock() + defer r.refreshLock.Unlock() + return r.refreshNoLock() +} + +func (r *ringpopHashring) refreshWithBackoff() error { + r.refreshLock.Lock() + defer r.refreshLock.Unlock() + if r.lastRefreshTime.After(time.Now().Add(-minRefreshInternal)) { + // refresh too frequently + return nil + } + return r.refreshNoLock() +} + +func (r *ringpopHashring) refreshNoLock() error { + addrs, err := r.rp.GetReachableMembers(swim.MemberWithLabelAndValue(RoleKey, r.service)) + if err != nil { + return err + } + + newMembersMap, changed := r.compareMembers(addrs) + if !changed { + return nil + } + + ring := emptyHashring() + for _, addr := range addrs { + host := NewHostInfo(addr, r.getLabelsMap()) + ring.AddMembers(host) + } + + r.membersMap = newMembersMap + r.lastRefreshTime = time.Now() + r.ringValue.Store(ring) + r.logger.Info("Current reachable members", tag.Addresses(addrs)) + return nil +} + +func (r *ringpopHashring) emitEvent( + rpEvent events.RingChangedEvent, +) { + + // Marshall the event object into the required type + event := &ChangedEvent{} + for _, addr := range rpEvent.ServersAdded { + event.HostsAdded = append(event.HostsAdded, NewHostInfo(addr, r.getLabelsMap())) + } + for _, addr := range rpEvent.ServersRemoved { + event.HostsRemoved = append(event.HostsRemoved, NewHostInfo(addr, r.getLabelsMap())) + } + for _, addr := range rpEvent.ServersUpdated { + event.HostsUpdated = append(event.HostsUpdated, NewHostInfo(addr, r.getLabelsMap())) + } + + // Notify subscribers + r.smu.RLock() + defer r.smu.RUnlock() + + for name, ch := range r.subscribers { + select { + case ch <- event: + default: + r.logger.Error("Failed to send event to subscriber, channel full", tag.Subscriber(name)) + } + } +} + +func (r *ringpopHashring) refreshRingWorker() { + defer r.shutdownWG.Done() + + refreshTicker := time.NewTicker(defaultRefreshInterval) + defer refreshTicker.Stop() + + for { + select { + case <-r.shutdownCh: + return + case <-r.refreshChan: + if err := r.refreshWithBackoff(); err != nil { + r.logger.Error("error periodically refreshing ring", tag.Error(err)) + } + case <-refreshTicker.C: + if err := r.refreshWithBackoff(); err != nil { + r.logger.Error("error periodically refreshing ring", tag.Error(err)) + } + } + } +} + +func (r *ringpopHashring) ring() *hashring.HashRing { + return r.ringValue.Load().(*hashring.HashRing) +} + +func (r *ringpopHashring) getLabelsMap() map[string]string { + labels := make(map[string]string) + labels[RoleKey] = r.service + return labels +} + +func (r *ringpopHashring) compareMembers(addrs []string) (map[string]struct{}, bool) { + changed := false + newMembersMap := make(map[string]struct{}, len(addrs)) + for _, addr := range addrs { + newMembersMap[addr] = struct{}{} + if _, ok := r.membersMap[addr]; !ok { + changed = true + } + } + for addr := range r.membersMap { + if _, ok := newMembersMap[addr]; !ok { + changed = true + break + } + } + return newMembersMap, changed +} diff --git a/common/membership/monitor.go b/common/membership/monitor.go deleted file mode 100644 index e3d4a338f7c..00000000000 --- a/common/membership/monitor.go +++ /dev/null @@ -1,155 +0,0 @@ -// Copyright (c) 2017 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination monitor_mock.go -self_package github.com/uber/cadence/common/membership - -package membership - -import ( - "fmt" - "sync/atomic" - - "github.com/uber/cadence/common" - "github.com/uber/cadence/common/log" - "github.com/uber/cadence/common/log/tag" -) - -type ( - - // ChangedEvent describes a change in membership - ChangedEvent struct { - HostsAdded []*HostInfo - HostsUpdated []*HostInfo - HostsRemoved []*HostInfo - } - - // Monitor provides membership information for all cadence services. - Monitor interface { - common.Daemon - // WhoAmI returns self address - WhoAmI() (*HostInfo, error) - // EvictSelf evicts this member from the membership ring. After this method is - // called, other members will discover that this node is no longer part of the - // ring. This primitive is useful to carry out graceful host shutdown during deployments. - EvictSelf() error - GetResolver(service string) (ServiceResolver, error) - } - - // ServiceResolver provides membership information for a specific cadence service. - // It can be used to resolve which member host is responsible for serving a given key. - ServiceResolver interface { - Lookup(key string) (*HostInfo, error) - // AddListener adds a listener which will get notified on the given - // channel, whenever membership changes. - AddListener(name string, notifyChannel chan<- *ChangedEvent) error - // RemoveListener removes a listener for this service. - RemoveListener(name string) error - // MemberCount returns host count in a hashring - MemberCount() int - // Members returns all host addresses in a hashring - Members() []*HostInfo - } -) - -// NewRingpopMonitor returns a ringpop-based membership monitor -func NewRingpopMonitor( - serviceName string, - services []string, - rp *RingpopWrapper, - logger log.Logger, -) *RingpopMonitor { - - rpo := &RingpopMonitor{ - status: common.DaemonStatusInitialized, - serviceName: serviceName, - ringpopWrapper: rp, - logger: logger, - rings: make(map[string]*ringpopServiceResolver), - } - for _, s := range services { - rpo.rings[s] = newRingpopServiceResolver(s, rp, logger) - } - return rpo -} - -func (rpo *RingpopMonitor) Start() { - if !atomic.CompareAndSwapInt32( - &rpo.status, - common.DaemonStatusInitialized, - common.DaemonStatusStarted, - ) { - return - } - - rpo.ringpopWrapper.Start() - - labels, err := rpo.ringpopWrapper.Labels() - if err != nil { - rpo.logger.Fatal("unable to get ring pop labels", tag.Error(err)) - } - - if err = labels.Set(RoleKey, rpo.serviceName); err != nil { - rpo.logger.Fatal("unable to set ring pop labels", tag.Error(err)) - } - - for _, ring := range rpo.rings { - ring.Start() - } -} - -func (rpo *RingpopMonitor) Stop() { - if !atomic.CompareAndSwapInt32( - &rpo.status, - common.DaemonStatusStarted, - common.DaemonStatusStopped, - ) { - return - } - - for _, ring := range rpo.rings { - ring.Stop() - } - - rpo.ringpopWrapper.Stop() -} - -func (rpo *RingpopMonitor) WhoAmI() (*HostInfo, error) { - address, err := rpo.ringpopWrapper.WhoAmI() - if err != nil { - return nil, err - } - labels, err := rpo.ringpopWrapper.Labels() - if err != nil { - return nil, err - } - return NewHostInfo(address, labels.AsMap()), nil -} - -func (rpo *RingpopMonitor) EvictSelf() error { - return rpo.ringpopWrapper.SelfEvict() -} - -func (rpo *RingpopMonitor) GetResolver(service string) (ServiceResolver, error) { - ring, found := rpo.rings[service] - if !found { - return nil, fmt.Errorf("service %q is not tracked by Monitor", service) - } - return ring, nil -} diff --git a/common/membership/monitor_mock.go b/common/membership/monitor_mock.go deleted file mode 100644 index 729c50ad993..00000000000 --- a/common/membership/monitor_mock.go +++ /dev/null @@ -1,291 +0,0 @@ -// The MIT License (MIT) - -// Copyright (c) 2017-2020 Uber Technologies Inc. - -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -// SOFTWARE. - -// Code generated by MockGen. DO NOT EDIT. -// Source: monitor.go - -// Package membership is a generated GoMock package. -package membership - -import ( - reflect "reflect" - - gomock "github.com/golang/mock/gomock" -) - -// MockMonitor is a mock of Monitor interface -type MockMonitor struct { - ctrl *gomock.Controller - recorder *MockMonitorMockRecorder -} - -// MockMonitorMockRecorder is the mock recorder for MockMonitor -type MockMonitorMockRecorder struct { - mock *MockMonitor -} - -// NewMockMonitor creates a new mock instance -func NewMockMonitor(ctrl *gomock.Controller) *MockMonitor { - mock := &MockMonitor{ctrl: ctrl} - mock.recorder = &MockMonitorMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use -func (m *MockMonitor) EXPECT() *MockMonitorMockRecorder { - return m.recorder -} - -// Start mocks base method -func (m *MockMonitor) Start() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Start") -} - -// Start indicates an expected call of Start -func (mr *MockMonitorMockRecorder) Start() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockMonitor)(nil).Start)) -} - -// Stop mocks base method -func (m *MockMonitor) Stop() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Stop") -} - -// Stop indicates an expected call of Stop -func (mr *MockMonitorMockRecorder) Stop() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockMonitor)(nil).Stop)) -} - -// WhoAmI mocks base method -func (m *MockMonitor) WhoAmI() (*HostInfo, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "WhoAmI") - ret0, _ := ret[0].(*HostInfo) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// WhoAmI indicates an expected call of WhoAmI -func (mr *MockMonitorMockRecorder) WhoAmI() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WhoAmI", reflect.TypeOf((*MockMonitor)(nil).WhoAmI)) -} - -// EvictSelf mocks base method -func (m *MockMonitor) EvictSelf() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "EvictSelf") - ret0, _ := ret[0].(error) - return ret0 -} - -// EvictSelf indicates an expected call of EvictSelf -func (mr *MockMonitorMockRecorder) EvictSelf() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EvictSelf", reflect.TypeOf((*MockMonitor)(nil).EvictSelf)) -} - -// Lookup mocks base method -func (m *MockMonitor) Lookup(service, key string) (*HostInfo, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Lookup", service, key) - ret0, _ := ret[0].(*HostInfo) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// Lookup indicates an expected call of Lookup -func (mr *MockMonitorMockRecorder) Lookup(service, key interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Lookup", reflect.TypeOf((*MockMonitor)(nil).Lookup), service, key) -} - -// GetResolver mocks base method -func (m *MockMonitor) GetResolver(service string) (ServiceResolver, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetResolver", service) - ret0, _ := ret[0].(ServiceResolver) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetResolver indicates an expected call of GetResolver -func (mr *MockMonitorMockRecorder) GetResolver(service interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetResolver", reflect.TypeOf((*MockMonitor)(nil).GetResolver), service) -} - -// AddListener mocks base method -func (m *MockMonitor) AddListener(service, name string, notifyChannel chan<- *ChangedEvent) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddListener", service, name, notifyChannel) - ret0, _ := ret[0].(error) - return ret0 -} - -// AddListener indicates an expected call of AddListener -func (mr *MockMonitorMockRecorder) AddListener(service, name, notifyChannel interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddListener", reflect.TypeOf((*MockMonitor)(nil).AddListener), service, name, notifyChannel) -} - -// RemoveListener mocks base method -func (m *MockMonitor) RemoveListener(service, name string) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "RemoveListener", service, name) - ret0, _ := ret[0].(error) - return ret0 -} - -// RemoveListener indicates an expected call of RemoveListener -func (mr *MockMonitorMockRecorder) RemoveListener(service, name interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveListener", reflect.TypeOf((*MockMonitor)(nil).RemoveListener), service, name) -} - -// GetReachableMembers mocks base method -func (m *MockMonitor) GetReachableMembers() ([]string, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetReachableMembers") - ret0, _ := ret[0].([]string) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetReachableMembers indicates an expected call of GetReachableMembers -func (mr *MockMonitorMockRecorder) GetReachableMembers() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetReachableMembers", reflect.TypeOf((*MockMonitor)(nil).GetReachableMembers)) -} - -// GetMemberCount mocks base method -func (m *MockMonitor) GetMemberCount(role string) (int, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetMemberCount", role) - ret0, _ := ret[0].(int) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetMemberCount indicates an expected call of GetMemberCount -func (mr *MockMonitorMockRecorder) GetMemberCount(role interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMemberCount", reflect.TypeOf((*MockMonitor)(nil).GetMemberCount), role) -} - -// MockServiceResolver is a mock of ServiceResolver interface -type MockServiceResolver struct { - ctrl *gomock.Controller - recorder *MockServiceResolverMockRecorder -} - -// MockServiceResolverMockRecorder is the mock recorder for MockServiceResolver -type MockServiceResolverMockRecorder struct { - mock *MockServiceResolver -} - -// NewMockServiceResolver creates a new mock instance -func NewMockServiceResolver(ctrl *gomock.Controller) *MockServiceResolver { - mock := &MockServiceResolver{ctrl: ctrl} - mock.recorder = &MockServiceResolverMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use -func (m *MockServiceResolver) EXPECT() *MockServiceResolverMockRecorder { - return m.recorder -} - -// Lookup mocks base method -func (m *MockServiceResolver) Lookup(key string) (*HostInfo, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Lookup", key) - ret0, _ := ret[0].(*HostInfo) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// Lookup indicates an expected call of Lookup -func (mr *MockServiceResolverMockRecorder) Lookup(key interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Lookup", reflect.TypeOf((*MockServiceResolver)(nil).Lookup), key) -} - -// AddListener mocks base method -func (m *MockServiceResolver) AddListener(name string, notifyChannel chan<- *ChangedEvent) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddListener", name, notifyChannel) - ret0, _ := ret[0].(error) - return ret0 -} - -// AddListener indicates an expected call of AddListener -func (mr *MockServiceResolverMockRecorder) AddListener(name, notifyChannel interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddListener", reflect.TypeOf((*MockServiceResolver)(nil).AddListener), name, notifyChannel) -} - -// RemoveListener mocks base method -func (m *MockServiceResolver) RemoveListener(name string) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "RemoveListener", name) - ret0, _ := ret[0].(error) - return ret0 -} - -// RemoveListener indicates an expected call of RemoveListener -func (mr *MockServiceResolverMockRecorder) RemoveListener(name interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveListener", reflect.TypeOf((*MockServiceResolver)(nil).RemoveListener), name) -} - -// MemberCount mocks base method -func (m *MockServiceResolver) MemberCount() int { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "MemberCount") - ret0, _ := ret[0].(int) - return ret0 -} - -// MemberCount indicates an expected call of MemberCount -func (mr *MockServiceResolverMockRecorder) MemberCount() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MemberCount", reflect.TypeOf((*MockServiceResolver)(nil).MemberCount)) -} - -// Members mocks base method -func (m *MockServiceResolver) Members() []*HostInfo { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Members") - ret0, _ := ret[0].([]*HostInfo) - return ret0 -} - -// Members indicates an expected call of Members -func (mr *MockServiceResolverMockRecorder) Members() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Members", reflect.TypeOf((*MockServiceResolver)(nil).Members)) -} diff --git a/common/membership/resolver.go b/common/membership/resolver.go index 8bb79786922..bcc8e3ca6bd 100644 --- a/common/membership/resolver.go +++ b/common/membership/resolver.go @@ -18,315 +18,223 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination resolver_mock.go -self_package github.com/uber/cadence/common/membership + package membership import ( "fmt" - "sync" "sync/atomic" - "time" - "github.com/dgryski/go-farm" - "github.com/uber/ringpop-go/events" - "github.com/uber/ringpop-go/hashring" + "github.com/uber/cadence/common/service" + "github.com/uber/ringpop-go" "github.com/uber/ringpop-go/swim" + tcg "github.com/uber/tchannel-go" + "go.uber.org/yarpc/transport/tchannel" "github.com/uber/cadence/common" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" - "github.com/uber/cadence/common/types" ) -// ErrInsufficientHosts is thrown when there are not enough hosts to serve the request -var ErrInsufficientHosts = &types.InternalServiceError{Message: "Not enough hosts to serve the request"} - -const ( - // RoleKey label is set by every single service as soon as it bootstraps its - // ringpop instance. The data for this key is the service name - RoleKey = "serviceName" - minRefreshInternal = time.Second * 4 - defaultRefreshInterval = time.Second * 10 - replicaPoints = 100 +type ( + + // ChangedEvent describes a change in membership + ChangedEvent struct { + HostsAdded []*HostInfo + HostsUpdated []*HostInfo + HostsRemoved []*HostInfo + } + + // Resolver provides membership information for all cadence services. + Resolver interface { + common.Daemon + // WhoAmI returns self address + WhoAmI() (*HostInfo, error) + // EvictSelf evicts this member from the membership ring. After this method is + // called, other members will discover that this node is no longer part of the + // ring. This primitive is useful to carry out graceful host shutdown during deployments. + EvictSelf() error + + Lookup(service, key string) (*HostInfo, error) + // AddListener adds a listener which will get notified on the given + // channel, whenever membership changes. + Subscribe(service, name string, notifyChannel chan<- *ChangedEvent) error + // RemoveListener removes a listener for this service. + Unsubscribe(service, name string) error + // MemberCount returns host count in a hashring + MemberCount(service string) (int, error) + // Members returns all host addresses in a hashring + Members(service string) ([]*HostInfo, error) + } ) -type ringpopServiceResolver struct { - status int32 - service string - rp *RingpopWrapper - refreshChan chan struct{} - shutdownCh chan struct{} - shutdownWG sync.WaitGroup - logger log.Logger - - ringValue atomic.Value // this stores the current hashring - - refreshLock sync.Mutex - lastRefreshTime time.Time - membersMap map[string]struct{} // for de-duping change notifications +type RingpopResolver struct { + status int32 - listenerLock sync.RWMutex - listeners map[string]chan<- *ChangedEvent + serviceName string + ringpopWrapper *RingpopWrapper + rings map[string]*ringpopHashring + logger log.Logger } -var _ ServiceResolver = (*ringpopServiceResolver)(nil) +var _ Resolver = (*RingpopResolver)(nil) -func newRingpopServiceResolver( - service string, - rp *RingpopWrapper, +// NewResolver builds a ringpop monitor conforming +// to the underlying configuration +func NewResolver( + config *RingpopConfig, + channel tchannel.Channel, + serviceName string, logger log.Logger, -) *ringpopServiceResolver { - - resolver := &ringpopServiceResolver{ - status: common.DaemonStatusInitialized, - service: service, - rp: rp, - refreshChan: make(chan struct{}), - shutdownCh: make(chan struct{}), - logger: logger.WithTags(tag.ComponentServiceResolver), - membersMap: make(map[string]struct{}), - listeners: make(map[string]chan<- *ChangedEvent), +) (*RingpopResolver, error) { + + if err := config.validate(); err != nil { + return nil, err } - resolver.ringValue.Store(newHashRing()) - return resolver + + rp, err := ringpop.New(config.Name, ringpop.Channel(channel.(*tcg.Channel))) + if err != nil { + return nil, fmt.Errorf("ringpop creation failed: %v", err) + } + + discoveryProvider, err := newDiscoveryProvider(config, logger) + if err != nil { + return nil, fmt.Errorf("failed to get discovery provider %v", err) + } + + bootstrapOpts := &swim.BootstrapOptions{ + MaxJoinDuration: config.MaxJoinDuration, + DiscoverProvider: discoveryProvider, + } + rpw := NewRingpopWraper(rp, bootstrapOpts, logger) + + return NewRingpopResolver(serviceName, service.List, rpw, logger), nil + } -func newHashRing() *hashring.HashRing { - return hashring.New(farm.Fingerprint32, replicaPoints) +// NewRingpopResolver returns a ringpop-based membership monitor +func NewRingpopResolver( + serviceName string, + services []string, + rp *RingpopWrapper, + logger log.Logger, +) *RingpopResolver { + + rpo := &RingpopResolver{ + status: common.DaemonStatusInitialized, + serviceName: serviceName, + ringpopWrapper: rp, + logger: logger, + rings: make(map[string]*ringpopHashring), + } + for _, s := range services { + rpo.rings[s] = newRingpopHashring(s, rp, logger) + } + return rpo } -// Start starts the oracle -func (r *ringpopServiceResolver) Start() { +func (rpo *RingpopResolver) Start() { if !atomic.CompareAndSwapInt32( - &r.status, + &rpo.status, common.DaemonStatusInitialized, common.DaemonStatusStarted, ) { return } - r.rp.AddListener(r) - if err := r.refresh(); err != nil { - r.logger.Fatal("unable to start ring pop service resolver", tag.Error(err)) + rpo.ringpopWrapper.Start() + + labels, err := rpo.ringpopWrapper.Labels() + if err != nil { + rpo.logger.Fatal("unable to get ring pop labels", tag.Error(err)) + } + + if err = labels.Set(RoleKey, rpo.serviceName); err != nil { + rpo.logger.Fatal("unable to set ring pop labels", tag.Error(err)) } - r.shutdownWG.Add(1) - go r.refreshRingWorker() + for _, ring := range rpo.rings { + ring.Start() + } } -// Stop stops the resolver -func (r *ringpopServiceResolver) Stop() { +func (rpo *RingpopResolver) Stop() { if !atomic.CompareAndSwapInt32( - &r.status, + &rpo.status, common.DaemonStatusStarted, common.DaemonStatusStopped, ) { return } - r.listenerLock.Lock() - defer r.listenerLock.Unlock() - r.rp.RemoveListener(r) - r.ringValue.Store(newHashRing()) - r.listeners = make(map[string]chan<- *ChangedEvent) - close(r.shutdownCh) - - if success := common.AwaitWaitGroup(&r.shutdownWG, time.Minute); !success { - r.logger.Warn("service resolver timed out on shutdown.") + for _, ring := range rpo.rings { + ring.Stop() } -} -// Lookup finds the host in the ring responsible for serving the given key -func (r *ringpopServiceResolver) Lookup( - key string, -) (*HostInfo, error) { - - addr, found := r.ring().Lookup(key) - if !found { - select { - case r.refreshChan <- struct{}{}: - default: - } - return nil, ErrInsufficientHosts - } - return NewHostInfo(addr, r.getLabelsMap()), nil + rpo.ringpopWrapper.Stop() } -func (r *ringpopServiceResolver) AddListener( - name string, - notifyChannel chan<- *ChangedEvent, -) error { - - r.listenerLock.Lock() - defer r.listenerLock.Unlock() - _, ok := r.listeners[name] - if ok { - return fmt.Errorf("listener already exist for service %q", name) +func (rpo *RingpopResolver) WhoAmI() (*HostInfo, error) { + address, err := rpo.ringpopWrapper.WhoAmI() + if err != nil { + return nil, err } - r.listeners[name] = notifyChannel - return nil -} - -func (r *ringpopServiceResolver) RemoveListener( - name string, -) error { - - r.listenerLock.Lock() - defer r.listenerLock.Unlock() - _, ok := r.listeners[name] - if !ok { - return nil + labels, err := rpo.ringpopWrapper.Labels() + if err != nil { + return nil, err } - delete(r.listeners, name) - return nil + return NewHostInfo(address, labels.AsMap()), nil } -func (r *ringpopServiceResolver) MemberCount() int { - return r.ring().ServerCount() -} - -func (r *ringpopServiceResolver) Members() []*HostInfo { - var servers []*HostInfo - for _, s := range r.ring().Servers() { - servers = append(servers, NewHostInfo(s, r.getLabelsMap())) - } - - return servers +func (rpo *RingpopResolver) EvictSelf() error { + return rpo.ringpopWrapper.SelfEvict() } -// HandleEvent handles updates from ringpop -func (r *ringpopServiceResolver) HandleEvent( - event events.Event, -) { - - // We only care about RingChangedEvent - e, ok := event.(events.RingChangedEvent) - if ok { - r.logger.Info("Received a ring changed event") - // Note that we receive events asynchronously, possibly out of order. - // We cannot rely on the content of the event, rather we load everything - // from ringpop when we get a notification that something changed. - if err := r.refresh(); err != nil { - r.logger.Error("error refreshing ring when receiving a ring changed event", tag.Error(err)) - } - r.emitEvent(e) +func (rpo *RingpopResolver) getRing(service string) (*ringpopHashring, error) { + ring, found := rpo.rings[service] + if !found { + return nil, fmt.Errorf("service %q is not tracked by Resolver", service) } + return ring, nil } -func (r *ringpopServiceResolver) refresh() error { - r.refreshLock.Lock() - defer r.refreshLock.Unlock() - return r.refreshNoLock() -} - -func (r *ringpopServiceResolver) refreshWithBackoff() error { - r.refreshLock.Lock() - defer r.refreshLock.Unlock() - if r.lastRefreshTime.After(time.Now().Add(-minRefreshInternal)) { - // refresh too frequently - return nil +func (rpo *RingpopResolver) Lookup(service string, key string) (*HostInfo, error) { + ring, err := rpo.getRing(service) + if err != nil { + return nil, err } - return r.refreshNoLock() + return ring.Lookup(key) } -func (r *ringpopServiceResolver) refreshNoLock() error { - addrs, err := r.rp.GetReachableMembers(swim.MemberWithLabelAndValue(RoleKey, r.service)) +func (rpo *RingpopResolver) Subscribe(service string, name string, notifyChannel chan<- *ChangedEvent) error { + ring, err := rpo.getRing(service) if err != nil { return err } - - newMembersMap, changed := r.compareMembers(addrs) - if !changed { - return nil - } - - ring := newHashRing() - for _, addr := range addrs { - host := NewHostInfo(addr, r.getLabelsMap()) - ring.AddMembers(host) - } - - r.membersMap = newMembersMap - r.lastRefreshTime = time.Now() - r.ringValue.Store(ring) - r.logger.Info("Current reachable members", tag.Addresses(addrs)) - return nil + return ring.AddListener(name, notifyChannel) } -func (r *ringpopServiceResolver) emitEvent( - rpEvent events.RingChangedEvent, -) { - - // Marshall the event object into the required type - event := &ChangedEvent{} - for _, addr := range rpEvent.ServersAdded { - event.HostsAdded = append(event.HostsAdded, NewHostInfo(addr, r.getLabelsMap())) - } - for _, addr := range rpEvent.ServersRemoved { - event.HostsRemoved = append(event.HostsRemoved, NewHostInfo(addr, r.getLabelsMap())) - } - for _, addr := range rpEvent.ServersUpdated { - event.HostsUpdated = append(event.HostsUpdated, NewHostInfo(addr, r.getLabelsMap())) - } - - // Notify listeners - r.listenerLock.RLock() - defer r.listenerLock.RUnlock() - - for name, ch := range r.listeners { - select { - case ch <- event: - default: - r.logger.Error("Failed to send listener notification, channel full", tag.ListenerName(name)) - } +func (rpo *RingpopResolver) Unsubscribe(service string, name string) error { + ring, err := rpo.getRing(service) + if err != nil { + return err } + return ring.RemoveListener(name) } -func (r *ringpopServiceResolver) refreshRingWorker() { - defer r.shutdownWG.Done() - - refreshTicker := time.NewTicker(defaultRefreshInterval) - defer refreshTicker.Stop() - - for { - select { - case <-r.shutdownCh: - return - case <-r.refreshChan: - if err := r.refreshWithBackoff(); err != nil { - r.logger.Error("error periodically refreshing ring", tag.Error(err)) - } - case <-refreshTicker.C: - if err := r.refreshWithBackoff(); err != nil { - r.logger.Error("error periodically refreshing ring", tag.Error(err)) - } - } +func (rpo *RingpopResolver) Members(service string) ([]*HostInfo, error) { + ring, err := rpo.getRing(service) + if err != nil { + return nil, err } + return ring.Members(), nil } -func (r *ringpopServiceResolver) ring() *hashring.HashRing { - return r.ringValue.Load().(*hashring.HashRing) -} - -func (r *ringpopServiceResolver) getLabelsMap() map[string]string { - labels := make(map[string]string) - labels[RoleKey] = r.service - return labels -} - -func (r *ringpopServiceResolver) compareMembers(addrs []string) (map[string]struct{}, bool) { - changed := false - newMembersMap := make(map[string]struct{}, len(addrs)) - for _, addr := range addrs { - newMembersMap[addr] = struct{}{} - if _, ok := r.membersMap[addr]; !ok { - changed = true - } - } - for addr := range r.membersMap { - if _, ok := newMembersMap[addr]; !ok { - changed = true - break - } +func (rpo *RingpopResolver) MemberCount(service string) (int, error) { + ring, err := rpo.getRing(service) + if err != nil { + return 0, err } - return newMembersMap, changed + return ring.MemberCount(), nil } diff --git a/common/membership/resolver_mock.go b/common/membership/resolver_mock.go new file mode 100644 index 00000000000..d44c7e58c93 --- /dev/null +++ b/common/membership/resolver_mock.go @@ -0,0 +1,182 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +// Code generated by MockGen. DO NOT EDIT. +// Source: resolver.go + +// Package membership is a generated GoMock package. +package membership + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockResolver is a mock of Resolver interface. +type MockResolver struct { + ctrl *gomock.Controller + recorder *MockResolverMockRecorder +} + +// MockResolverMockRecorder is the mock recorder for MockResolver. +type MockResolverMockRecorder struct { + mock *MockResolver +} + +// NewMockResolver creates a new mock instance. +func NewMockResolver(ctrl *gomock.Controller) *MockResolver { + mock := &MockResolver{ctrl: ctrl} + mock.recorder = &MockResolverMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockResolver) EXPECT() *MockResolverMockRecorder { + return m.recorder +} + +// EvictSelf mocks base method. +func (m *MockResolver) EvictSelf() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "EvictSelf") + ret0, _ := ret[0].(error) + return ret0 +} + +// EvictSelf indicates an expected call of EvictSelf. +func (mr *MockResolverMockRecorder) EvictSelf() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EvictSelf", reflect.TypeOf((*MockResolver)(nil).EvictSelf)) +} + +// Lookup mocks base method. +func (m *MockResolver) Lookup(service, key string) (*HostInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Lookup", service, key) + ret0, _ := ret[0].(*HostInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Lookup indicates an expected call of Lookup. +func (mr *MockResolverMockRecorder) Lookup(service, key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Lookup", reflect.TypeOf((*MockResolver)(nil).Lookup), service, key) +} + +// MemberCount mocks base method. +func (m *MockResolver) MemberCount(service string) (int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MemberCount", service) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// MemberCount indicates an expected call of MemberCount. +func (mr *MockResolverMockRecorder) MemberCount(service interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MemberCount", reflect.TypeOf((*MockResolver)(nil).MemberCount), service) +} + +// Members mocks base method. +func (m *MockResolver) Members(service string) ([]*HostInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Members", service) + ret0, _ := ret[0].([]*HostInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Members indicates an expected call of Members. +func (mr *MockResolverMockRecorder) Members(service interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Members", reflect.TypeOf((*MockResolver)(nil).Members), service) +} + +// Start mocks base method. +func (m *MockResolver) Start() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Start") +} + +// Start indicates an expected call of Start. +func (mr *MockResolverMockRecorder) Start() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockResolver)(nil).Start)) +} + +// Stop mocks base method. +func (m *MockResolver) Stop() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Stop") +} + +// Stop indicates an expected call of Stop. +func (mr *MockResolverMockRecorder) Stop() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockResolver)(nil).Stop)) +} + +// Subscribe mocks base method. +func (m *MockResolver) Subscribe(service, name string, notifyChannel chan<- *ChangedEvent) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Subscribe", service, name, notifyChannel) + ret0, _ := ret[0].(error) + return ret0 +} + +// Subscribe indicates an expected call of Subscribe. +func (mr *MockResolverMockRecorder) Subscribe(service, name, notifyChannel interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*MockResolver)(nil).Subscribe), service, name, notifyChannel) +} + +// Unsubscribe mocks base method. +func (m *MockResolver) Unsubscribe(service, name string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Unsubscribe", service, name) + ret0, _ := ret[0].(error) + return ret0 +} + +// Unsubscribe indicates an expected call of Unsubscribe. +func (mr *MockResolverMockRecorder) Unsubscribe(service, name interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Unsubscribe", reflect.TypeOf((*MockResolver)(nil).Unsubscribe), service, name) +} + +// WhoAmI mocks base method. +func (m *MockResolver) WhoAmI() (*HostInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WhoAmI") + ret0, _ := ret[0].(*HostInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// WhoAmI indicates an expected call of WhoAmI. +func (mr *MockResolverMockRecorder) WhoAmI() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WhoAmI", reflect.TypeOf((*MockResolver)(nil).WhoAmI)) +} diff --git a/common/membership/rpMonitor_test.go b/common/membership/resolver_test.go similarity index 89% rename from common/membership/rpMonitor_test.go rename to common/membership/resolver_test.go index 8bceb1055c8..7d4b2075c2c 100644 --- a/common/membership/rpMonitor_test.go +++ b/common/membership/resolver_test.go @@ -54,15 +54,11 @@ func (s *RpoSuite) TestRingpopMonitor() { time.Sleep(time.Second) listenCh := make(chan *ChangedEvent, 5) - resolver, err := rpm.GetResolver("rpm-test") - s.Error(err, "GetResolver should not exist") - s.Nil(resolver, "should return nil on non-existing resolver") - resolver, err = rpm.GetResolver("rpm-service-name") - err = resolver.AddListener("test-listener", listenCh) + err := rpm.Subscribe("rpm-service-name", "test-listener", listenCh) s.Nil(err, "AddListerener failed") - host, err := resolver.Lookup("key") + host, err := rpm.Lookup("rpm-service-name", "key") s.Nil(err, "Ringpop monitor failed to find host for key") s.NotNil(host, "Ringpop monitor returned a nil host") @@ -79,12 +75,12 @@ func (s *RpoSuite) TestRingpopMonitor() { s.Fail("Timed out waiting for failure to be detected by ringpop") } - host, err = resolver.Lookup("key") + host, err = rpm.Lookup("rpm-service-name", "key") s.Nil(err, "Ringpop monitor failed to find host for key") s.NotEqual(testService.hostAddrs[1], host.GetAddress(), "Ringpop monitor assigned key to dead host") - err = resolver.RemoveListener("test-listener") - s.Nil(err, "RemoveListener() failed") + err = rpm.Unsubscribe("rpm-service-name", "test-listener") + s.Nil(err, "Unsubscribe() failed") rpm.Stop() testService.Stop() @@ -102,7 +98,7 @@ func (s *RpoSuite) TestCompareMembers() { } func (s *RpoSuite) testCompareMembers(curr []string, new []string, hasDiff bool) { - resolver := &ringpopServiceResolver{} + resolver := &ringpopHashring{} currMembers := make(map[string]struct{}, len(curr)) for _, m := range curr { currMembers[m] = struct{}{} diff --git a/common/membership/rpMonitor.go b/common/membership/rpMonitor.go deleted file mode 100644 index 4e772d80898..00000000000 --- a/common/membership/rpMonitor.go +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright (c) 2017 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package membership - -import ( - "fmt" - - "github.com/uber/ringpop-go" - "github.com/uber/ringpop-go/swim" - tcg "github.com/uber/tchannel-go" - "go.uber.org/yarpc/transport/tchannel" - - "github.com/uber/cadence/common/log" - "github.com/uber/cadence/common/service" -) - -type RingpopMonitor struct { - status int32 - - serviceName string - ringpopWrapper *RingpopWrapper - rings map[string]*ringpopServiceResolver - logger log.Logger -} - -var _ Monitor = (*RingpopMonitor)(nil) - -// NewMonitor builds a ringpop monitor conforming -// to the underlying configuration -func NewMonitor( - config *RingpopConfig, - channel tchannel.Channel, - serviceName string, - logger log.Logger, -) (*RingpopMonitor, error) { - - if err := config.validate(); err != nil { - return nil, err - } - - rp, err := ringpop.New(config.Name, ringpop.Channel(channel.(*tcg.Channel))) - if err != nil { - return nil, fmt.Errorf("ringpop creation failed: %v", err) - } - - discoveryProvider, err := newDiscoveryProvider(config, logger) - if err != nil { - return nil, fmt.Errorf("failed to get discovery provider %v", err) - } - - bootstrapOpts := &swim.BootstrapOptions{ - MaxJoinDuration: config.MaxJoinDuration, - DiscoverProvider: discoveryProvider, - } - rpw := NewRingpopWraper(rp, bootstrapOpts, logger) - - return NewRingpopMonitor(serviceName, service.List, rpw, logger), nil - -} diff --git a/common/membership/rp_cluster_test.go b/common/membership/rp_cluster_test.go index 6b6d1fb1f55..27050d95bbb 100644 --- a/common/membership/rp_cluster_test.go +++ b/common/membership/rp_cluster_test.go @@ -40,7 +40,7 @@ type TestRingpopCluster struct { hostUUIDs []string hostAddrs []string hostInfoList []HostInfo - rings []Monitor + rings []Resolver channels []*tchannel.Channel seedNode string } @@ -55,7 +55,7 @@ func NewTestRingpopCluster(ringPopApp string, size int, ipAddr string, seed stri hostUUIDs: make([]string, size), hostAddrs: make([]string, size), hostInfoList: make([]HostInfo, size), - rings: make([]Monitor, size), + rings: make([]Resolver, size), channels: make([]*tchannel.Channel, size), seedNode: seed, } @@ -94,7 +94,7 @@ func NewTestRingpopCluster(ringPopApp string, size int, ipAddr string, seed stri logger.Error("failed to create ringpop instance", tag.Error(err)) return nil } - cluster.rings[i] = NewRingpopMonitor( + cluster.rings[i] = NewRingpopResolver( serviceName, []string{serviceName}, NewRingpopWraper(ringPop, bOptions, logger), diff --git a/common/mocks/ServiceResolver.go b/common/mocks/ServiceResolver.go deleted file mode 100644 index 7a37d441bc1..00000000000 --- a/common/mocks/ServiceResolver.go +++ /dev/null @@ -1,113 +0,0 @@ -// Copyright (c) 2017 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package mocks - -import ( - mock "github.com/stretchr/testify/mock" - - membership "github.com/uber/cadence/common/membership" -) - -// ServiceResolver is an mock implementation -type ServiceResolver struct { - mock.Mock -} - -// Lookup is am mock implementation -func (_m *ServiceResolver) Lookup(key string) (*membership.HostInfo, error) { - ret := _m.Called(key) - - var r0 *membership.HostInfo - if rf, ok := ret.Get(0).(func(string) *membership.HostInfo); ok { - r0 = rf(key) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*membership.HostInfo) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(string) error); ok { - r1 = rf(key) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// AddListener is am mock implementation -func (_m *ServiceResolver) AddListener(name string, notifyChannel chan<- *membership.ChangedEvent) error { - ret := _m.Called(name, notifyChannel) - - var r0 error - if rf, ok := ret.Get(0).(func(string, chan<- *membership.ChangedEvent) error); ok { - r0 = rf(name, notifyChannel) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// RemoveListener is am mock implementation -func (_m *ServiceResolver) RemoveListener(name string) error { - ret := _m.Called(name) - - var r0 error - if rf, ok := ret.Get(0).(func(string) error); ok { - r0 = rf(name) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// MemberCount is am mock implementation -func (_m *ServiceResolver) MemberCount() int { - ret := _m.Called() - - var r0 int - if rf, ok := ret.Get(0).(func() int); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(int) - } - - return r0 -} - -// Members is am mock implementation -func (_m *ServiceResolver) Members() []*membership.HostInfo { - ret := _m.Called() - - var r0 []*membership.HostInfo - if rf, ok := ret.Get(0).(func() []*membership.HostInfo); ok { - r0 = rf() - } else { - r0 = ret.Get(0).([]*membership.HostInfo) - } - - return r0 -} - -var _ membership.ServiceResolver = (*ServiceResolver)(nil) diff --git a/common/resource/params.go b/common/resource/params.go index 59d732d8290..e2e2d10af97 100644 --- a/common/resource/params.go +++ b/common/resource/params.go @@ -49,7 +49,7 @@ type ( ThrottledLogger log.Logger MetricScope tally.Scope - MembershipMonitor membership.Monitor + MembershipResolver membership.Resolver RPCFactory common.RPCFactory PProfInitializer common.PProfInitializer PersistenceConfig config.Persistence diff --git a/common/resource/resource.go b/common/resource/resource.go index a30e8e0e926..ab7b529251d 100644 --- a/common/resource/resource.go +++ b/common/resource/resource.go @@ -72,12 +72,7 @@ type ( GetDomainReplicationQueue() domain.ReplicationQueue // membership infos - - GetMembershipMonitor() membership.Monitor - GetFrontendServiceResolver() membership.ServiceResolver - GetMatchingServiceResolver() membership.ServiceResolver - GetHistoryServiceResolver() membership.ServiceResolver - GetWorkerServiceResolver() membership.ServiceResolver + GetMembershipResolver() membership.Resolver // internal services clients diff --git a/common/resource/resourceImpl.go b/common/resource/resourceImpl.go index cf1ff63e552..36fade252d7 100644 --- a/common/resource/resourceImpl.go +++ b/common/resource/resourceImpl.go @@ -92,11 +92,7 @@ type ( // membership infos - membershipMonitor membership.Monitor - frontendServiceResolver membership.ServiceResolver - matchingServiceResolver membership.ServiceResolver - historyServiceResolver membership.ServiceResolver - workerServiceResolver membership.ServiceResolver + membershipResolver membership.Resolver // internal services clients @@ -147,7 +143,7 @@ func New( dispatcher := params.RPCFactory.GetDispatcher() - membershipMonitor := params.MembershipMonitor + membershipResolver := params.MembershipResolver dynamicCollection := dynamicconfig.NewCollection( params.DynamicConfig, @@ -157,7 +153,7 @@ func New( clientBean, err := client.NewClientBean( client.NewRPCClientFactory( params.RPCFactory, - membershipMonitor, + membershipResolver, params.MetricsClient, dynamicCollection, numShards, @@ -170,33 +166,13 @@ func New( return nil, err } - frontendServiceResolver, err := membershipMonitor.GetResolver(service.Frontend) - if err != nil { - return nil, err - } - - matchingServiceResolver, err := membershipMonitor.GetResolver(service.Matching) - if err != nil { - return nil, err - } - - historyServiceResolver, err := membershipMonitor.GetResolver(service.History) - if err != nil { - return nil, err - } - - workerServiceResolver, err := membershipMonitor.GetResolver(service.Worker) - if err != nil { - return nil, err - } - persistenceBean, err := persistenceClient.NewBeanFromFactory(persistenceClient.NewFactory( ¶ms.PersistenceConfig, func(...dynamicconfig.FilterOption) int { if serviceConfig.PersistenceGlobalMaxQPS() > 0 { - resolver, err := membershipMonitor.GetResolver(serviceName) - if err == nil && resolver.MemberCount() > 0 { - avgQuota := common.MaxInt(serviceConfig.PersistenceGlobalMaxQPS()/resolver.MemberCount(), 1) + members, err := membershipResolver.MemberCount(serviceName) + if err == nil && members > 0 { + avgQuota := common.MaxInt(serviceConfig.PersistenceGlobalMaxQPS()/members, 1) return common.MinInt(avgQuota, serviceConfig.PersistenceMaxQPS()) } } @@ -301,12 +277,7 @@ func New( domainReplicationQueue: domainReplicationQueue, // membership infos - - membershipMonitor: membershipMonitor, - frontendServiceResolver: frontendServiceResolver, - matchingServiceResolver: matchingServiceResolver, - historyServiceResolver: historyServiceResolver, - workerServiceResolver: workerServiceResolver, + membershipResolver: membershipResolver, // internal services clients @@ -363,11 +334,11 @@ func (h *Impl) Start() { if err := h.dispatcher.Start(); err != nil { h.logger.WithTags(tag.Error(err)).Fatal("fail to start dispatcher") } - h.membershipMonitor.Start() + h.membershipResolver.Start() h.domainCache.Start() h.domainMetricsScopeCache.Start() - hostInfo, err := h.membershipMonitor.WhoAmI() + hostInfo, err := h.membershipResolver.WhoAmI() if err != nil { h.logger.WithTags(tag.Error(err)).Fatal("fail to get host info from membership monitor") } @@ -392,7 +363,7 @@ func (h *Impl) Stop() { h.domainCache.Stop() h.domainMetricsScopeCache.Stop() - h.membershipMonitor.Stop() + h.membershipResolver.Stop() if err := h.dispatcher.Stop(); err != nil { h.logger.WithTags(tag.Error(err)).Error("failed to stop dispatcher") } @@ -472,31 +443,9 @@ func (h *Impl) GetDomainReplicationQueue() domain.ReplicationQueue { return h.domainReplicationQueue } -// membership infos - -// GetMembershipMonitor return the membership monitor -func (h *Impl) GetMembershipMonitor() membership.Monitor { - return h.membershipMonitor -} - -// GetFrontendServiceResolver return frontend service resolver -func (h *Impl) GetFrontendServiceResolver() membership.ServiceResolver { - return h.frontendServiceResolver -} - -// GetMatchingServiceResolver return matching service resolver -func (h *Impl) GetMatchingServiceResolver() membership.ServiceResolver { - return h.matchingServiceResolver -} - -// GetHistoryServiceResolver return history service resolver -func (h *Impl) GetHistoryServiceResolver() membership.ServiceResolver { - return h.historyServiceResolver -} - -// GetWorkerServiceResolver return worker service resolver -func (h *Impl) GetWorkerServiceResolver() membership.ServiceResolver { - return h.workerServiceResolver +// GetMembershipResolver return the membership resolver +func (h *Impl) GetMembershipResolver() membership.Resolver { + return h.membershipResolver } // internal services clients diff --git a/common/resource/resourceTest.go b/common/resource/resourceTest.go index 013f2d7495f..1b2cbbb2ecb 100644 --- a/common/resource/resourceTest.go +++ b/common/resource/resourceTest.go @@ -27,6 +27,9 @@ import ( "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" publicservicetest "go.uber.org/cadence/.gen/go/cadence/workflowservicetest" + "go.uber.org/yarpc" + "go.uber.org/zap" + "github.com/uber/cadence/client" "github.com/uber/cadence/client/admin" "github.com/uber/cadence/client/frontend" @@ -47,10 +50,6 @@ import ( "github.com/uber/cadence/common/mocks" "github.com/uber/cadence/common/persistence" persistenceClient "github.com/uber/cadence/common/persistence/client" - "github.com/uber/cadence/common/service" - - "go.uber.org/yarpc" - "go.uber.org/zap" ) type ( @@ -72,12 +71,7 @@ type ( BlobstoreClient *blobstore.MockClient // membership infos - - MembershipMonitor *membership.MockMonitor - FrontendServiceResolver *membership.MockServiceResolver - MatchingServiceResolver *membership.MockServiceResolver - HistoryServiceResolver *membership.MockServiceResolver - WorkerServiceResolver *membership.MockServiceResolver + MembershipResolver *membership.MockResolver // internal services clients @@ -154,16 +148,6 @@ func NewTest( persistenceBean.EXPECT().GetShardManager().Return(shardMgr).AnyTimes() persistenceBean.EXPECT().GetExecutionManager(gomock.Any()).Return(executionMgr, nil).AnyTimes() - membershipMonitor := membership.NewMockMonitor(controller) - frontendServiceResolver := membership.NewMockServiceResolver(controller) - matchingServiceResolver := membership.NewMockServiceResolver(controller) - historyServiceResolver := membership.NewMockServiceResolver(controller) - workerServiceResolver := membership.NewMockServiceResolver(controller) - membershipMonitor.EXPECT().GetResolver(service.Frontend).Return(frontendServiceResolver, nil).AnyTimes() - membershipMonitor.EXPECT().GetResolver(service.Matching).Return(matchingServiceResolver, nil).AnyTimes() - membershipMonitor.EXPECT().GetResolver(service.History).Return(historyServiceResolver, nil).AnyTimes() - membershipMonitor.EXPECT().GetResolver(service.Worker).Return(workerServiceResolver, nil).AnyTimes() - scope := tally.NewTestScope("test", nil) return &Test{ @@ -183,12 +167,7 @@ func NewTest( BlobstoreClient: &blobstore.MockClient{}, // membership infos - - MembershipMonitor: membershipMonitor, - FrontendServiceResolver: frontendServiceResolver, - MatchingServiceResolver: matchingServiceResolver, - HistoryServiceResolver: historyServiceResolver, - WorkerServiceResolver: workerServiceResolver, + MembershipResolver: membership.NewMockResolver(controller), // internal services clients @@ -301,31 +280,9 @@ func (s *Test) GetArchiverProvider() provider.ArchiverProvider { return s.ArchiverProvider } -// membership infos - -// GetMembershipMonitor for testing -func (s *Test) GetMembershipMonitor() membership.Monitor { - return s.MembershipMonitor -} - -// GetFrontendServiceResolver for testing -func (s *Test) GetFrontendServiceResolver() membership.ServiceResolver { - return s.FrontendServiceResolver -} - -// GetMatchingServiceResolver for testing -func (s *Test) GetMatchingServiceResolver() membership.ServiceResolver { - return s.MatchingServiceResolver -} - -// GetHistoryServiceResolver for testing -func (s *Test) GetHistoryServiceResolver() membership.ServiceResolver { - return s.HistoryServiceResolver -} - -// GetWorkerServiceResolver for testing -func (s *Test) GetWorkerServiceResolver() membership.ServiceResolver { - return s.WorkerServiceResolver +// GetMembershipResolver for testing +func (s *Test) GetMembershipResolver() membership.Resolver { + return s.MembershipResolver } // internal services clients diff --git a/environment/env.go b/environment/env.go index e9226e3782d..028286191d9 100644 --- a/environment/env.go +++ b/environment/env.go @@ -297,4 +297,4 @@ func GetMongoPort() int { panic(fmt.Sprintf("error getting env %v", MongoPort)) } return p -} \ No newline at end of file +} diff --git a/host/simpleServiceResolver.go b/host/membership_hashring.go similarity index 77% rename from host/simpleServiceResolver.go rename to host/membership_hashring.go index 79a0bdedf3b..a7bd636c289 100644 --- a/host/simpleServiceResolver.go +++ b/host/membership_hashring.go @@ -26,39 +26,39 @@ import ( "github.com/uber/cadence/common/membership" ) -type simpleResolver struct { +type simpleHashring struct { hosts []*membership.HostInfo hashfunc func([]byte) uint32 } -// newSimpleResolver returns a service resolver that maintains static mapping +// newSimpleHashring returns a service resolver that maintains static mapping // between services and host info -func newSimpleResolver(service string, hosts []string) membership.ServiceResolver { +func newSimpleHashring(service string, hosts []string) *simpleHashring { hostInfos := make([]*membership.HostInfo, 0, len(hosts)) for _, host := range hosts { hostInfos = append(hostInfos, membership.NewHostInfo(host, map[string]string{membership.RoleKey: service})) } - return &simpleResolver{hostInfos, farm.Fingerprint32} + return &simpleHashring{hostInfos, farm.Fingerprint32} } -func (s *simpleResolver) Lookup(key string) (*membership.HostInfo, error) { +func (s *simpleHashring) Lookup(key string) (*membership.HostInfo, error) { hash := int(s.hashfunc([]byte(key))) idx := hash % len(s.hosts) return s.hosts[idx], nil } -func (s *simpleResolver) AddListener(name string, notifyChannel chan<- *membership.ChangedEvent) error { +func (s *simpleHashring) AddListener(name string, notifyChannel chan<- *membership.ChangedEvent) error { return nil } -func (s *simpleResolver) RemoveListener(name string) error { +func (s *simpleHashring) RemoveListener(name string) error { return nil } -func (s *simpleResolver) MemberCount() int { +func (s *simpleHashring) MemberCount() int { return len(s.hosts) } -func (s *simpleResolver) Members() []*membership.HostInfo { +func (s *simpleHashring) Members() []*membership.HostInfo { return s.hosts } diff --git a/host/simpleMonitor.go b/host/membership_resolver.go similarity index 53% rename from host/simpleMonitor.go rename to host/membership_resolver.go index ce3581ae2d2..ba95984d8e4 100644 --- a/host/simpleMonitor.go +++ b/host/membership_resolver.go @@ -21,38 +21,60 @@ package host import ( + "fmt" + "github.com/uber/cadence/common/membership" ) -type simpleMonitor struct { +type simpleResolver struct { hostInfo *membership.HostInfo - resolvers map[string]membership.ServiceResolver + resolvers map[string]*simpleHashring } -// NewSimpleMonitor returns a simple monitor interface -func newSimpleMonitor(serviceName string, hosts map[string][]string) membership.Monitor { - resolvers := make(map[string]membership.ServiceResolver, len(hosts)) +// NewSimpleResolver returns a membership resolver interface +func NewSimpleResolver(serviceName string, hosts map[string][]string) membership.Resolver { + resolvers := make(map[string]*simpleHashring, len(hosts)) for service, hostList := range hosts { - resolvers[service] = newSimpleResolver(service, hostList) + resolvers[service] = newSimpleHashring(service, hostList) } hostInfo := membership.NewHostInfo(hosts[serviceName][0], map[string]string{membership.RoleKey: serviceName}) - return &simpleMonitor{hostInfo, resolvers} + return &simpleResolver{hostInfo, resolvers} } -func (s *simpleMonitor) Start() { +func (s *simpleResolver) Start() { } -func (s *simpleMonitor) Stop() { +func (s *simpleResolver) Stop() { } -func (s *simpleMonitor) EvictSelf() error { +func (s *simpleResolver) EvictSelf() error { return nil } -func (s *simpleMonitor) WhoAmI() (*membership.HostInfo, error) { +func (s *simpleResolver) WhoAmI() (*membership.HostInfo, error) { return s.hostInfo, nil } -func (s *simpleMonitor) GetResolver(service string) (membership.ServiceResolver, error) { - return s.resolvers[service], nil +func (s *simpleResolver) Subscribe(service string, name string, notifyChannel chan<- *membership.ChangedEvent) error { + return nil +} + +func (s *simpleResolver) Unsubscribe(service string, name string) error { + return nil +} + +func (s *simpleResolver) Lookup(service string, key string) (*membership.HostInfo, error) { + resolver, ok := s.resolvers[service] + if !ok { + return nil, fmt.Errorf("cannot lookup host for service %q", service) + } + return resolver.Lookup(key) +} + +func (s *simpleResolver) MemberCount(service string) (int, error) { + return 0, nil +} + +func (s *simpleResolver) Members(service string) ([]*membership.HostInfo, error) { + return nil, nil } diff --git a/host/onebox.go b/host/onebox.go index 22d4c7a8d58..6cdc78d2735 100644 --- a/host/onebox.go +++ b/host/onebox.go @@ -392,7 +392,7 @@ func (c *cadenceImpl) startFrontend(hosts map[string][]string, startWG *sync.Wai params.PProfInitializer = newPProfInitializerImpl(c.logger, c.FrontendPProfPort()) params.RPCFactory = c.newRPCFactory(service.Frontend, c.FrontendAddress()) params.MetricScope = tally.NewTestScope(service.Frontend, make(map[string]string)) - params.MembershipMonitor = newMembershipMonitor(params.Name, hosts) + params.MembershipResolver = newMembershipResolver(params.Name, hosts) params.ClusterMetadata = c.clusterMetadata params.MessagingClient = c.messagingClient params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, c.logger)) @@ -458,7 +458,7 @@ func (c *cadenceImpl) startHistory( params.PProfInitializer = newPProfInitializerImpl(c.logger, pprofPorts[i]) params.RPCFactory = c.newRPCFactory(service.History, hostport) params.MetricScope = tally.NewTestScope(service.History, make(map[string]string)) - params.MembershipMonitor = newMembershipMonitor(params.Name, hosts) + params.MembershipResolver = newMembershipResolver(params.Name, hosts) params.ClusterMetadata = c.clusterMetadata params.MessagingClient = c.messagingClient params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, c.logger)) @@ -524,7 +524,7 @@ func (c *cadenceImpl) startMatching(hosts map[string][]string, startWG *sync.Wai params.PProfInitializer = newPProfInitializerImpl(c.logger, c.MatchingPProfPort()) params.RPCFactory = c.newRPCFactory(service.Matching, c.MatchingServiceAddress()) params.MetricScope = tally.NewTestScope(service.Matching, make(map[string]string)) - params.MembershipMonitor = newMembershipMonitor(params.Name, hosts) + params.MembershipResolver = newMembershipResolver(params.Name, hosts) params.ClusterMetadata = c.clusterMetadata params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, c.logger)) params.DynamicConfig = newIntegrationConfigClient(dynamicconfig.NewNopClient()) @@ -566,7 +566,7 @@ func (c *cadenceImpl) startWorker(hosts map[string][]string, startWG *sync.WaitG params.PProfInitializer = newPProfInitializerImpl(c.logger, c.WorkerPProfPort()) params.RPCFactory = c.newRPCFactory(service.Worker, c.WorkerServiceAddress()) params.MetricScope = tally.NewTestScope(service.Worker, make(map[string]string)) - params.MembershipMonitor = newMembershipMonitor(params.Name, hosts) + params.MembershipResolver = newMembershipResolver(params.Name, hosts) params.ClusterMetadata = c.clusterMetadata params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, c.logger)) params.DynamicConfig = newIntegrationConfigClient(dynamicconfig.NewNopClient()) @@ -587,7 +587,7 @@ func (c *cadenceImpl) startWorker(hosts map[string][]string, startWG *sync.WaitG metadataManager := persistence.NewDomainPersistenceMetricsClient(c.domainManager, service.GetMetricsClient(), c.logger) replicatorDomainCache = cache.NewDomainCache(metadataManager, params.ClusterMetadata, service.GetMetricsClient(), service.GetLogger()) replicatorDomainCache.Start() - c.startWorkerReplicator(params, service, replicatorDomainCache) + c.startWorkerReplicator(service) } var clientWorkerDomainCache cache.DomainCache @@ -613,18 +613,14 @@ func (c *cadenceImpl) startWorker(hosts map[string][]string, startWG *sync.WaitG c.shutdownWG.Done() } -func (c *cadenceImpl) startWorkerReplicator(params *resource.Params, svc Service, domainCache cache.DomainCache) { - serviceResolver, err := svc.GetMembershipMonitor().GetResolver(service.Worker) - if err != nil { - c.logger.Fatal("Fail to start replicator when start worker", tag.Error(err)) - } +func (c *cadenceImpl) startWorkerReplicator(svc Service) { c.replicator = replicator.NewReplicator( c.clusterMetadata, svc.GetClientBean(), c.logger, svc.GetMetricsClient(), svc.GetHostInfo(), - serviceResolver, + svc.GetMembershipResolver(), c.domainReplicationQueue, c.domainReplicationTaskExecutor, time.Millisecond, @@ -752,8 +748,8 @@ func copyPersistenceConfig(pConfig config.Persistence) (config.Persistence, erro return pConfig, nil } -func newMembershipMonitor(serviceName string, hosts map[string][]string) membership.Monitor { - return newSimpleMonitor(serviceName, hosts) +func newMembershipResolver(serviceName string, hosts map[string][]string) membership.Resolver { + return NewSimpleResolver(serviceName, hosts) } func newPProfInitializerImpl(logger log.Logger, port int) common.PProfInitializer { diff --git a/host/service.go b/host/service.go index a3deeadcd3c..b23e72fbbdb 100644 --- a/host/service.go +++ b/host/service.go @@ -59,7 +59,7 @@ type ( GetClientBean() client.Bean GetTimeSource() clock.TimeSource GetDispatcher() *yarpc.Dispatcher - GetMembershipMonitor() membership.Monitor + GetMembershipResolver() membership.Resolver GetHostInfo() *membership.HostInfo GetClusterMetadata() cluster.Metadata GetMessagingClient() messaging.Client @@ -76,7 +76,7 @@ type ( hostName string hostInfo *membership.HostInfo dispatcher *yarpc.Dispatcher - membershipMonitor membership.Monitor + membershipResolver membership.Resolver rpcFactory common.RPCFactory pprofInitializer common.PProfInitializer clientBean client.Bean @@ -110,7 +110,7 @@ func NewService(params *resource.Params) Service { logger: params.Logger, throttledLogger: params.ThrottledLogger, rpcFactory: params.RPCFactory, - membershipMonitor: params.MembershipMonitor, + membershipResolver: params.MembershipResolver, pprofInitializer: params.PProfInitializer, timeSource: clock.NewRealTimeSource(), metricsScope: params.MetricScope, @@ -168,16 +168,16 @@ func (h *serviceImpl) Start() { h.logger.WithTags(tag.Error(err)).Fatal("Failed to start yarpc dispatcher") } - h.membershipMonitor.Start() + h.membershipResolver.Start() - hostInfo, err := h.membershipMonitor.WhoAmI() + hostInfo, err := h.membershipResolver.WhoAmI() if err != nil { h.logger.WithTags(tag.Error(err)).Fatal("failed to get host info from membership monitor") } h.hostInfo = hostInfo h.clientBean, err = client.NewClientBean( - client.NewRPCClientFactory(h.rpcFactory, h.membershipMonitor, h.metricsClient, h.dynamicCollection, h.numberOfHistoryShards, h.logger), + client.NewRPCClientFactory(h.rpcFactory, h.membershipResolver, h.metricsClient, h.dynamicCollection, h.numberOfHistoryShards, h.logger), h.rpcFactory.GetDispatcher(), h.clusterMetadata, ) @@ -197,8 +197,8 @@ func (h *serviceImpl) Stop() { return } - if h.membershipMonitor != nil { - h.membershipMonitor.Stop() + if h.membershipResolver != nil { + h.membershipResolver.Stop() } if h.dispatcher != nil { @@ -228,8 +228,8 @@ func (h *serviceImpl) GetTimeSource() clock.TimeSource { return h.timeSource } -func (h *serviceImpl) GetMembershipMonitor() membership.Monitor { - return h.membershipMonitor +func (h *serviceImpl) GetMembershipResolver() membership.Resolver { + return h.membershipResolver } func (h *serviceImpl) GetHostInfo() *membership.HostInfo { diff --git a/service/frontend/adminHandler.go b/service/frontend/adminHandler.go index 0fccf1ce340..b5edf37a00c 100644 --- a/service/frontend/adminHandler.go +++ b/service/frontend/adminHandler.go @@ -272,7 +272,7 @@ func (adh *adminHandlerImpl) DescribeWorkflowExecution( shardIDstr := string(rune(shardID)) // originally `string(int_shard_id)`, but changing it will change the ring hashing shardIDForOutput := strconv.Itoa(shardID) - historyHost, err := adh.GetHistoryServiceResolver().Lookup(shardIDstr) + historyHost, err := adh.GetMembershipResolver().Lookup(service.History, shardIDstr) if err != nil { return nil, adh.error(err, scope) } @@ -395,7 +395,7 @@ func (adh *adminHandlerImpl) DescribeShardDistribution( offset := int(request.PageID * request.PageSize) nextPageStart := offset + int(request.PageSize) for shardID := offset; shardID < numShards && shardID < nextPageStart; shardID++ { - info, err := adh.GetHistoryServiceResolver().Lookup(string(rune(shardID))) + info, err := adh.GetMembershipResolver().Lookup(service.History, string(rune(shardID))) if err != nil { resp.Shards[int32(shardID)] = "unknown" } else { @@ -585,7 +585,7 @@ func (adh *adminHandlerImpl) DescribeCluster( } membershipInfo := types.MembershipInfo{} - if monitor := adh.GetMembershipMonitor(); monitor != nil { + if monitor := adh.GetMembershipResolver(); monitor != nil { currentHost, err := monitor.WhoAmI() if err != nil { return nil, adh.error(err, scope) @@ -597,13 +597,13 @@ func (adh *adminHandlerImpl) DescribeCluster( var rings []*types.RingInfo for _, role := range service.List { - resolver, err := monitor.GetResolver(role) + var servers []*types.HostInfo + members, err := monitor.Members(role) if err != nil { return nil, adh.error(err, scope) } - var servers []*types.HostInfo - for _, server := range resolver.Members() { + for _, server := range members { servers = append(servers, &types.HostInfo{ Identity: server.Identity(), }) @@ -612,7 +612,7 @@ func (adh *adminHandlerImpl) DescribeCluster( rings = append(rings, &types.RingInfo{ Role: role, - MemberCount: int32(resolver.MemberCount()), + MemberCount: int32(len(servers)), Members: servers, }) } diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index caac95c564e..66435767e0c 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -166,11 +166,10 @@ func NewWorkflowHandler( return float64(config.RPS()) }, func(domain string) float64 { - if resolver := resource.GetFrontendServiceResolver(); resolver != nil && config.GlobalDomainRPS(domain) > 0 { - if resolver.MemberCount() > 0 { - avgQuota := common.MaxInt(config.GlobalDomainRPS(domain)/resolver.MemberCount(), 1) - return float64(common.MinInt(avgQuota, config.MaxDomainRPSPerInstance(domain))) - } + memberCount, err := resource.GetMembershipResolver().MemberCount(service.Frontend) + if err == nil && memberCount > 0 && config.GlobalDomainRPS(domain) > 0 { + avgQuota := common.MaxInt(config.GlobalDomainRPS(domain)/memberCount, 1) + return float64(common.MinInt(avgQuota, config.MaxDomainRPSPerInstance(domain))) } return float64(config.MaxDomainRPSPerInstance(domain)) }, diff --git a/service/frontend/workflowHandler_test.go b/service/frontend/workflowHandler_test.go index 6fee20154be..e48a592d8c5 100644 --- a/service/frontend/workflowHandler_test.go +++ b/service/frontend/workflowHandler_test.go @@ -122,8 +122,8 @@ func (s *workflowHandlerSuite) SetupTest() { s.mockVisibilityArchiver = &archiver.VisibilityArchiverMock{} s.mockVersionChecker = client.NewMockVersionChecker(s.controller) - mockMonitor := s.mockResource.MembershipMonitor - mockMonitor.EXPECT().GetMemberCount(service.Frontend).Return(5, nil).AnyTimes() + mockMonitor := s.mockResource.MembershipResolver + mockMonitor.EXPECT().MemberCount(service.Frontend).Return(5, nil).AnyTimes() s.mockVersionChecker.EXPECT().ClientSupported(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() } diff --git a/service/history/handler.go b/service/history/handler.go index 3762507c5b1..ba23e72e9e0 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -41,6 +41,7 @@ import ( "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/quotas" + "github.com/uber/cadence/common/service" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/history/config" "github.com/uber/cadence/service/history/engine" @@ -1907,7 +1908,7 @@ func (h *handlerImpl) GetCrossClusterTasks( if err != nil { logger.Error("History engine not found for shard", tag.Error(err)) var owner string - if info, err := h.GetHistoryServiceResolver().Lookup(strconv.Itoa(int(shardID))); err == nil { + if info, err := h.GetMembershipResolver().Lookup(service.History, strconv.Itoa(int(shardID))); err == nil { owner = info.GetAddress() } settable.Set(nil, shard.CreateShardOwnershipLostError(h.GetHostInfo().GetAddress(), owner)) @@ -2007,7 +2008,7 @@ func (h *handlerImpl) convertError(err error) error { switch err.(type) { case *persistence.ShardOwnershipLostError: shardID := err.(*persistence.ShardOwnershipLostError).ShardID - info, err := h.GetHistoryServiceResolver().Lookup(strconv.Itoa(shardID)) + info, err := h.GetMembershipResolver().Lookup(service.History, strconv.Itoa(shardID)) if err == nil { return shard.CreateShardOwnershipLostError(h.GetHostInfo().GetAddress(), info.GetAddress()) } diff --git a/service/history/service.go b/service/history/service.go index e186d307c3d..36b2b60d0c1 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -129,7 +129,7 @@ func (s *Service) Stop() { remainingTime := s.config.ShutdownDrainDuration() s.GetLogger().Info("ShutdownHandler: Evicting self from membership ring") - s.GetMembershipMonitor().EvictSelf() + s.GetMembershipResolver().EvictSelf() s.GetLogger().Info("ShutdownHandler: Waiting for others to discover I am unhealthy") remainingTime = common.SleepWithMinDuration(gossipPropagationDelay, remainingTime) diff --git a/service/history/shard/controller.go b/service/history/shard/controller.go index 54f4a3286eb..6f48a37ef83 100644 --- a/service/history/shard/controller.go +++ b/service/history/shard/controller.go @@ -28,6 +28,8 @@ import ( "sync/atomic" "time" + "github.com/uber/cadence/common/service" + workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" "github.com/uber/cadence/common/log" @@ -163,9 +165,9 @@ func (c *controller) Start() { c.shutdownWG.Add(1) go c.shardManagementPump() - err := c.GetHistoryServiceResolver().AddListener(shardControllerMembershipUpdateListenerName, c.membershipUpdateCh) + err := c.GetMembershipResolver().Subscribe(service.History, shardControllerMembershipUpdateListenerName, c.membershipUpdateCh) if err != nil { - c.logger.Error("Error adding listener", tag.Error(err)) + c.logger.Error("subscribing to membership resolver", tag.Error(err)) } c.logger.Info("Shard controller state changed", tag.LifeCycleStarted) @@ -178,8 +180,8 @@ func (c *controller) Stop() { c.PrepareToStop() - if err := c.GetHistoryServiceResolver().RemoveListener(shardControllerMembershipUpdateListenerName); err != nil { - c.logger.Error("Error removing membership update listener", tag.Error(err), tag.OperationFailed) + if err := c.GetMembershipResolver().Unsubscribe(service.History, shardControllerMembershipUpdateListenerName); err != nil { + c.logger.Error("unsubscribing from membership resolver", tag.Error(err), tag.OperationFailed) } close(c.shutdownCh) @@ -292,7 +294,7 @@ func (c *controller) getOrCreateHistoryShardItem(shardID int) (*historyShardsIte if c.isShuttingDown() || atomic.LoadInt32(&c.status) == common.DaemonStatusStopped { return nil, fmt.Errorf("controller for host '%v' shutting down", c.GetHostInfo().Identity()) } - info, err := c.GetHistoryServiceResolver().Lookup(string(rune(shardID))) + info, err := c.GetMembershipResolver().Lookup(service.History, string(rune(shardID))) if err != nil { return nil, err } @@ -392,7 +394,7 @@ func (c *controller) acquireShards() { if c.isShuttingDown() { return } - info, err := c.GetHistoryServiceResolver().Lookup(string(rune(shardID))) + info, err := c.GetMembershipResolver().Lookup(service.History, string(rune(shardID))) if err != nil { c.logger.Error("Error looking up host for shardID", tag.Error(err), tag.OperationFailed, tag.ShardID(shardID)) } else { diff --git a/service/history/shard/controller_test.go b/service/history/shard/controller_test.go index 8558a803569..2cdcf9ea72c 100644 --- a/service/history/shard/controller_test.go +++ b/service/history/shard/controller_test.go @@ -27,6 +27,8 @@ import ( "testing" "time" + "github.com/uber/cadence/common/service" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -51,11 +53,11 @@ type ( suite.Suite *require.Assertions - controller *gomock.Controller - mockResource *resource.Test - mockHistoryEngine *engine.MockEngine - mockClusterMetadata *cluster.MockMetadata - mockServiceResolver *membership.MockServiceResolver + controller *gomock.Controller + mockResource *resource.Test + mockHistoryEngine *engine.MockEngine + mockClusterMetadata *cluster.MockMetadata + mockMembershipResolver *membership.MockResolver hostInfo *membership.HostInfo mockShardManager *mmocks.ShardManager @@ -82,7 +84,7 @@ func (s *controllerSuite) SetupTest() { s.mockHistoryEngine = engine.NewMockEngine(s.controller) s.mockShardManager = s.mockResource.ShardMgr - s.mockServiceResolver = s.mockResource.HistoryServiceResolver + s.mockMembershipResolver = s.mockResource.MembershipResolver s.mockClusterMetadata = s.mockResource.ClusterMetadata s.hostInfo = s.mockResource.GetHostInfo() @@ -113,7 +115,7 @@ func (s *controllerSuite) TestAcquireShardSuccess() { if hostID == 0 { myShards = append(myShards, shardID) s.mockHistoryEngine.EXPECT().Start().Return().Times(1) - s.mockServiceResolver.EXPECT().Lookup(string(rune(shardID))).Return(s.hostInfo, nil).Times(2) + s.mockMembershipResolver.EXPECT().Lookup(service.History, string(rune(shardID))).Return(s.hostInfo, nil).Times(2) s.mockEngineFactory.EXPECT().CreateEngine(gomock.Any()).Return(s.mockHistoryEngine).Times(1) s.mockShardManager.On("GetShard", mock.Anything, &persistence.GetShardRequest{ShardID: shardID}).Return( &persistence.GetShardResponse{ @@ -168,7 +170,7 @@ func (s *controllerSuite) TestAcquireShardSuccess() { }).Return(nil).Once() } else { ownerHost := fmt.Sprintf("test-acquire-shard-host-%v", hostID) - s.mockServiceResolver.EXPECT().Lookup(string(rune(shardID))).Return(membership.NewHostInfo(ownerHost, nil), nil).Times(1) + s.mockMembershipResolver.EXPECT().Lookup(service.History, string(rune(shardID))).Return(membership.NewHostInfo(ownerHost, nil), nil).Times(1) } } @@ -203,7 +205,7 @@ func (s *controllerSuite) TestAcquireShardsConcurrently() { if hostID == 0 { myShards = append(myShards, shardID) s.mockHistoryEngine.EXPECT().Start().Return().Times(1) - s.mockServiceResolver.EXPECT().Lookup(string(rune(shardID))).Return(s.hostInfo, nil).Times(2) + s.mockMembershipResolver.EXPECT().Lookup(service.History, string(rune(shardID))).Return(s.hostInfo, nil).Times(2) s.mockEngineFactory.EXPECT().CreateEngine(gomock.Any()).Return(s.mockHistoryEngine).Times(1) s.mockShardManager.On("GetShard", mock.Anything, &persistence.GetShardRequest{ShardID: shardID}).Return( &persistence.GetShardResponse{ @@ -258,7 +260,7 @@ func (s *controllerSuite) TestAcquireShardsConcurrently() { }).Return(nil).Once() } else { ownerHost := fmt.Sprintf("test-acquire-shard-host-%v", hostID) - s.mockServiceResolver.EXPECT().Lookup(string(rune(shardID))).Return(membership.NewHostInfo(ownerHost, nil), nil).Times(1) + s.mockMembershipResolver.EXPECT().Lookup(service.History, string(rune(shardID))).Return(membership.NewHostInfo(ownerHost, nil), nil).Times(1) } } @@ -278,12 +280,12 @@ func (s *controllerSuite) TestAcquireShardLookupFailure() { numShards := 2 s.config.NumberOfShards = numShards for shardID := 0; shardID < numShards; shardID++ { - s.mockServiceResolver.EXPECT().Lookup(string(rune(shardID))).Return(nil, errors.New("ring failure")).Times(1) + s.mockMembershipResolver.EXPECT().Lookup(service.History, string(rune(shardID))).Return(nil, errors.New("ring failure")).Times(1) } s.shardController.acquireShards() for shardID := 0; shardID < numShards; shardID++ { - s.mockServiceResolver.EXPECT().Lookup(string(rune(shardID))).Return(nil, errors.New("ring failure")).Times(1) + s.mockMembershipResolver.EXPECT().Lookup(service.History, string(rune(shardID))).Return(nil, errors.New("ring failure")).Times(1) s.Nil(s.shardController.GetEngineForShard(shardID)) } } @@ -300,7 +302,7 @@ func (s *controllerSuite) TestAcquireShardRenewSuccess() { for shardID := 0; shardID < numShards; shardID++ { s.mockHistoryEngine.EXPECT().Start().Return().Times(1) - s.mockServiceResolver.EXPECT().Lookup(string(rune(shardID))).Return(s.hostInfo, nil).Times(2) + s.mockMembershipResolver.EXPECT().Lookup(service.History, string(rune(shardID))).Return(s.hostInfo, nil).Times(2) s.mockEngineFactory.EXPECT().CreateEngine(gomock.Any()).Return(s.mockHistoryEngine).Times(1) s.mockShardManager.On("GetShard", mock.Anything, &persistence.GetShardRequest{ShardID: shardID}).Return( &persistence.GetShardResponse{ @@ -361,7 +363,7 @@ func (s *controllerSuite) TestAcquireShardRenewSuccess() { s.shardController.acquireShards() for shardID := 0; shardID < numShards; shardID++ { - s.mockServiceResolver.EXPECT().Lookup(string(rune(shardID))).Return(s.hostInfo, nil).Times(1) + s.mockMembershipResolver.EXPECT().Lookup(service.History, string(rune(shardID))).Return(s.hostInfo, nil).Times(1) } s.shardController.acquireShards() @@ -382,7 +384,7 @@ func (s *controllerSuite) TestAcquireShardRenewLookupFailed() { for shardID := 0; shardID < numShards; shardID++ { s.mockHistoryEngine.EXPECT().Start().Return().Times(1) - s.mockServiceResolver.EXPECT().Lookup(string(rune(shardID))).Return(s.hostInfo, nil).Times(2) + s.mockMembershipResolver.EXPECT().Lookup(service.History, string(rune(shardID))).Return(s.hostInfo, nil).Times(2) s.mockEngineFactory.EXPECT().CreateEngine(gomock.Any()).Return(s.mockHistoryEngine).Times(1) s.mockShardManager.On("GetShard", mock.Anything, &persistence.GetShardRequest{ShardID: shardID}).Return( &persistence.GetShardResponse{ @@ -443,7 +445,7 @@ func (s *controllerSuite) TestAcquireShardRenewLookupFailed() { s.shardController.acquireShards() for shardID := 0; shardID < numShards; shardID++ { - s.mockServiceResolver.EXPECT().Lookup(string(rune(shardID))).Return(nil, errors.New("ring failure")).Times(1) + s.mockMembershipResolver.EXPECT().Lookup(service.History, string(rune(shardID))).Return(nil, errors.New("ring failure")).Times(1) } s.shardController.acquireShards() @@ -463,7 +465,7 @@ func (s *controllerSuite) TestHistoryEngineClosed() { s.setupMocksForAcquireShard(shardID, mockEngine, 5, 6) } - s.mockServiceResolver.EXPECT().AddListener(shardControllerMembershipUpdateListenerName, + s.mockMembershipResolver.EXPECT().Subscribe(service.History, shardControllerMembershipUpdateListenerName, gomock.Any()).Return(nil).AnyTimes() // when shard is initialized, it will use the 2 mock function below to initialize the "current" time of each cluster s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() @@ -490,7 +492,7 @@ func (s *controllerSuite) TestHistoryEngineClosed() { for shardID := 0; shardID < 2; shardID++ { mockEngine := historyEngines[shardID] mockEngine.EXPECT().Stop().Return().Times(1) - s.mockServiceResolver.EXPECT().Lookup(string(rune(shardID))).Return(differentHostInfo, nil).AnyTimes() + s.mockMembershipResolver.EXPECT().Lookup(service.History, string(rune(shardID))).Return(differentHostInfo, nil).AnyTimes() s.shardController.shardClosedCallback(shardID, nil) } @@ -531,11 +533,11 @@ func (s *controllerSuite) TestHistoryEngineClosed() { workerWG.Wait() - s.mockServiceResolver.EXPECT().RemoveListener(shardControllerMembershipUpdateListenerName).Return(nil).AnyTimes() + s.mockMembershipResolver.EXPECT().Unsubscribe(service.History, shardControllerMembershipUpdateListenerName).Return(nil).AnyTimes() for shardID := 2; shardID < numShards; shardID++ { mockEngine := historyEngines[shardID] mockEngine.EXPECT().Stop().Return().Times(1) - s.mockServiceResolver.EXPECT().Lookup(string(rune(shardID))).Return(s.hostInfo, nil).AnyTimes() + s.mockMembershipResolver.EXPECT().Lookup(service.History, string(rune(shardID))).Return(s.hostInfo, nil).AnyTimes() } s.shardController.Stop() } @@ -551,7 +553,7 @@ func (s *controllerSuite) TestShardControllerClosed() { s.setupMocksForAcquireShard(shardID, mockEngine, 5, 6) } - s.mockServiceResolver.EXPECT().AddListener(shardControllerMembershipUpdateListenerName, gomock.Any()).Return(nil).AnyTimes() + s.mockMembershipResolver.EXPECT().Subscribe(service.History, shardControllerMembershipUpdateListenerName, gomock.Any()).Return(nil).AnyTimes() // when shard is initialized, it will use the 2 mock function below to initialize the "current" time of each cluster s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() s.mockClusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestSingleDCClusterInfo).AnyTimes() @@ -578,11 +580,11 @@ func (s *controllerSuite) TestShardControllerClosed() { }() } - s.mockServiceResolver.EXPECT().RemoveListener(shardControllerMembershipUpdateListenerName).Return(nil).AnyTimes() + s.mockMembershipResolver.EXPECT().Unsubscribe(service.History, shardControllerMembershipUpdateListenerName).Return(nil).AnyTimes() for shardID := 0; shardID < numShards; shardID++ { mockEngine := historyEngines[shardID] mockEngine.EXPECT().Stop().Times(1) - s.mockServiceResolver.EXPECT().Lookup(string(rune(shardID))).Return(s.hostInfo, nil).AnyTimes() + s.mockMembershipResolver.EXPECT().Lookup(service.History, string(rune(shardID))).Return(s.hostInfo, nil).AnyTimes() } s.shardController.Stop() workerWG.Wait() @@ -612,7 +614,7 @@ func (s *controllerSuite) setupMocksForAcquireShard(shardID int, mockEngine *eng // s.mockResource.ExecutionMgr.On("Close").Return() mockEngine.EXPECT().Start().Times(1) - s.mockServiceResolver.EXPECT().Lookup(string(rune(shardID))).Return(s.hostInfo, nil).Times(2) + s.mockMembershipResolver.EXPECT().Lookup(service.History, string(rune(shardID))).Return(s.hostInfo, nil).Times(2) s.mockEngineFactory.EXPECT().CreateEngine(gomock.Any()).Return(mockEngine).Times(1) s.mockShardManager.On("GetShard", mock.Anything, &persistence.GetShardRequest{ShardID: shardID}).Return( &persistence.GetShardResponse{ diff --git a/service/matching/handler.go b/service/matching/handler.go index 100d65820c6..e18d781a8d1 100644 --- a/service/matching/handler.go +++ b/service/matching/handler.go @@ -101,7 +101,7 @@ func NewHandler( resource.GetLogger(), resource.GetMetricsClient(), resource.GetDomainCache(), - resource.GetMatchingServiceResolver(), + resource.GetMembershipResolver(), ), } // prevent us from trying to serve requests before matching engine is started and ready diff --git a/service/matching/matchingEngine.go b/service/matching/matchingEngine.go index 30d324f8c35..bfb3f50540e 100644 --- a/service/matching/matchingEngine.go +++ b/service/matching/matchingEngine.go @@ -29,6 +29,8 @@ import ( "sync" "time" + "github.com/uber/cadence/common/service" + "github.com/pborman/uuid" "github.com/uber/cadence/client/history" @@ -79,7 +81,7 @@ type ( lockableQueryTaskMap lockableQueryTaskMap domainCache cache.DomainCache versionChecker client.VersionChecker - keyResolver membership.ServiceResolver + membershipResolver membership.Resolver } ) @@ -109,7 +111,7 @@ func NewEngine(taskManager persistence.TaskManager, logger log.Logger, metricsClient metrics.Client, domainCache cache.DomainCache, - resolver membership.ServiceResolver, + resolver membership.Resolver, ) Engine { return &matchingEngineImpl{ @@ -124,7 +126,7 @@ func NewEngine(taskManager persistence.TaskManager, lockableQueryTaskMap: lockableQueryTaskMap{queryTaskMap: make(map[string]chan *queryResult)}, domainCache: domainCache, versionChecker: client.NewVersionChecker(), - keyResolver: resolver, + membershipResolver: resolver, } } @@ -717,7 +719,7 @@ func (e *matchingEngineImpl) GetTaskListsByDomain( } func (e *matchingEngineImpl) getHostInfo(partitionKey string) (string, error) { - host, err := e.keyResolver.Lookup(partitionKey) + host, err := e.membershipResolver.Lookup(service.Matching, partitionKey) if err != nil { return "", err } diff --git a/service/matching/service.go b/service/matching/service.go index 4fa75ee4fdb..d0779db95d7 100644 --- a/service/matching/service.go +++ b/service/matching/service.go @@ -109,7 +109,7 @@ func (s *Service) Stop() { // remove self from membership ring and wait for traffic to drain s.GetLogger().Info("ShutdownHandler: Evicting self from membership ring") - s.GetMembershipMonitor().EvictSelf() + s.GetMembershipResolver().EvictSelf() s.GetLogger().Info("ShutdownHandler: Waiting for others to discover I am unhealthy") time.Sleep(s.config.ShutdownDrainDuration()) diff --git a/service/worker/replicator/domain_replication_processor.go b/service/worker/replicator/domain_replication_processor.go index d5ea4db2dce..d9ad0f0226e 100644 --- a/service/worker/replicator/domain_replication_processor.go +++ b/service/worker/replicator/domain_replication_processor.go @@ -28,6 +28,8 @@ import ( "sync/atomic" "time" + "github.com/uber/cadence/common/service" + "github.com/uber/cadence/client/admin" "github.com/uber/cadence/common" "github.com/uber/cadence/common/backoff" @@ -47,6 +49,25 @@ const ( taskProcessorErrorRetryBackoffCoefficient = 1 ) +type ( + domainReplicationProcessor struct { + hostInfo *membership.HostInfo + membershipResolver membership.Resolver + status int32 + sourceCluster string + currentCluster string + logger log.Logger + remotePeer admin.Client + taskExecutor domain.ReplicationTaskExecutor + metricsClient metrics.Client + throttleRetry *backoff.ThrottleRetry + lastProcessedMessageID int64 + lastRetrievedMessageID int64 + done chan struct{} + domainReplicationQueue domain.ReplicationQueue + } +) + func newDomainReplicationProcessor( sourceCluster string, currentCluster string, @@ -55,7 +76,7 @@ func newDomainReplicationProcessor( metricsClient metrics.Client, taskExecutor domain.ReplicationTaskExecutor, hostInfo *membership.HostInfo, - serviceResolver membership.ServiceResolver, + resolver membership.Resolver, domainReplicationQueue domain.ReplicationQueue, replicationMaxRetry time.Duration, ) *domainReplicationProcessor { @@ -69,7 +90,7 @@ func newDomainReplicationProcessor( return &domainReplicationProcessor{ hostInfo: hostInfo, - serviceResolver: serviceResolver, + membershipResolver: resolver, status: common.DaemonStatusInitialized, sourceCluster: sourceCluster, currentCluster: currentCluster, @@ -85,25 +106,6 @@ func newDomainReplicationProcessor( } } -type ( - domainReplicationProcessor struct { - hostInfo *membership.HostInfo - serviceResolver membership.ServiceResolver - status int32 - sourceCluster string - currentCluster string - logger log.Logger - remotePeer admin.Client - taskExecutor domain.ReplicationTaskExecutor - metricsClient metrics.Client - throttleRetry *backoff.ThrottleRetry - lastProcessedMessageID int64 - lastRetrievedMessageID int64 - done chan struct{} - domainReplicationQueue domain.ReplicationQueue - } -) - func (p *domainReplicationProcessor) Start() { if !atomic.CompareAndSwapInt32(&p.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) { return @@ -133,7 +135,7 @@ func (p *domainReplicationProcessor) fetchDomainReplicationTasks() { // for a small period of time two or more workers think they are the owner and try to execute // the processing logic. This will not result in correctness issue as domain replication task // processing will be protected by version check. - info, err := p.serviceResolver.Lookup(p.sourceCluster) + info, err := p.membershipResolver.Lookup(service.Worker, p.sourceCluster) if err != nil { p.logger.Info("Failed to lookup host info. Skip current run.") return diff --git a/service/worker/replicator/domain_replication_processor_test.go b/service/worker/replicator/domain_replication_processor_test.go index fa753196ea5..b7e98db9728 100644 --- a/service/worker/replicator/domain_replication_processor_test.go +++ b/service/worker/replicator/domain_replication_processor_test.go @@ -36,6 +36,7 @@ import ( "github.com/uber/cadence/common/domain" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/resource" + "github.com/uber/cadence/common/service" "github.com/uber/cadence/common/types" ) @@ -67,8 +68,8 @@ func (s *domainReplicationSuite) SetupTest() { s.taskExecutor = domain.NewMockReplicationTaskExecutor(s.controller) s.domainReplicationQueue = domain.NewMockReplicationQueue(s.controller) s.remoteClient = resource.RemoteAdminClient - serviceResolver := resource.WorkerServiceResolver - serviceResolver.EXPECT().Lookup(s.sourceCluster).Return(resource.GetHostInfo(), nil).AnyTimes() + serviceResolver := resource.MembershipResolver + serviceResolver.EXPECT().Lookup(service.Worker, s.sourceCluster).Return(resource.GetHostInfo(), nil).AnyTimes() s.replicationProcessor = newDomainReplicationProcessor( s.sourceCluster, s.currentCluster, diff --git a/service/worker/replicator/replicator.go b/service/worker/replicator/replicator.go index f95faec6de9..aacf3487f9f 100644 --- a/service/worker/replicator/replicator.go +++ b/service/worker/replicator/replicator.go @@ -44,7 +44,7 @@ type ( logger log.Logger metricsClient metrics.Client hostInfo *membership.HostInfo - serviceResolver membership.ServiceResolver + membershipResolver membership.Resolver domainReplicationQueue domain.ReplicationQueue replicationMaxRetry time.Duration } @@ -57,7 +57,7 @@ func NewReplicator( logger log.Logger, metricsClient metrics.Client, hostInfo *membership.HostInfo, - serviceResolver membership.ServiceResolver, + membership membership.Resolver, domainReplicationQueue domain.ReplicationQueue, domainReplicationTaskExecutor domain.ReplicationTaskExecutor, replicationMaxRetry time.Duration, @@ -66,7 +66,7 @@ func NewReplicator( logger = logger.WithTags(tag.ComponentReplicator) return &Replicator{ hostInfo: hostInfo, - serviceResolver: serviceResolver, + membershipResolver: membership, clusterMetadata: clusterMetadata, domainReplicationTaskExecutor: domainReplicationTaskExecutor, clientBean: clientBean, @@ -94,7 +94,7 @@ func (r *Replicator) Start() error { r.metricsClient, r.domainReplicationTaskExecutor, r.hostInfo, - r.serviceResolver, + r.membershipResolver, r.domainReplicationQueue, r.replicationMaxRetry, ) diff --git a/service/worker/service.go b/service/worker/service.go index 42390d8d4ab..0ef35a6878f 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -336,7 +336,7 @@ func (s *Service) startReplicator() { s.GetLogger(), s.GetMetricsClient(), s.GetHostInfo(), - s.GetWorkerServiceResolver(), + s.GetMembershipResolver(), s.GetDomainReplicationQueue(), domainReplicationTaskExecutor, s.config.DomainReplicationMaxRetryDuration(),