Skip to content

Commit

Permalink
Add global ratelimiter for frontend API (cadence-workflow#3161)
Browse files Browse the repository at this point in the history
* Add global ratelimiter for frontend API
  • Loading branch information
mkolodezny authored Apr 3, 2020
1 parent 0a7f74b commit b4c1a8b
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 6 deletions.
3 changes: 3 additions & 0 deletions common/membership/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ type (
RemoveListener(service string, name string) error
// GetReachableMembers returns addresses of all members of the ring
GetReachableMembers() ([]string, error)
// GetMemberCount returns the number of reachable members
// currently in this node's membership list for the given role
GetMemberCount(role string) (int, error)
}

// ServiceResolver provides membership information for a specific cadence service.
Expand Down
15 changes: 15 additions & 0 deletions common/membership/interfaces_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions common/membership/rpMonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,11 @@ func (rpo *ringpopMonitor) RemoveListener(service string, name string) error {
func (rpo *ringpopMonitor) GetReachableMembers() ([]string, error) {
return rpo.rp.GetReachableMembers()
}

func (rpo *ringpopMonitor) GetMemberCount(service string) (int, error) {
ring, err := rpo.GetResolver(service)
if err != nil {
return 0, err
}
return ring.MemberCount(), nil
}
9 changes: 6 additions & 3 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ var keys = map[Key]string{
FrontendESIndexMaxResultWindow: "frontend.esIndexMaxResultWindow",
FrontendHistoryMaxPageSize: "frontend.historyMaxPageSize",
FrontendRPS: "frontend.rps",
FrontendDomainRPS: "frontend.domainrps",
FrontendMaxDomainRPSPerInstance: "frontend.domainrps",
FrontendGlobalDomainRPS: "frontend.globalDomainrps",
FrontendHistoryMgrNumConns: "frontend.historyMgrNumConns",
FrontendShutdownDrainDuration: "frontend.shutdownDrainDuration",
DisableListVisibilityByFilter: "frontend.disableListVisibilityByFilter",
Expand Down Expand Up @@ -352,8 +353,10 @@ const (
FrontendHistoryMaxPageSize
// FrontendRPS is workflow rate limit per second
FrontendRPS
// FrontendDomainRPS is workflow domain rate limit per second
FrontendDomainRPS
// FrontendMaxDomainRPSPerInstance is workflow domain rate limit per second
FrontendMaxDomainRPSPerInstance
// FrontendGlobalDomainRPS is workflow domain rate limit per second for the whole Cadence cluster
FrontendGlobalDomainRPS
// FrontendHistoryMgrNumConns is for persistence cluster.NumConns
FrontendHistoryMgrNumConns
// FrontendThrottledLogRPS is the rate limit on number of log messages emitted per second for throttled logger
Expand Down
4 changes: 4 additions & 0 deletions host/simpleMonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,7 @@ func (s *simpleMonitor) RemoveListener(service string, name string) error {
func (s *simpleMonitor) GetReachableMembers() ([]string, error) {
return nil, nil
}

func (s *simpleMonitor) GetMemberCount(service string) (int, error) {
return 0, nil
}
6 changes: 4 additions & 2 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ type Config struct {
ESIndexMaxResultWindow dynamicconfig.IntPropertyFn
HistoryMaxPageSize dynamicconfig.IntPropertyFnWithDomainFilter
RPS dynamicconfig.IntPropertyFn
DomainRPS dynamicconfig.IntPropertyFnWithDomainFilter
MaxDomainRPSPerInstance dynamicconfig.IntPropertyFnWithDomainFilter
GlobalDomainRPS dynamicconfig.IntPropertyFnWithDomainFilter
MaxIDLengthLimit dynamicconfig.IntPropertyFn
EnableClientVersionCheck dynamicconfig.BoolPropertyFn
MinRetentionDays dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -107,7 +108,8 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int, enableReadFro
ESIndexMaxResultWindow: dc.GetIntProperty(dynamicconfig.FrontendESIndexMaxResultWindow, 10000),
HistoryMaxPageSize: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendHistoryMaxPageSize, common.GetHistoryMaxPageSize),
RPS: dc.GetIntProperty(dynamicconfig.FrontendRPS, 1200),
DomainRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendDomainRPS, 1200),
MaxDomainRPSPerInstance: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendMaxDomainRPSPerInstance, 1200),
GlobalDomainRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendGlobalDomainRPS, 0),
MaxIDLengthLimit: dc.GetIntProperty(dynamicconfig.MaxIDLengthLimit, 1000),
HistoryMgrNumConns: dc.GetIntProperty(dynamicconfig.FrontendHistoryMgrNumConns, 10),
MaxBadBinaries: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendMaxBadBinaries, domain.MaxBadBinaries),
Expand Down
9 changes: 8 additions & 1 deletion service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,14 @@ func NewWorkflowHandler(
return float64(config.RPS())
},
func(domain string) float64 {
return float64(config.DomainRPS(domain))
if monitor := resource.GetMembershipMonitor(); monitor != nil && config.GlobalDomainRPS(domain) > 0 {
ringSize, err := monitor.GetMemberCount(common.FrontendServiceName)
if err == nil && ringSize > 0 {
avgQuota := common.MaxInt(config.GlobalDomainRPS(domain)/ringSize, 1)
return float64(common.MinInt(avgQuota, config.MaxDomainRPSPerInstance(domain)))
}
}
return float64(config.MaxDomainRPSPerInstance(domain))
},
),
versionChecker: client.NewVersionChecker(),
Expand Down
4 changes: 4 additions & 0 deletions service/frontend/workflowHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ func (s *workflowHandlerSuite) SetupTest() {
s.mockMessagingClient = mocks.NewMockMessagingClient(s.mockProducer, nil)
s.mockHistoryArchiver = &archiver.HistoryArchiverMock{}
s.mockVisibilityArchiver = &archiver.VisibilityArchiverMock{}

mockMonitor := s.mockResource.MembershipMonitor
mockMonitor.EXPECT().GetMemberCount(common.FrontendServiceName).Return(5, nil).AnyTimes()

}

func (s *workflowHandlerSuite) TearDownTest() {
Expand Down

0 comments on commit b4c1a8b

Please sign in to comment.