Skip to content

Commit

Permalink
Add unit tests for sigWithStart (cadence-workflow#1520)
Browse files Browse the repository at this point in the history
  • Loading branch information
vancexu authored Mar 12, 2019
1 parent e98340c commit ccb5165
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 13 deletions.
26 changes: 13 additions & 13 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func (e *historyEngineImpl) appendFirstBatchHistoryEvents(msBuilder mutableState
return
}

func fullfillExecutionInfo(msBuilder mutableState, domainID, taskList string, execution workflow.WorkflowExecution, lastFirstEventID int64) {
func fulfillExecutionInfo(msBuilder mutableState, domainID, taskList string, execution workflow.WorkflowExecution, lastFirstEventID int64) {
info := msBuilder.GetExecutionInfo()
info.DomainID = domainID
info.WorkflowID = *execution.WorkflowId
Expand Down Expand Up @@ -526,35 +526,35 @@ func (e *historyEngineImpl) StartWorkflowExecution(ctx context.Context, startReq
// set versions and timestamp for timer and transfer tasks
setTaskInfo(msBuilder.GetCurrentVersion(), time.Now(), transferTasks, timerTasks)

needDeleteHistory := true
historySize, retError := e.appendFirstBatchHistoryEvents(msBuilder, domainID, execution)
if retError != nil {
return
}
// delete the history if this API call is not successful, otherwise the history events will be zombie data
// delete history if createWorkflow failed, otherwise history will leak
shouldDeleteHistory := true
defer func() {
if needDeleteHistory {
if shouldDeleteHistory {
e.deleteEvents(domainID, execution, eventStoreVersion, msBuilder.GetCurrentBranch())
}
}()

// prepare for execution persistence operation
msBuilder.IncrementHistorySize(historySize)
fullfillExecutionInfo(msBuilder, domainID, taskList, execution, startedEvent.GetEventId())
fulfillExecutionInfo(msBuilder, domainID, taskList, execution, startedEvent.GetEventId())

// create as brand new
createMode := persistence.CreateWorkflowModeBrandNew
prevRunID := ""
prevLastWriteVersion := int64(0)
retError = e.createWorkflow(startRequest, msBuilder, createMode, prevRunID, prevLastWriteVersion, firstDecisionTask, transferTasks, timerTasks, replicationTasks, clusterMetadata)

if retError != nil {
t, ok := retError.(*persistence.WorkflowExecutionAlreadyStartedError)
if ok {
if t.StartRequestID == *request.RequestId {
return &workflow.StartWorkflowExecutionResponse{
RunId: common.StringPtr(t.RunID),
}, nil
// delete history is expected here because duplicate start request will create history with different rid
}

if msBuilder.GetCurrentVersion() < t.LastWriteVersion {
Expand All @@ -579,7 +579,7 @@ func (e *historyEngineImpl) StartWorkflowExecution(ctx context.Context, startReq
}

if retError == nil {
needDeleteHistory = false
shouldDeleteHistory = false
e.timerProcessor.NotifyNewTimers(e.currentClusterName, e.shard.GetCurrentTime(e.currentClusterName), timerTasks)
return &workflow.StartWorkflowExecutionResponse{
RunId: execution.RunId,
Expand Down Expand Up @@ -2371,20 +2371,20 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(ctx context.Context
// set versions and timestamp for timer and transfer tasks
setTaskInfo(msBuilder.GetCurrentVersion(), time.Now(), transferTasks, timerTasks)

needDeleteHistory := true
historySize, retError := e.appendFirstBatchHistoryEvents(msBuilder, domainID, execution)
if retError != nil {
return
}
// delete the history if this API call is not successful, otherwise the history events will be zombie data
// delete history if createWorkflow failed, otherwise history will leak
shouldDeleteHistory := true
defer func() {
if needDeleteHistory {
if shouldDeleteHistory {
e.deleteEvents(domainID, execution, eventStoreVersion, msBuilder.GetCurrentBranch())
}
}()

msBuilder.IncrementHistorySize(historySize)
fullfillExecutionInfo(msBuilder, domainID, taskList, execution, startedEvent.GetEventId())
fulfillExecutionInfo(msBuilder, domainID, taskList, execution, startedEvent.GetEventId())

if prevMutableState != nil {
createMode := persistence.CreateWorkflowModeWorkflowIDReuse
Expand All @@ -2402,13 +2402,13 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(ctx context.Context
return &workflow.StartWorkflowExecutionResponse{
RunId: common.StringPtr(t.RunID),
}, nil
// delete history is expected here because duplicate start request will create history with different rid
}
}

if retError == nil {
needDeleteHistory = false
shouldDeleteHistory = false
e.timerProcessor.NotifyNewTimers(e.currentClusterName, e.shard.GetCurrentTime(e.currentClusterName), timerTasks)

return &workflow.StartWorkflowExecutionResponse{
RunId: execution.RunId,
}, nil
Expand Down
133 changes: 133 additions & 0 deletions service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1498,6 +1498,139 @@ func (s *engine2Suite) TestSignalWithStartWorkflowExecution_WorkflowNotRunning()
s.NotEqual(runID, resp.GetRunId())
}

func (s *engine2Suite) TestSignalWithStartWorkflowExecution_Start_DuplicateRequests() {
domainID := validDomainID
workflowID := "wId"
runID := validRunID
workflowType := "workflowType"
taskList := "testTaskList"
identity := "testIdentity"
signalName := "my signal name"
input := []byte("test input")
requestID := "testRequestID"
sRequest := &h.SignalWithStartWorkflowExecutionRequest{
DomainUUID: common.StringPtr(domainID),
SignalWithStartRequest: &workflow.SignalWithStartWorkflowExecutionRequest{
Domain: common.StringPtr(domainID),
WorkflowId: common.StringPtr(workflowID),
WorkflowType: &workflow.WorkflowType{Name: common.StringPtr(workflowType)},
TaskList: &workflow.TaskList{Name: common.StringPtr(taskList)},
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(1),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2),
Identity: common.StringPtr(identity),
SignalName: common.StringPtr(signalName),
Input: input,
RequestId: common.StringPtr(requestID),
},
}

msBuilder := newMutableStateBuilder(s.mockClusterMetadata.GetCurrentClusterName(), s.historyEngine.shard, s.mockEventsCache,
bark.NewLoggerFromLogrus(log.New()))
ms := createMutableState(msBuilder)
ms.ExecutionInfo.State = p.WorkflowStateCompleted
gwmsResponse := &p.GetWorkflowExecutionResponse{State: ms}
gceResponse := &p.GetCurrentExecutionResponse{RunID: runID}
workflowAlreadyStartedErr := &p.WorkflowExecutionAlreadyStartedError{
Msg: "random message",
StartRequestID: requestID, // use same requestID
RunID: runID,
State: p.WorkflowStateRunning,
CloseStatus: p.WorkflowCloseStatusNone,
LastWriteVersion: common.EmptyVersion,
}

s.mockExecutionMgr.On("GetCurrentExecution", mock.Anything).Return(gceResponse, nil).Once()
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse, nil).Once()
s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(&p.AppendHistoryEventsResponse{Size: 0}, nil).Once()
s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything).Return(nil, workflowAlreadyStartedErr).Once()
s.mockHistoryMgr.On("DeleteWorkflowExecutionHistory", mock.Anything).Return(nil).Once()
s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(
&p.GetDomainResponse{
Info: &p.DomainInfo{ID: domainID},
Config: &p.DomainConfig{Retention: 1},
ReplicationConfig: &p.DomainReplicationConfig{
ActiveClusterName: cluster.TestCurrentClusterName,
Clusters: []*p.ClusterReplicationConfig{
&p.ClusterReplicationConfig{ClusterName: cluster.TestCurrentClusterName},
},
},
TableVersion: p.DomainTableVersionV1,
},
nil,
)

resp, err := s.historyEngine.SignalWithStartWorkflowExecution(context.Background(), sRequest)
s.Nil(err)
s.NotNil(resp.GetRunId())
s.Equal(runID, resp.GetRunId())
}

func (s *engine2Suite) TestSignalWithStartWorkflowExecution_Start_WorkflowAlreadyStarted() {
domainID := validDomainID
workflowID := "wId"
runID := validRunID
workflowType := "workflowType"
taskList := "testTaskList"
identity := "testIdentity"
signalName := "my signal name"
input := []byte("test input")
requestID := "testRequestID"
sRequest := &h.SignalWithStartWorkflowExecutionRequest{
DomainUUID: common.StringPtr(domainID),
SignalWithStartRequest: &workflow.SignalWithStartWorkflowExecutionRequest{
Domain: common.StringPtr(domainID),
WorkflowId: common.StringPtr(workflowID),
WorkflowType: &workflow.WorkflowType{Name: common.StringPtr(workflowType)},
TaskList: &workflow.TaskList{Name: common.StringPtr(taskList)},
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(1),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2),
Identity: common.StringPtr(identity),
SignalName: common.StringPtr(signalName),
Input: input,
RequestId: common.StringPtr(requestID),
},
}

msBuilder := newMutableStateBuilder(s.mockClusterMetadata.GetCurrentClusterName(), s.historyEngine.shard, s.mockEventsCache,
bark.NewLoggerFromLogrus(log.New()))
ms := createMutableState(msBuilder)
ms.ExecutionInfo.State = p.WorkflowStateCompleted
gwmsResponse := &p.GetWorkflowExecutionResponse{State: ms}
gceResponse := &p.GetCurrentExecutionResponse{RunID: runID}
workflowAlreadyStartedErr := &p.WorkflowExecutionAlreadyStartedError{
Msg: "random message",
StartRequestID: "new request ID",
RunID: runID,
State: p.WorkflowStateRunning,
CloseStatus: p.WorkflowCloseStatusNone,
LastWriteVersion: common.EmptyVersion,
}

s.mockExecutionMgr.On("GetCurrentExecution", mock.Anything).Return(gceResponse, nil).Once()
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse, nil).Once()
s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(&p.AppendHistoryEventsResponse{Size: 0}, nil).Once()
s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything).Return(nil, workflowAlreadyStartedErr).Once()
s.mockHistoryMgr.On("DeleteWorkflowExecutionHistory", mock.Anything).Return(nil).Once()
s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(
&p.GetDomainResponse{
Info: &p.DomainInfo{ID: domainID},
Config: &p.DomainConfig{Retention: 1},
ReplicationConfig: &p.DomainReplicationConfig{
ActiveClusterName: cluster.TestCurrentClusterName,
Clusters: []*p.ClusterReplicationConfig{
&p.ClusterReplicationConfig{ClusterName: cluster.TestCurrentClusterName},
},
},
TableVersion: p.DomainTableVersionV1,
},
nil,
)

resp, err := s.historyEngine.SignalWithStartWorkflowExecution(context.Background(), sRequest)
s.Nil(resp)
s.NotNil(err)
}

func (s *engine2Suite) getBuilder(domainID string, we workflow.WorkflowExecution) mutableState {
context, release, err := s.historyEngine.historyCache.getOrCreateWorkflowExecution(domainID, we)
if err != nil {
Expand Down

0 comments on commit ccb5165

Please sign in to comment.