Skip to content

Commit

Permalink
Reduce API scope for membership.Monitor (cadence-workflow#4644)
Browse files Browse the repository at this point in the history
  • Loading branch information
mantas-sidlauskas authored Nov 16, 2021
1 parent 7e14102 commit 97f1690
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 108 deletions.
64 changes: 3 additions & 61 deletions common/membership/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,8 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/types"
)

// ErrInsufficientHosts is thrown when there are not enough hosts to serve the request
var ErrInsufficientHosts = &types.InternalServiceError{Message: "Not enough hosts to serve the request"}

type (

// ChangedEvent describes a change in membership
Expand All @@ -45,31 +41,15 @@ type (
}

// Monitor provides membership information for all cadence services.
// It can be used to query which member host of a service is responsible for serving a given key.
Monitor interface {
common.Daemon

// WhoAmI returns self address
WhoAmI() (*HostInfo, error)
// EvictSelf evicts this member from the membership ring. After this method is
// called, other members will discover that this node is no longer part of the
// ring. This primitive is useful to carry out graceful host shutdown during deployments.
EvictSelf() error
Lookup(service string, key string) (*HostInfo, error)
GetResolver(service string) (ServiceResolver, error)
// AddListener adds a listener for this service.
// The listener will get notified on the given
// channel, whenever there is a membership change.
// @service: The service to be listened on
// @name: The name for identifying the listener
// @notifyChannel: The channel on which the caller receives notifications
AddListener(service string, name string, notifyChannel chan<- *ChangedEvent) error
// RemoveListener removes a listener for this service.
RemoveListener(service string, name string) error
// GetReachableMembers returns addresses of all members of the ring
GetReachableMembers() ([]string, error)
// GetMemberCount returns the number of reachable members
// currently in this node's membership list for the given role
GetMemberCount(role string) (int, error)
}

// ServiceResolver provides membership information for a specific cadence service.
Expand All @@ -78,14 +58,12 @@ type (
Lookup(key string) (*HostInfo, error)
// AddListener adds a listener which will get notified on the given
// channel, whenever membership changes.
// @name: The name for identifying the listener
// @notifyChannel: The channel on which the caller receives notifications
AddListener(name string, notifyChannel chan<- *ChangedEvent) error
// RemoveListener removes a listener for this service.
RemoveListener(name string) error
// MemberCount returns host count in hashring for any particular role
// MemberCount returns host count in a hashring
MemberCount() int
// Members returns all host addresses in hashring for any particular role
// Members returns all host addresses in a hashring
Members() []*HostInfo
}
)
Expand Down Expand Up @@ -175,39 +153,3 @@ func (rpo *RingpopMonitor) GetResolver(service string) (ServiceResolver, error)
}
return ring, nil
}

func (rpo *RingpopMonitor) Lookup(service string, key string) (*HostInfo, error) {
ring, err := rpo.GetResolver(service)
if err != nil {
return nil, err
}
return ring.Lookup(key)
}

func (rpo *RingpopMonitor) AddListener(service string, name string, notifyChannel chan<- *ChangedEvent) error {
ring, err := rpo.GetResolver(service)
if err != nil {
return err
}
return ring.AddListener(name, notifyChannel)
}

func (rpo *RingpopMonitor) RemoveListener(service string, name string) error {
ring, err := rpo.GetResolver(service)
if err != nil {
return err
}
return ring.RemoveListener(name)
}

func (rpo *RingpopMonitor) GetReachableMembers() ([]string, error) {
return rpo.ringpopWrapper.GetReachableMembers()
}

func (rpo *RingpopMonitor) GetMemberCount(service string) (int, error) {
ring, err := rpo.GetResolver(service)
if err != nil {
return 0, err
}
return ring.MemberCount(), nil
}
4 changes: 4 additions & 0 deletions common/membership/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/types"
)

// ErrInsufficientHosts is thrown when there are not enough hosts to serve the request
var ErrInsufficientHosts = &types.InternalServiceError{Message: "Not enough hosts to serve the request"}

const (
// RoleKey label is set by every single service as soon as it bootstraps its
// ringpop instance. The data for this key is the service name
Expand Down
17 changes: 11 additions & 6 deletions common/membership/rpMonitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (s *RpoSuite) SetupTest() {
}

func (s *RpoSuite) TestRingpopMonitor() {
testService := NewTestRingpopCluster("rpm-test", 3, "127.0.0.1", "", "rpm-test")
testService := NewTestRingpopCluster("rpm-ring-app", 3, "127.0.0.1", "", "rpm-service-name")
s.NotNil(testService, "Failed to create test service")

logger := loggerimpl.NewNopLogger()
Expand All @@ -54,10 +54,15 @@ func (s *RpoSuite) TestRingpopMonitor() {
time.Sleep(time.Second)

listenCh := make(chan *ChangedEvent, 5)
err := rpm.AddListener("rpm-test", "test-listener", listenCh)
s.Nil(err, "AddListener failed")
resolver, err := rpm.GetResolver("rpm-test")
s.Error(err, "GetResolver should not exist")
s.Nil(resolver, "should return nil on non-existing resolver")

host, err := rpm.Lookup("rpm-test", "key")
resolver, err = rpm.GetResolver("rpm-service-name")
err = resolver.AddListener("test-listener", listenCh)
s.Nil(err, "AddListerener failed")

host, err := resolver.Lookup("key")
s.Nil(err, "Ringpop monitor failed to find host for key")
s.NotNil(host, "Ringpop monitor returned a nil host")

Expand All @@ -74,11 +79,11 @@ func (s *RpoSuite) TestRingpopMonitor() {
s.Fail("Timed out waiting for failure to be detected by ringpop")
}

host, err = rpm.Lookup("rpm-test", "key")
host, err = resolver.Lookup("key")
s.Nil(err, "Ringpop monitor failed to find host for key")
s.NotEqual(testService.hostAddrs[1], host.GetAddress(), "Ringpop monitor assigned key to dead host")

err = rpm.RemoveListener("rpm-test", "test-listener")
err = resolver.RemoveListener("test-listener")
s.Nil(err, "RemoveListener() failed")

rpm.Stop()
Expand Down
6 changes: 3 additions & 3 deletions common/resource/resourceImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,9 @@ func New(
&params.PersistenceConfig,
func(...dynamicconfig.FilterOption) int {
if serviceConfig.PersistenceGlobalMaxQPS() > 0 {
ringSize, err := membershipMonitor.GetMemberCount(serviceName)
if err == nil && ringSize > 0 {
avgQuota := common.MaxInt(serviceConfig.PersistenceGlobalMaxQPS()/ringSize, 1)
resolver, err := membershipMonitor.GetResolver(serviceName)
if err == nil && resolver.MemberCount() > 0 {
avgQuota := common.MaxInt(serviceConfig.PersistenceGlobalMaxQPS()/resolver.MemberCount(), 1)
return common.MinInt(avgQuota, serviceConfig.PersistenceMaxQPS())
}
}
Expand Down
26 changes: 0 additions & 26 deletions host/simpleMonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
package host

import (
"fmt"

"github.com/uber/cadence/common/membership"
)

Expand Down Expand Up @@ -58,27 +56,3 @@ func (s *simpleMonitor) WhoAmI() (*membership.HostInfo, error) {
func (s *simpleMonitor) GetResolver(service string) (membership.ServiceResolver, error) {
return s.resolvers[service], nil
}

func (s *simpleMonitor) Lookup(service string, key string) (*membership.HostInfo, error) {
resolver, ok := s.resolvers[service]
if !ok {
return nil, fmt.Errorf("cannot lookup host for service %v", service)
}
return resolver.Lookup(key)
}

func (s *simpleMonitor) AddListener(service string, name string, notifyChannel chan<- *membership.ChangedEvent) error {
return nil
}

func (s *simpleMonitor) RemoveListener(service string, name string) error {
return nil
}

func (s *simpleMonitor) GetReachableMembers() ([]string, error) {
return nil, nil
}

func (s *simpleMonitor) GetMemberCount(service string) (int, error) {
return 0, nil
}
10 changes: 2 additions & 8 deletions service/frontend/adminHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (adh *adminHandlerImpl) DescribeWorkflowExecution(
shardIDstr := string(rune(shardID)) // originally `string(int_shard_id)`, but changing it will change the ring hashing
shardIDForOutput := strconv.Itoa(shardID)

historyHost, err := adh.GetMembershipMonitor().Lookup(service.History, shardIDstr)
historyHost, err := adh.GetHistoryServiceResolver().Lookup(shardIDstr)
if err != nil {
return nil, adh.error(err, scope)
}
Expand Down Expand Up @@ -595,13 +595,6 @@ func (adh *adminHandlerImpl) DescribeCluster(
Identity: currentHost.Identity(),
}

members, err := monitor.GetReachableMembers()
if err != nil {
return nil, adh.error(err, scope)
}

membershipInfo.ReachableMembers = members

var rings []*types.RingInfo
for _, role := range service.List {
resolver, err := monitor.GetResolver(role)
Expand All @@ -614,6 +607,7 @@ func (adh *adminHandlerImpl) DescribeCluster(
servers = append(servers, &types.HostInfo{
Identity: server.Identity(),
})
membershipInfo.ReachableMembers = append(membershipInfo.ReachableMembers, server.Identity())
}

rings = append(rings, &types.RingInfo{
Expand Down
7 changes: 3 additions & 4 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,9 @@ func NewWorkflowHandler(
return float64(config.RPS())
},
func(domain string) float64 {
if monitor := resource.GetMembershipMonitor(); monitor != nil && config.GlobalDomainRPS(domain) > 0 {
ringSize, err := monitor.GetMemberCount(service.Frontend)
if err == nil && ringSize > 0 {
avgQuota := common.MaxInt(config.GlobalDomainRPS(domain)/ringSize, 1)
if resolver := resource.GetFrontendServiceResolver(); resolver != nil && config.GlobalDomainRPS(domain) > 0 {
if resolver.MemberCount() > 0 {
avgQuota := common.MaxInt(config.GlobalDomainRPS(domain)/resolver.MemberCount(), 1)
return float64(common.MinInt(avgQuota, config.MaxDomainRPSPerInstance(domain)))
}
}
Expand Down

0 comments on commit 97f1690

Please sign in to comment.