Skip to content

Commit

Permalink
matching: fix bug that can causes tasks to be dropped (cadence-workfl…
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 authored May 1, 2019
1 parent 5971e43 commit 6d2cbad
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 4 deletions.
5 changes: 4 additions & 1 deletion service/matching/matchingEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1539,6 +1539,9 @@ func (s *matchingEngineSuite) TestTaskExpiryAndCompletion() {
const rangeSize = 10
s.matchingEngine.config.RangeSize = rangeSize
s.matchingEngine.config.MaxTaskDeleteBatchSize = dynamicconfig.GetIntPropertyFilteredByTaskListInfo(2)
// set idle timer check to a really small value to assert that we don't accidentally drop tasks while blocking
// on enqueuing a task to task buffer
s.matchingEngine.config.IdleTasklistCheckInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(time.Microsecond)

testCases := []struct {
batchSize int
Expand All @@ -1560,7 +1563,7 @@ func (s *matchingEngineSuite) TestTaskExpiryAndCompletion() {
ScheduleToStartTimeoutSeconds: common.Int32Ptr(5),
}
if i%2 == 0 {
// simulates creating a task whos scheduledToStartTimeout is already expired
// simulates creating a task whose scheduledToStartTimeout is already expired
addRequest.ScheduleToStartTimeoutSeconds = common.Int32Ptr(-5)
}
_, err := s.matchingEngine.AddActivityTask(&addRequest)
Expand Down
15 changes: 12 additions & 3 deletions service/matching/taskReader.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,19 @@ func (c *taskListManagerImpl) addTasksToBuffer(
c.domainScope.IncCounter(metrics.ExpiredTasksCounter)
continue
}
c.taskAckManager.addTask(t.TaskID)
if !c.addSingleTaskToBuffer(t, lastWriteTime, idleTimer) {
return false // we are shutting down the task list
}
}
return true
}

func (c *taskListManagerImpl) addSingleTaskToBuffer(
task *persistence.TaskInfo, lastWriteTime time.Time, idleTimer *time.Timer) bool {
c.taskAckManager.addTask(task.TaskID)
for {
select {
case c.taskBuffer <- t:
case c.taskBuffer <- task:
return true
case <-idleTimer.C:
if c.isIdle(lastWriteTime) {
Expand All @@ -210,5 +220,4 @@ func (c *taskListManagerImpl) addTasksToBuffer(
return false
}
}
return true
}

0 comments on commit 6d2cbad

Please sign in to comment.