Skip to content

Commit

Permalink
Add timeout timer for renewed decision task (cadence-workflow#1023)
Browse files Browse the repository at this point in the history
* Add timeout timer for renewed decision task

* add integration test to verify relay decision task has proper timeout timer

* update comments for test
  • Loading branch information
yiminc authored Aug 1, 2018
1 parent 79798a6 commit 2d99fa3
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 0 deletions.
95 changes: 95 additions & 0 deletions host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6297,6 +6297,101 @@ func (s *integrationSuite) TestTransientDecisionTimeout() {
s.True(workflowComplete)
}

func (s *integrationSuite) TestRelayDecisionTimeout() {
id := "integration-relay-decision-timeout-test"
wt := "integration-relay-decision-timeout-test-type"
tl := "integration-relay-decision-timeout-test-tasklist"
identity := "worker1"

workflowType := &workflow.WorkflowType{}
workflowType.Name = common.StringPtr(wt)

taskList := &workflow.TaskList{}
taskList.Name = common.StringPtr(tl)

// Start workflow execution
request := &workflow.StartWorkflowExecutionRequest{
RequestId: common.StringPtr(uuid.New()),
Domain: common.StringPtr(s.domainName),
WorkflowId: common.StringPtr(id),
WorkflowType: workflowType,
TaskList: taskList,
Input: nil,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(100),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2),
Identity: common.StringPtr(identity),
}

we, err0 := s.engine.StartWorkflowExecution(createContext(), request)
s.Nil(err0)
s.logger.Infof("StartWorkflowExecution: response: %v \n", *we.RunId)

workflowExecution := &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(id),
RunId: common.StringPtr(*we.RunId),
}

workflowComplete, isFirst := false, true
dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType,
previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision, error) {
if isFirst {
isFirst = false
return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeRecordMarker),
RecordMarkerDecisionAttributes: &workflow.RecordMarkerDecisionAttributes{
MarkerName: common.StringPtr("test-marker"),
},
}}, nil
}
workflowComplete = true
return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution),
CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{},
}}, nil
}

poller := &taskPoller{
engine: s.engine,
domain: s.domainName,
taskList: taskList,
identity: identity,
decisionHandler: dtHandler,
activityHandler: nil,
logger: s.logger,
suite: s,
}

// First decision task complete with a marker decision, and request to relay decision (immediately return a new decision task)
_, newTask, err := poller.pollAndProcessDecisionTaskWithAttemptAndRetryAndForceNewDecision(false, false, false, false, 0, 3, true)
s.logger.Infof("pollAndProcessDecisionTask: %v", err)
s.Nil(err)
s.NotNil(newTask)
s.NotNil(newTask.DecisionTask)

time.Sleep(time.Second * 2) // wait 2s for relay decision to timeout
decisionTaskTimeout := false
for i := 0; i < 3; i++ {
events := s.getHistory(s.domainName, workflowExecution)
if len(events) >= 8 {
s.Equal(workflow.EventTypeDecisionTaskTimedOut, events[7].GetEventType())
s.Equal(workflow.TimeoutTypeStartToClose, events[7].DecisionTaskTimedOutEventAttributes.GetTimeoutType())
decisionTaskTimeout = true
break
}
time.Sleep(time.Second)
}
// verify relay decision task timeout
s.True(decisionTaskTimeout)
s.printWorkflowHistory(s.domainName, workflowExecution)

// Now complete workflow
_, err = poller.pollAndProcessDecisionTaskWithAttempt(true, false, false, false, int64(1))
s.logger.Infof("pollAndProcessDecisionTask: %v", err)
s.Nil(err)

s.True(workflowComplete)
}

func (s *integrationSuite) TestTaskProcessingProtectionForRateLimitError() {
id := "integration-task-processing-protection-for-rate-limit-error-test"
wt := "integration-task-processing-protection-for-rate-limit-error-test-type"
Expand Down
2 changes: 2 additions & 0 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1394,6 +1394,8 @@ Update_History_Loop:
TaskList: &workflow.TaskList{Name: common.StringPtr(di.TaskList)},
Identity: request.Identity,
})
timeOutTask := tBuilder.AddStartToCloseDecisionTimoutTask(di.ScheduleID, di.Attempt, di.DecisionTimeout)
timerTasks = append(timerTasks, timeOutTask)
}
}

Expand Down

0 comments on commit 2d99fa3

Please sign in to comment.