Skip to content

Commit

Permalink
Fix throttling burst and add debug logs (cadence-workflow#529)
Browse files Browse the repository at this point in the history
  • Loading branch information
madhuravi authored Jan 26, 2018
1 parent 11a308a commit 99f4899
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 15 deletions.
5 changes: 3 additions & 2 deletions service/matching/matchingEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,9 +627,10 @@ func (s *matchingEngineSuite) TestConcurrentPublishConsumeActivitiesWithZeroDisp
const taskCount = 100
s.matchingEngine.metricsClient = metrics.NewClient(tally.NewTestScope("test", nil), metrics.Matching)
throttleCt := s.concurrentPublishConsumeActivities(workerCount, taskCount, dispatchLimitFn)
s.logger.Infof("Number of tasks throttled: %d", throttleCt)
// atleast once from 0 dispatch poll, and until TTL is hit at which time throttle limit is reset
// hard to predict exactly how many times, since the atomic.Value load might not have updated.
s.True(throttleCt >= 1 && throttleCt < int64(workerCount*int(taskCount)))
s.True(throttleCt >= 1)
}

func (s *matchingEngineSuite) concurrentPublishConsumeActivities(
Expand Down Expand Up @@ -660,7 +661,6 @@ func (s *matchingEngineSuite) concurrentPublishConsumeActivities(

taskList := &workflow.TaskList{}
taskList.Name = &tl

var wg sync.WaitGroup
wg.Add(2 * workerCount)

Expand Down Expand Up @@ -716,6 +716,7 @@ func (s *matchingEngineSuite) concurrentPublishConsumeActivities(
Identity: &identity,
})}
}, nil)

for p := 0; p < workerCount; p++ {
go func(wNum int) {
defer wg.Done()
Expand Down
2 changes: 1 addition & 1 deletion service/matching/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewConfig() *Config {
IdleTasklistCheckInterval: 5 * time.Minute,
OutstandingTaskAppendsThreshold: 250,
MaxTaskBatchSize: 100,
MinTaskThrottlingBurstSize: 10000,
MinTaskThrottlingBurstSize: 1,
}
}

Expand Down
37 changes: 25 additions & 12 deletions service/matching/taskListManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"context"
"errors"
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -72,36 +73,40 @@ type rateLimiter struct {
// lower(existing TTL, input TTL). After TTL, pick input TTL if different from existing TTL
ttlTimer *time.Timer
ttl time.Duration
minBurst int
}

func newRateLimiter(maxDispatchPerSecond *float64, ttl time.Duration, minBurst int) rateLimiter {
rl := rateLimiter{
maxDispatchPerSecond: maxDispatchPerSecond,
ttl: ttl,
ttlTimer: time.NewTimer(ttl),
// Note: Potentially expose burst config to users in future
minBurst: minBurst,
}
// Note: Potentially expose burst config in future
// Set burst to be a minimum of 5 when maxDispatch is set to low numbers
burst := int(*maxDispatchPerSecond)
if burst <= minBurst {
burst = minBurst
}
limiter := rate.NewLimiter(rate.Limit(*maxDispatchPerSecond), burst)
rl.globalLimiter.Store(limiter)
rl.storeLimiter(maxDispatchPerSecond)
return rl
}

func (rl *rateLimiter) UpdateMaxDispatch(maxDispatchPerSecond *float64) {
if rl.shouldUpdate(maxDispatchPerSecond) {
rl.Lock()
rl.maxDispatchPerSecond = maxDispatchPerSecond
rl.globalLimiter.Store(
rate.NewLimiter(rate.Limit(*maxDispatchPerSecond), int(*maxDispatchPerSecond)),
)
rl.storeLimiter(maxDispatchPerSecond)
rl.Unlock()
}
}

func (rl *rateLimiter) storeLimiter(maxDispatchPerSecond *float64) {
burst := int(*maxDispatchPerSecond)
// If throttling is zero, burst also has to be 0
if *maxDispatchPerSecond != 0 && burst <= rl.minBurst {
burst = rl.minBurst
}
limiter := rate.NewLimiter(rate.Limit(*maxDispatchPerSecond), burst)
rl.globalLimiter.Store(limiter)
}

func (rl *rateLimiter) shouldUpdate(maxDispatchPerSecond *float64) bool {
if maxDispatchPerSecond == nil {
return false
Expand Down Expand Up @@ -619,10 +624,12 @@ func (c *taskListManagerImpl) trySyncMatch(task *persistence.TaskInfo) (*persist
request := &getTaskResult{task: task, C: make(chan *syncMatchResponse, 1), syncMatch: true}

rsv := c.rateLimiter.Reserve()
if !rsv.OK() {
// If we have to wait too long for reservation, better to store in task buffer and handle later.
if !rsv.OK() || rsv.Delay() > time.Second {
c.metricsClient.IncCounter(metrics.MatchingTaskListMgrScope, metrics.SyncThrottleCounter)
return nil, errAddTasklistThrottled
}
time.Sleep(rsv.Delay())
select {
case c.tasksForPoll <- request: // poller goroutine picked up the task
r := <-request.C
Expand All @@ -642,7 +649,13 @@ deliverBufferTasksLoop:
c.logger.Info("Tasklist manager context is cancelled, shutting down")
break deliverBufferTasksLoop
}
c.logger.Debugf(
"Unable to add buffer task, rate limit failed, domainId: %s, tasklist: %s, error: %s",
c.taskListID.domainID, c.taskListID.taskListName, err.Error(),
)
c.metricsClient.IncCounter(metrics.MatchingTaskListMgrScope, metrics.BufferThrottleCounter)
// This is to prevent busy looping when throttling is set to 0
runtime.Gosched()
continue
}
select {
Expand Down

0 comments on commit 99f4899

Please sign in to comment.