Skip to content

Commit

Permalink
Fix: set backoff timer for cron childWorkflow when it's started (cade…
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Apr 5, 2019
1 parent 6130d4b commit f880324
Show file tree
Hide file tree
Showing 7 changed files with 231 additions and 10 deletions.
4 changes: 2 additions & 2 deletions .gen/go/history/idl.go

Large diffs are not rendered by default.

48 changes: 46 additions & 2 deletions .gen/go/history/types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

160 changes: 160 additions & 0 deletions host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1692,6 +1692,166 @@ func (s *integrationSuite) TestChildWorkflowExecution() {
startedEvent.ChildWorkflowExecutionStartedEventAttributes.WorkflowExecution)
}

func (s *integrationSuite) TestCronChildWorkflowExecution() {
parentID := "integration-cron-child-workflow-test-parent"
childID := "integration-cron-child-workflow-test-child"
wtParent := "integration-cron-child-workflow-test-parent-type"
wtChild := "integration-cron-child-workflow-test-child-type"
tlParent := "integration-cron-child-workflow-test-parent-tasklist"
tlChild := "integration-cron-child-workflow-test-child-tasklist"
identity := "worker1"

cronSchedule := "@every 3s"
targetBackoffDuration := time.Second * 3
backoffDurationTolerance := time.Second

parentWorkflowType := &workflow.WorkflowType{}
parentWorkflowType.Name = common.StringPtr(wtParent)

childWorkflowType := &workflow.WorkflowType{}
childWorkflowType.Name = common.StringPtr(wtChild)

taskListParent := &workflow.TaskList{}
taskListParent.Name = common.StringPtr(tlParent)
taskListChild := &workflow.TaskList{}
taskListChild.Name = common.StringPtr(tlChild)

request := &workflow.StartWorkflowExecutionRequest{
RequestId: common.StringPtr(uuid.New()),
Domain: common.StringPtr(s.domainName),
WorkflowId: common.StringPtr(parentID),
WorkflowType: parentWorkflowType,
TaskList: taskListParent,
Input: nil,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(100),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1),
ChildPolicy: common.ChildPolicyPtr(workflow.ChildPolicyRequestCancel),
Identity: common.StringPtr(identity),
}

we, err0 := s.engine.StartWorkflowExecution(createContext(), request)
s.Nil(err0)
s.Logger.Infof("StartWorkflowExecution: response: %v \n", *we.RunId)

// decider logic
childExecutionStarted := false
var parentStartedEvent *workflow.HistoryEvent
var terminatedEvent *workflow.HistoryEvent
var startChildWorkflowTS time.Time
// Parent Decider Logic
dtHandlerParent := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType,
previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision, error) {
s.Logger.Infof("Processing decision task for WorkflowID: %v", *execution.WorkflowId)

if !childExecutionStarted {
s.Logger.Info("Starting child execution.")
childExecutionStarted = true
parentStartedEvent = history.Events[0]
startChildWorkflowTS = time.Now()
return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeStartChildWorkflowExecution),
StartChildWorkflowExecutionDecisionAttributes: &workflow.StartChildWorkflowExecutionDecisionAttributes{
WorkflowId: common.StringPtr(childID),
WorkflowType: childWorkflowType,
TaskList: taskListChild,
Input: nil,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(200),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2),
ChildPolicy: common.ChildPolicyPtr(workflow.ChildPolicyRequestCancel),
Control: nil,
CronSchedule: common.StringPtr(cronSchedule),
},
}}, nil
}
for _, event := range history.Events[previousStartedEventID:] {
if *event.EventType == workflow.EventTypeChildWorkflowExecutionTerminated {
terminatedEvent = event
return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution),
CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{
Result: []byte("Done."),
},
}}, nil
}
}
return nil, nil, nil
}

// Child Decider Logic
dtHandlerChild := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType,
previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision, error) {

s.Logger.Infof("Processing decision task for Child WorkflowID: %v", *execution.WorkflowId)
return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution),
CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{},
}}, nil
}

pollerParent := &TaskPoller{
Engine: s.engine,
Domain: s.domainName,
TaskList: taskListParent,
Identity: identity,
DecisionHandler: dtHandlerParent,
Logger: s.Logger,
T: s.T(),
}

pollerChild := &TaskPoller{
Engine: s.engine,
Domain: s.domainName,
TaskList: taskListChild,
Identity: identity,
DecisionHandler: dtHandlerChild,
Logger: s.Logger,
T: s.T(),
}

// Make first decision to start child execution
_, err := pollerParent.PollAndProcessDecisionTask(false, false)
s.Logger.Infof("PollAndProcessDecisionTask: %v", err)
s.Nil(err)
s.True(childExecutionStarted)
s.Equal(workflow.ChildPolicyRequestCancel,
parentStartedEvent.WorkflowExecutionStartedEventAttributes.GetChildPolicy())

// Process ChildExecution Started event
_, err = pollerParent.PollAndProcessDecisionTask(false, false)
s.Logger.Infof("PollAndProcessDecisionTask: %v", err)
s.Nil(err)

for i := 0; i < 2; i++ {
_, err = pollerChild.PollAndProcessDecisionTask(false, false)
s.Logger.Infof("PollAndProcessDecisionTask: %v", err)
s.Nil(err)

backoffDuration := time.Now().Sub(startChildWorkflowTS)
s.True(backoffDuration > targetBackoffDuration)
s.True(backoffDuration < targetBackoffDuration+backoffDurationTolerance)
startChildWorkflowTS = time.Now()
}

// terminate the childworkflow
terminateErr := s.engine.TerminateWorkflowExecution(createContext(), &workflow.TerminateWorkflowExecutionRequest{
Domain: common.StringPtr(s.domainName),
WorkflowExecution: &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(childID),
},
})
s.Nil(terminateErr)

// Process ChildExecution terminated event and complete parent execution
_, err = pollerParent.PollAndProcessDecisionTask(false, false)
s.Logger.Infof("PollAndProcessDecisionTask: %v", err)
s.Nil(err)
s.NotNil(terminatedEvent)
terminatedAttributes := terminatedEvent.ChildWorkflowExecutionTerminatedEventAttributes
s.Nil(terminatedAttributes.Domain)
s.Equal(childID, *terminatedAttributes.WorkflowExecution.WorkflowId)
s.Equal(wtChild, *terminatedAttributes.WorkflowType.Name)
}

func (s *integrationSuite) TestWorkflowTimeout() {
startTime := time.Now().UnixNano()

Expand Down
1 change: 1 addition & 0 deletions idl/github.com/uber/cadence/history.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ struct RequestCancelWorkflowExecutionRequest {
struct ScheduleDecisionTaskRequest {
10: optional string domainUUID
20: optional shared.WorkflowExecution workflowExecution
30: optional bool isFirstDecision
}

struct DescribeWorkflowExecutionRequest {
Expand Down
25 changes: 19 additions & 6 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ func (e *historyEngineImpl) StartWorkflowExecution(ctx context.Context, startReq

taskList := request.TaskList.GetName()
cronBackoffSeconds := startRequest.GetFirstDecisionTaskBackoffSeconds()
// Generate first decision task event if not child WF
// Generate first decision task event if not child WF and no first decision task backoff
transferTasks, firstDecisionTask, retError := e.generateFirstDecisionTask(domainID, msBuilder, startRequest.ParentExecutionInfo, taskList, cronBackoffSeconds)
if retError != nil {
return
Expand All @@ -550,7 +550,9 @@ func (e *historyEngineImpl) StartWorkflowExecution(ctx context.Context, startReq
timerTasks := []persistence.Task{&persistence.WorkflowTimeoutTask{
VisibilityTimestamp: e.shard.GetTimeSource().Now().Add(timeoutDuration),
}}
if cronBackoffSeconds != 0 {

// Only schedule the backoff timer task if not child WF and there's first decision task backoff
if cronBackoffSeconds != 0 && startRequest.ParentExecutionInfo == nil {
timerTasks = append(timerTasks, &persistence.WorkflowBackoffTimerTask{
VisibilityTimestamp: e.shard.GetTimeSource().Now().Add(cronBackoffDuration),
TimeoutType: persistence.WorkflowBackoffTimeoutTypeCron,
Expand Down Expand Up @@ -2567,15 +2569,26 @@ func (e *historyEngineImpl) ScheduleDecisionTask(ctx context.Context, scheduleRe
RunId: scheduleRequest.WorkflowExecution.RunId,
}

return e.updateWorkflowExecution(ctx, domainID, execution, false, true,
func(msBuilder mutableState, tBuilder *timerBuilder) ([]persistence.Task, error) {
return e.updateWorkflowExecutionWithAction(ctx, domainID, execution,
func(msBuilder mutableState, tBuilder *timerBuilder) (*updateWorkflowAction, error) {
if !msBuilder.IsWorkflowExecutionRunning() {
return nil, ErrWorkflowCompleted
}

// Noop
postActions := &updateWorkflowAction{
createDecision: true,
}

cronBackoffDuration := msBuilder.GetCronBackoffDuration()
if scheduleRequest.GetIsFirstDecision() && cronBackoffDuration != cron.NoBackoff {
postActions.timerTasks = append(postActions.timerTasks, &persistence.WorkflowBackoffTimerTask{
VisibilityTimestamp: time.Now().Add(cronBackoffDuration),
TimeoutType: persistence.WorkflowBackoffTimeoutTypeCron,
})
postActions.createDecision = false
}

return nil, nil
return postActions, nil
})
}

Expand Down
1 change: 1 addition & 0 deletions service/history/transferQueueActiveProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,7 @@ func (t *transferQueueActiveProcessorImpl) createFirstDecisionTask(domainID stri
err := t.historyClient.ScheduleDecisionTask(nil, &h.ScheduleDecisionTaskRequest{
DomainUUID: common.StringPtr(domainID),
WorkflowExecution: execution,
IsFirstDecision: common.BoolPtr(true),
})

if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions service/history/transferQueueActiveProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessStartChildExecution_Succe
WorkflowId: common.StringPtr(childWorkflowID),
RunId: common.StringPtr(childRunID),
},
IsFirstDecision: common.BoolPtr(true),
}).Return(nil).Once()
s.mockTimerQueueProcessor.On("NotifyNewTimers", cluster.TestCurrentClusterName, mock.Anything, mock.Anything).Once()

Expand Down Expand Up @@ -1360,6 +1361,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessStartChildExecution_Succe
WorkflowId: common.StringPtr(childWorkflowID),
RunId: common.StringPtr(childRunID),
},
IsFirstDecision: common.BoolPtr(true),
}).Return(nil).Once()

_, err := s.transferQueueActiveProcessor.process(transferTask, true)
Expand Down

0 comments on commit f880324

Please sign in to comment.