Skip to content

Commit

Permalink
Remove Membership Factory (cadence-workflow#4627)
Browse files Browse the repository at this point in the history
  • Loading branch information
mantas-sidlauskas authored Nov 11, 2021
1 parent e15f181 commit c145ab8
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 154 deletions.
4 changes: 2 additions & 2 deletions cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,13 @@ func (s *server) startService() common.Daemon {
)
rpcFactory := rpc.NewFactory(params.Logger, rpcParams)
params.RPCFactory = rpcFactory
params.MembershipFactory, err = s.cfg.Ringpop.NewFactory(
params.MembershipMonitor, err = s.cfg.Ringpop.NewMonitor(
rpcFactory.GetChannel(),
params.Name,
params.Logger,
)
if err != nil {
log.Fatalf("error creating ringpop factory: %v", err)
log.Fatalf("error creating ringpop monitor: %v", err)
}
params.PProfInitializer = svcCfg.PProf.NewInitializer(params.Logger)

Expand Down
90 changes: 21 additions & 69 deletions common/config/ringpop.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"fmt"
"net"
"strings"
"sync"
"time"

"go.uber.org/yarpc/transport/tchannel"
Expand Down Expand Up @@ -72,21 +71,31 @@ type RingpopFactory struct {
channel tchannel.Channel
serviceName string
logger log.Logger

sync.Mutex
ringPop *membership.RingPop
membershipMonitor membership.Monitor
}

// NewFactory builds a ringpop factory conforming
// NewMonitor builds a ringpop monitor conforming
// to the underlying configuration
func (rpConfig *Ringpop) NewFactory(
func (rpConfig *Ringpop) NewMonitor(
channel tchannel.Channel,
serviceName string,
logger log.Logger,
) (*RingpopFactory, error) {
) (*membership.RingpopMonitor, error) {

if err := rpConfig.validate(); err != nil {
return nil, err
}
if rpConfig.MaxJoinDuration == 0 {
rpConfig.MaxJoinDuration = defaultMaxJoinDuration
}
factory := &RingpopFactory{
config: rpConfig,
channel: channel,
serviceName: serviceName,
logger: logger,
}

return factory.createMembership()

return newRingpopFactory(rpConfig, channel, serviceName, logger)
}

func (rpConfig *Ringpop) validate() error {
Expand Down Expand Up @@ -154,70 +163,13 @@ func validateBootstrapMode(
return nil
}

func newRingpopFactory(
rpConfig *Ringpop,
channel tchannel.Channel,
serviceName string,
logger log.Logger,
) (*RingpopFactory, error) {

if err := rpConfig.validate(); err != nil {
return nil, err
}
if rpConfig.MaxJoinDuration == 0 {
rpConfig.MaxJoinDuration = defaultMaxJoinDuration
}
return &RingpopFactory{
config: rpConfig,
channel: channel,
serviceName: serviceName,
logger: logger,
}, nil
}

// GetMembershipMonitor return a membership monitor
func (factory *RingpopFactory) GetMembershipMonitor() (membership.Monitor, error) {
factory.Lock()
defer factory.Unlock()

return factory.getMembership()
}

func (factory *RingpopFactory) getMembership() (membership.Monitor, error) {
if factory.membershipMonitor != nil {
return factory.membershipMonitor, nil
}

membershipMonitor, err := factory.createMembership()
if err != nil {
return nil, err
}
factory.membershipMonitor = membershipMonitor
return membershipMonitor, nil
}

func (factory *RingpopFactory) createMembership() (membership.Monitor, error) {
// use actual listen port (in case service is bound to :0 or 0.0.0.0:0)
rp, err := factory.getRingpop()
func (factory *RingpopFactory) createMembership() (*membership.RingpopMonitor, error) {
rp, err := factory.createRingpop()
if err != nil {
return nil, fmt.Errorf("ringpop creation failed: %v", err)
}

membershipMonitor := membership.NewRingpopMonitor(factory.serviceName, service.List, rp, factory.logger)
return membershipMonitor, nil
}

func (factory *RingpopFactory) getRingpop() (*membership.RingPop, error) {
if factory.ringPop != nil {
return factory.ringPop, nil
}

ringPop, err := factory.createRingpop()
if err != nil {
return nil, err
}
factory.ringPop = ringPop
return ringPop, nil
return membership.NewRingpopMonitor(factory.serviceName, service.List, rp, factory.logger), nil
}

func (factory *RingpopFactory) createRingpop() (*membership.RingPop, error) {
Expand Down
24 changes: 4 additions & 20 deletions common/config/ringpop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ package config
import (
"context"
"fmt"
"net"
"testing"
"time"
"net"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -58,9 +58,6 @@ func (s *RingpopSuite) TestHostsMode() {
s.Equal(time.Second*30, cfg.MaxJoinDuration)
err = cfg.validate()
s.Nil(err)
f, err := cfg.NewFactory(nil, "test", loggerimpl.NewNopLogger())
s.Nil(err)
s.NotNil(f)
}

func (s *RingpopSuite) TestFileMode() {
Expand All @@ -73,9 +70,6 @@ func (s *RingpopSuite) TestFileMode() {
s.Equal(time.Second*30, cfg.MaxJoinDuration)
err = cfg.validate()
s.Nil(err)
f, err := cfg.NewFactory(nil, "test", loggerimpl.NewNopLogger())
s.Nil(err)
s.NotNil(f)
}

func (s *RingpopSuite) TestCustomMode() {
Expand All @@ -87,14 +81,11 @@ func (s *RingpopSuite) TestCustomMode() {
s.NotNil(cfg.validate())
cfg.DiscoveryProvider = statichosts.New("127.0.0.1")
s.Nil(cfg.validate())
f, err := cfg.NewFactory(nil, "test", loggerimpl.NewNopLogger())
s.Nil(err)
s.NotNil(f)
}

type mockResolver struct {
Hosts map[string][]string
SRV map[string][]net.SRV
SRV map[string][]net.SRV
suite *RingpopSuite
}

Expand Down Expand Up @@ -130,9 +121,6 @@ func (s *RingpopSuite) TestDNSMode() {
s.Equal(BootstrapModeDNS, cfg.BootstrapMode)
s.Nil(cfg.validate())
logger := loggerimpl.NewNopLogger()
f, err := cfg.NewFactory(nil, "test", logger)
s.Nil(err)
s.NotNil(f)

s.ElementsMatch(
[]string{
Expand Down Expand Up @@ -192,9 +180,6 @@ func (s *RingpopSuite) TestDNSSRVMode() {
s.Equal(BootstrapModeDNSSRV, cfg.BootstrapMode)
s.Nil(cfg.validate())
logger := loggerimpl.NewNopLogger()
f, err := cfg.NewFactory(nil, "test", logger)
s.Nil(err)
s.NotNil(f)

s.ElementsMatch(
[]string{
Expand All @@ -211,8 +196,8 @@ func (s *RingpopSuite) TestDNSSRVMode() {
cfg.BootstrapHosts,
&mockResolver{
SRV: map[string][]net.SRV{
"service-a": []net.SRV{{ Target:"az1-service-a.addr.example.net", Port: 7755}, {Target: "az2-service-a.addr.example.net", Port: 7566}},
"service-b": []net.SRV{{ Target:"az1-service-b.addr.example.net", Port: 7788}, {Target: "az2-service-b.addr.example.net", Port: 7896}},
"service-a": []net.SRV{{Target: "az1-service-a.addr.example.net", Port: 7755}, {Target: "az2-service-a.addr.example.net", Port: 7566}},
"service-b": []net.SRV{{Target: "az1-service-b.addr.example.net", Port: 7788}, {Target: "az2-service-b.addr.example.net", Port: 7896}},
},
Hosts: map[string][]string{
"az1-service-a.addr.example.net": []string{"10.0.0.1"},
Expand Down Expand Up @@ -251,7 +236,6 @@ func (s *RingpopSuite) TestDNSSRVMode() {
_, err = cfg.DiscoveryProvider.Hosts()
s.NotNil(err)


//Remove known bad hosts from Unresolved list
provider.UnresolvedHosts = []string{
"service-a.example.net",
Expand Down
8 changes: 0 additions & 8 deletions common/membership/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,13 @@
package membership

import (
"errors"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/types"
)

// ErrUnknownService is thrown for a service that is not tracked by this instance
var ErrUnknownService = errors.New("Service not tracked by Monitor")

// ErrInsufficientHosts is thrown when there are not enough hosts to serve the request
var ErrInsufficientHosts = &types.InternalServiceError{Message: "Not enough hosts to serve the request"}

// ErrListenerAlreadyExist is thrown on a duplicate AddListener call from the same listener
var ErrListenerAlreadyExist = errors.New("Listener already exist for the service")

type (

// ChangedEvent describes a change in membership
Expand Down
31 changes: 16 additions & 15 deletions common/membership/rpMonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
package membership

import (
"fmt"
"sync/atomic"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
)

type ringpopMonitor struct {
type RingpopMonitor struct {
status int32

serviceName string
Expand All @@ -38,17 +39,17 @@ type ringpopMonitor struct {
logger log.Logger
}

var _ Monitor = (*ringpopMonitor)(nil)
var _ Monitor = (*RingpopMonitor)(nil)

// NewRingpopMonitor returns a ringpop-based membership monitor
func NewRingpopMonitor(
serviceName string,
services []string,
rp *RingPop,
logger log.Logger,
) Monitor {
) *RingpopMonitor {

rpo := &ringpopMonitor{
rpo := &RingpopMonitor{
status: common.DaemonStatusInitialized,
serviceName: serviceName,
services: services,
Expand All @@ -62,7 +63,7 @@ func NewRingpopMonitor(
return rpo
}

func (rpo *ringpopMonitor) Start() {
func (rpo *RingpopMonitor) Start() {
if !atomic.CompareAndSwapInt32(
&rpo.status,
common.DaemonStatusInitialized,
Expand All @@ -87,7 +88,7 @@ func (rpo *ringpopMonitor) Start() {
}
}

func (rpo *ringpopMonitor) Stop() {
func (rpo *RingpopMonitor) Stop() {
if !atomic.CompareAndSwapInt32(
&rpo.status,
common.DaemonStatusStarted,
Expand All @@ -103,7 +104,7 @@ func (rpo *ringpopMonitor) Stop() {
rpo.rp.Stop()
}

func (rpo *ringpopMonitor) WhoAmI() (*HostInfo, error) {
func (rpo *RingpopMonitor) WhoAmI() (*HostInfo, error) {
address, err := rpo.rp.WhoAmI()
if err != nil {
return nil, err
Expand All @@ -115,47 +116,47 @@ func (rpo *ringpopMonitor) WhoAmI() (*HostInfo, error) {
return NewHostInfo(address, labels.AsMap()), nil
}

func (rpo *ringpopMonitor) EvictSelf() error {
func (rpo *RingpopMonitor) EvictSelf() error {
return rpo.rp.SelfEvict()
}

func (rpo *ringpopMonitor) GetResolver(service string) (ServiceResolver, error) {
func (rpo *RingpopMonitor) GetResolver(service string) (ServiceResolver, error) {
ring, found := rpo.rings[service]
if !found {
return nil, ErrUnknownService
return nil, fmt.Errorf("service %q is not tracked by Monitor", service)
}
return ring, nil
}

func (rpo *ringpopMonitor) Lookup(service string, key string) (*HostInfo, error) {
func (rpo *RingpopMonitor) Lookup(service string, key string) (*HostInfo, error) {
ring, err := rpo.GetResolver(service)
if err != nil {
return nil, err
}
return ring.Lookup(key)
}

func (rpo *ringpopMonitor) AddListener(service string, name string, notifyChannel chan<- *ChangedEvent) error {
func (rpo *RingpopMonitor) AddListener(service string, name string, notifyChannel chan<- *ChangedEvent) error {
ring, err := rpo.GetResolver(service)
if err != nil {
return err
}
return ring.AddListener(name, notifyChannel)
}

func (rpo *ringpopMonitor) RemoveListener(service string, name string) error {
func (rpo *RingpopMonitor) RemoveListener(service string, name string) error {
ring, err := rpo.GetResolver(service)
if err != nil {
return err
}
return ring.RemoveListener(name)
}

func (rpo *ringpopMonitor) GetReachableMembers() ([]string, error) {
func (rpo *RingpopMonitor) GetReachableMembers() ([]string, error) {
return rpo.rp.GetReachableMembers()
}

func (rpo *ringpopMonitor) GetMemberCount(service string) (int, error) {
func (rpo *RingpopMonitor) GetMemberCount(service string) (int, error) {
ring, err := rpo.GetResolver(service)
if err != nil {
return 0, err
Expand Down
3 changes: 2 additions & 1 deletion common/membership/rpServiceResolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package membership

import (
"fmt"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -155,7 +156,7 @@ func (r *ringpopServiceResolver) AddListener(
defer r.listenerLock.Unlock()
_, ok := r.listeners[name]
if ok {
return ErrListenerAlreadyExist
return fmt.Errorf("listener already exist for service %q", name)
}
r.listeners[name] = notifyChannel
return nil
Expand Down
Loading

0 comments on commit c145ab8

Please sign in to comment.