Skip to content

Commit

Permalink
Fix workflow retry policy for childWorkflow (cadence-workflow#3780)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Nov 25, 2020
1 parent a201ed7 commit 127a97a
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 70 deletions.
6 changes: 4 additions & 2 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,20 +430,22 @@ func ValidateRetryPolicy(policy *workflow.RetryPolicy) error {
func CreateHistoryStartWorkflowRequest(
domainID string,
startRequest *workflow.StartWorkflowExecutionRequest,
now time.Time,
) *h.StartWorkflowExecutionRequest {
now := time.Now()
histRequest := &h.StartWorkflowExecutionRequest{
DomainUUID: StringPtr(domainID),
StartRequest: startRequest,
}
firstDecisionTaskBackoffSeconds := backoff.GetBackoffForNextScheduleInSeconds(startRequest.GetCronSchedule(), now, now)
histRequest.FirstDecisionTaskBackoffSeconds = Int32Ptr(firstDecisionTaskBackoffSeconds)

if startRequest.RetryPolicy != nil && startRequest.RetryPolicy.GetExpirationIntervalInSeconds() > 0 {
expirationInSeconds := startRequest.RetryPolicy.GetExpirationIntervalInSeconds() + firstDecisionTaskBackoffSeconds
// expirationTime calculates from first decision task schedule to the end of the workflow
deadline := now.Add(time.Duration(expirationInSeconds) * time.Second)
histRequest.ExpirationTimestamp = Int64Ptr(deadline.Round(time.Millisecond).UnixNano())
}
histRequest.FirstDecisionTaskBackoffSeconds = Int32Ptr(firstDecisionTaskBackoffSeconds)

return histRequest
}

Expand Down
4 changes: 2 additions & 2 deletions common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestCreateHistoryStartWorkflowRequest_ExpirationTimeWithCron(t *testing.T)
CronSchedule: StringPtr("@every 300s"),
}
now := time.Now()
startRequest := CreateHistoryStartWorkflowRequest(domainID, request)
startRequest := CreateHistoryStartWorkflowRequest(domainID, request, now)

expirationTime := startRequest.GetExpirationTimestamp()
require.NotNil(t, expirationTime)
Expand All @@ -113,7 +113,7 @@ func TestCreateHistoryStartWorkflowRequest_ExpirationTimeWithoutCron(t *testing.
},
}
now := time.Now()
startRequest := CreateHistoryStartWorkflowRequest(domainID, request)
startRequest := CreateHistoryStartWorkflowRequest(domainID, request, now)

expirationTime := startRequest.GetExpirationTimestamp()
require.NotNil(t, expirationTime)
Expand Down
6 changes: 5 additions & 1 deletion service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1831,7 +1831,11 @@ func (wh *WorkflowHandler) StartWorkflowExecution(
}

wh.GetLogger().Debug("Start workflow execution request domainID", tag.WorkflowDomainID(domainID))
clientResp, err := wh.GetHistoryClient().StartWorkflowExecution(ctx, thrift.ToHistoryStartWorkflowExecutionRequest(common.CreateHistoryStartWorkflowRequest(domainID, startRequest)))
clientResp, err := wh.GetHistoryClient().
StartWorkflowExecution(
ctx, thrift.ToHistoryStartWorkflowExecutionRequest(
common.CreateHistoryStartWorkflowRequest(
domainID, startRequest, time.Now())))
resp = thrift.FromStartWorkflowExecutionResponse(clientResp)
if err != nil {
return nil, wh.error(err, scope)
Expand Down
2 changes: 1 addition & 1 deletion service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2971,7 +2971,7 @@ func getStartRequest(
Header: request.Header,
}

startRequest := common.CreateHistoryStartWorkflowRequest(domainID, req)
startRequest := common.CreateHistoryStartWorkflowRequest(domainID, req, time.Now())
return startRequest
}

Expand Down
63 changes: 28 additions & 35 deletions service/history/task/transfer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1267,50 +1267,43 @@ func (t *transferActiveTaskExecutor) startWorkflowWithRetry(
attributes *workflow.StartChildWorkflowExecutionInitiatedEventAttributes,
) (string, error) {

frontendStartReq := &workflow.StartWorkflowExecutionRequest{
Domain: common.StringPtr(targetDomain),
WorkflowId: attributes.WorkflowId,
WorkflowType: attributes.WorkflowType,
TaskList: attributes.TaskList,
Input: attributes.Input,
Header: attributes.Header,
ExecutionStartToCloseTimeoutSeconds: attributes.ExecutionStartToCloseTimeoutSeconds,
TaskStartToCloseTimeoutSeconds: attributes.TaskStartToCloseTimeoutSeconds,
// Use the same request ID to dedupe StartWorkflowExecution calls
RequestId: common.StringPtr(childInfo.CreateRequestID),
WorkflowIdReusePolicy: attributes.WorkflowIdReusePolicy,
RetryPolicy: attributes.RetryPolicy,
CronSchedule: attributes.CronSchedule,
Memo: attributes.Memo,
SearchAttributes: attributes.SearchAttributes,
}

now := t.shard.GetTimeSource().Now()
request := &h.StartWorkflowExecutionRequest{
DomainUUID: common.StringPtr(task.TargetDomainID),
StartRequest: &workflow.StartWorkflowExecutionRequest{
Domain: common.StringPtr(targetDomain),
WorkflowId: attributes.WorkflowId,
WorkflowType: attributes.WorkflowType,
TaskList: attributes.TaskList,
Input: attributes.Input,
Header: attributes.Header,
ExecutionStartToCloseTimeoutSeconds: attributes.ExecutionStartToCloseTimeoutSeconds,
TaskStartToCloseTimeoutSeconds: attributes.TaskStartToCloseTimeoutSeconds,
// Use the same request ID to dedupe StartWorkflowExecution calls
RequestId: common.StringPtr(childInfo.CreateRequestID),
WorkflowIdReusePolicy: attributes.WorkflowIdReusePolicy,
RetryPolicy: attributes.RetryPolicy,
CronSchedule: attributes.CronSchedule,
Memo: attributes.Memo,
SearchAttributes: attributes.SearchAttributes,
},
ParentExecutionInfo: &h.ParentExecutionInfo{
DomainUUID: common.StringPtr(task.DomainID),
Domain: common.StringPtr(domain),
Execution: &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(task.WorkflowID),
RunId: common.StringPtr(task.RunID),
},
InitiatedId: common.Int64Ptr(task.ScheduleID),
historyStartReq := common.CreateHistoryStartWorkflowRequest(task.TargetDomainID, frontendStartReq, now)

historyStartReq.ParentExecutionInfo = &h.ParentExecutionInfo{
DomainUUID: common.StringPtr(task.DomainID),
Domain: common.StringPtr(domain),
Execution: &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(task.WorkflowID),
RunId: common.StringPtr(task.RunID),
},
FirstDecisionTaskBackoffSeconds: common.Int32Ptr(
backoff.GetBackoffForNextScheduleInSeconds(
attributes.GetCronSchedule(),
now,
now,
),
),
InitiatedId: common.Int64Ptr(task.ScheduleID),
}

startWorkflowCtx, cancel := context.WithTimeout(ctx, taskRPCCallTimeout)
defer cancel()
var response *workflow.StartWorkflowExecutionResponse
var err error
op := func() error {
clientResp, err := t.historyClient.StartWorkflowExecution(startWorkflowCtx, thrift.ToHistoryStartWorkflowExecutionRequest(request))
clientResp, err := t.historyClient.StartWorkflowExecution(startWorkflowCtx, thrift.ToHistoryStartWorkflowExecutionRequest(historyStartReq))
response = thrift.FromStartWorkflowExecutionResponse(clientResp)
return err
}
Expand Down
154 changes: 127 additions & 27 deletions service/history/task/transfer_active_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/archiver/provider"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
Expand Down Expand Up @@ -1558,7 +1557,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessStartChildExecution_Success
taskID := int64(59)

event, ci := test.AddStartChildWorkflowExecutionInitiatedEvent(mutableState, event.GetEventId(), uuid.New(),
s.childDomainName, childWorkflowID, childWorkflowType, childTaskListName, nil, 1, 1)
s.childDomainName, childWorkflowID, childWorkflowType, childTaskListName, nil, 1, 1, nil)

transferTask := &persistence.TransferTaskInfo{
Version: s.version,
Expand All @@ -1582,6 +1581,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessStartChildExecution_Success
transferTask,
mutableState,
ci,
time.Now(),
)).Return(&types.StartWorkflowExecutionResponse{RunID: common.StringPtr(childRunID)}, nil).Times(1)
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&p.AppendHistoryNodesResponse{Size: 0}, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.Anything).Return(&p.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &p.MutableStateUpdateSessionStats{}}, nil).Once()
Expand All @@ -1599,6 +1599,101 @@ func (s *transferActiveTaskExecutorSuite) TestProcessStartChildExecution_Success
s.Nil(err)
}

func (s *transferActiveTaskExecutorSuite) TestProcessStartChildExecution_WithRetry_Success() {

workflowExecution := workflow.WorkflowExecution{
WorkflowId: common.StringPtr("some random workflow ID"),
RunId: common.StringPtr(uuid.New()),
}
workflowType := "some random workflow type"
taskListName := "some random task list"

childWorkflowID := "some random child workflow ID"
childRunID := uuid.New()
childWorkflowType := "some random child workflow type"
childTaskListName := "some random child task list"

mutableState := execution.NewMutableStateBuilderWithVersionHistoriesWithEventV2(
s.mockShard,
s.logger,
s.version,
workflowExecution.GetRunId(),
constants.TestGlobalDomainEntry,
)
_, err := mutableState.AddWorkflowExecutionStartedEvent(
workflowExecution,
&history.StartWorkflowExecutionRequest{
DomainUUID: common.StringPtr(s.domainID),
StartRequest: &workflow.StartWorkflowExecutionRequest{
WorkflowType: &workflow.WorkflowType{Name: common.StringPtr(workflowType)},
TaskList: &workflow.TaskList{Name: common.StringPtr(taskListName)},
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(2),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1),
},
},
)
s.Nil(err)

di := test.AddDecisionTaskScheduledEvent(mutableState)
event := test.AddDecisionTaskStartedEvent(mutableState, di.ScheduleID, taskListName, uuid.New())
di.StartedID = event.GetEventId()
event = test.AddDecisionTaskCompletedEvent(mutableState, di.ScheduleID, di.StartedID, nil, "some random identity")

taskID := int64(59)

retryPolicy := &workflow.RetryPolicy{
ExpirationIntervalInSeconds: common.Int32Ptr(100),
MaximumAttempts: common.Int32Ptr(3),
InitialIntervalInSeconds: common.Int32Ptr(1),
MaximumIntervalInSeconds: common.Int32Ptr(2),
BackoffCoefficient: common.Float64Ptr(1),
}

event, ci := test.AddStartChildWorkflowExecutionInitiatedEvent(mutableState, event.GetEventId(), uuid.New(),
s.childDomainName, childWorkflowID, childWorkflowType, childTaskListName, nil, 1, 1, retryPolicy)

transferTask := &persistence.TransferTaskInfo{
Version: s.version,
DomainID: s.domainID,
WorkflowID: workflowExecution.GetWorkflowId(),
RunID: workflowExecution.GetRunId(),
TargetDomainID: constants.TestChildDomainID,
TargetWorkflowID: childWorkflowID,
TargetRunID: "",
TaskID: taskID,
TaskList: taskListName,
TaskType: persistence.TransferTaskTypeStartChildExecution,
ScheduleID: event.GetEventId(),
}

persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion())
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
s.mockHistoryClient.EXPECT().StartWorkflowExecution(
gomock.Any(),
gomock.Eq(s.createChildWorkflowExecutionRequest(
s.domainName,
s.childDomainName,
transferTask,
mutableState,
ci,
s.mockShard.GetTimeSource().Now(),
))).Return(&types.StartWorkflowExecutionResponse{RunID: common.StringPtr(childRunID)}, nil).Times(1)
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&p.AppendHistoryNodesResponse{Size: 0}, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.Anything).Return(&p.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &p.MutableStateUpdateSessionStats{}}, nil).Once()
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(s.version).Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockHistoryClient.EXPECT().ScheduleDecisionTask(gomock.Any(), &types.ScheduleDecisionTaskRequest{
DomainUUID: common.StringPtr(constants.TestChildDomainID),
WorkflowExecution: &types.WorkflowExecution{
WorkflowID: common.StringPtr(childWorkflowID),
RunID: common.StringPtr(childRunID),
},
IsFirstDecision: common.BoolPtr(true),
}).Return(nil).Times(1)

err = s.transferActiveTaskExecutor.Execute(transferTask, true)
s.Nil(err)
}

func (s *transferActiveTaskExecutorSuite) TestProcessStartChildExecution_Failure() {

workflowExecution := workflow.WorkflowExecution{
Expand Down Expand Up @@ -1651,6 +1746,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessStartChildExecution_Failure
nil,
1,
1,
nil,
)

transferTask := &persistence.TransferTaskInfo{
Expand All @@ -1675,6 +1771,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessStartChildExecution_Failure
transferTask,
mutableState,
ci,
time.Now(),
)).Return(nil, &types.WorkflowExecutionAlreadyStartedError{}).Times(1)
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&p.AppendHistoryNodesResponse{Size: 0}, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.Anything).Return(&p.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &p.MutableStateUpdateSessionStats{}}, nil).Once()
Expand Down Expand Up @@ -1737,6 +1834,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessStartChildExecution_Success
nil,
1,
1,
nil,
)

transferTask := &persistence.TransferTaskInfo{
Expand Down Expand Up @@ -1827,6 +1925,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessStartChildExecution_Duplica
nil,
1,
1,
nil,
)

transferTask := &persistence.TransferTaskInfo{
Expand Down Expand Up @@ -2125,6 +2224,7 @@ func (s *transferActiveTaskExecutorSuite) createChildWorkflowExecutionRequest(
task *persistence.TransferTaskInfo,
mutableState execution.MutableState,
ci *persistence.ChildExecutionInfo,
now time.Time,
) *types.HistoryStartWorkflowExecutionRequest {

event, err := mutableState.GetChildExecutionInitiatedEvent(context.Background(), task.ScheduleID)
Expand All @@ -2134,31 +2234,31 @@ func (s *transferActiveTaskExecutorSuite) createChildWorkflowExecutionRequest(
WorkflowID: common.StringPtr(task.WorkflowID),
RunID: common.StringPtr(task.RunID),
}
now := time.Now()
return &types.HistoryStartWorkflowExecutionRequest{
DomainUUID: common.StringPtr(task.TargetDomainID),
StartRequest: &types.StartWorkflowExecutionRequest{
Domain: common.StringPtr(childDomainName),
WorkflowID: attributes.WorkflowID,
WorkflowType: attributes.WorkflowType,
TaskList: attributes.TaskList,
Input: attributes.Input,
ExecutionStartToCloseTimeoutSeconds: attributes.ExecutionStartToCloseTimeoutSeconds,
TaskStartToCloseTimeoutSeconds: attributes.TaskStartToCloseTimeoutSeconds,
// Use the same request ID to dedupe StartWorkflowExecution calls
RequestID: common.StringPtr(ci.CreateRequestID),
WorkflowIDReusePolicy: attributes.WorkflowIDReusePolicy,
},
ParentExecutionInfo: &types.ParentExecutionInfo{
DomainUUID: common.StringPtr(task.DomainID),
Domain: common.StringPtr(constants.TestDomainName),
Execution: &workflowExecution,
InitiatedID: common.Int64Ptr(task.ScheduleID),
},
FirstDecisionTaskBackoffSeconds: common.Int32Ptr(
backoff.GetBackoffForNextScheduleInSeconds(attributes.GetCronSchedule(), now, now),
),
}
frontendStartReq := &types.StartWorkflowExecutionRequest{
Domain: common.StringPtr(childDomainName),
WorkflowID: attributes.WorkflowID,
WorkflowType: attributes.WorkflowType,
TaskList: attributes.TaskList,
Input: attributes.Input,
ExecutionStartToCloseTimeoutSeconds: attributes.ExecutionStartToCloseTimeoutSeconds,
TaskStartToCloseTimeoutSeconds: attributes.TaskStartToCloseTimeoutSeconds,
// Use the same request ID to dedupe StartWorkflowExecution calls
RequestID: common.StringPtr(ci.CreateRequestID),
WorkflowIDReusePolicy: attributes.WorkflowIDReusePolicy,
RetryPolicy: attributes.RetryPolicy,
}

parentInfo := &types.ParentExecutionInfo{
DomainUUID: common.StringPtr(task.DomainID),
Domain: common.StringPtr(domainName),
Execution: &workflowExecution,
InitiatedID: common.Int64Ptr(task.ScheduleID),
}

historyStartReq := common.CreateHistoryStartWorkflowRequest(
task.TargetDomainID, thrift.FromStartWorkflowExecutionRequest(frontendStartReq), now)
historyStartReq.ParentExecutionInfo = thrift.FromParentExecutionInfo(parentInfo)
return thrift.ToHistoryStartWorkflowExecutionRequest(historyStartReq)
}

func (s *transferActiveTaskExecutorSuite) createUpsertWorkflowSearchAttributesRequest(
Expand Down
4 changes: 2 additions & 2 deletions service/history/task/transfer_standby_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,7 @@ func (s *transferStandbyTaskExecutorSuite) TestProcessStartChildExecution_Pendin

taskID := int64(59)
event, _ = test.AddStartChildWorkflowExecutionInitiatedEvent(mutableState, event.GetEventId(), uuid.New(),
constants.TestChildDomainName, childWorkflowID, childWorkflowType, childTaskListName, nil, 1, 1)
constants.TestChildDomainName, childWorkflowID, childWorkflowType, childTaskListName, nil, 1, 1, nil)
nextEventID := event.GetEventId()

now := time.Now()
Expand Down Expand Up @@ -1085,7 +1085,7 @@ func (s *transferStandbyTaskExecutorSuite) TestProcessStartChildExecution_Succes

taskID := int64(59)
event, childInfo := test.AddStartChildWorkflowExecutionInitiatedEvent(mutableState, event.GetEventId(), uuid.New(),
constants.TestChildDomainName, childWorkflowID, childWorkflowType, childTaskListName, nil, 1, 1)
constants.TestChildDomainName, childWorkflowID, childWorkflowType, childTaskListName, nil, 1, 1, nil)

now := time.Now()
transferTask := &persistence.TransferTaskInfo{
Expand Down
Loading

0 comments on commit 127a97a

Please sign in to comment.