Skip to content

Commit

Permalink
Bugfix: Recreate activity heartbeat timeout after first timer fire (c…
Browse files Browse the repository at this point in the history
…adence-workflow#658)

When the heartbeat timeout is created for the very first time it
uses StartEventID as the dedupe event ID. But it is possible that
it is a buffered event and no ID is specified when the timer task
is created. This trips the recreation logic from recreating the
timer.
This change instead relies on the schedule ID for the event
for dedupe event ID of the heartbeat timer.
  • Loading branch information
samarabbas authored and Liang Mei committed Apr 10, 2018
1 parent 6ad47ff commit 8f14f94
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 12 deletions.
177 changes: 177 additions & 0 deletions host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2028,6 +2028,183 @@ func (s *integrationSuite) TestActivityTimeouts() {
s.True(workflowComplete)
}

func (s *integrationSuite) TestActivityHeartbeatTimeouts() {
id := "integration-activity-heartbeat-timeout-test"
wt := "integration-activity-heartbeat-timeout-test-type"
tl := "integration-activity-heartbeat-timeout-test-tasklist"
identity := "worker1"
activityName := "timeout_activity"

workflowType := &workflow.WorkflowType{}
workflowType.Name = common.StringPtr(wt)

taskList := &workflow.TaskList{}
taskList.Name = common.StringPtr(tl)

request := &workflow.StartWorkflowExecutionRequest{
RequestId: common.StringPtr(uuid.New()),
Domain: common.StringPtr(s.domainName),
WorkflowId: common.StringPtr(id),
WorkflowType: workflowType,
TaskList: taskList,
Input: nil,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(300),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2),
Identity: common.StringPtr(identity),
}

we, err0 := s.engine.StartWorkflowExecution(createContext(), request)
s.Nil(err0)

s.logger.Infof("StartWorkflowExecution: response: %v \n", *we.RunId)

workflowComplete := false
activitiesScheduled := false
activitiesMap := map[int64]*workflow.HistoryEvent{}
failWorkflow := false
failReason := ""
var activityATimedout bool
var activityALastHeartbeat int
dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType,
previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision, error) {
if !activitiesScheduled {
activitiesScheduled = true
return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeScheduleActivityTask),
ScheduleActivityTaskDecisionAttributes: &workflow.ScheduleActivityTaskDecisionAttributes{
ActivityId: common.StringPtr("A"),
ActivityType: &workflow.ActivityType{Name: common.StringPtr(activityName)},
TaskList: &workflow.TaskList{Name: &tl},
Input: []byte("Heartbeat"),
ScheduleToCloseTimeoutSeconds: common.Int32Ptr(35),
ScheduleToStartTimeoutSeconds: common.Int32Ptr(20),
StartToCloseTimeoutSeconds: common.Int32Ptr(15),
HeartbeatTimeoutSeconds: common.Int32Ptr(3), // ActivityID A is expected to timeout using Heartbeat
},
}}, nil
} else if previousStartedEventID > 0 {
for _, event := range history.Events[previousStartedEventID:] {
if event.GetEventType() == workflow.EventTypeActivityTaskScheduled {
activitiesMap[event.GetEventId()] = event
}

if event.GetEventType() == workflow.EventTypeActivityTaskTimedOut {
timeoutEvent := event.ActivityTaskTimedOutEventAttributes
scheduledEvent, ok := activitiesMap[timeoutEvent.GetScheduledEventId()]
if !ok {
return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeFailWorkflowExecution),
FailWorkflowExecutionDecisionAttributes: &workflow.FailWorkflowExecutionDecisionAttributes{
Reason: common.StringPtr("ScheduledEvent not found."),
},
}}, nil
}

switch timeoutEvent.GetTimeoutType() {
case workflow.TimeoutTypeHeartbeat:
if scheduledEvent.ActivityTaskScheduledEventAttributes.GetActivityId() == "A" {
activityATimedout = true
activityALastHeartbeat, _ = strconv.Atoi(string(timeoutEvent.Details))
} else {
failWorkflow = true
failReason = "ActivityID A is expected to timeout with Heartbeat"
}
}
}
}
}

if failWorkflow {
s.logger.Errorf("Failing workflow.")
workflowComplete = true
return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeFailWorkflowExecution),
FailWorkflowExecutionDecisionAttributes: &workflow.FailWorkflowExecutionDecisionAttributes{
Reason: common.StringPtr(failReason),
},
}}, nil
}

if activityATimedout {
s.logger.Info("Completing Workflow.")
workflowComplete = true
return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution),
CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{
Result: []byte("Done."),
},
}}, nil
}

return nil, []*workflow.Decision{}, nil
}

atHandler := func(execution *workflow.WorkflowExecution, activityType *workflow.ActivityType,
activityID string, input []byte, taskToken []byte) ([]byte, bool, error) {
s.Equal(id, *execution.WorkflowId)
s.Equal(activityName, *activityType.Name)
timeoutType := string(input)
switch timeoutType {
case "Heartbeat":
s.logger.Info("Starting hearbeat activity.")
go func() {
for i := 0; i < 6; i++ {
s.logger.Infof("Heartbeating for activity: %s, count: %d", activityID, i)
_, err := s.engine.RecordActivityTaskHeartbeat(createContext(), &workflow.RecordActivityTaskHeartbeatRequest{
TaskToken: taskToken, Details: []byte(strconv.Itoa(i))})
s.Nil(err)
time.Sleep(1 * time.Second)
}
s.logger.Info("End Heartbeating.")
}()
s.logger.Info("Sleeping hearbeat activity.")
time.Sleep(12 * time.Second)
}

return []byte("Activity Result."), false, nil
}

poller := &taskPoller{
engine: s.engine,
domain: s.domainName,
taskList: taskList,
identity: identity,
decisionHandler: dtHandler,
activityHandler: atHandler,
logger: s.logger,
suite: s,
}

_, err := poller.pollAndProcessDecisionTask(false, false)
s.True(err == nil || err == matching.ErrNoTasks)

for i := 0; i < 1; i++ {
go func() {
err = poller.pollAndProcessActivityTask(false)
s.logger.Infof("Activity Processing Completed. Error: %v", err)
}()
}

s.logger.Infof("Waiting for workflow to complete: RunId: %v", *we.RunId)
for i := 0; i < 10; i++ {
s.logger.Infof("Processing decision task: %v", i)
_, err := poller.pollAndProcessDecisionTask(false, false)
s.Nil(err, "Poll for decision task failed.")

if workflowComplete {
break
}
}

s.printWorkflowHistory(s.domainName, &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(id),
RunId: common.StringPtr(we.GetRunId()),
})
s.True(workflowComplete)
s.True(activityATimedout)
s.Equal(5, activityALastHeartbeat)
}

func (s *integrationSuite) TestSequential_UserTimers() {
id := "interation-sequential-user-timers-test"
wt := "interation-sequential-user-timers-test-type"
Expand Down
24 changes: 12 additions & 12 deletions service/history/timerBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,12 +284,12 @@ func (tb *timerBuilder) loadActivityTimers(msBuilder *mutableStateBuilder) {
if v.StartedID != emptyEventID {
startToCloseExpiry := v.StartedTime.Add(time.Duration(v.StartToCloseTimeout) * time.Second)
td := &timerDetails{
SequenceID: SequenceID{VisibilityTimestamp: startToCloseExpiry},
ActivityID: v.ScheduleID,
EventID: v.StartedID,
TimeoutType: w.TimeoutTypeStartToClose,
TimeoutSec: v.StartToCloseTimeout,
TaskCreated: (v.TimerTaskStatus & TimerTaskStatusCreatedStartToClose) != 0}
SequenceID: SequenceID{VisibilityTimestamp: startToCloseExpiry},
ActivityID: v.ScheduleID,
EventID: v.ScheduleID,
TimeoutType: w.TimeoutTypeStartToClose,
TimeoutSec: v.StartToCloseTimeout,
TaskCreated: (v.TimerTaskStatus & TimerTaskStatusCreatedStartToClose) != 0}
tb.activityTimers = append(tb.activityTimers, td)
if v.HeartbeatTimeout > 0 {
lastHeartBeatTS := v.LastHeartBeatUpdatedTime
Expand All @@ -298,12 +298,12 @@ func (tb *timerBuilder) loadActivityTimers(msBuilder *mutableStateBuilder) {
}
heartBeatExpiry := lastHeartBeatTS.Add(time.Duration(v.HeartbeatTimeout) * time.Second)
td := &timerDetails{
SequenceID: SequenceID{VisibilityTimestamp: heartBeatExpiry},
ActivityID: v.ScheduleID,
EventID: v.StartedID,
TimeoutType: w.TimeoutTypeHeartbeat,
TimeoutSec: v.HeartbeatTimeout,
TaskCreated: (v.TimerTaskStatus & TimerTaskStatusCreatedHeartbeat) != 0}
SequenceID: SequenceID{VisibilityTimestamp: heartBeatExpiry},
ActivityID: v.ScheduleID,
EventID: v.ScheduleID,
TimeoutType: w.TimeoutTypeHeartbeat,
TimeoutSec: v.HeartbeatTimeout,
TaskCreated: (v.TimerTaskStatus & TimerTaskStatusCreatedHeartbeat) != 0}
tb.activityTimers = append(tb.activityTimers, td)
}
} else {
Expand Down

0 comments on commit 8f14f94

Please sign in to comment.