Skip to content

Commit

Permalink
Move frontend token bucket under quota policy (cadence-workflow#2152)
Browse files Browse the repository at this point in the history
  • Loading branch information
shreyassrivatsan authored Jul 2, 2019
1 parent f058b84 commit cb7e56f
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 29 deletions.
30 changes: 30 additions & 0 deletions common/quotas/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package quotas

// Policy corresponds to a quota policy. A policy allows implementing layered
// and more complex rate limiting functionality.
type Policy interface {
// Allow attempts to allow a request to go through. The method returns
// immediately with a true or false indicating if the request can make
// progress
Allow() bool
}
45 changes: 45 additions & 0 deletions common/quotas/policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package quotas

import (
"time"

"github.com/uber/cadence/common/tokenbucket"
)

type simpleRateLimitPolicy struct {
tb tokenbucket.TokenBucket
}

// NewSimpleRateLimiter returns a new simple rate limiter
func NewSimpleRateLimiter(tb tokenbucket.TokenBucket) Policy {
return &simpleRateLimitPolicy{tb}
}

func (s *simpleRateLimitPolicy) Allow() bool {
ok, _ := s.tb.TryConsume(1)
return ok
}

func (s *simpleRateLimitPolicy) Wait(d time.Duration) bool {
return s.tb.Consume(1, d)
}
59 changes: 30 additions & 29 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/quotas"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/common/tokenbucket"
"github.com/uber/cadence/service/worker/archiver"
Expand All @@ -72,7 +73,7 @@ type (
tokenSerializer common.TaskTokenSerializer
metricsClient metrics.Client
startWG sync.WaitGroup
rateLimiter tokenbucket.TokenBucket
rateLimiter quotas.Policy
config *Config
blobstoreClient blobstore.Client
versionChecker *versionChecker
Expand Down Expand Up @@ -165,7 +166,7 @@ func NewWorkflowHandler(sVice service.Service, config *Config, metadataMgr persi
tokenSerializer: common.NewJSONTaskTokenSerializer(),
metricsClient: sVice.GetMetricsClient(),
domainCache: cache.NewDomainCache(metadataMgr, sVice.GetClusterMetadata(), sVice.GetMetricsClient(), sVice.GetLogger()),
rateLimiter: tokenbucket.NewDynamicTokenBucket(config.RPS, clock.NewRealTimeSource()),
rateLimiter: quotas.NewSimpleRateLimiter(tokenbucket.NewDynamicTokenBucket(config.RPS, clock.NewRealTimeSource())),
blobstoreClient: blobstoreClient,
versionChecker: &versionChecker{checkVersion: config.EnableClientVersionCheck()},
domainHandler: newDomainHandler(
Expand Down Expand Up @@ -341,7 +342,7 @@ func (wh *WorkflowHandler) PollForActivityTask(
return nil, wh.error(errRequestNotSet, scope)
}

if ok, _ := wh.rateLimiter.TryConsume(1); !ok {
if ok := wh.rateLimiter.Allow(); !ok {
return nil, wh.error(createServiceBusyError(), scope)
}

Expand Down Expand Up @@ -420,7 +421,7 @@ func (wh *WorkflowHandler) PollForDecisionTask(
return nil, wh.error(errRequestNotSet, scope)
}

if ok, _ := wh.rateLimiter.TryConsume(1); !ok {
if ok := wh.rateLimiter.Allow(); !ok {
return nil, wh.error(createServiceBusyError(), scope)
}

Expand Down Expand Up @@ -553,7 +554,7 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeat(
}

// Count the request in the RPS, but we still accept it even if RPS is exceeded
wh.rateLimiter.TryConsume(1)
wh.rateLimiter.Allow()

wh.Service.GetLogger().Debug("Received RecordActivityTaskHeartbeat")
if heartbeatRequest.TaskToken == nil {
Expand Down Expand Up @@ -634,7 +635,7 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeatByID(
}

// Count the request in the RPS, but we still accept it even if RPS is exceeded
wh.rateLimiter.TryConsume(1)
wh.rateLimiter.Allow()

wh.Service.GetLogger().Debug("Received RecordActivityTaskHeartbeatByID")
domainID, err := wh.domainCache.GetDomainID(heartbeatRequest.GetDomain())
Expand Down Expand Up @@ -740,7 +741,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCompleted(
}

// Count the request in the RPS, but we still accept it even if RPS is exceeded
wh.rateLimiter.TryConsume(1)
wh.rateLimiter.Allow()

if completeRequest.TaskToken == nil {
return wh.error(errTaskTokenNotSet, scope)
Expand Down Expand Up @@ -822,7 +823,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCompletedByID(
}

// Count the request in the RPS, but we still accept it even if RPS is exceeded
wh.rateLimiter.TryConsume(1)
wh.rateLimiter.Allow()

domainID, err := wh.domainCache.GetDomainID(completeRequest.GetDomain())
if err != nil {
Expand Down Expand Up @@ -930,7 +931,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailed(
}

// Count the request in the RPS, but we still accept it even if RPS is exceeded
wh.rateLimiter.TryConsume(1)
wh.rateLimiter.Allow()

if failedRequest.TaskToken == nil {
return wh.error(errTaskTokenNotSet, scope)
Expand Down Expand Up @@ -1001,7 +1002,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailedByID(
}

// Count the request in the RPS, but we still accept it even if RPS is exceeded
wh.rateLimiter.TryConsume(1)
wh.rateLimiter.Allow()

domainID, err := wh.domainCache.GetDomainID(failedRequest.GetDomain())
if err != nil {
Expand Down Expand Up @@ -1097,7 +1098,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceled(
}

// Count the request in the RPS, but we still accept it even if RPS is exceeded
wh.rateLimiter.TryConsume(1)
wh.rateLimiter.Allow()

if cancelRequest.TaskToken == nil {
return wh.error(errTaskTokenNotSet, scope)
Expand Down Expand Up @@ -1180,7 +1181,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceledByID(
}

// Count the request in the RPS, but we still accept it even if RPS is exceeded
wh.rateLimiter.TryConsume(1)
wh.rateLimiter.Allow()

domainID, err := wh.domainCache.GetDomainID(cancelRequest.GetDomain())
if err != nil {
Expand Down Expand Up @@ -1287,7 +1288,7 @@ func (wh *WorkflowHandler) RespondDecisionTaskCompleted(
}

// Count the request in the RPS, but we still accept it even if RPS is exceeded
wh.rateLimiter.TryConsume(1)
wh.rateLimiter.Allow()

if completeRequest.TaskToken == nil {
return nil, wh.error(errTaskTokenNotSet, scope)
Expand Down Expand Up @@ -1365,7 +1366,7 @@ func (wh *WorkflowHandler) RespondDecisionTaskFailed(
}

// Count the request in the RPS, but we still accept it even if RPS is exceeded
wh.rateLimiter.TryConsume(1)
wh.rateLimiter.Allow()

if failedRequest.TaskToken == nil {
return wh.error(errTaskTokenNotSet, scope)
Expand Down Expand Up @@ -1435,7 +1436,7 @@ func (wh *WorkflowHandler) RespondQueryTaskCompleted(
}

// Count the request in the RPS, but we still accept it even if RPS is exceeded
wh.rateLimiter.TryConsume(1)
wh.rateLimiter.Allow()

if completeRequest.TaskToken == nil {
return wh.error(errTaskTokenNotSet, scope)
Expand Down Expand Up @@ -1487,7 +1488,7 @@ func (wh *WorkflowHandler) StartWorkflowExecution(
return nil, wh.error(errRequestNotSet, scope)
}

if ok, _ := wh.rateLimiter.TryConsume(1); !ok {
if ok := wh.rateLimiter.Allow(); !ok {
return nil, wh.error(createServiceBusyError(), scope)
}

Expand Down Expand Up @@ -1630,7 +1631,7 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
return nil, wh.error(errRequestNotSet, scope)
}

if ok, _ := wh.rateLimiter.TryConsume(1); !ok {
if ok := wh.rateLimiter.Allow(); !ok {
return nil, wh.error(createServiceBusyError(), scope)
}

Expand Down Expand Up @@ -1832,7 +1833,7 @@ func (wh *WorkflowHandler) SignalWorkflowExecution(ctx context.Context,
return wh.error(errRequestNotSet, scope)
}

if ok, _ := wh.rateLimiter.TryConsume(1); !ok {
if ok := wh.rateLimiter.Allow(); !ok {
return wh.error(createServiceBusyError(), scope)
}

Expand Down Expand Up @@ -1911,7 +1912,7 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(ctx context.Context,
return nil, wh.error(errRequestNotSet, scope)
}

if ok, _ := wh.rateLimiter.TryConsume(1); !ok {
if ok := wh.rateLimiter.Allow(); !ok {
return nil, wh.error(createServiceBusyError(), scope)
}

Expand Down Expand Up @@ -2069,7 +2070,7 @@ func (wh *WorkflowHandler) TerminateWorkflowExecution(ctx context.Context,
return wh.error(errRequestNotSet, scope)
}

if ok, _ := wh.rateLimiter.TryConsume(1); !ok {
if ok := wh.rateLimiter.Allow(); !ok {
return wh.error(createServiceBusyError(), scope)
}

Expand Down Expand Up @@ -2114,7 +2115,7 @@ func (wh *WorkflowHandler) ResetWorkflowExecution(ctx context.Context,
return nil, wh.error(errRequestNotSet, scope)
}

if ok, _ := wh.rateLimiter.TryConsume(1); !ok {
if ok := wh.rateLimiter.Allow(); !ok {
return nil, wh.error(createServiceBusyError(), scope)
}

Expand Down Expand Up @@ -2159,7 +2160,7 @@ func (wh *WorkflowHandler) RequestCancelWorkflowExecution(
return wh.error(errRequestNotSet, scope)
}

if ok, _ := wh.rateLimiter.TryConsume(1); !ok {
if ok := wh.rateLimiter.Allow(); !ok {
return wh.error(createServiceBusyError(), scope)
}

Expand Down Expand Up @@ -2203,7 +2204,7 @@ func (wh *WorkflowHandler) ListOpenWorkflowExecutions(ctx context.Context,
return nil, wh.error(errRequestNotSet, scope)
}

if ok, _ := wh.rateLimiter.TryConsume(1); !ok {
if ok := wh.rateLimiter.Allow(); !ok {
return nil, wh.error(createServiceBusyError(), scope)
}

Expand Down Expand Up @@ -2310,7 +2311,7 @@ func (wh *WorkflowHandler) ListClosedWorkflowExecutions(ctx context.Context,
return nil, wh.error(errRequestNotSet, scope)
}

if ok, _ := wh.rateLimiter.TryConsume(1); !ok {
if ok := wh.rateLimiter.Allow(); !ok {
return nil, wh.error(createServiceBusyError(), scope)
}

Expand Down Expand Up @@ -2435,7 +2436,7 @@ func (wh *WorkflowHandler) ListWorkflowExecutions(ctx context.Context, listReque
return nil, wh.error(errRequestNotSet, scope)
}

if ok, _ := wh.rateLimiter.TryConsume(1); !ok {
if ok := wh.rateLimiter.Allow(); !ok {
return nil, wh.error(createServiceBusyError(), scope)
}

Expand Down Expand Up @@ -2495,7 +2496,7 @@ func (wh *WorkflowHandler) ScanWorkflowExecutions(ctx context.Context, listReque
return nil, wh.error(errRequestNotSet, scope)
}

if ok, _ := wh.rateLimiter.TryConsume(1); !ok {
if ok := wh.rateLimiter.Allow(); !ok {
return nil, wh.error(createServiceBusyError(), scope)
}

Expand Down Expand Up @@ -2555,7 +2556,7 @@ func (wh *WorkflowHandler) CountWorkflowExecutions(ctx context.Context, countReq
return nil, wh.error(errRequestNotSet, scope)
}

if ok, _ := wh.rateLimiter.TryConsume(1); !ok {
if ok := wh.rateLimiter.Allow(); !ok {
return nil, wh.error(createServiceBusyError(), scope)
}

Expand Down Expand Up @@ -2766,7 +2767,7 @@ func (wh *WorkflowHandler) DescribeWorkflowExecution(ctx context.Context, reques
return nil, wh.error(errRequestNotSet, scope)
}

if ok, _ := wh.rateLimiter.TryConsume(1); !ok {
if ok := wh.rateLimiter.Allow(); !ok {
return nil, wh.error(createServiceBusyError(), scope)
}

Expand Down Expand Up @@ -2811,7 +2812,7 @@ func (wh *WorkflowHandler) DescribeTaskList(ctx context.Context, request *gen.De
return nil, wh.error(errRequestNotSet, scope)
}

if ok, _ := wh.rateLimiter.TryConsume(1); !ok {
if ok := wh.rateLimiter.Allow(); !ok {
return nil, wh.error(createServiceBusyError(), scope)
}

Expand Down

0 comments on commit cb7e56f

Please sign in to comment.