diff --git a/common/persistence/nosql/nosql_execution_store_test.go b/common/persistence/nosql/nosql_execution_store_test.go index 60e52035909..01162e8aeda 100644 --- a/common/persistence/nosql/nosql_execution_store_test.go +++ b/common/persistence/nosql/nosql_execution_store_test.go @@ -23,10 +23,12 @@ package nosql import ( "context" "errors" + "fmt" "testing" "time" "github.com/golang/mock/gomock" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" "github.com/uber/cadence/common" @@ -37,197 +39,269 @@ import ( "github.com/uber/cadence/service/history/constants" ) -func TestNosqlExecutionStore(t *testing.T) { +func TestCreateWorkflowExecution(t *testing.T) { ctx := context.Background() - shardID := 1 - testCases := []struct { + + tests := []struct { name string - setupMock func(*gomock.Controller) *nosqlExecutionStore - testFunc func(*nosqlExecutionStore) error + setupMock func(*nosqlplugin.MockDB, int) // Now accepts shard ID as parameter + expectedResp *persistence.CreateWorkflowExecutionResponse expectedError error }{ { - name: "CreateWorkflowExecution success", - setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore { - mockDB := nosqlplugin.NewMockDB(ctrl) + name: "success", + setupMock: func(mockDB *nosqlplugin.MockDB, shardID int) { mockDB.EXPECT(). InsertWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(nil) - return newTestNosqlExecutionStore(mockDB, log.NewNoop()) - }, - testFunc: func(store *nosqlExecutionStore) error { - _, err := store.CreateWorkflowExecution(ctx, newCreateWorkflowExecutionRequest()) - return err }, + expectedResp: &persistence.CreateWorkflowExecutionResponse{}, expectedError: nil, }, { - name: "CreateWorkflowExecution failure - workflow already exists", - setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore { - mockDB := nosqlplugin.NewMockDB(ctrl) + name: "ShardRangeIDNotMatch condition failure", + setupMock: func(mockDB *nosqlplugin.MockDB, shardID int) { + err := &nosqlplugin.WorkflowOperationConditionFailure{ + ShardRangeIDNotMatch: common.Int64Ptr(456), + } mockDB.EXPECT(). InsertWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - Return(&persistence.WorkflowExecutionAlreadyStartedError{}).Times(1) - mockDB.EXPECT().IsNotFoundError(gomock.Any()).Return(false).AnyTimes() - mockDB.EXPECT().IsTimeoutError(gomock.Any()).Return(false).AnyTimes() - mockDB.EXPECT().IsThrottlingError(gomock.Any()).Return(false).AnyTimes() - mockDB.EXPECT().IsDBUnavailableError(gomock.Any()).Return(false).AnyTimes() - return newTestNosqlExecutionStore(mockDB, log.NewNoop()) + Return(err) }, - testFunc: func(store *nosqlExecutionStore) error { - _, err := store.CreateWorkflowExecution(ctx, newCreateWorkflowExecutionRequest()) - return err + expectedResp: nil, // No response expected on error + expectedError: &persistence.ShardOwnershipLostError{ + ShardID: 123, + Msg: "Failed to create workflow execution. Request RangeID: 123, Actual RangeID: 456", }, - expectedError: &persistence.WorkflowExecutionAlreadyStartedError{}, }, { - name: "CreateWorkflowExecution failure - shard ownership lost", - setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore { - mockDB := nosqlplugin.NewMockDB(ctrl) + name: "WorkflowExecutionAlreadyExists condition failure", + setupMock: func(mockDB *nosqlplugin.MockDB, shardID int) { + err := &nosqlplugin.WorkflowOperationConditionFailure{ + WorkflowExecutionAlreadyExists: &nosqlplugin.WorkflowExecutionAlreadyExists{ + OtherInfo: "Workflow with ID already exists", + CreateRequestID: "existing-request-id", + RunID: "existing-run-id", + State: persistence.WorkflowStateCompleted, + CloseStatus: persistence.WorkflowCloseStatusCompleted, + LastWriteVersion: 123, + }, + } mockDB.EXPECT(). InsertWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - Return(&persistence.ShardOwnershipLostError{ShardID: shardID, Msg: "shard ownership lost"}).Times(1) - mockDB.EXPECT().IsNotFoundError(gomock.Any()).Return(false).AnyTimes() - mockDB.EXPECT().IsTimeoutError(gomock.Any()).Return(false).AnyTimes() - mockDB.EXPECT().IsThrottlingError(gomock.Any()).Return(false).AnyTimes() - mockDB.EXPECT().IsDBUnavailableError(gomock.Any()).Return(false).AnyTimes() - return newTestNosqlExecutionStore(mockDB, log.NewNoop()) + Return(err) }, - testFunc: func(store *nosqlExecutionStore) error { - _, err := store.CreateWorkflowExecution(ctx, newCreateWorkflowExecutionRequest()) - return err + expectedResp: nil, // No response expected on error + expectedError: &persistence.WorkflowExecutionAlreadyStartedError{ + Msg: "Workflow with ID already exists", + StartRequestID: "existing-request-id", + RunID: "existing-run-id", + State: persistence.WorkflowStateCompleted, + CloseStatus: persistence.WorkflowCloseStatusCompleted, + LastWriteVersion: 123, }, - expectedError: &persistence.ShardOwnershipLostError{}, }, { - name: "CreateWorkflowExecution failure - current workflow condition failed", - setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore { - mockDB := nosqlplugin.NewMockDB(ctrl) + name: "Unknown condition failure", + setupMock: func(mockDB *nosqlplugin.MockDB, shardID int) { + err := &nosqlplugin.WorkflowOperationConditionFailure{ + UnknownConditionFailureDetails: common.StringPtr("Unknown error occurred during operation"), + } mockDB.EXPECT(). InsertWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - Return(&persistence.CurrentWorkflowConditionFailedError{Msg: "current workflow condition failed"}).Times(1) - mockDB.EXPECT().IsNotFoundError(gomock.Any()).Return(false).AnyTimes() - mockDB.EXPECT().IsTimeoutError(gomock.Any()).Return(false).AnyTimes() - mockDB.EXPECT().IsThrottlingError(gomock.Any()).Return(false).AnyTimes() - mockDB.EXPECT().IsDBUnavailableError(gomock.Any()).Return(false).AnyTimes() - return newTestNosqlExecutionStore(mockDB, log.NewNoop()) + Return(err) }, - testFunc: func(store *nosqlExecutionStore) error { - _, err := store.CreateWorkflowExecution(ctx, newCreateWorkflowExecutionRequest()) - return err + expectedResp: nil, + expectedError: &persistence.ShardOwnershipLostError{ + ShardID: 123, + Msg: "Unknown error occurred during operation", }, - expectedError: &persistence.CurrentWorkflowConditionFailedError{}, }, { - name: "CreateWorkflowExecution failure - generic internal service error", - setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore { - mockDB := nosqlplugin.NewMockDB(ctrl) + name: "Current workflow condition failure", + setupMock: func(mockDB *nosqlplugin.MockDB, shardID int) { + err := &nosqlplugin.WorkflowOperationConditionFailure{ + CurrentWorkflowConditionFailInfo: common.StringPtr("Current workflow condition failed"), + } mockDB.EXPECT(). InsertWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - Return(&types.InternalServiceError{Message: "generic internal service error"}).Times(1) - mockDB.EXPECT().IsNotFoundError(gomock.Any()).Return(false).AnyTimes() - mockDB.EXPECT().IsTimeoutError(gomock.Any()).Return(false).AnyTimes() - mockDB.EXPECT().IsThrottlingError(gomock.Any()).Return(false).AnyTimes() - mockDB.EXPECT().IsDBUnavailableError(gomock.Any()).Return(false).AnyTimes() - return newTestNosqlExecutionStore(mockDB, log.NewNoop()) + Return(err) }, - testFunc: func(store *nosqlExecutionStore) error { - _, err := store.CreateWorkflowExecution(ctx, newCreateWorkflowExecutionRequest()) - return err + expectedResp: nil, + expectedError: &persistence.CurrentWorkflowConditionFailedError{ + Msg: "Current workflow condition failed", }, - expectedError: &types.InternalServiceError{}, }, { - name: "GetWorkflowExecution success", - setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore { - mockDB := nosqlplugin.NewMockDB(ctrl) + name: "Unexpected error type leading to unsupported condition failure", + setupMock: func(mockDB *nosqlplugin.MockDB, shardID int) { + // Simulate returning an unexpected error type from the mock + unexpectedErr := errors.New("unexpected error type") + mockDB.EXPECT().IsNotFoundError(gomock.Any()).Return(true).AnyTimes() mockDB.EXPECT(). - SelectWorkflowExecution(ctx, shardID, gomock.Any(), gomock.Any(), gomock.Any()). - Return(&nosqlplugin.WorkflowExecution{}, nil).Times(1) - return newTestNosqlExecutionStore(mockDB, log.NewNoop()) + InsertWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(unexpectedErr) }, - testFunc: func(store *nosqlExecutionStore) error { - _, err := store.GetWorkflowExecution(ctx, newGetWorkflowExecutionRequest()) - return err + expectedResp: nil, + expectedError: fmt.Errorf("unsupported conditionFailureReason error"), // Expected generic error for unexpected conditions + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + controller := gomock.NewController(t) + defer controller.Finish() + + mockDB := nosqlplugin.NewMockDB(controller) + store := newTestNosqlExecutionStore(mockDB, log.NewNoop()) + shardID := store.GetShardID() + + tc.setupMock(mockDB, shardID) + + resp, err := store.CreateWorkflowExecution(ctx, newCreateWorkflowExecutionRequest()) + + if diff := cmp.Diff(tc.expectedResp, resp); diff != "" { + t.Errorf("CreateWorkflowExecution() response mismatch (-want +got):\n%s", diff) + } + if tc.expectedError != nil { + require.ErrorAs(t, err, &tc.expectedError) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestUpdateWorkflowExecution(t *testing.T) { + ctx := context.Background() + + tests := []struct { + name string + setupMock func(*nosqlplugin.MockDB, int) + request func() *persistence.InternalUpdateWorkflowExecutionRequest + expectedError error // Ensure we are using `expectedError` consistently + }{ + + { + name: "success", + setupMock: func(mockDB *nosqlplugin.MockDB, shardID int) { + mockDB.EXPECT(). + UpdateWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), nil, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil) }, + request: newUpdateWorkflowExecutionRequest, expectedError: nil, }, { - name: "GetWorkflowExecution failure - not found", - setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore { - mockDB := nosqlplugin.NewMockDB(ctrl) + name: "Success - UpdateWorkflowModeIgnoreCurrent", + setupMock: func(mockDB *nosqlplugin.MockDB, shardID int) { mockDB.EXPECT(). - SelectWorkflowExecution(ctx, shardID, gomock.Any(), gomock.Any(), gomock.Any()). - Return(nil, &types.EntityNotExistsError{}).Times(1) - mockDB.EXPECT().IsNotFoundError(gomock.Any()).Return(true).AnyTimes() - return newTestNosqlExecutionStore(mockDB, log.NewNoop()) + UpdateWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), nil, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil) }, - testFunc: func(store *nosqlExecutionStore) error { - _, err := store.GetWorkflowExecution(ctx, newGetWorkflowExecutionRequest()) - return err + request: func() *persistence.InternalUpdateWorkflowExecutionRequest { + req := newUpdateWorkflowExecutionRequest() + req.Mode = persistence.UpdateWorkflowModeIgnoreCurrent + return req }, - expectedError: &types.EntityNotExistsError{}, + expectedError: nil, }, { - name: "UpdateWorkflowExecution success", - setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore { - mockDB := nosqlplugin.NewMockDB(ctrl) + name: "UpdateWorkflowModeBypassCurrent - assertNotCurrentExecution failure", + setupMock: func(mockDB *nosqlplugin.MockDB, shardID int) { mockDB.EXPECT(). - UpdateWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Nil(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - Return(nil).Times(1) - return newTestNosqlExecutionStore(mockDB, log.NewNoop()) + UpdateWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), nil, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Times(0) }, - testFunc: func(store *nosqlExecutionStore) error { - err := store.UpdateWorkflowExecution(ctx, newUpdateWorkflowExecutionRequest()) - return err + request: func() *persistence.InternalUpdateWorkflowExecutionRequest { + req := newUpdateWorkflowExecutionRequest() + req.Mode = persistence.UpdateWorkflowModeBypassCurrent + return req }, - expectedError: nil, + expectedError: &types.InternalServiceError{Message: "assertNotCurrentExecution failure"}, }, { - name: "UpdateWorkflowExecution failure - invalid update mode", - setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore { - mockDB := nosqlplugin.NewMockDB(ctrl) - // No operation expected on the DB due to invalid mode - return newTestNosqlExecutionStore(mockDB, log.NewNoop()) + name: "Unknown update mode", + setupMock: func(mockDB *nosqlplugin.MockDB, shardID int) { }, - testFunc: func(store *nosqlExecutionStore) error { - request := newUpdateWorkflowExecutionRequest() - request.Mode = persistence.UpdateWorkflowMode(-1) - return store.UpdateWorkflowExecution(ctx, request) + request: func() *persistence.InternalUpdateWorkflowExecutionRequest { + req := newUpdateWorkflowExecutionRequest() + req.Mode = -1 + return req }, - expectedError: &types.InternalServiceError{}, + expectedError: &types.InternalServiceError{Message: "UpdateWorkflowExecution: unknown mode: -1"}, }, { - name: "UpdateWorkflowExecution failure - condition not met", + name: "Bypass_current_execution_failure_due_to_assertNotCurrentExecution", + setupMock: func(mockDB *nosqlplugin.MockDB, shardID int) {}, + request: func() *persistence.InternalUpdateWorkflowExecutionRequest { + req := newUpdateWorkflowExecutionRequest() + req.Mode = persistence.UpdateWorkflowModeBypassCurrent + return req + }, + expectedError: &types.InternalServiceError{Message: "assertNotCurrentExecution failure"}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + controller := gomock.NewController(t) + mockDB := nosqlplugin.NewMockDB(controller) + store, _ := NewExecutionStore(1, mockDB, log.NewNoop()) + + tc.setupMock(mockDB, 1) + + req := tc.request() + err := store.UpdateWorkflowExecution(ctx, req) + if tc.expectedError != nil { + require.Error(t, err) + require.IsType(t, tc.expectedError, err, "Error type does not match the expected one.") + } else { + require.NoError(t, err) + } + }) + } +} + +func TestNosqlExecutionStore(t *testing.T) { + ctx := context.Background() + shardID := 1 + testCases := []struct { + name string + setupMock func(*gomock.Controller) *nosqlExecutionStore + testFunc func(*nosqlExecutionStore) error + expectedError error + }{ + { + name: "GetWorkflowExecution success", setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore { mockDB := nosqlplugin.NewMockDB(ctrl) - conditionFailure := &nosqlplugin.WorkflowOperationConditionFailure{ - UnknownConditionFailureDetails: common.StringPtr("condition not met"), - } mockDB.EXPECT(). - UpdateWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Nil(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - Return(conditionFailure).Times(1) + SelectWorkflowExecution(ctx, shardID, gomock.Any(), gomock.Any(), gomock.Any()). + Return(&nosqlplugin.WorkflowExecution{}, nil).Times(1) return newTestNosqlExecutionStore(mockDB, log.NewNoop()) }, testFunc: func(store *nosqlExecutionStore) error { - return store.UpdateWorkflowExecution(ctx, newUpdateWorkflowExecutionRequest()) + _, err := store.GetWorkflowExecution(ctx, newGetWorkflowExecutionRequest()) + return err }, - expectedError: &persistence.ConditionFailedError{}, + expectedError: nil, }, { - name: "UpdateWorkflowExecution failure - operational error", + name: "GetWorkflowExecution failure - not found", setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore { mockDB := nosqlplugin.NewMockDB(ctrl) mockDB.EXPECT(). - UpdateWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Nil(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - Return(errors.New("database is unavailable")).Times(1) + SelectWorkflowExecution(ctx, shardID, gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil, &types.EntityNotExistsError{}).Times(1) mockDB.EXPECT().IsNotFoundError(gomock.Any()).Return(true).AnyTimes() return newTestNosqlExecutionStore(mockDB, log.NewNoop()) }, testFunc: func(store *nosqlExecutionStore) error { - return store.UpdateWorkflowExecution(ctx, newUpdateWorkflowExecutionRequest()) + _, err := store.GetWorkflowExecution(ctx, newGetWorkflowExecutionRequest()) + return err }, - expectedError: &types.InternalServiceError{Message: "database is unavailable"}, + expectedError: &types.EntityNotExistsError{}, }, { name: "DeleteWorkflowExecution success", @@ -1215,6 +1289,29 @@ func TestCreateFailoverMarkerTasks(t *testing.T) { }, expectedError: nil, }, + { + name: "CreateFailoverMarkerTasks failure - ShardOperationConditionFailure", + rangeID: 123, + markers: []*persistence.FailoverMarkerTask{ + { + TaskData: persistence.TaskData{}, + DomainID: "testDomainID", + }, + }, + setupMock: func(mockDB *nosqlplugin.MockDB) { + conditionFailureErr := &nosqlplugin.ShardOperationConditionFailure{ + RangeID: 123, // Use direct int64 value + Details: "Shard condition failed", // Use direct string value + } + mockDB.EXPECT(). + InsertReplicationTask(ctx, gomock.Any(), nosqlplugin.ShardCondition{ShardID: shardID, RangeID: 123}). + Return(conditionFailureErr) // Simulate ShardOperationConditionFailure + }, + expectedError: &persistence.ShardOwnershipLostError{ + ShardID: shardID, + Msg: "Failed to create workflow execution. Request RangeID: 123, columns: (Shard condition failed)", + }, + }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) {