Skip to content

Commit

Permalink
Fix activity unit tests to check for mutable state.
Browse files Browse the repository at this point in the history
Summary:
Rename isActivityHeartBeatRunning() -> isActivityRunning()

History engine to retry on add AddActivityTaskStartedEvent()

Reviewers: maxim, samar

Reviewed By: samar

Subscribers: jenkins

Differential Revision: https://code.uberinternal.com/D727447
  • Loading branch information
sivakku committed Feb 6, 2017
1 parent 26cf646 commit 0072289
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 49 deletions.
16 changes: 10 additions & 6 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ Update_History_Loop:

event := builder.AddDecisionTaskStartedEvent(scheduleID, request.PollRequest)
if event == nil {
return nil, &workflow.InternalServiceError{Message: "Unable to add decision started event to history"}
// Let's retry and see if the decision still exist.
continue Update_History_Loop
}

// Start a timer for the decision task.
Expand Down Expand Up @@ -249,7 +250,7 @@ Update_History_Loop:

// Check execution state to make sure task is in the list of outstanding tasks and it is not yet started. If
// task is not outstanding than it is most probably a duplicate and complete the task.
isRunning, ai := msBuilder.isActivityHeartBeatRunning(scheduleID)
isRunning, ai := msBuilder.isActivityRunning(scheduleID)
if !isRunning {
logDuplicateTaskEvent(context.logger, persistence.TaskTypeActivity, request.GetTaskId(), scheduleID, emptyEventID,
isRunning)
Expand All @@ -268,7 +269,8 @@ Update_History_Loop:

event := builder.AddActivityTaskStartedEvent(scheduleID, request.PollRequest)
if event == nil {
return nil, &workflow.InternalServiceError{Message: "Unable to add started event to history"}
// Let's retry and see if the activity still exist.
continue Update_History_Loop
}

// Start a timer for the activity task.
Expand Down Expand Up @@ -475,7 +477,8 @@ Update_History_Loop:
}

if builder.AddActivityTaskCompletedEvent(scheduleID, startedID, request) == nil {
return &workflow.InternalServiceError{Message: "Unable to add completed event to history"}
// Let's retry and see if the activity still exist.
continue Update_History_Loop
}

msBuilder.DeletePendingActivity(scheduleID)
Expand Down Expand Up @@ -541,7 +544,8 @@ Update_History_Loop:
}

if builder.AddActivityTaskFailedEvent(scheduleID, startedID, request) == nil {
return &workflow.InternalServiceError{Message: "Unable to add failed event to history"}
// Let's retry and see if the activity still exist.
continue Update_History_Loop
}

msBuilder.DeletePendingActivity(scheduleID)
Expand Down Expand Up @@ -600,7 +604,7 @@ Update_History_Loop:
}

scheduleID := token.ScheduleID
isRunning, ai := msBuilder.isActivityHeartBeatRunning(scheduleID)
isRunning, ai := msBuilder.isActivityRunning(scheduleID)
if !isRunning || ai.StartedID == emptyEventID {
e.logger.Debugf("Activity HeartBeat: scheduleEventID: %v, ActivityInfo: %+v, Exist: %v",
scheduleID, ai, isRunning)
Expand Down
2 changes: 1 addition & 1 deletion service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (e *mutableStateBuilder) Load(activityInfos map[int64]*persistence.Activity
e.pendingTimerInfoIDs = timerInfos
}

func (e *mutableStateBuilder) isActivityHeartBeatRunning(scheduleEventID int64) (bool, *persistence.ActivityInfo) {
func (e *mutableStateBuilder) isActivityRunning(scheduleEventID int64) (bool, *persistence.ActivityInfo) {
a, ok := e.pendingActivityInfoIDs[scheduleEventID]
return ok, a
}
Expand Down
6 changes: 3 additions & 3 deletions service/history/timerBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (tb *timerBuilder) AddScheduleToStartActivityTimeout(scheduleID int64, sche

func (tb *timerBuilder) AddScheduleToCloseActivityTimeout(scheduleID int64,
msBuilder *mutableStateBuilder) (*persistence.ActivityTimeoutTask, error) {
ok, ai := msBuilder.isActivityHeartBeatRunning(scheduleID)
ok, ai := msBuilder.isActivityRunning(scheduleID)
if !ok {
return nil, fmt.Errorf("ScheduleToClose: Unable to find activity Info in mutable state for event id: %d", scheduleID)
}
Expand All @@ -180,7 +180,7 @@ func (tb *timerBuilder) AddScheduleToCloseActivityTimeout(scheduleID int64,

func (tb *timerBuilder) AddStartToCloseActivityTimeout(scheduleID int64,
msBuilder *mutableStateBuilder) (*persistence.ActivityTimeoutTask, error) {
ok, ai := msBuilder.isActivityHeartBeatRunning(scheduleID)
ok, ai := msBuilder.isActivityRunning(scheduleID)
if !ok {
return nil, fmt.Errorf("StartToClose: Unable to find activity Info in mutable state for event id: %d", scheduleID)
}
Expand All @@ -189,7 +189,7 @@ func (tb *timerBuilder) AddStartToCloseActivityTimeout(scheduleID int64,

func (tb *timerBuilder) AddHeartBeatActivityTimeout(scheduleID int64,
msBuilder *mutableStateBuilder) (*persistence.ActivityTimeoutTask, error) {
ok, ai := msBuilder.isActivityHeartBeatRunning(scheduleID)
ok, ai := msBuilder.isActivityRunning(scheduleID)
if !ok {
return nil, fmt.Errorf("HeartBeat: Unable to find activity Info in mutable state for event id: %d", scheduleID)
}
Expand Down
23 changes: 13 additions & 10 deletions service/history/timerQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,37 +362,40 @@ Update_History_Loop:

scheduleID := timerTask.EventID

if isRunning, startedID := builder.isActivityTaskRunning(scheduleID); isRunning {
if isRunning, ai := msBuilder.isActivityRunning(scheduleID); isRunning {
timeoutType := workflow.TimeoutType(timerTask.TimeoutType)
t.logger.Debugf("Activity TimeoutType: %v, scheduledID: %v, startedId: %v. \n",
timeoutType, scheduleID, startedID)
timeoutType, scheduleID, ai.StartedID)

switch timeoutType {
case workflow.TimeoutType_SCHEDULE_TO_CLOSE:
builder.AddActivityTaskTimedOutEvent(scheduleID, startedID, timeoutType, nil)
builder.AddActivityTaskTimedOutEvent(scheduleID, ai.StartedID, timeoutType, nil)
msBuilder.DeletePendingActivity(scheduleID)
scheduleNewDecision = !builder.hasPendingDecisionTask()

case workflow.TimeoutType_START_TO_CLOSE:
if startedID != emptyEventID {
builder.AddActivityTaskTimedOutEvent(scheduleID, startedID, timeoutType, nil)
if ai.StartedID != emptyEventID {
builder.AddActivityTaskTimedOutEvent(scheduleID, ai.StartedID, timeoutType, nil)
msBuilder.DeletePendingActivity(scheduleID)
scheduleNewDecision = !builder.hasPendingDecisionTask()
}

case workflow.TimeoutType_HEARTBEAT:
if startedID != emptyEventID {
isTimerRunning, ai := msBuilder.isActivityHeartBeatRunning(scheduleID)
if ai.StartedID != emptyEventID {
isTimerRunning, ai := msBuilder.isActivityRunning(scheduleID)
if isTimerRunning {
t.logger.Debugf("Activity Heartbeat expired: %+v", *ai)
// The current heart beat expired.
builder.AddActivityTaskTimedOutEvent(scheduleID, startedID, timeoutType, ai.Details)
builder.AddActivityTaskTimedOutEvent(scheduleID, ai.StartedID, timeoutType, ai.Details)
msBuilder.DeletePendingActivity(scheduleID)
scheduleNewDecision = !builder.hasPendingDecisionTask()
}
}

case workflow.TimeoutType_SCHEDULE_TO_START:
if startedID == emptyEventID {
builder.AddActivityTaskTimedOutEvent(scheduleID, startedID, timeoutType, nil)
if ai.StartedID == emptyEventID {
builder.AddActivityTaskTimedOutEvent(scheduleID, ai.StartedID, timeoutType, nil)
msBuilder.DeletePendingActivity(scheduleID)
scheduleNewDecision = !builder.hasPendingDecisionTask()
}
}
Expand Down
74 changes: 45 additions & 29 deletions service/history/timerQueueProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,21 @@ func (s *timerQueueProcessorSuite) waitForTimerTasksToProcess(p timerQueueProces
}
}

func (s *timerQueueProcessorSuite) checkTimedOutEventFor(workflowExecution workflow.WorkflowExecution, scheduleID int64) (bool, *historyBuilder) {
func (s *timerQueueProcessorSuite) checkTimedOutEventFor(workflowExecution workflow.WorkflowExecution,
scheduleID int64) (bool, bool, *historyBuilder) {
info, err1 := s.GetWorkflowExecutionInfo(workflowExecution)
s.Nil(err1)
builder := newHistoryBuilder(s.logger)
builder.loadExecutionInfo(info)
isRunning, _ := builder.isActivityTaskRunning(scheduleID)
return isRunning, builder

minfo, err1 := s.GetWorkflowMutableState(workflowExecution)
s.Nil(err1)
msBuilder := newMutableStateBuilder(s.logger)
msBuilder.Load(minfo.ActivitInfos, minfo.TimerInfos)
isRunningFromMutableState, _ := msBuilder.isActivityRunning(scheduleID)

return isRunning, isRunningFromMutableState, builder
}

func (s *timerQueueProcessorSuite) checkTimedOutEventForUserTimer(workflowExecution workflow.WorkflowExecution,
Expand Down Expand Up @@ -301,13 +309,14 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTask() {
s.NotNil(t)
timerTasks := []persistence.Task{t}

s.updateHistoryAndTimers(workflowExecution, history, timerTasks, nil, nil)
s.updateHistoryAndTimers(workflowExecution, history, timerTasks, msBuilder.updateActivityInfos, nil)
processor.NotifyNewTimer()

s.waitForTimerTasksToProcess(processor)
s.Equal(uint64(1), processor.timerFiredCount)
running, b := s.checkTimedOutEventFor(workflowExecution, activityScheduled.GetEventId())
running, isRunningFromMS, b := s.checkTimedOutEventFor(workflowExecution, activityScheduled.GetEventId())
s.False(running)
s.False(isRunningFromMS)

// TimeoutType_SCHEDULE_TO_START - With Start
p := newTimerQueueProcessor(s.engineImpl, s.WorkflowMgr, s.logger).(*timerQueueProcessorImpl)
Expand All @@ -317,22 +326,24 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTask() {
&workflow.ScheduleActivityTaskDecisionAttributes{
ScheduleToStartTimeoutSeconds: common.Int32Ptr(1),
})
b.AddActivityTaskStartedEvent(ase.GetEventId(), &workflow.PollForActivityTaskRequest{})
aste := b.AddActivityTaskStartedEvent(ase.GetEventId(), &workflow.PollForActivityTaskRequest{})
history, err = b.Serialize()
s.Nil(err)

msBuilder = newMutableStateBuilder(s.logger)
t = tBuilder.AddScheduleToStartActivityTimeout(ase.GetEventId(), ase, msBuilder)
s.NotNil(t)
timerTasks = []persistence.Task{t}
msBuilder.updateActivityInfos[0].StartedID = aste.GetEventId()

s.updateHistoryAndTimers(workflowExecution, history, timerTasks, nil, nil)
s.updateHistoryAndTimers(workflowExecution, history, timerTasks, msBuilder.updateActivityInfos, nil)
p.NotifyNewTimer()

s.waitForTimerTasksToProcess(p)
s.Equal(uint64(1), p.timerFiredCount)
running, b = s.checkTimedOutEventFor(workflowExecution, ase.GetEventId())
running, isRunningFromMS, b = s.checkTimedOutEventFor(workflowExecution, ase.GetEventId())
s.True(running)
s.True(isRunningFromMS)

// TimeoutType_START_TO_CLOSE - Just start.
p = newTimerQueueProcessor(s.engineImpl, s.WorkflowMgr, s.logger).(*timerQueueProcessorImpl)
Expand All @@ -342,10 +353,11 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTask() {
&workflow.ScheduleActivityTaskDecisionAttributes{
StartToCloseTimeoutSeconds: common.Int32Ptr(1),
})
b.AddActivityTaskStartedEvent(ase.GetEventId(), &workflow.PollForActivityTaskRequest{})
aste = b.AddActivityTaskStartedEvent(ase.GetEventId(), &workflow.PollForActivityTaskRequest{})

msBuilder = newMutableStateBuilder(s.logger)
msBuilder.UpdatePendingActivity(ase.GetEventId(), &persistence.ActivityInfo{StartToCloseTimeout: 1})
msBuilder.UpdatePendingActivity(ase.GetEventId(), &persistence.ActivityInfo{
ScheduleID: ase.GetEventId(), StartedID: aste.GetEventId(), StartToCloseTimeout: 1})
t, err = tBuilder.AddStartToCloseActivityTimeout(ase.GetEventId(), msBuilder)
s.Nil(err)
s.NotNil(t)
Expand All @@ -354,13 +366,14 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTask() {
history, err = b.Serialize()
s.Nil(err)

s.updateHistoryAndTimers(workflowExecution, history, timerTasks, nil, nil)
s.updateHistoryAndTimers(workflowExecution, history, timerTasks, msBuilder.updateActivityInfos, nil)
p.NotifyNewTimer()

s.waitForTimerTasksToProcess(p)
s.Equal(uint64(1), p.timerFiredCount)
running, b = s.checkTimedOutEventFor(workflowExecution, ase.GetEventId())
running, isRunningFromMS, b = s.checkTimedOutEventFor(workflowExecution, ase.GetEventId())
s.False(running)
s.False(isRunningFromMS)

// TimeoutType_START_TO_CLOSE - Start and Completed activity.
p = newTimerQueueProcessor(s.engineImpl, s.WorkflowMgr, s.logger).(*timerQueueProcessorImpl)
Expand All @@ -370,7 +383,7 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTask() {
&workflow.ScheduleActivityTaskDecisionAttributes{
StartToCloseTimeoutSeconds: common.Int32Ptr(1),
})
aste := b.AddActivityTaskStartedEvent(ase.GetEventId(), &workflow.PollForActivityTaskRequest{})
aste = b.AddActivityTaskStartedEvent(ase.GetEventId(), &workflow.PollForActivityTaskRequest{})

msBuilder = newMutableStateBuilder(s.logger)
msBuilder.UpdatePendingActivity(ase.GetEventId(), &persistence.ActivityInfo{StartToCloseTimeout: 1})
Expand All @@ -387,13 +400,14 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTask() {
history, err = b.Serialize()
s.Nil(err)

s.updateHistoryAndTimers(workflowExecution, history, timerTasks, nil, nil)
s.updateHistoryAndTimers(workflowExecution, history, timerTasks, nil /* since activity is completed */, nil)
p.NotifyNewTimer()

s.waitForTimerTasksToProcess(p)
s.Equal(uint64(1), p.timerFiredCount)
running, b = s.checkTimedOutEventFor(workflowExecution, ase.GetEventId())
running, isRunningFromMS, b = s.checkTimedOutEventFor(workflowExecution, ase.GetEventId())
s.False(running)
s.False(isRunningFromMS)

// TimeoutType_SCHEDULE_TO_CLOSE - Just Scheduled.
p = newTimerQueueProcessor(s.engineImpl, s.WorkflowMgr, s.logger).(*timerQueueProcessorImpl)
Expand All @@ -405,7 +419,8 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTask() {
})

msBuilder = newMutableStateBuilder(s.logger)
msBuilder.UpdatePendingActivity(ase.GetEventId(), &persistence.ActivityInfo{ScheduleToCloseTimeout: 1})
msBuilder.UpdatePendingActivity(ase.GetEventId(), &persistence.ActivityInfo{
ScheduleID: ase.GetEventId(), StartedID: emptyEventID, ScheduleToCloseTimeout: 1})
t, err = tBuilder.AddScheduleToCloseActivityTimeout(ase.GetEventId(), msBuilder)
s.Nil(err)
s.NotNil(t)
Expand All @@ -414,13 +429,14 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTask() {
history, err = b.Serialize()
s.Nil(err)

s.updateHistoryAndTimers(workflowExecution, history, timerTasks, nil, nil)
s.updateHistoryAndTimers(workflowExecution, history, timerTasks, msBuilder.updateActivityInfos, nil)
p.NotifyNewTimer()

s.waitForTimerTasksToProcess(p)
s.Equal(uint64(1), p.timerFiredCount)
running, b = s.checkTimedOutEventFor(workflowExecution, ase.GetEventId())
running, isRunningFromMS, b = s.checkTimedOutEventFor(workflowExecution, ase.GetEventId())
s.False(running)
s.False(isRunningFromMS)

// TimeoutType_SCHEDULE_TO_CLOSE - Scheduled and started.
p = newTimerQueueProcessor(s.engineImpl, s.WorkflowMgr, s.logger).(*timerQueueProcessorImpl)
Expand All @@ -433,7 +449,8 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTask() {
aste = b.AddActivityTaskStartedEvent(ase.GetEventId(), &workflow.PollForActivityTaskRequest{})

msBuilder = newMutableStateBuilder(s.logger)
msBuilder.UpdatePendingActivity(ase.GetEventId(), &persistence.ActivityInfo{ScheduleToCloseTimeout: 1})
msBuilder.UpdatePendingActivity(ase.GetEventId(), &persistence.ActivityInfo{
ScheduleID: ase.GetEventId(), StartedID: aste.GetEventId(), ScheduleToCloseTimeout: 1})
t, err = tBuilder.AddScheduleToCloseActivityTimeout(ase.GetEventId(), msBuilder)
s.Nil(err)
s.NotNil(t)
Expand All @@ -442,13 +459,14 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTask() {
history, err = b.Serialize()
s.Nil(err)

s.updateHistoryAndTimers(workflowExecution, history, timerTasks, nil, nil)
s.updateHistoryAndTimers(workflowExecution, history, timerTasks, msBuilder.updateActivityInfos, nil)
p.NotifyNewTimer()

s.waitForTimerTasksToProcess(p)
s.Equal(uint64(1), p.timerFiredCount)
running, b = s.checkTimedOutEventFor(workflowExecution, ase.GetEventId())
running, isRunningFromMS, b = s.checkTimedOutEventFor(workflowExecution, ase.GetEventId())
s.False(running)
s.False(isRunningFromMS)

// TimeoutType_SCHEDULE_TO_CLOSE - Scheduled, started, completed.
p = newTimerQueueProcessor(s.engineImpl, s.WorkflowMgr, s.logger).(*timerQueueProcessorImpl)
Expand All @@ -475,13 +493,14 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTask() {
history, err = b.Serialize()
s.Nil(err)

s.updateHistoryAndTimers(workflowExecution, history, timerTasks, nil, nil)
s.updateHistoryAndTimers(workflowExecution, history, timerTasks, nil /* since it is completed */, nil)
p.NotifyNewTimer()

s.waitForTimerTasksToProcess(p)
s.Equal(uint64(1), p.timerFiredCount)
running, b = s.checkTimedOutEventFor(workflowExecution, ase.GetEventId())
running, isRunningFromMS, b = s.checkTimedOutEventFor(workflowExecution, ase.GetEventId())
s.False(running)
s.False(isRunningFromMS)

// TimeoutType_HEARTBEAT - Scheduled, started.
p = newTimerQueueProcessor(s.engineImpl, s.WorkflowMgr, s.logger).(*timerQueueProcessorImpl)
Expand All @@ -494,7 +513,8 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTask() {
aste = b.AddActivityTaskStartedEvent(ase.GetEventId(), &workflow.PollForActivityTaskRequest{})

msBuilder = newMutableStateBuilder(s.logger)
msBuilder.UpdatePendingActivity(ase.GetEventId(), &persistence.ActivityInfo{HeartbeatTimeout: 1})
msBuilder.UpdatePendingActivity(ase.GetEventId(), &persistence.ActivityInfo{
ScheduleID: ase.GetEventId(), StartedID: aste.GetEventId(), HeartbeatTimeout: 1})

t, err = tBuilder.AddHeartBeatActivityTimeout(ase.GetEventId(), msBuilder)
s.Nil(err)
Expand All @@ -504,18 +524,14 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTask() {
history, err = b.Serialize()
s.Nil(err)

// -- Update heart beat timer ID.
msBuilder = newMutableStateBuilder(s.logger)
msBuilder.UpdatePendingActivity(ase.GetEventId(), &persistence.ActivityInfo{
ScheduleID: ase.GetEventId(), HeartbeatTimeout: 1})

s.updateHistoryAndTimers(workflowExecution, history, timerTasks, msBuilder.updateActivityInfos, nil)
p.NotifyNewTimer()

s.waitForTimerTasksToProcess(p)
s.Equal(uint64(1), p.timerFiredCount)
running, b = s.checkTimedOutEventFor(workflowExecution, ase.GetEventId())
running, isRunningFromMS, b = s.checkTimedOutEventFor(workflowExecution, ase.GetEventId())
s.False(running)
s.False(isRunningFromMS)
}

func (s *timerQueueProcessorSuite) TestTimer_UserTimers() {
Expand Down

0 comments on commit 0072289

Please sign in to comment.