diff --git a/service/matching/matchingEngine_test.go b/service/matching/matchingEngine_test.go index 4a86bc04162..0557ca3b845 100644 --- a/service/matching/matchingEngine_test.go +++ b/service/matching/matchingEngine_test.go @@ -491,7 +491,7 @@ func (s *matchingEngineSuite) TestSyncMatchActivities() { dPtr := _defaultTaskDispatchRPS mgr := newTaskListManagerWithRateLimiter( s.matchingEngine, tlID, s.matchingEngine.config, - newRateLimiter(&dPtr, dispatchTTL), + newRateLimiter(&dPtr, dispatchTTL, _minBurst), ) s.matchingEngine.updateTaskList(tlID, mgr) s.taskManager.getTaskListManager(tlID).rangeID = initialRangeID @@ -648,7 +648,7 @@ func (s *matchingEngineSuite) concurrentPublishConsumeActivities( dPtr := _defaultTaskDispatchRPS mgr := newTaskListManagerWithRateLimiter( s.matchingEngine, tlID, s.matchingEngine.config, - newRateLimiter(&dPtr, dispatchTTL), + newRateLimiter(&dPtr, dispatchTTL, _minBurst), ) s.matchingEngine.updateTaskList(tlID, mgr) s.taskManager.getTaskListManager(tlID).rangeID = initialRangeID diff --git a/service/matching/service.go b/service/matching/service.go index cfa8fa4a273..fb6958ebe0d 100644 --- a/service/matching/service.go +++ b/service/matching/service.go @@ -35,9 +35,10 @@ type Config struct { LongPollExpirationInterval time.Duration // taskListManager configuration - RangeSize int64 - GetTasksBatchSize int - UpdateAckInterval time.Duration + RangeSize int64 + GetTasksBatchSize int + UpdateAckInterval time.Duration + MinTaskThrottlingBurstSize int // taskWriter configuration OutstandingTaskAppendsThreshold int @@ -54,6 +55,7 @@ func NewConfig() *Config { UpdateAckInterval: 10 * time.Second, OutstandingTaskAppendsThreshold: 250, MaxTaskBatchSize: 100, + MinTaskThrottlingBurstSize: 5, } } diff --git a/service/matching/taskListManager.go b/service/matching/taskListManager.go index 7b725e71f34..c953ba25b7c 100644 --- a/service/matching/taskListManager.go +++ b/service/matching/taskListManager.go @@ -74,16 +74,19 @@ type rateLimiter struct { ttl time.Duration } -func newRateLimiter(maxDispatchPerSecond *float64, ttl time.Duration) rateLimiter { +func newRateLimiter(maxDispatchPerSecond *float64, ttl time.Duration, minBurst int) rateLimiter { rl := rateLimiter{ maxDispatchPerSecond: maxDispatchPerSecond, ttl: ttl, ttlTimer: time.NewTimer(ttl), } // Note: Potentially expose burst config in future - limiter := rate.NewLimiter( - rate.Limit(*maxDispatchPerSecond), int(*maxDispatchPerSecond), - ) + // Set burst to be a minimum of 5 when maxDispatch is set to low numbers + burst := int(*maxDispatchPerSecond) + if burst <= minBurst { + burst = minBurst + } + limiter := rate.NewLimiter(rate.Limit(*maxDispatchPerSecond), burst) rl.globalLimiter.Store(limiter) return rl } @@ -130,7 +133,7 @@ func newTaskListManager( e *matchingEngineImpl, taskList *taskListID, config *Config, ) taskListManager { dPtr := _defaultTaskDispatchRPS - rl := newRateLimiter(&dPtr, _defaultTaskDispatchRPSTTL) + rl := newRateLimiter(&dPtr, _defaultTaskDispatchRPSTTL, config.MinTaskThrottlingBurstSize) return newTaskListManagerWithRateLimiter(e, taskList, config, rl) } @@ -626,7 +629,10 @@ deliverBufferTasksLoop: c.logger.Info("Tasklist manager context is cancelled, shutting down") break deliverBufferTasksLoop } - c.logger.Warnf("Unable to send tasks for poll, rate limit failed: %s", err.Error()) + c.logger.Debugf( + "Unable to send tasks for poll, rate limit failed, domain: %s, tasklist: %s, error: %s", + c.taskListID.domainID, c.taskListID.String(), err.Error(), + ) c.metricsClient.IncCounter(metrics.MatchingTaskListMgrScope, metrics.BufferThrottleCounter) continue } diff --git a/service/matching/taskListManager_test.go b/service/matching/taskListManager_test.go index 3a17b32ee6c..19a43e0d4a4 100644 --- a/service/matching/taskListManager_test.go +++ b/service/matching/taskListManager_test.go @@ -23,6 +23,10 @@ package matching import ( "sync" "testing" + "time" + + "github.com/stretchr/testify/assert" + "golang.org/x/time/rate" "github.com/uber/cadence/common/mocks" @@ -31,6 +35,8 @@ import ( "github.com/uber/cadence/common/persistence" ) +const _minBurst = 5 + func TestDeliverBufferTasks(t *testing.T) { tests := []func(tlm *taskListManagerImpl){ func(tlm *taskListManagerImpl) { close(tlm.taskBuffer) }, @@ -51,6 +57,13 @@ func TestDeliverBufferTasks(t *testing.T) { } } +func TestNewRateLimiter(t *testing.T) { + maxDispatch := float64(0.01) + rl := newRateLimiter(&maxDispatch, time.Second, _minBurst) + limiter := rl.globalLimiter.Load().(*rate.Limiter) + assert.Equal(t, _minBurst, limiter.Burst()) +} + func createTestTaskListManager() *taskListManagerImpl { logger := bark.NewLoggerFromLogrus(log.New()) tm := newTestTaskManager(logger)