From cb7e56fe87139053e8497634b8f00211069e631c Mon Sep 17 00:00:00 2001 From: shreyassrivatsan Date: Tue, 2 Jul 2019 12:41:44 -0700 Subject: [PATCH] Move frontend token bucket under quota policy (#2152) --- common/quotas/interfaces.go | 30 +++++++++++++++ common/quotas/policy.go | 45 ++++++++++++++++++++++ service/frontend/workflowHandler.go | 59 +++++++++++++++-------------- 3 files changed, 105 insertions(+), 29 deletions(-) create mode 100644 common/quotas/interfaces.go create mode 100644 common/quotas/policy.go diff --git a/common/quotas/interfaces.go b/common/quotas/interfaces.go new file mode 100644 index 00000000000..0bc21bd8201 --- /dev/null +++ b/common/quotas/interfaces.go @@ -0,0 +1,30 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package quotas + +// Policy corresponds to a quota policy. A policy allows implementing layered +// and more complex rate limiting functionality. +type Policy interface { + // Allow attempts to allow a request to go through. The method returns + // immediately with a true or false indicating if the request can make + // progress + Allow() bool +} diff --git a/common/quotas/policy.go b/common/quotas/policy.go new file mode 100644 index 00000000000..8e655cea2e6 --- /dev/null +++ b/common/quotas/policy.go @@ -0,0 +1,45 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package quotas + +import ( + "time" + + "github.com/uber/cadence/common/tokenbucket" +) + +type simpleRateLimitPolicy struct { + tb tokenbucket.TokenBucket +} + +// NewSimpleRateLimiter returns a new simple rate limiter +func NewSimpleRateLimiter(tb tokenbucket.TokenBucket) Policy { + return &simpleRateLimitPolicy{tb} +} + +func (s *simpleRateLimitPolicy) Allow() bool { + ok, _ := s.tb.TryConsume(1) + return ok +} + +func (s *simpleRateLimitPolicy) Wait(d time.Duration) bool { + return s.tb.Consume(1, d) +} diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index f8fb856b663..d0e40e8cdb5 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -50,6 +50,7 @@ import ( "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/quotas" "github.com/uber/cadence/common/service" "github.com/uber/cadence/common/tokenbucket" "github.com/uber/cadence/service/worker/archiver" @@ -72,7 +73,7 @@ type ( tokenSerializer common.TaskTokenSerializer metricsClient metrics.Client startWG sync.WaitGroup - rateLimiter tokenbucket.TokenBucket + rateLimiter quotas.Policy config *Config blobstoreClient blobstore.Client versionChecker *versionChecker @@ -165,7 +166,7 @@ func NewWorkflowHandler(sVice service.Service, config *Config, metadataMgr persi tokenSerializer: common.NewJSONTaskTokenSerializer(), metricsClient: sVice.GetMetricsClient(), domainCache: cache.NewDomainCache(metadataMgr, sVice.GetClusterMetadata(), sVice.GetMetricsClient(), sVice.GetLogger()), - rateLimiter: tokenbucket.NewDynamicTokenBucket(config.RPS, clock.NewRealTimeSource()), + rateLimiter: quotas.NewSimpleRateLimiter(tokenbucket.NewDynamicTokenBucket(config.RPS, clock.NewRealTimeSource())), blobstoreClient: blobstoreClient, versionChecker: &versionChecker{checkVersion: config.EnableClientVersionCheck()}, domainHandler: newDomainHandler( @@ -341,7 +342,7 @@ func (wh *WorkflowHandler) PollForActivityTask( return nil, wh.error(errRequestNotSet, scope) } - if ok, _ := wh.rateLimiter.TryConsume(1); !ok { + if ok := wh.rateLimiter.Allow(); !ok { return nil, wh.error(createServiceBusyError(), scope) } @@ -420,7 +421,7 @@ func (wh *WorkflowHandler) PollForDecisionTask( return nil, wh.error(errRequestNotSet, scope) } - if ok, _ := wh.rateLimiter.TryConsume(1); !ok { + if ok := wh.rateLimiter.Allow(); !ok { return nil, wh.error(createServiceBusyError(), scope) } @@ -553,7 +554,7 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeat( } // Count the request in the RPS, but we still accept it even if RPS is exceeded - wh.rateLimiter.TryConsume(1) + wh.rateLimiter.Allow() wh.Service.GetLogger().Debug("Received RecordActivityTaskHeartbeat") if heartbeatRequest.TaskToken == nil { @@ -634,7 +635,7 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeatByID( } // Count the request in the RPS, but we still accept it even if RPS is exceeded - wh.rateLimiter.TryConsume(1) + wh.rateLimiter.Allow() wh.Service.GetLogger().Debug("Received RecordActivityTaskHeartbeatByID") domainID, err := wh.domainCache.GetDomainID(heartbeatRequest.GetDomain()) @@ -740,7 +741,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCompleted( } // Count the request in the RPS, but we still accept it even if RPS is exceeded - wh.rateLimiter.TryConsume(1) + wh.rateLimiter.Allow() if completeRequest.TaskToken == nil { return wh.error(errTaskTokenNotSet, scope) @@ -822,7 +823,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCompletedByID( } // Count the request in the RPS, but we still accept it even if RPS is exceeded - wh.rateLimiter.TryConsume(1) + wh.rateLimiter.Allow() domainID, err := wh.domainCache.GetDomainID(completeRequest.GetDomain()) if err != nil { @@ -930,7 +931,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailed( } // Count the request in the RPS, but we still accept it even if RPS is exceeded - wh.rateLimiter.TryConsume(1) + wh.rateLimiter.Allow() if failedRequest.TaskToken == nil { return wh.error(errTaskTokenNotSet, scope) @@ -1001,7 +1002,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailedByID( } // Count the request in the RPS, but we still accept it even if RPS is exceeded - wh.rateLimiter.TryConsume(1) + wh.rateLimiter.Allow() domainID, err := wh.domainCache.GetDomainID(failedRequest.GetDomain()) if err != nil { @@ -1097,7 +1098,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceled( } // Count the request in the RPS, but we still accept it even if RPS is exceeded - wh.rateLimiter.TryConsume(1) + wh.rateLimiter.Allow() if cancelRequest.TaskToken == nil { return wh.error(errTaskTokenNotSet, scope) @@ -1180,7 +1181,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceledByID( } // Count the request in the RPS, but we still accept it even if RPS is exceeded - wh.rateLimiter.TryConsume(1) + wh.rateLimiter.Allow() domainID, err := wh.domainCache.GetDomainID(cancelRequest.GetDomain()) if err != nil { @@ -1287,7 +1288,7 @@ func (wh *WorkflowHandler) RespondDecisionTaskCompleted( } // Count the request in the RPS, but we still accept it even if RPS is exceeded - wh.rateLimiter.TryConsume(1) + wh.rateLimiter.Allow() if completeRequest.TaskToken == nil { return nil, wh.error(errTaskTokenNotSet, scope) @@ -1365,7 +1366,7 @@ func (wh *WorkflowHandler) RespondDecisionTaskFailed( } // Count the request in the RPS, but we still accept it even if RPS is exceeded - wh.rateLimiter.TryConsume(1) + wh.rateLimiter.Allow() if failedRequest.TaskToken == nil { return wh.error(errTaskTokenNotSet, scope) @@ -1435,7 +1436,7 @@ func (wh *WorkflowHandler) RespondQueryTaskCompleted( } // Count the request in the RPS, but we still accept it even if RPS is exceeded - wh.rateLimiter.TryConsume(1) + wh.rateLimiter.Allow() if completeRequest.TaskToken == nil { return wh.error(errTaskTokenNotSet, scope) @@ -1487,7 +1488,7 @@ func (wh *WorkflowHandler) StartWorkflowExecution( return nil, wh.error(errRequestNotSet, scope) } - if ok, _ := wh.rateLimiter.TryConsume(1); !ok { + if ok := wh.rateLimiter.Allow(); !ok { return nil, wh.error(createServiceBusyError(), scope) } @@ -1630,7 +1631,7 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory( return nil, wh.error(errRequestNotSet, scope) } - if ok, _ := wh.rateLimiter.TryConsume(1); !ok { + if ok := wh.rateLimiter.Allow(); !ok { return nil, wh.error(createServiceBusyError(), scope) } @@ -1832,7 +1833,7 @@ func (wh *WorkflowHandler) SignalWorkflowExecution(ctx context.Context, return wh.error(errRequestNotSet, scope) } - if ok, _ := wh.rateLimiter.TryConsume(1); !ok { + if ok := wh.rateLimiter.Allow(); !ok { return wh.error(createServiceBusyError(), scope) } @@ -1911,7 +1912,7 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(ctx context.Context, return nil, wh.error(errRequestNotSet, scope) } - if ok, _ := wh.rateLimiter.TryConsume(1); !ok { + if ok := wh.rateLimiter.Allow(); !ok { return nil, wh.error(createServiceBusyError(), scope) } @@ -2069,7 +2070,7 @@ func (wh *WorkflowHandler) TerminateWorkflowExecution(ctx context.Context, return wh.error(errRequestNotSet, scope) } - if ok, _ := wh.rateLimiter.TryConsume(1); !ok { + if ok := wh.rateLimiter.Allow(); !ok { return wh.error(createServiceBusyError(), scope) } @@ -2114,7 +2115,7 @@ func (wh *WorkflowHandler) ResetWorkflowExecution(ctx context.Context, return nil, wh.error(errRequestNotSet, scope) } - if ok, _ := wh.rateLimiter.TryConsume(1); !ok { + if ok := wh.rateLimiter.Allow(); !ok { return nil, wh.error(createServiceBusyError(), scope) } @@ -2159,7 +2160,7 @@ func (wh *WorkflowHandler) RequestCancelWorkflowExecution( return wh.error(errRequestNotSet, scope) } - if ok, _ := wh.rateLimiter.TryConsume(1); !ok { + if ok := wh.rateLimiter.Allow(); !ok { return wh.error(createServiceBusyError(), scope) } @@ -2203,7 +2204,7 @@ func (wh *WorkflowHandler) ListOpenWorkflowExecutions(ctx context.Context, return nil, wh.error(errRequestNotSet, scope) } - if ok, _ := wh.rateLimiter.TryConsume(1); !ok { + if ok := wh.rateLimiter.Allow(); !ok { return nil, wh.error(createServiceBusyError(), scope) } @@ -2310,7 +2311,7 @@ func (wh *WorkflowHandler) ListClosedWorkflowExecutions(ctx context.Context, return nil, wh.error(errRequestNotSet, scope) } - if ok, _ := wh.rateLimiter.TryConsume(1); !ok { + if ok := wh.rateLimiter.Allow(); !ok { return nil, wh.error(createServiceBusyError(), scope) } @@ -2435,7 +2436,7 @@ func (wh *WorkflowHandler) ListWorkflowExecutions(ctx context.Context, listReque return nil, wh.error(errRequestNotSet, scope) } - if ok, _ := wh.rateLimiter.TryConsume(1); !ok { + if ok := wh.rateLimiter.Allow(); !ok { return nil, wh.error(createServiceBusyError(), scope) } @@ -2495,7 +2496,7 @@ func (wh *WorkflowHandler) ScanWorkflowExecutions(ctx context.Context, listReque return nil, wh.error(errRequestNotSet, scope) } - if ok, _ := wh.rateLimiter.TryConsume(1); !ok { + if ok := wh.rateLimiter.Allow(); !ok { return nil, wh.error(createServiceBusyError(), scope) } @@ -2555,7 +2556,7 @@ func (wh *WorkflowHandler) CountWorkflowExecutions(ctx context.Context, countReq return nil, wh.error(errRequestNotSet, scope) } - if ok, _ := wh.rateLimiter.TryConsume(1); !ok { + if ok := wh.rateLimiter.Allow(); !ok { return nil, wh.error(createServiceBusyError(), scope) } @@ -2766,7 +2767,7 @@ func (wh *WorkflowHandler) DescribeWorkflowExecution(ctx context.Context, reques return nil, wh.error(errRequestNotSet, scope) } - if ok, _ := wh.rateLimiter.TryConsume(1); !ok { + if ok := wh.rateLimiter.Allow(); !ok { return nil, wh.error(createServiceBusyError(), scope) } @@ -2811,7 +2812,7 @@ func (wh *WorkflowHandler) DescribeTaskList(ctx context.Context, request *gen.De return nil, wh.error(errRequestNotSet, scope) } - if ok, _ := wh.rateLimiter.TryConsume(1); !ok { + if ok := wh.rateLimiter.Allow(); !ok { return nil, wh.error(createServiceBusyError(), scope) }