diff --git a/Makefile b/Makefile index 4a50db68855..9349dd1026c 100644 --- a/Makefile +++ b/Makefile @@ -29,7 +29,7 @@ THRIFTRW_SRCS = \ idl/github.com/uber/cadence/sqlblobs.thrift \ PROGS = cadence -TEST_TIMEOUT = 15m +TEST_TIMEOUT = 20m TEST_ARG ?= -race -v -timeout $(TEST_TIMEOUT) BUILD := ./build TOOLS_CMD_ROOT=./cmd/tools diff --git a/common/persistence/cassandra/cassandraPersistence.go b/common/persistence/cassandra/cassandraPersistence.go index 9685b219eec..58259723298 100644 --- a/common/persistence/cassandra/cassandraPersistence.go +++ b/common/persistence/cassandra/cassandraPersistence.go @@ -1206,6 +1206,22 @@ func (d *cassandraPersistence) CreateWorkflowExecution( msg := fmt.Sprintf("Workflow execution creation condition failed. WorkflowId: %v, CurrentRunID: %v, columns: (%v)", executionInfo.WorkflowID, executionInfo.RunID, strings.Join(columns, ",")) return nil, &p.CurrentWorkflowConditionFailedError{Msg: msg} + } else if rowType == rowTypeExecution && runID == executionInfo.RunID { + msg := fmt.Sprintf("Workflow execution already running. WorkflowId: %v, RunId: %v, rangeID: %v", + executionInfo.WorkflowID, executionInfo.RunID, request.RangeID) + replicationState := createReplicationState(previous["replication_state"].(map[string]interface{})) + lastWriteVersion = common.EmptyVersion + if replicationState != nil { + lastWriteVersion = replicationState.LastWriteVersion + } + return nil, &p.WorkflowExecutionAlreadyStartedError{ + Msg: msg, + StartRequestID: executionInfo.CreateRequestID, + RunID: executionInfo.RunID, + State: executionInfo.State, + CloseStatus: executionInfo.CloseStatus, + LastWriteVersion: lastWriteVersion, + } } previous = make(map[string]interface{}) diff --git a/common/persistence/persistence-tests/executionManagerTest.go b/common/persistence/persistence-tests/executionManagerTest.go index a317fa8344c..a7446bc27f9 100644 --- a/common/persistence/persistence-tests/executionManagerTest.go +++ b/common/persistence/persistence-tests/executionManagerTest.go @@ -70,6 +70,72 @@ func (s *ExecutionManagerSuite) SetupTest() { s.ClearTasks() } +// TestCreateWorkflowExecutionDeDup test +func (s *ExecutionManagerSuite) TestCreateWorkflowExecutionDeDup() { + domainID := uuid.New() + workflowID := "create-workflow-test-dedup" + runID := "3969fae6-6b75-4c2a-b74b-4054edd296a6" + workflowExecution := gen.WorkflowExecution{ + WorkflowId: common.StringPtr(workflowID), + RunId: common.StringPtr(runID), + } + tasklist := "some random tasklist" + workflowType := "some random workflow type" + workflowTimeout := int32(10) + decisionTimeout := int32(14) + lastProcessedEventID := int64(0) + nextEventID := int64(3) + + req := &p.CreateWorkflowExecutionRequest{ + NewWorkflowSnapshot: p.WorkflowSnapshot{ + ExecutionInfo: &p.WorkflowExecutionInfo{ + CreateRequestID: uuid.New(), + DomainID: domainID, + WorkflowID: workflowID, + RunID: runID, + TaskList: tasklist, + WorkflowTypeName: workflowType, + WorkflowTimeout: workflowTimeout, + DecisionStartToCloseTimeout: decisionTimeout, + LastFirstEventID: common.FirstEventID, + NextEventID: nextEventID, + LastProcessedEvent: lastProcessedEventID, + State: p.WorkflowStateCreated, + CloseStatus: p.WorkflowCloseStatusNone, + }, + ExecutionStats: &p.ExecutionStats{}, + }, + RangeID: s.ShardInfo.RangeID, + Mode: p.CreateWorkflowModeBrandNew, + } + + _, err := s.ExecutionManager.CreateWorkflowExecution(req) + s.Nil(err) + info, err := s.GetWorkflowExecutionInfo(domainID, workflowExecution) + s.Nil(err) + updatedInfo := copyWorkflowExecutionInfo(info.ExecutionInfo) + updatedStats := copyExecutionStats(info.ExecutionStats) + updatedInfo.State = p.WorkflowStateCompleted + updatedInfo.CloseStatus = p.WorkflowCloseStatusCompleted + _, err = s.ExecutionManager.UpdateWorkflowExecution(&p.UpdateWorkflowExecutionRequest{ + UpdateWorkflowMutation: p.WorkflowMutation{ + ExecutionInfo: updatedInfo, + ExecutionStats: updatedStats, + Condition: nextEventID, + }, + RangeID: s.ShardInfo.RangeID, + Mode: p.UpdateWorkflowModeUpdateCurrent, + }) + s.NoError(err) + + req.Mode = p.CreateWorkflowModeWorkflowIDReuse + req.PreviousRunID = runID + req.PreviousLastWriteVersion = common.EmptyVersion + _, err = s.ExecutionManager.CreateWorkflowExecution(req) + s.Error(err) + s.IsType(&p.WorkflowExecutionAlreadyStartedError{}, err) +} + // TestCreateWorkflowExecutionStateCloseStatus test func (s *ExecutionManagerSuite) TestCreateWorkflowExecutionStateCloseStatus() { domainID := uuid.New() diff --git a/common/persistence/sql/sqlExecutionManagerUtil.go b/common/persistence/sql/sqlExecutionManagerUtil.go index 82626400250..07614d6a5a0 100644 --- a/common/persistence/sql/sqlExecutionManagerUtil.go +++ b/common/persistence/sql/sqlExecutionManagerUtil.go @@ -26,6 +26,8 @@ import ( "fmt" "time" + "github.com/go-sql-driver/mysql" + workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/.gen/go/sqlblobs" "github.com/uber/cadence/common" @@ -67,7 +69,7 @@ func applyWorkflowMutationTx( return err default: return &workflow.InternalServiceError{ - Message: fmt.Sprintf("UpdateWorkflowExecution operation failed. Failed to lock executions row. Error: %v", err), + Message: fmt.Sprintf("applyWorkflowMutationTx failed. Failed to lock executions row. Error: %v", err), } } } @@ -81,7 +83,7 @@ func applyWorkflowMutationTx( currentVersion, shardID); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("UpdateWorkflowExecution operation failed. Failed to update executions row. Erorr: %v", err), + Message: fmt.Sprintf("applyWorkflowMutationTx failed. Failed to update executions row. Erorr: %v", err), } } @@ -104,7 +106,7 @@ func applyWorkflowMutationTx( workflowID, runID); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("UpdateWorkflowExecution operation failed. Error: %v", err), + Message: fmt.Sprintf("applyWorkflowMutationTx failed. Error: %v", err), } } @@ -116,7 +118,7 @@ func applyWorkflowMutationTx( workflowID, runID); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("UpdateWorkflowExecution operation failed. Error: %v", err), + Message: fmt.Sprintf("applyWorkflowMutationTx failed. Error: %v", err), } } @@ -128,7 +130,7 @@ func applyWorkflowMutationTx( workflowID, runID); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("UpdateWorkflowExecution operation failed. Error: %v", err), + Message: fmt.Sprintf("applyWorkflowMutationTx failed. Error: %v", err), } } @@ -140,7 +142,7 @@ func applyWorkflowMutationTx( workflowID, runID); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("UpdateWorkflowExecution operation failed. Error: %v", err), + Message: fmt.Sprintf("applyWorkflowMutationTx failed. Error: %v", err), } } @@ -152,7 +154,7 @@ func applyWorkflowMutationTx( workflowID, runID); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("UpdateWorkflowExecution operation failed. Error: %v", err), + Message: fmt.Sprintf("applyWorkflowMutationTx failed. Error: %v", err), } } @@ -164,7 +166,7 @@ func applyWorkflowMutationTx( workflowID, runID); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("UpdateWorkflowExecution operation failed. Error: %v", err), + Message: fmt.Sprintf("applyWorkflowMutationTx failed. Error: %v", err), } } @@ -175,7 +177,7 @@ func applyWorkflowMutationTx( workflowID, runID); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("UpdateWorkflowExecution operation failed. Error: %v", err), + Message: fmt.Sprintf("applyWorkflowMutationTx failed. Error: %v", err), } } } @@ -187,7 +189,7 @@ func applyWorkflowMutationTx( workflowID, runID); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("UpdateWorkflowExecution operation failed. Error: %v", err), + Message: fmt.Sprintf("applyWorkflowMutationTx failed. Error: %v", err), } } return nil @@ -227,7 +229,7 @@ func applyWorkflowSnapshotTxAsReset( return err default: return &workflow.InternalServiceError{ - Message: fmt.Sprintf("ConflictResolveWorkflowExecution operation failed. Failed to lock executions row. Error: %v", err), + Message: fmt.Sprintf("applyWorkflowSnapshotTxAsReset failed. Failed to lock executions row. Error: %v", err), } } } @@ -241,7 +243,7 @@ func applyWorkflowSnapshotTxAsReset( currentVersion, shardID); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("ConflictResolveWorkflowExecution operation failed. Failed to update executions row. Erorr: %v", err), + Message: fmt.Sprintf("applyWorkflowSnapshotTxAsReset failed. Failed to update executions row. Erorr: %v", err), } } @@ -262,7 +264,7 @@ func applyWorkflowSnapshotTxAsReset( workflowID, runID); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("ConflictResolveWorkflowExecution operation failed. Failed to clear activity info map. Error: %v", err), + Message: fmt.Sprintf("applyWorkflowSnapshotTxAsReset failed. Failed to clear activity info map. Error: %v", err), } } @@ -274,7 +276,7 @@ func applyWorkflowSnapshotTxAsReset( workflowID, runID); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("ConflictResolveWorkflowExecution operation failed. Failed to insert into activity info map after clearing. Error: %v", err), + Message: fmt.Sprintf("applyWorkflowSnapshotTxAsReset failed. Failed to insert into activity info map after clearing. Error: %v", err), } } @@ -284,7 +286,7 @@ func applyWorkflowSnapshotTxAsReset( workflowID, runID); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("ConflictResolveWorkflowExecution operation failed. Failed to clear timer info map. Error: %v", err), + Message: fmt.Sprintf("applyWorkflowSnapshotTxAsReset failed. Failed to clear timer info map. Error: %v", err), } } @@ -296,7 +298,7 @@ func applyWorkflowSnapshotTxAsReset( workflowID, runID); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("ConflictResolveWorkflowExecution operation failed. Failed to insert into timer info map after clearing. Error: %v", err), + Message: fmt.Sprintf("applyWorkflowSnapshotTxAsReset failed. Failed to insert into timer info map after clearing. Error: %v", err), } } @@ -306,7 +308,7 @@ func applyWorkflowSnapshotTxAsReset( workflowID, runID); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("ConflictResolveWorkflowExecution operation failed. Failed to clear child execution info map. Error: %v", err), + Message: fmt.Sprintf("applyWorkflowSnapshotTxAsReset failed. Failed to clear child execution info map. Error: %v", err), } } @@ -318,7 +320,7 @@ func applyWorkflowSnapshotTxAsReset( workflowID, runID); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("ConflictResolveWorkflowExecution operation failed. Failed to insert into activity info map after clearing. Error: %v", err), + Message: fmt.Sprintf("applyWorkflowSnapshotTxAsReset failed. Failed to insert into activity info map after clearing. Error: %v", err), } } @@ -328,7 +330,7 @@ func applyWorkflowSnapshotTxAsReset( workflowID, runID); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("ConflictResolveWorkflowExecution operation failed. Failed to clear request cancel info map. Error: %v", err), + Message: fmt.Sprintf("applyWorkflowSnapshotTxAsReset failed. Failed to clear request cancel info map. Error: %v", err), } } @@ -340,7 +342,7 @@ func applyWorkflowSnapshotTxAsReset( workflowID, runID); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("ConflictResolveWorkflowExecution operation failed. Failed to insert into request cancel info map after clearing. Error: %v", err), + Message: fmt.Sprintf("applyWorkflowSnapshotTxAsReset failed. Failed to insert into request cancel info map after clearing. Error: %v", err), } } @@ -350,7 +352,7 @@ func applyWorkflowSnapshotTxAsReset( workflowID, runID); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("ConflictResolveWorkflowExecution operation failed. Failed to clear signal info map. Error: %v", err), + Message: fmt.Sprintf("applyWorkflowSnapshotTxAsReset failed. Failed to clear signal info map. Error: %v", err), } } @@ -362,7 +364,7 @@ func applyWorkflowSnapshotTxAsReset( workflowID, runID); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("ConflictResolveWorkflowExecution operation failed. Failed to insert into signal info map after clearing. Error: %v", err), + Message: fmt.Sprintf("applyWorkflowSnapshotTxAsReset failed. Failed to insert into signal info map after clearing. Error: %v", err), } } @@ -372,7 +374,7 @@ func applyWorkflowSnapshotTxAsReset( workflowID, runID); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("ConflictResolveWorkflowExecution operation failed. Failed to clear signals requested set. Error: %v", err), + Message: fmt.Sprintf("applyWorkflowSnapshotTxAsReset failed. Failed to clear signals requested set. Error: %v", err), } } @@ -384,7 +386,7 @@ func applyWorkflowSnapshotTxAsReset( workflowID, runID); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("ConflictResolveWorkflowExecution operation failed. Failed to insert into signals requested set after clearing. Error: %v", err), + Message: fmt.Sprintf("applyWorkflowSnapshotTxAsReset failed. Failed to insert into signals requested set after clearing. Error: %v", err), } } @@ -394,7 +396,7 @@ func applyWorkflowSnapshotTxAsReset( workflowID, runID); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("ConflictResolveWorkflowExecution operation failed. Failed to clear buffered events. Error: %v", err), + Message: fmt.Sprintf("applyWorkflowSnapshotTxAsReset failed. Failed to clear buffered events. Error: %v", err), } } return nil @@ -430,9 +432,7 @@ func applyWorkflowSnapshotTxAsNew( lastWriteVersion, currentVersion, shardID); err != nil { - return &workflow.InternalServiceError{ - Message: fmt.Sprintf("ConflictResolveWorkflowExecution operation failed. Failed to update executions row. Erorr: %v", err), - } + return err } if err := applyTasks(tx, @@ -454,7 +454,7 @@ func applyWorkflowSnapshotTxAsNew( workflowID, runID); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("ConflictResolveWorkflowExecution operation failed. Failed to insert into activity info map after clearing. Error: %v", err), + Message: fmt.Sprintf("applyWorkflowSnapshotTxAsNew failed. Failed to insert into activity info map after clearing. Error: %v", err), } } @@ -466,7 +466,7 @@ func applyWorkflowSnapshotTxAsNew( workflowID, runID); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("ConflictResolveWorkflowExecution operation failed. Failed to insert into timer info map after clearing. Error: %v", err), + Message: fmt.Sprintf("applyWorkflowSnapshotTxAsNew failed. Failed to insert into timer info map after clearing. Error: %v", err), } } @@ -478,7 +478,7 @@ func applyWorkflowSnapshotTxAsNew( workflowID, runID); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("ConflictResolveWorkflowExecution operation failed. Failed to insert into activity info map after clearing. Error: %v", err), + Message: fmt.Sprintf("applyWorkflowSnapshotTxAsNew failed. Failed to insert into activity info map after clearing. Error: %v", err), } } @@ -490,7 +490,7 @@ func applyWorkflowSnapshotTxAsNew( workflowID, runID); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("ConflictResolveWorkflowExecution operation failed. Failed to insert into request cancel info map after clearing. Error: %v", err), + Message: fmt.Sprintf("applyWorkflowSnapshotTxAsNew failed. Failed to insert into request cancel info map after clearing. Error: %v", err), } } @@ -502,7 +502,7 @@ func applyWorkflowSnapshotTxAsNew( workflowID, runID); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("ConflictResolveWorkflowExecution operation failed. Failed to insert into signal info map after clearing. Error: %v", err), + Message: fmt.Sprintf("applyWorkflowSnapshotTxAsNew failed. Failed to insert into signal info map after clearing. Error: %v", err), } } @@ -514,7 +514,7 @@ func applyWorkflowSnapshotTxAsNew( workflowID, runID); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("ConflictResolveWorkflowExecution operation failed. Failed to insert into signals requested set after clearing. Error: %v", err), + Message: fmt.Sprintf("applyWorkflowSnapshotTxAsNew failed. Failed to insert into signals requested set after clearing. Error: %v", err), } } @@ -539,7 +539,7 @@ func applyTasks( workflowID, runID); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("UpdateWorkflowExecution operation failed. Failed to create transfer tasks. Error: %v", err), + Message: fmt.Sprintf("applyTasks failed. Failed to create transfer tasks. Error: %v", err), } } @@ -550,7 +550,7 @@ func applyTasks( workflowID, runID); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("UpdateWorkflowExecution operation failed. Failed to create replication tasks. Error: %v", err), + Message: fmt.Sprintf("applyTasks failed. Failed to create replication tasks. Error: %v", err), } } @@ -561,7 +561,7 @@ func applyTasks( workflowID, runID); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("UpdateWorkflowExecution operation failed. Failed to create timer tasks. Error: %v", err), + Message: fmt.Sprintf("applyTasks failed. Failed to create timer tasks. Error: %v", err), } } @@ -582,14 +582,14 @@ func lockCurrentExecutionIfExists( if err != nil { if err != sql.ErrNoRows { return nil, &workflow.InternalServiceError{ - Message: fmt.Sprintf("Failed to get current_executions row for (shard,domain,workflow) = (%v, %v, %v). Error: %v", shardID, domainID, workflowID, err), + Message: fmt.Sprintf("lockCurrentExecutionIfExists failed. Failed to get current_executions row for (shard,domain,workflow) = (%v, %v, %v). Error: %v", shardID, domainID, workflowID, err), } } } size := len(rows) if size > 1 { return nil, &workflow.InternalServiceError{ - Message: fmt.Sprintf("Multiple current_executions rows for (shard,domain,workflow) = (%v, %v, %v).", shardID, domainID, workflowID), + Message: fmt.Sprintf("lockCurrentExecutionIfExists failed. Multiple current_executions rows for (shard,domain,workflow) = (%v, %v, %v).", shardID, domainID, workflowID), } } if size == 0 { @@ -637,7 +637,7 @@ func createOrUpdateCurrentExecution( row.StartVersion, row.LastWriteVersion); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("CreateWorkflowExecution operation failed. Failed to continue as new. Error: %v", err), + Message: fmt.Sprintf("createOrUpdateCurrentExecution failed. Failed to continue as new. Error: %v", err), } } case p.CreateWorkflowModeWorkflowIDReuse: @@ -652,19 +652,19 @@ func createOrUpdateCurrentExecution( row.StartVersion, row.LastWriteVersion); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("CreateWorkflowExecution operation failed. Failed to reuse workflow ID. Error: %v", err), + Message: fmt.Sprintf("createOrUpdateCurrentExecution failed. Failed to reuse workflow ID. Error: %v", err), } } case p.CreateWorkflowModeBrandNew: if _, err := tx.InsertIntoCurrentExecutions(&row); err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("CreateWorkflowExecution operation failed. Failed to insert into current_executions table. Error: %v", err), + Message: fmt.Sprintf("createOrUpdateCurrentExecution failed. Failed to insert into current_executions table. Error: %v", err), } } case p.CreateWorkflowModeZombie: // noop default: - return fmt.Errorf("Unknown workflow creation mode: %v", createMode) + return fmt.Errorf("createOrUpdateCurrentExecution failed. Unknown workflow creation mode: %v", createMode) } return nil @@ -685,7 +685,7 @@ func lockAndCheckNextEventID( } if *nextEventID != condition { return &p.ConditionFailedError{ - Msg: fmt.Sprintf("next_event_id was %v when it should have been %v.", nextEventID, condition), + Msg: fmt.Sprintf("lockAndCheckNextEventID failed. Next_event_id was %v when it should have been %v.", nextEventID, condition), } } return nil @@ -709,7 +709,7 @@ func lockNextEventID( if err == sql.ErrNoRows { return nil, &workflow.EntityNotExistsError{ Message: fmt.Sprintf( - "Failed to lock executions row with (shard, domain, workflow, run) = (%v,%v,%v,%v) which does not exist.", + "lockNextEventID failed. Unable to lock executions row with (shard, domain, workflow, run) = (%v,%v,%v,%v) which does not exist.", shardID, domainID, workflowID, @@ -718,7 +718,7 @@ func lockNextEventID( } } return nil, &workflow.InternalServiceError{ - Message: fmt.Sprintf("Failed to lock executions row. Error: %v", err), + Message: fmt.Sprintf("lockNextEventID failed. Error: %v", err), } } result := int64(nextEventID) @@ -794,7 +794,7 @@ func createTransferTasks( default: return &workflow.InternalServiceError{ - Message: fmt.Sprintf("Unknow transfer type: %v", task.GetType()), + Message: fmt.Sprintf("createTransferTasks failed. Unknow transfer type: %v", task.GetType()), } } @@ -813,20 +813,20 @@ func createTransferTasks( result, err := tx.InsertIntoTransferTasks(transferTasksRows) if err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("Failed to create transfer tasks. Error: %v", err), + Message: fmt.Sprintf("createTransferTasks failed. Error: %v", err), } } rowsAffected, err := result.RowsAffected() if err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("Failed to create transfer tasks. Could not verify number of rows inserted. Error: %v", err), + Message: fmt.Sprintf("createTransferTasks failed. Could not verify number of rows inserted. Error: %v", err), } } if int(rowsAffected) != len(transferTasks) { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("Failed to create transfer tasks. Inserted %v instead of %v rows into transfer_tasks. Error: %v", rowsAffected, len(transferTasks), err), + Message: fmt.Sprintf("createTransferTasks failed. Inserted %v instead of %v rows into transfer_tasks. Error: %v", rowsAffected, len(transferTasks), err), } } @@ -863,7 +863,7 @@ func createReplicationTasks( historyReplicationTask, ok := task.(*p.HistoryReplicationTask) if !ok { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("Failed to cast %v to HistoryReplicationTask", task), + Message: fmt.Sprintf("createReplicationTasks failed. Failed to cast %v to HistoryReplicationTask", task), } } firstEventID = historyReplicationTask.FirstEventID @@ -916,20 +916,20 @@ func createReplicationTasks( result, err := tx.InsertIntoReplicationTasks(replicationTasksRows) if err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("Failed to create replication tasks. Error: %v", err), + Message: fmt.Sprintf("createReplicationTasks failed. Error: %v", err), } } rowsAffected, err := result.RowsAffected() if err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("Failed to create replication tasks. Could not verify number of rows inserted. Error: %v", err), + Message: fmt.Sprintf("createReplicationTasks failed. Could not verify number of rows inserted. Error: %v", err), } } if int(rowsAffected) != len(replicationTasks) { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("Failed to create replication tasks. Inserted %v instead of %v rows into transfer_tasks. Error: %v", rowsAffected, len(replicationTasks), err), + Message: fmt.Sprintf("createReplicationTasks failed. Inserted %v instead of %v rows into transfer_tasks. Error: %v", rowsAffected, len(replicationTasks), err), } } @@ -980,7 +980,7 @@ func createTimerTasks( default: return &workflow.InternalServiceError{ - Message: fmt.Sprintf("Unknown timer task: %v", task.GetType()), + Message: fmt.Sprintf("createTimerTasks failed. Unknown timer task: %v", task.GetType()), } } @@ -1005,19 +1005,19 @@ func createTimerTasks( result, err := tx.InsertIntoTimerTasks(timerTasksRows) if err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("Failed to create timer tasks. Error: %v", err), + Message: fmt.Sprintf("createTimerTasks failed. Error: %v", err), } } rowsAffected, err := result.RowsAffected() if err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("Failed to create timer tasks. Could not verify number of rows inserted. Error: %v", err), + Message: fmt.Sprintf("createTimerTasks failed. Could not verify number of rows inserted. Error: %v", err), } } if int(rowsAffected) != len(timerTasks) { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("Failed to create timer tasks. Inserted %v instead of %v rows into timer_tasks. Error: %v", rowsAffected, len(timerTasks), err), + Message: fmt.Sprintf("createTimerTasks failed. Inserted %v instead of %v rows into timer_tasks. Error: %v", rowsAffected, len(timerTasks), err), } } } @@ -1056,7 +1056,7 @@ func assertRunIDAndUpdateCurrentExecution( assertFn := func(currentRow *sqldb.CurrentExecutionsRow) error { if !bytes.Equal(currentRow.RunID, previousRunID) { return &p.ConditionFailedError{Msg: fmt.Sprintf( - "Update current record failed failed. Current run ID was %v, expected %v", + "assertRunIDAndUpdateCurrentExecution failed. Current run ID was %v, expected %v", currentRow.RunID, previousRunID, )} @@ -1089,21 +1089,21 @@ func assertAndUpdateCurrentExecution( assertFn := func(currentRow *sqldb.CurrentExecutionsRow) error { if !bytes.Equal(currentRow.RunID, previousRunID) { return &p.ConditionFailedError{Msg: fmt.Sprintf( - "Update current record failed failed. Current run ID was %v, expected %v", + "assertAndUpdateCurrentExecution failed. Current run ID was %v, expected %v", currentRow.RunID, previousRunID, )} } if currentRow.LastWriteVersion != previousLastWriteVersion { return &p.ConditionFailedError{Msg: fmt.Sprintf( - "Update current record failed failed. Current last write version was %v, expected %v", + "assertAndUpdateCurrentExecution failed. Current last write version was %v, expected %v", currentRow.LastWriteVersion, previousLastWriteVersion, )} } if currentRow.State != previousState { return &p.ConditionFailedError{Msg: fmt.Sprintf( - "Update current record failed failed. Current state %v, expected %v", + "assertAndUpdateCurrentExecution failed. Current state %v, expected %v", currentRow.State, previousState, )} @@ -1132,7 +1132,7 @@ func assertCurrentExecution( }) if err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("Unable to load current record. Error: %v", err), + Message: fmt.Sprintf("assertCurrentExecution failed. Unable to load current record. Error: %v", err), } } return assertFn(currentRow) @@ -1142,7 +1142,7 @@ func assertRunIDMismatch(runID sqldb.UUID, currentRunID sqldb.UUID) error { // zombie workflow creation with existence of current record, this is a noop if bytes.Equal(currentRunID, runID) { return &p.ConditionFailedError{Msg: fmt.Sprintf( - "Assert not current record failed failed. Current run ID was %v, input %v", + "assertRunIDMismatch failed. Current run ID was %v, input %v", currentRunID, runID, )} @@ -1176,18 +1176,18 @@ func updateCurrentExecution( }) if err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("ContinueAsNew failed. Failed to update current_executions table. Error: %v", err), + Message: fmt.Sprintf("updateCurrentExecution failed. Error: %v", err), } } rowsAffected, err := result.RowsAffected() if err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("ContinueAsNew failed. Failed to check number of rows updated in current_executions table. Error: %v", err), + Message: fmt.Sprintf("updateCurrentExecution failed. Failed to check number of rows updated in current_executions table. Error: %v", err), } } if rowsAffected != 1 { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("ContinueAsNew failed. %v rows of current_executions updated instead of 1.", rowsAffected), + Message: fmt.Sprintf("updateCurrentExecution failed. %v rows of current_executions updated instead of 1.", rowsAffected), } } return nil @@ -1343,19 +1343,39 @@ func createExecution( } result, err := tx.InsertIntoExecutions(row) if err != nil { - return &workflow.InternalServiceError{ - Message: fmt.Sprintf("Failed to insert executions row. Erorr: %v", err), + switch mysqlErr := err.(type) { + case *mysql.MySQLError: + switch mysqlErr.Number { + case uint16(1062): + // mysql error `Duplicate entry` + return &p.WorkflowExecutionAlreadyStartedError{ + Msg: fmt.Sprintf("Workflow execution already running. WorkflowId: %v", executionInfo.WorkflowID), + StartRequestID: executionInfo.CreateRequestID, + RunID: executionInfo.RunID, + State: executionInfo.State, + CloseStatus: executionInfo.CloseStatus, + LastWriteVersion: row.LastWriteVersion, + } + default: + return &workflow.InternalServiceError{ + Message: fmt.Sprintf("createExecution failed. Erorr: %v, Code: %v", err, mysqlErr.Number), + } + } + default: + return &workflow.InternalServiceError{ + Message: fmt.Sprintf("createExecution failed. Erorr: %v", err), + } } } rowsAffected, err := result.RowsAffected() if err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("Failed to insert executions row. Failed to verify number of rows affected. Erorr: %v", err), + Message: fmt.Sprintf("createExecution failed. Failed to verify number of rows affected. Erorr: %v", err), } } if rowsAffected != 1 { return &workflow.EntityNotExistsError{ - Message: fmt.Sprintf("Failed to insert executions row. Affected %v rows updated instead of 1.", rowsAffected), + Message: fmt.Sprintf("createExecution failed. Affected %v rows updated instead of 1.", rowsAffected), } } @@ -1398,18 +1418,18 @@ func updateExecution( result, err := tx.UpdateExecutions(row) if err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("Failed to update executions row. Erorr: %v", err), + Message: fmt.Sprintf("updateExecution failed. Erorr: %v", err), } } rowsAffected, err := result.RowsAffected() if err != nil { return &workflow.InternalServiceError{ - Message: fmt.Sprintf("Failed to update executions row. Failed to verify number of rows affected. Erorr: %v", err), + Message: fmt.Sprintf("updateExecution failed. Failed to verify number of rows affected. Erorr: %v", err), } } if rowsAffected != 1 { return &workflow.EntityNotExistsError{ - Message: fmt.Sprintf("Failed to update executions row. Affected %v rows updated instead of 1.", rowsAffected), + Message: fmt.Sprintf("updateExecution failed. Affected %v rows updated instead of 1.", rowsAffected), } } diff --git a/host/ndc/nDC_integration_test.go b/host/ndc/nDC_integration_test.go index a7d4f00031e..a17af08a643 100644 --- a/host/ndc/nDC_integration_test.go +++ b/host/ndc/nDC_integration_test.go @@ -1444,8 +1444,7 @@ func (s *nDCIntegrationTestSuite) applyEvents( ) { for _, batch := range eventBatches { eventBlob, newRunEventBlob := s.generateEventBlobs(workflowID, runID, workflowType, tasklist, batch) - - err := historyClient.ReplicateEventsV2(s.createContext(), &history.ReplicateEventsV2Request{ + req := &history.ReplicateEventsV2Request{ DomainUUID: common.StringPtr(s.domainID), WorkflowExecution: &shared.WorkflowExecution{ WorkflowId: common.StringPtr(workflowID), @@ -1454,8 +1453,12 @@ func (s *nDCIntegrationTestSuite) applyEvents( VersionHistoryItems: s.toThriftVersionHistoryItems(versionHistory), Events: s.toThriftDataBlob(eventBlob), NewRunEvents: s.toThriftDataBlob(newRunEventBlob), - }) + } + + err := historyClient.ReplicateEventsV2(s.createContext(), req) s.Nil(err, "Failed to replicate history event") + err = historyClient.ReplicateEventsV2(s.createContext(), req) + s.Nil(err, "Failed to dedup replicate history event") } } @@ -1488,6 +1491,8 @@ func (s *nDCIntegrationTestSuite) applyEventsThroughFetcher( } s.standByReplicationTasksChan <- replicationTask + // this is to test whether dedup works + s.standByReplicationTasksChan <- replicationTask } } diff --git a/host/ndc/replication_integration_test.go b/host/ndc/replication_integration_test.go index e218536babf..a4cd9ae114a 100644 --- a/host/ndc/replication_integration_test.go +++ b/host/ndc/replication_integration_test.go @@ -22,6 +22,7 @@ package ndc import ( "math" + "reflect" "time" "github.com/uber/cadence/common/persistence" @@ -126,14 +127,37 @@ func (s *nDCIntegrationTestSuite) TestReplicationMessageDLQ() { executionManager, err := execMgrFactory.NewExecutionManager(0) s.NoError(err) + expectedDLQMsgs := map[int64]bool{} + for _, batch := range historyBatch { + firstEventID := batch.Events[0].GetEventId() + expectedDLQMsgs[firstEventID] = true + } + // Applying replication messages through fetcher is Async. // So we need to retry a couple of times. - for i := 0; i < 10; i++ { +Loop: + for i := 0; i < 60; i++ { time.Sleep(time.Second) + + actualDLQMsgs := map[int64]bool{} request := persistence.NewGetReplicationTasksFromDLQRequest( - "standby", -1, math.MaxInt64, math.MaxInt64, nil) - response, err := executionManager.GetReplicationTasksFromDLQ(request) - if err == nil && len(response.Tasks) == len(historyBatch) { + "standby", -1, math.MaxInt64, math.MaxInt64, nil, + ) + var token []byte + for doPaging := true; doPaging; doPaging = len(token) > 0 { + request.NextPageToken = token + response, err := executionManager.GetReplicationTasksFromDLQ(request) + if err != nil { + continue Loop + } + token = response.NextPageToken + + for _, task := range response.Tasks { + firstEventID := task.FirstEventID + actualDLQMsgs[firstEventID] = true + } + } + if reflect.DeepEqual(expectedDLQMsgs, actualDLQMsgs) { return } } diff --git a/service/history/nDCHistoryReplicator.go b/service/history/nDCHistoryReplicator.go index b4e6a7ae3e4..adf6a124e5d 100644 --- a/service/history/nDCHistoryReplicator.go +++ b/service/history/nDCHistoryReplicator.go @@ -291,6 +291,10 @@ func (r *nDCHistoryReplicatorImpl) applyStartEvents( true, ) if err != nil { + task.getLogger().Error( + "nDCHistoryReplicator unable to apply events when applyStartEvents", + tag.Error(err), + ) return err } @@ -306,7 +310,12 @@ func (r *nDCHistoryReplicatorImpl) applyStartEvents( releaseFn, ), ) - if err == nil { + if err != nil { + task.getLogger().Error( + "nDCHistoryReplicator unable to create workflow when applyStartEvents", + tag.Error(err), + ) + } else { r.notify(task.getSourceCluster(), task.getEventTime()) } return err @@ -327,11 +336,20 @@ func (r *nDCHistoryReplicatorImpl) applyNonStartEventsPrepareBranch( task.getFirstEvent().GetEventId(), task.getFirstEvent().GetVersion(), ) - if err != nil { + switch err.(type) { + case nil: + return doContinue, versionHistoryIndex, nil + case *shared.RetryTaskV2Error: + // replication message can arrive out of order + // do not log + return false, 0, err + default: + task.getLogger().Error( + "nDCHistoryReplicator unable to prepare version history when applyNonStartEventsPrepareBranch", + tag.Error(err), + ) return false, 0, err } - return doContinue, versionHistoryIndex, nil - } func (r *nDCHistoryReplicatorImpl) applyNonStartEventsPrepareMutableState( @@ -344,11 +362,18 @@ func (r *nDCHistoryReplicatorImpl) applyNonStartEventsPrepareMutableState( incomingVersion := task.getVersion() conflictResolver := r.newConflictResolver(context, mutableState, task.getLogger()) - return conflictResolver.prepareMutableState( + mutableState, isRebuilt, err := conflictResolver.prepareMutableState( ctx, branchIndex, incomingVersion, ) + if err != nil { + task.getLogger().Error( + "nDCHistoryReplicator unable to prepare mutable state when applyNonStartEventsPrepareMutableState", + tag.Error(err), + ) + } + return mutableState, isRebuilt, err } func (r *nDCHistoryReplicatorImpl) applyNonStartEventsToCurrentBranch( @@ -371,6 +396,10 @@ func (r *nDCHistoryReplicatorImpl) applyNonStartEventsToCurrentBranch( true, ) if err != nil { + task.getLogger().Error( + "nDCHistoryReplicator unable to apply events when applyNonStartEventsToCurrentBranch", + tag.Error(err), + ) return err } @@ -414,7 +443,12 @@ func (r *nDCHistoryReplicatorImpl) applyNonStartEventsToCurrentBranch( targetWorkflow, newWorkflow, ) - if err == nil { + if err != nil { + task.getLogger().Error( + "nDCHistoryReplicator unable to update workflow when applyNonStartEventsToCurrentBranch", + tag.Error(err), + ) + } else { r.notify(task.getSourceCluster(), task.getEventTime()) } return err @@ -439,6 +473,10 @@ func (r *nDCHistoryReplicatorImpl) applyNonStartEventsToNoneCurrentBranch( return err } if err := r.applyEvents(ctx, newTask); err != nil { + newTask.getLogger().Error( + "nDCHistoryReplicator unable to create new workflow when applyNonStartEventsToNoneCurrentBranch", + tag.Error(err), + ) return err } } @@ -455,7 +493,7 @@ func (r *nDCHistoryReplicatorImpl) applyNonStartEventsToNoneCurrentBranch( return err } - return r.transactionMgr.backfillWorkflow( + err = r.transactionMgr.backfillWorkflow( ctx, task.getEventTime(), newNDCWorkflow( @@ -474,6 +512,14 @@ func (r *nDCHistoryReplicatorImpl) applyNonStartEventsToNoneCurrentBranch( Events: task.getEvents(), }, ) + if err != nil { + task.getLogger().Error( + "nDCHistoryReplicator unable to backfill workflow when applyNonStartEventsToNoneCurrentBranch", + tag.Error(err), + ) + return err + } + return nil } func (r *nDCHistoryReplicatorImpl) applyNonStartEventsMissingMutableState( @@ -512,7 +558,7 @@ func (r *nDCHistoryReplicatorImpl) applyNonStartEventsMissingMutableState( task.getLogger(), ) - return workflowResetter.resetWorkflow( + resetMutableState, err := workflowResetter.resetWorkflow( ctx, task.getEventTime(), baseEventID, @@ -520,6 +566,14 @@ func (r *nDCHistoryReplicatorImpl) applyNonStartEventsMissingMutableState( task.getFirstEvent().GetEventId(), task.getVersion(), ) + if err != nil { + task.getLogger().Error( + "nDCHistoryReplicator unable to reset workflow when applyNonStartEventsMissingMutableState", + tag.Error(err), + ) + return nil, err + } + return resetMutableState, nil } func (r *nDCHistoryReplicatorImpl) applyNonStartEventsResetWorkflow( @@ -540,6 +594,10 @@ func (r *nDCHistoryReplicatorImpl) applyNonStartEventsResetWorkflow( true, ) if err != nil { + task.getLogger().Error( + "nDCHistoryReplicator unable to apply events when applyNonStartEventsResetWorkflow", + tag.Error(err), + ) return err } @@ -557,7 +615,12 @@ func (r *nDCHistoryReplicatorImpl) applyNonStartEventsResetWorkflow( task.getEventTime(), targetWorkflow, ) - if err == nil { + if err != nil { + task.getLogger().Error( + "nDCHistoryReplicator unable to create workflow when applyNonStartEventsResetWorkflow", + tag.Error(err), + ) + } else { r.notify(task.getSourceCluster(), task.getEventTime()) } return err diff --git a/service/history/nDCTransactionMgrForNewWorkflow.go b/service/history/nDCTransactionMgrForNewWorkflow.go index 84163453d82..e7c2448b3db 100644 --- a/service/history/nDCTransactionMgrForNewWorkflow.go +++ b/service/history/nDCTransactionMgrForNewWorkflow.go @@ -239,7 +239,7 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) createAsZombie( createMode := persistence.CreateWorkflowModeZombie prevRunID := "" prevLastWriteVersion := int64(0) - return targetWorkflow.getContext().createWorkflowExecution( + err = targetWorkflow.getContext().createWorkflowExecution( targetWorkflowSnapshot, targetWorkflowHistorySize, now, @@ -247,6 +247,15 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) createAsZombie( prevRunID, prevLastWriteVersion, ) + switch err.(type) { + case nil: + return nil + case *persistence.WorkflowExecutionAlreadyStartedError: + // workflow already created + return nil + default: + return err + } } func (r *nDCTransactionMgrForNewWorkflowImpl) suppressCurrentAndCreateAsCurrent( diff --git a/service/history/nDCTransactionMgrForNewWorkflow_test.go b/service/history/nDCTransactionMgrForNewWorkflow_test.go index 96c3e9c898b..185c9ceca3c 100644 --- a/service/history/nDCTransactionMgrForNewWorkflow_test.go +++ b/service/history/nDCTransactionMgrForNewWorkflow_test.go @@ -297,6 +297,72 @@ func (s *nDCTransactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_Create s.True(currentReleaseCalled) } +func (s *nDCTransactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZombie_Dedup() { + ctx := ctx.Background() + now := time.Now() + + domainID := "some random domain ID" + workflowID := "some random workflow ID" + targetRunID := "some random run ID" + currentRunID := "other random runID" + + targetReleaseCalled := false + currentReleaseCalled := false + + targetWorkflow := NewMocknDCWorkflow(s.controller) + targetContext := NewMockworkflowExecutionContext(s.controller) + targetMutableState := NewMockmutableState(s.controller) + var targetReleaseFn releaseWorkflowExecutionFunc = func(error) { targetReleaseCalled = true } + targetWorkflow.EXPECT().getContext().Return(targetContext).AnyTimes() + targetWorkflow.EXPECT().getMutableState().Return(targetMutableState).AnyTimes() + targetWorkflow.EXPECT().getReleaseFn().Return(targetReleaseFn).AnyTimes() + + currentWorkflow := NewMocknDCWorkflow(s.controller) + var currentReleaseFn releaseWorkflowExecutionFunc = func(error) { currentReleaseCalled = true } + currentWorkflow.EXPECT().getReleaseFn().Return(currentReleaseFn).AnyTimes() + + targetWorkflowSnapshot := &persistence.WorkflowSnapshot{ + ExecutionInfo: &persistence.WorkflowExecutionInfo{ + DomainID: domainID, + WorkflowID: workflowID, + }, + } + targetWorkflowEventsSeq := []*persistence.WorkflowEvents{&persistence.WorkflowEvents{}} + targetWorkflowHistorySize := int64(12345) + targetMutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{ + DomainID: domainID, + WorkflowID: workflowID, + RunID: targetRunID, + }).AnyTimes() + targetMutableState.EXPECT().CloseTransactionAsSnapshot(now, transactionPolicyPassive).Return( + targetWorkflowSnapshot, targetWorkflowEventsSeq, nil, + ).Times(1) + + s.mockTransactionMgr.EXPECT().getCurrentWorkflowRunID(ctx, domainID, workflowID).Return(currentRunID, nil).Times(1) + s.mockTransactionMgr.EXPECT().loadNDCWorkflow(ctx, domainID, workflowID, currentRunID).Return(currentWorkflow, nil).Times(1) + + targetWorkflow.EXPECT().happensAfter(currentWorkflow).Return(false, nil) + targetWorkflow.EXPECT().suppressBy(currentWorkflow).Return(transactionPolicyPassive, nil).Times(1) + + targetContext.EXPECT().persistFirstWorkflowEvents( + targetWorkflowEventsSeq[0], + ).Return(targetWorkflowHistorySize, nil).Times(1) + targetContext.EXPECT().createWorkflowExecution( + targetWorkflowSnapshot, + targetWorkflowHistorySize, + now, + persistence.CreateWorkflowModeZombie, + "", + int64(0), + ).Return(&persistence.WorkflowExecutionAlreadyStartedError{}).Times(1) + targetContext.EXPECT().reapplyEvents(targetWorkflowEventsSeq).Return(nil).Times(1) + + err := s.createMgr.dispatchForNewWorkflow(ctx, now, targetWorkflow) + s.NoError(err) + s.True(targetReleaseCalled) + s.True(currentReleaseCalled) +} + func (s *nDCTransactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_SuppressCurrentAndCreateAsCurrent() { ctx := ctx.Background() now := time.Now()