Skip to content

Commit

Permalink
Matching engine fix for flaky unit test (cadence-workflow#349)
Browse files Browse the repository at this point in the history
TaskListManager needs range to updated before starting task writer so it
can initialize max read level correctly using the initialized
taskSequenceNumber on TasklistMgr.  If ordering of reversed then
MaxReadLevel is initialized incorrectly to -1 and will rely on a task to
be written before it can be set to correct value.  This results in
GetTasks to use incorrectly initialized MaxReadLevel and will never read
tasks from persistence store.
  • Loading branch information
samarabbas authored Sep 12, 2017
1 parent f358b88 commit a4b6171
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 5 deletions.
4 changes: 2 additions & 2 deletions service/matching/matchingEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ func (s *matchingEngineSuite) TestMultipleEnginesActivitiesRangeStealing() {
workflowExecution := workflow.WorkflowExecution{RunId: &runID, WorkflowId: &workflowID}

const engineCount = 2
const taskCount = 200
const taskCount = 400
const iterations = 2
const initialRangeID = 0
const rangeSize = 10
Expand Down Expand Up @@ -985,7 +985,7 @@ func (s *matchingEngineSuite) TestMultipleEnginesDecisionsRangeStealing() {
workflowExecution := workflow.WorkflowExecution{RunId: &runID, WorkflowId: &workflowID}

const engineCount = 2
const taskCount = 200
const taskCount = 400
const iterations = 2
const initialRangeID = 0
const rangeSize = 10
Expand Down
9 changes: 6 additions & 3 deletions service/matching/taskListManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,17 @@ type syncMatchResponse struct {
func (c *taskListManagerImpl) Start() error {
defer c.startWG.Done()

c.taskWriter.Start()
c.signalNewTask()
go c.getTasksPump()
// Make sure to grab the range first before starting task writer, as it needs the range to initialize maxReadLevel
err := c.updateRangeIfNeeded() // Grabs a new range and updates read and ackLevels
if err != nil {
c.Stop()
return err
}

c.taskWriter.Start()
c.signalNewTask()
go c.getTasksPump()

return nil
}

Expand Down

0 comments on commit a4b6171

Please sign in to comment.