From c145ab8614393618035277db223964f6131eb3d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mantas=20=C5=A0idlauskas?= Date: Thu, 11 Nov 2021 13:09:23 +0200 Subject: [PATCH] Remove Membership Factory (#4627) --- cmd/server/cadence/server.go | 4 +- common/config/ringpop.go | 90 ++++++-------------------- common/config/ringpop_test.go | 24 ++----- common/membership/interfaces.go | 8 --- common/membership/rpMonitor.go | 31 ++++----- common/membership/rpServiceResolver.go | 3 +- common/resource/params.go | 8 +-- common/resource/resourceImpl.go | 9 +-- host/onebox.go | 24 ++----- host/service.go | 8 +-- 10 files changed, 55 insertions(+), 154 deletions(-) diff --git a/cmd/server/cadence/server.go b/cmd/server/cadence/server.go index 80e8f60cd3f..01acaad7630 100644 --- a/cmd/server/cadence/server.go +++ b/cmd/server/cadence/server.go @@ -155,13 +155,13 @@ func (s *server) startService() common.Daemon { ) rpcFactory := rpc.NewFactory(params.Logger, rpcParams) params.RPCFactory = rpcFactory - params.MembershipFactory, err = s.cfg.Ringpop.NewFactory( + params.MembershipMonitor, err = s.cfg.Ringpop.NewMonitor( rpcFactory.GetChannel(), params.Name, params.Logger, ) if err != nil { - log.Fatalf("error creating ringpop factory: %v", err) + log.Fatalf("error creating ringpop monitor: %v", err) } params.PProfInitializer = svcCfg.PProf.NewInitializer(params.Logger) diff --git a/common/config/ringpop.go b/common/config/ringpop.go index d89623508b0..1cb3bf3d14c 100644 --- a/common/config/ringpop.go +++ b/common/config/ringpop.go @@ -26,7 +26,6 @@ import ( "fmt" "net" "strings" - "sync" "time" "go.uber.org/yarpc/transport/tchannel" @@ -72,21 +71,31 @@ type RingpopFactory struct { channel tchannel.Channel serviceName string logger log.Logger - - sync.Mutex - ringPop *membership.RingPop - membershipMonitor membership.Monitor } -// NewFactory builds a ringpop factory conforming +// NewMonitor builds a ringpop monitor conforming // to the underlying configuration -func (rpConfig *Ringpop) NewFactory( +func (rpConfig *Ringpop) NewMonitor( channel tchannel.Channel, serviceName string, logger log.Logger, -) (*RingpopFactory, error) { +) (*membership.RingpopMonitor, error) { + + if err := rpConfig.validate(); err != nil { + return nil, err + } + if rpConfig.MaxJoinDuration == 0 { + rpConfig.MaxJoinDuration = defaultMaxJoinDuration + } + factory := &RingpopFactory{ + config: rpConfig, + channel: channel, + serviceName: serviceName, + logger: logger, + } + + return factory.createMembership() - return newRingpopFactory(rpConfig, channel, serviceName, logger) } func (rpConfig *Ringpop) validate() error { @@ -154,70 +163,13 @@ func validateBootstrapMode( return nil } -func newRingpopFactory( - rpConfig *Ringpop, - channel tchannel.Channel, - serviceName string, - logger log.Logger, -) (*RingpopFactory, error) { - - if err := rpConfig.validate(); err != nil { - return nil, err - } - if rpConfig.MaxJoinDuration == 0 { - rpConfig.MaxJoinDuration = defaultMaxJoinDuration - } - return &RingpopFactory{ - config: rpConfig, - channel: channel, - serviceName: serviceName, - logger: logger, - }, nil -} - -// GetMembershipMonitor return a membership monitor -func (factory *RingpopFactory) GetMembershipMonitor() (membership.Monitor, error) { - factory.Lock() - defer factory.Unlock() - - return factory.getMembership() -} - -func (factory *RingpopFactory) getMembership() (membership.Monitor, error) { - if factory.membershipMonitor != nil { - return factory.membershipMonitor, nil - } - - membershipMonitor, err := factory.createMembership() - if err != nil { - return nil, err - } - factory.membershipMonitor = membershipMonitor - return membershipMonitor, nil -} - -func (factory *RingpopFactory) createMembership() (membership.Monitor, error) { - // use actual listen port (in case service is bound to :0 or 0.0.0.0:0) - rp, err := factory.getRingpop() +func (factory *RingpopFactory) createMembership() (*membership.RingpopMonitor, error) { + rp, err := factory.createRingpop() if err != nil { return nil, fmt.Errorf("ringpop creation failed: %v", err) } - membershipMonitor := membership.NewRingpopMonitor(factory.serviceName, service.List, rp, factory.logger) - return membershipMonitor, nil -} - -func (factory *RingpopFactory) getRingpop() (*membership.RingPop, error) { - if factory.ringPop != nil { - return factory.ringPop, nil - } - - ringPop, err := factory.createRingpop() - if err != nil { - return nil, err - } - factory.ringPop = ringPop - return ringPop, nil + return membership.NewRingpopMonitor(factory.serviceName, service.List, rp, factory.logger), nil } func (factory *RingpopFactory) createRingpop() (*membership.RingPop, error) { diff --git a/common/config/ringpop_test.go b/common/config/ringpop_test.go index ea0ae50a1fb..c85e470576d 100644 --- a/common/config/ringpop_test.go +++ b/common/config/ringpop_test.go @@ -23,9 +23,9 @@ package config import ( "context" "fmt" + "net" "testing" "time" - "net" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -58,9 +58,6 @@ func (s *RingpopSuite) TestHostsMode() { s.Equal(time.Second*30, cfg.MaxJoinDuration) err = cfg.validate() s.Nil(err) - f, err := cfg.NewFactory(nil, "test", loggerimpl.NewNopLogger()) - s.Nil(err) - s.NotNil(f) } func (s *RingpopSuite) TestFileMode() { @@ -73,9 +70,6 @@ func (s *RingpopSuite) TestFileMode() { s.Equal(time.Second*30, cfg.MaxJoinDuration) err = cfg.validate() s.Nil(err) - f, err := cfg.NewFactory(nil, "test", loggerimpl.NewNopLogger()) - s.Nil(err) - s.NotNil(f) } func (s *RingpopSuite) TestCustomMode() { @@ -87,14 +81,11 @@ func (s *RingpopSuite) TestCustomMode() { s.NotNil(cfg.validate()) cfg.DiscoveryProvider = statichosts.New("127.0.0.1") s.Nil(cfg.validate()) - f, err := cfg.NewFactory(nil, "test", loggerimpl.NewNopLogger()) - s.Nil(err) - s.NotNil(f) } type mockResolver struct { Hosts map[string][]string - SRV map[string][]net.SRV + SRV map[string][]net.SRV suite *RingpopSuite } @@ -130,9 +121,6 @@ func (s *RingpopSuite) TestDNSMode() { s.Equal(BootstrapModeDNS, cfg.BootstrapMode) s.Nil(cfg.validate()) logger := loggerimpl.NewNopLogger() - f, err := cfg.NewFactory(nil, "test", logger) - s.Nil(err) - s.NotNil(f) s.ElementsMatch( []string{ @@ -192,9 +180,6 @@ func (s *RingpopSuite) TestDNSSRVMode() { s.Equal(BootstrapModeDNSSRV, cfg.BootstrapMode) s.Nil(cfg.validate()) logger := loggerimpl.NewNopLogger() - f, err := cfg.NewFactory(nil, "test", logger) - s.Nil(err) - s.NotNil(f) s.ElementsMatch( []string{ @@ -211,8 +196,8 @@ func (s *RingpopSuite) TestDNSSRVMode() { cfg.BootstrapHosts, &mockResolver{ SRV: map[string][]net.SRV{ - "service-a": []net.SRV{{ Target:"az1-service-a.addr.example.net", Port: 7755}, {Target: "az2-service-a.addr.example.net", Port: 7566}}, - "service-b": []net.SRV{{ Target:"az1-service-b.addr.example.net", Port: 7788}, {Target: "az2-service-b.addr.example.net", Port: 7896}}, + "service-a": []net.SRV{{Target: "az1-service-a.addr.example.net", Port: 7755}, {Target: "az2-service-a.addr.example.net", Port: 7566}}, + "service-b": []net.SRV{{Target: "az1-service-b.addr.example.net", Port: 7788}, {Target: "az2-service-b.addr.example.net", Port: 7896}}, }, Hosts: map[string][]string{ "az1-service-a.addr.example.net": []string{"10.0.0.1"}, @@ -251,7 +236,6 @@ func (s *RingpopSuite) TestDNSSRVMode() { _, err = cfg.DiscoveryProvider.Hosts() s.NotNil(err) - //Remove known bad hosts from Unresolved list provider.UnresolvedHosts = []string{ "service-a.example.net", diff --git a/common/membership/interfaces.go b/common/membership/interfaces.go index dc73f5ec428..0c43363676d 100644 --- a/common/membership/interfaces.go +++ b/common/membership/interfaces.go @@ -23,21 +23,13 @@ package membership import ( - "errors" - "github.com/uber/cadence/common" "github.com/uber/cadence/common/types" ) -// ErrUnknownService is thrown for a service that is not tracked by this instance -var ErrUnknownService = errors.New("Service not tracked by Monitor") - // ErrInsufficientHosts is thrown when there are not enough hosts to serve the request var ErrInsufficientHosts = &types.InternalServiceError{Message: "Not enough hosts to serve the request"} -// ErrListenerAlreadyExist is thrown on a duplicate AddListener call from the same listener -var ErrListenerAlreadyExist = errors.New("Listener already exist for the service") - type ( // ChangedEvent describes a change in membership diff --git a/common/membership/rpMonitor.go b/common/membership/rpMonitor.go index 8e40ee98bfb..b8ce3d6d8c6 100644 --- a/common/membership/rpMonitor.go +++ b/common/membership/rpMonitor.go @@ -21,6 +21,7 @@ package membership import ( + "fmt" "sync/atomic" "github.com/uber/cadence/common" @@ -28,7 +29,7 @@ import ( "github.com/uber/cadence/common/log/tag" ) -type ringpopMonitor struct { +type RingpopMonitor struct { status int32 serviceName string @@ -38,7 +39,7 @@ type ringpopMonitor struct { logger log.Logger } -var _ Monitor = (*ringpopMonitor)(nil) +var _ Monitor = (*RingpopMonitor)(nil) // NewRingpopMonitor returns a ringpop-based membership monitor func NewRingpopMonitor( @@ -46,9 +47,9 @@ func NewRingpopMonitor( services []string, rp *RingPop, logger log.Logger, -) Monitor { +) *RingpopMonitor { - rpo := &ringpopMonitor{ + rpo := &RingpopMonitor{ status: common.DaemonStatusInitialized, serviceName: serviceName, services: services, @@ -62,7 +63,7 @@ func NewRingpopMonitor( return rpo } -func (rpo *ringpopMonitor) Start() { +func (rpo *RingpopMonitor) Start() { if !atomic.CompareAndSwapInt32( &rpo.status, common.DaemonStatusInitialized, @@ -87,7 +88,7 @@ func (rpo *ringpopMonitor) Start() { } } -func (rpo *ringpopMonitor) Stop() { +func (rpo *RingpopMonitor) Stop() { if !atomic.CompareAndSwapInt32( &rpo.status, common.DaemonStatusStarted, @@ -103,7 +104,7 @@ func (rpo *ringpopMonitor) Stop() { rpo.rp.Stop() } -func (rpo *ringpopMonitor) WhoAmI() (*HostInfo, error) { +func (rpo *RingpopMonitor) WhoAmI() (*HostInfo, error) { address, err := rpo.rp.WhoAmI() if err != nil { return nil, err @@ -115,19 +116,19 @@ func (rpo *ringpopMonitor) WhoAmI() (*HostInfo, error) { return NewHostInfo(address, labels.AsMap()), nil } -func (rpo *ringpopMonitor) EvictSelf() error { +func (rpo *RingpopMonitor) EvictSelf() error { return rpo.rp.SelfEvict() } -func (rpo *ringpopMonitor) GetResolver(service string) (ServiceResolver, error) { +func (rpo *RingpopMonitor) GetResolver(service string) (ServiceResolver, error) { ring, found := rpo.rings[service] if !found { - return nil, ErrUnknownService + return nil, fmt.Errorf("service %q is not tracked by Monitor", service) } return ring, nil } -func (rpo *ringpopMonitor) Lookup(service string, key string) (*HostInfo, error) { +func (rpo *RingpopMonitor) Lookup(service string, key string) (*HostInfo, error) { ring, err := rpo.GetResolver(service) if err != nil { return nil, err @@ -135,7 +136,7 @@ func (rpo *ringpopMonitor) Lookup(service string, key string) (*HostInfo, error) return ring.Lookup(key) } -func (rpo *ringpopMonitor) AddListener(service string, name string, notifyChannel chan<- *ChangedEvent) error { +func (rpo *RingpopMonitor) AddListener(service string, name string, notifyChannel chan<- *ChangedEvent) error { ring, err := rpo.GetResolver(service) if err != nil { return err @@ -143,7 +144,7 @@ func (rpo *ringpopMonitor) AddListener(service string, name string, notifyChanne return ring.AddListener(name, notifyChannel) } -func (rpo *ringpopMonitor) RemoveListener(service string, name string) error { +func (rpo *RingpopMonitor) RemoveListener(service string, name string) error { ring, err := rpo.GetResolver(service) if err != nil { return err @@ -151,11 +152,11 @@ func (rpo *ringpopMonitor) RemoveListener(service string, name string) error { return ring.RemoveListener(name) } -func (rpo *ringpopMonitor) GetReachableMembers() ([]string, error) { +func (rpo *RingpopMonitor) GetReachableMembers() ([]string, error) { return rpo.rp.GetReachableMembers() } -func (rpo *ringpopMonitor) GetMemberCount(service string) (int, error) { +func (rpo *RingpopMonitor) GetMemberCount(service string) (int, error) { ring, err := rpo.GetResolver(service) if err != nil { return 0, err diff --git a/common/membership/rpServiceResolver.go b/common/membership/rpServiceResolver.go index 66e0298f84e..b9119732d61 100644 --- a/common/membership/rpServiceResolver.go +++ b/common/membership/rpServiceResolver.go @@ -21,6 +21,7 @@ package membership import ( + "fmt" "sync" "sync/atomic" "time" @@ -155,7 +156,7 @@ func (r *ringpopServiceResolver) AddListener( defer r.listenerLock.Unlock() _, ok := r.listeners[name] if ok { - return ErrListenerAlreadyExist + return fmt.Errorf("listener already exist for service %q", name) } r.listeners[name] = notifyChannel return nil diff --git a/common/resource/params.go b/common/resource/params.go index 0a0ce69c9f8..59d732d8290 100644 --- a/common/resource/params.go +++ b/common/resource/params.go @@ -49,7 +49,7 @@ type ( ThrottledLogger log.Logger MetricScope tally.Scope - MembershipFactory MembershipMonitorFactory + MembershipMonitor membership.Monitor RPCFactory common.RPCFactory PProfInitializer common.PProfInitializer PersistenceConfig config.Persistence @@ -68,12 +68,6 @@ type ( Authorizer authorization.Authorizer // NOTE: this can be nil. If nil, AccessControlledHandlerImpl will initiate one with config.Authorization AuthorizationConfig config.Authorization // NOTE: empty(default) struct will get a authorization.NoopAuthorizer } - - // MembershipMonitorFactory provides a bootstrapped membership monitor - MembershipMonitorFactory interface { - // GetMembershipMonitor return a membership monitor - GetMembershipMonitor() (membership.Monitor, error) - } ) // UpdateLoggerWithServiceName tag logging with service name from the top level diff --git a/common/resource/resourceImpl.go b/common/resource/resourceImpl.go index fa13e16754f..2e01959f8fc 100644 --- a/common/resource/resourceImpl.go +++ b/common/resource/resourceImpl.go @@ -123,7 +123,6 @@ type ( pprofInitializer common.PProfInitializer runtimeMetricsReporter *metrics.RuntimeMetricsReporter - membershipFactory MembershipMonitorFactory rpcFactory common.RPCFactory } ) @@ -148,10 +147,7 @@ func New( dispatcher := params.RPCFactory.GetDispatcher() - membershipMonitor, err := params.MembershipFactory.GetMembershipMonitor() - if err != nil { - return nil, err - } + membershipMonitor := params.MembershipMonitor dynamicCollection := dynamicconfig.NewCollection( params.DynamicConfig, @@ -342,8 +338,7 @@ func New( logger, params.InstanceID, ), - membershipFactory: params.MembershipFactory, - rpcFactory: params.RPCFactory, + rpcFactory: params.RPCFactory, } return impl, nil } diff --git a/host/onebox.go b/host/onebox.go index 85a988a3a2f..6627a2b5cca 100644 --- a/host/onebox.go +++ b/host/onebox.go @@ -146,11 +146,6 @@ type ( DomainReplicationTaskExecutor domain.ReplicationTaskExecutor AuthorizationConfig config.Authorization } - - membershipFactoryImpl struct { - serviceName string - hosts map[string][]string - } ) // NewCadence returns an instance that hosts full cadence in one process @@ -397,7 +392,7 @@ func (c *cadenceImpl) startFrontend(hosts map[string][]string, startWG *sync.Wai params.PProfInitializer = newPProfInitializerImpl(c.logger, c.FrontendPProfPort()) params.RPCFactory = c.newRPCFactory(service.Frontend, c.FrontendAddress()) params.MetricScope = tally.NewTestScope(service.Frontend, make(map[string]string)) - params.MembershipFactory = newMembershipFactory(params.Name, hosts) + params.MembershipMonitor = newMembershipMonitor(params.Name, hosts) params.ClusterMetadata = c.clusterMetadata params.MessagingClient = c.messagingClient params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, c.logger)) @@ -463,7 +458,7 @@ func (c *cadenceImpl) startHistory( params.PProfInitializer = newPProfInitializerImpl(c.logger, pprofPorts[i]) params.RPCFactory = c.newRPCFactory(service.History, hostport) params.MetricScope = tally.NewTestScope(service.History, make(map[string]string)) - params.MembershipFactory = newMembershipFactory(params.Name, hosts) + params.MembershipMonitor = newMembershipMonitor(params.Name, hosts) params.ClusterMetadata = c.clusterMetadata params.MessagingClient = c.messagingClient params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, c.logger)) @@ -529,7 +524,7 @@ func (c *cadenceImpl) startMatching(hosts map[string][]string, startWG *sync.Wai params.PProfInitializer = newPProfInitializerImpl(c.logger, c.MatchingPProfPort()) params.RPCFactory = c.newRPCFactory(service.Matching, c.MatchingServiceAddress()) params.MetricScope = tally.NewTestScope(service.Matching, make(map[string]string)) - params.MembershipFactory = newMembershipFactory(params.Name, hosts) + params.MembershipMonitor = newMembershipMonitor(params.Name, hosts) params.ClusterMetadata = c.clusterMetadata params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, c.logger)) params.DynamicConfig = newIntegrationConfigClient(dynamicconfig.NewNopClient()) @@ -571,7 +566,7 @@ func (c *cadenceImpl) startWorker(hosts map[string][]string, startWG *sync.WaitG params.PProfInitializer = newPProfInitializerImpl(c.logger, c.WorkerPProfPort()) params.RPCFactory = c.newRPCFactory(service.Worker, c.WorkerServiceAddress()) params.MetricScope = tally.NewTestScope(service.Worker, make(map[string]string)) - params.MembershipFactory = newMembershipFactory(params.Name, hosts) + params.MembershipMonitor = newMembershipMonitor(params.Name, hosts) params.ClusterMetadata = c.clusterMetadata params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, c.logger)) params.DynamicConfig = newIntegrationConfigClient(dynamicconfig.NewNopClient()) @@ -757,15 +752,8 @@ func copyPersistenceConfig(pConfig config.Persistence) (config.Persistence, erro return pConfig, nil } -func newMembershipFactory(serviceName string, hosts map[string][]string) resource.MembershipMonitorFactory { - return &membershipFactoryImpl{ - serviceName: serviceName, - hosts: hosts, - } -} - -func (p *membershipFactoryImpl) GetMembershipMonitor() (membership.Monitor, error) { - return newSimpleMonitor(p.serviceName, p.hosts), nil +func newMembershipMonitor(serviceName string, hosts map[string][]string) membership.Monitor { + return newSimpleMonitor(serviceName, hosts) } func newPProfInitializerImpl(logger log.Logger, port int) common.PProfInitializer { diff --git a/host/service.go b/host/service.go index cc3c49e80c9..a3deeadcd3c 100644 --- a/host/service.go +++ b/host/service.go @@ -76,7 +76,6 @@ type ( hostName string hostInfo *membership.HostInfo dispatcher *yarpc.Dispatcher - membershipFactory resource.MembershipMonitorFactory membershipMonitor membership.Monitor rpcFactory common.RPCFactory pprofInitializer common.PProfInitializer @@ -111,7 +110,7 @@ func NewService(params *resource.Params) Service { logger: params.Logger, throttledLogger: params.ThrottledLogger, rpcFactory: params.RPCFactory, - membershipFactory: params.MembershipFactory, + membershipMonitor: params.MembershipMonitor, pprofInitializer: params.PProfInitializer, timeSource: clock.NewRealTimeSource(), metricsScope: params.MetricScope, @@ -169,11 +168,6 @@ func (h *serviceImpl) Start() { h.logger.WithTags(tag.Error(err)).Fatal("Failed to start yarpc dispatcher") } - h.membershipMonitor, err = h.membershipFactory.GetMembershipMonitor() - if err != nil { - h.logger.WithTags(tag.Error(err)).Fatal("Membership monitor creation failed") - } - h.membershipMonitor.Start() hostInfo, err := h.membershipMonitor.WhoAmI()