Skip to content

Commit

Permalink
Fix create workflow error handling issue (cadence-workflow#2823)
Browse files Browse the repository at this point in the history
* Fix persistence layer create zombie workflow dedup error handling
* Fix nDC transaction manager creating zombie workflow error handling
* NDC integration test will duplicate events for better testing
* Improve replication fetcher test reliability
* Fix error message in MySQL
* Add more log to NDC history replicator
  • Loading branch information
wxing1292 authored Nov 14, 2019
1 parent 3a27d38 commit eedf89b
Show file tree
Hide file tree
Showing 9 changed files with 361 additions and 92 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ THRIFTRW_SRCS = \
idl/github.com/uber/cadence/sqlblobs.thrift \

PROGS = cadence
TEST_TIMEOUT = 15m
TEST_TIMEOUT = 20m
TEST_ARG ?= -race -v -timeout $(TEST_TIMEOUT)
BUILD := ./build
TOOLS_CMD_ROOT=./cmd/tools
Expand Down
16 changes: 16 additions & 0 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -1206,6 +1206,22 @@ func (d *cassandraPersistence) CreateWorkflowExecution(
msg := fmt.Sprintf("Workflow execution creation condition failed. WorkflowId: %v, CurrentRunID: %v, columns: (%v)",
executionInfo.WorkflowID, executionInfo.RunID, strings.Join(columns, ","))
return nil, &p.CurrentWorkflowConditionFailedError{Msg: msg}
} else if rowType == rowTypeExecution && runID == executionInfo.RunID {
msg := fmt.Sprintf("Workflow execution already running. WorkflowId: %v, RunId: %v, rangeID: %v",
executionInfo.WorkflowID, executionInfo.RunID, request.RangeID)
replicationState := createReplicationState(previous["replication_state"].(map[string]interface{}))
lastWriteVersion = common.EmptyVersion
if replicationState != nil {
lastWriteVersion = replicationState.LastWriteVersion
}
return nil, &p.WorkflowExecutionAlreadyStartedError{
Msg: msg,
StartRequestID: executionInfo.CreateRequestID,
RunID: executionInfo.RunID,
State: executionInfo.State,
CloseStatus: executionInfo.CloseStatus,
LastWriteVersion: lastWriteVersion,
}
}

previous = make(map[string]interface{})
Expand Down
66 changes: 66 additions & 0 deletions common/persistence/persistence-tests/executionManagerTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,72 @@ func (s *ExecutionManagerSuite) SetupTest() {
s.ClearTasks()
}

// TestCreateWorkflowExecutionDeDup test
func (s *ExecutionManagerSuite) TestCreateWorkflowExecutionDeDup() {
domainID := uuid.New()
workflowID := "create-workflow-test-dedup"
runID := "3969fae6-6b75-4c2a-b74b-4054edd296a6"
workflowExecution := gen.WorkflowExecution{
WorkflowId: common.StringPtr(workflowID),
RunId: common.StringPtr(runID),
}
tasklist := "some random tasklist"
workflowType := "some random workflow type"
workflowTimeout := int32(10)
decisionTimeout := int32(14)
lastProcessedEventID := int64(0)
nextEventID := int64(3)

req := &p.CreateWorkflowExecutionRequest{
NewWorkflowSnapshot: p.WorkflowSnapshot{
ExecutionInfo: &p.WorkflowExecutionInfo{
CreateRequestID: uuid.New(),
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
TaskList: tasklist,
WorkflowTypeName: workflowType,
WorkflowTimeout: workflowTimeout,
DecisionStartToCloseTimeout: decisionTimeout,
LastFirstEventID: common.FirstEventID,
NextEventID: nextEventID,
LastProcessedEvent: lastProcessedEventID,
State: p.WorkflowStateCreated,
CloseStatus: p.WorkflowCloseStatusNone,
},
ExecutionStats: &p.ExecutionStats{},
},
RangeID: s.ShardInfo.RangeID,
Mode: p.CreateWorkflowModeBrandNew,
}

_, err := s.ExecutionManager.CreateWorkflowExecution(req)
s.Nil(err)
info, err := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
s.Nil(err)
updatedInfo := copyWorkflowExecutionInfo(info.ExecutionInfo)
updatedStats := copyExecutionStats(info.ExecutionStats)
updatedInfo.State = p.WorkflowStateCompleted
updatedInfo.CloseStatus = p.WorkflowCloseStatusCompleted
_, err = s.ExecutionManager.UpdateWorkflowExecution(&p.UpdateWorkflowExecutionRequest{
UpdateWorkflowMutation: p.WorkflowMutation{
ExecutionInfo: updatedInfo,
ExecutionStats: updatedStats,
Condition: nextEventID,
},
RangeID: s.ShardInfo.RangeID,
Mode: p.UpdateWorkflowModeUpdateCurrent,
})
s.NoError(err)

req.Mode = p.CreateWorkflowModeWorkflowIDReuse
req.PreviousRunID = runID
req.PreviousLastWriteVersion = common.EmptyVersion
_, err = s.ExecutionManager.CreateWorkflowExecution(req)
s.Error(err)
s.IsType(&p.WorkflowExecutionAlreadyStartedError{}, err)
}

// TestCreateWorkflowExecutionStateCloseStatus test
func (s *ExecutionManagerSuite) TestCreateWorkflowExecutionStateCloseStatus() {
domainID := uuid.New()
Expand Down
Loading

0 comments on commit eedf89b

Please sign in to comment.