Skip to content

Commit

Permalink
Remove throttling logs and set min burst size (cadence-workflow#523)
Browse files Browse the repository at this point in the history
  • Loading branch information
madhuravi authored Jan 23, 2018
1 parent 6ce4fdc commit 4725ace
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 11 deletions.
4 changes: 2 additions & 2 deletions service/matching/matchingEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ func (s *matchingEngineSuite) TestSyncMatchActivities() {
dPtr := _defaultTaskDispatchRPS
mgr := newTaskListManagerWithRateLimiter(
s.matchingEngine, tlID, s.matchingEngine.config,
newRateLimiter(&dPtr, dispatchTTL),
newRateLimiter(&dPtr, dispatchTTL, _minBurst),
)
s.matchingEngine.updateTaskList(tlID, mgr)
s.taskManager.getTaskListManager(tlID).rangeID = initialRangeID
Expand Down Expand Up @@ -648,7 +648,7 @@ func (s *matchingEngineSuite) concurrentPublishConsumeActivities(
dPtr := _defaultTaskDispatchRPS
mgr := newTaskListManagerWithRateLimiter(
s.matchingEngine, tlID, s.matchingEngine.config,
newRateLimiter(&dPtr, dispatchTTL),
newRateLimiter(&dPtr, dispatchTTL, _minBurst),
)
s.matchingEngine.updateTaskList(tlID, mgr)
s.taskManager.getTaskListManager(tlID).rangeID = initialRangeID
Expand Down
8 changes: 5 additions & 3 deletions service/matching/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ type Config struct {
LongPollExpirationInterval time.Duration

// taskListManager configuration
RangeSize int64
GetTasksBatchSize int
UpdateAckInterval time.Duration
RangeSize int64
GetTasksBatchSize int
UpdateAckInterval time.Duration
MinTaskThrottlingBurstSize int

// taskWriter configuration
OutstandingTaskAppendsThreshold int
Expand All @@ -54,6 +55,7 @@ func NewConfig() *Config {
UpdateAckInterval: 10 * time.Second,
OutstandingTaskAppendsThreshold: 250,
MaxTaskBatchSize: 100,
MinTaskThrottlingBurstSize: 5,
}
}

Expand Down
18 changes: 12 additions & 6 deletions service/matching/taskListManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,19 @@ type rateLimiter struct {
ttl time.Duration
}

func newRateLimiter(maxDispatchPerSecond *float64, ttl time.Duration) rateLimiter {
func newRateLimiter(maxDispatchPerSecond *float64, ttl time.Duration, minBurst int) rateLimiter {
rl := rateLimiter{
maxDispatchPerSecond: maxDispatchPerSecond,
ttl: ttl,
ttlTimer: time.NewTimer(ttl),
}
// Note: Potentially expose burst config in future
limiter := rate.NewLimiter(
rate.Limit(*maxDispatchPerSecond), int(*maxDispatchPerSecond),
)
// Set burst to be a minimum of 5 when maxDispatch is set to low numbers
burst := int(*maxDispatchPerSecond)
if burst <= minBurst {
burst = minBurst
}
limiter := rate.NewLimiter(rate.Limit(*maxDispatchPerSecond), burst)
rl.globalLimiter.Store(limiter)
return rl
}
Expand Down Expand Up @@ -130,7 +133,7 @@ func newTaskListManager(
e *matchingEngineImpl, taskList *taskListID, config *Config,
) taskListManager {
dPtr := _defaultTaskDispatchRPS
rl := newRateLimiter(&dPtr, _defaultTaskDispatchRPSTTL)
rl := newRateLimiter(&dPtr, _defaultTaskDispatchRPSTTL, config.MinTaskThrottlingBurstSize)
return newTaskListManagerWithRateLimiter(e, taskList, config, rl)
}

Expand Down Expand Up @@ -626,7 +629,10 @@ deliverBufferTasksLoop:
c.logger.Info("Tasklist manager context is cancelled, shutting down")
break deliverBufferTasksLoop
}
c.logger.Warnf("Unable to send tasks for poll, rate limit failed: %s", err.Error())
c.logger.Debugf(
"Unable to send tasks for poll, rate limit failed, domain: %s, tasklist: %s, error: %s",
c.taskListID.domainID, c.taskListID.String(), err.Error(),
)
c.metricsClient.IncCounter(metrics.MatchingTaskListMgrScope, metrics.BufferThrottleCounter)
continue
}
Expand Down
13 changes: 13 additions & 0 deletions service/matching/taskListManager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ package matching
import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"golang.org/x/time/rate"

"github.com/uber/cadence/common/mocks"

Expand All @@ -31,6 +35,8 @@ import (
"github.com/uber/cadence/common/persistence"
)

const _minBurst = 5

func TestDeliverBufferTasks(t *testing.T) {
tests := []func(tlm *taskListManagerImpl){
func(tlm *taskListManagerImpl) { close(tlm.taskBuffer) },
Expand All @@ -51,6 +57,13 @@ func TestDeliverBufferTasks(t *testing.T) {
}
}

func TestNewRateLimiter(t *testing.T) {
maxDispatch := float64(0.01)
rl := newRateLimiter(&maxDispatch, time.Second, _minBurst)
limiter := rl.globalLimiter.Load().(*rate.Limiter)
assert.Equal(t, _minBurst, limiter.Burst())
}

func createTestTaskListManager() *taskListManagerImpl {
logger := bark.NewLoggerFromLogrus(log.New())
tm := newTestTaskManager(logger)
Expand Down

0 comments on commit 4725ace

Please sign in to comment.