diff --git a/service/history/execution/context.go b/service/history/execution/context.go index e787946f1df..84c8def7f9f 100644 --- a/service/history/execution/context.go +++ b/service/history/execution/context.go @@ -187,6 +187,7 @@ type ( emitWorkflowCompletionStatsFn func(string, string, string, string, string, *types.HistoryEvent) mergeContinueAsNewReplicationTasksFn func(persistence.UpdateWorkflowMode, *persistence.WorkflowMutation, *persistence.WorkflowSnapshot) error updateWorkflowExecutionEventReapplyFn func(persistence.UpdateWorkflowMode, []*persistence.WorkflowEvents, []*persistence.WorkflowEvents) error + conflictResolveEventReapplyFn func(persistence.ConflictResolveWorkflowMode, []*persistence.WorkflowEvents, []*persistence.WorkflowEvents) error emitLargeWorkflowShardIDStatsFn func(int64, int64, int64, int64) } ) @@ -243,6 +244,7 @@ func NewContext( ctx.persistStartWorkflowBatchEventsFn = ctx.PersistStartWorkflowBatchEvents ctx.persistNonStartWorkflowBatchEventsFn = ctx.PersistNonStartWorkflowBatchEvents ctx.updateWorkflowExecutionEventReapplyFn = ctx.updateWorkflowExecutionEventReapply + ctx.conflictResolveEventReapplyFn = ctx.conflictResolveEventReapply ctx.emitLargeWorkflowShardIDStatsFn = ctx.emitLargeWorkflowShardIDStats return ctx } @@ -466,17 +468,13 @@ func (c *contextImpl) ConflictResolveWorkflowExecution( currentMutableState MutableState, currentTransactionPolicy *TransactionPolicy, ) (retError error) { - defer func() { if retError != nil { c.Clear() } }() - resetWorkflow, resetWorkflowEventsSeq, err := resetMutableState.CloseTransactionAsSnapshot( - now, - TransactionPolicyPassive, - ) + resetWorkflow, resetWorkflowEventsSeq, err := resetMutableState.CloseTransactionAsSnapshot(now, TransactionPolicyPassive) if err != nil { return err } @@ -484,7 +482,7 @@ func (c *contextImpl) ConflictResolveWorkflowExecution( var persistedBlobs events.PersistedBlobs resetHistorySize := c.GetHistorySize() for _, workflowEvents := range resetWorkflowEventsSeq { - blob, err := c.PersistNonStartWorkflowBatchEvents(ctx, workflowEvents) + blob, err := c.persistNonStartWorkflowBatchEventsFn(ctx, workflowEvents) if err != nil { return err } @@ -499,23 +497,19 @@ func (c *contextImpl) ConflictResolveWorkflowExecution( var newWorkflow *persistence.WorkflowSnapshot var newWorkflowEventsSeq []*persistence.WorkflowEvents if newContext != nil && newMutableState != nil { - defer func() { if retError != nil { newContext.Clear() } }() - newWorkflow, newWorkflowEventsSeq, err = newMutableState.CloseTransactionAsSnapshot( - now, - TransactionPolicyPassive, - ) + newWorkflow, newWorkflowEventsSeq, err = newMutableState.CloseTransactionAsSnapshot(now, TransactionPolicyPassive) if err != nil { return err } newWorkflowSizeSize := newContext.GetHistorySize() startEvents := newWorkflowEventsSeq[0] - blob, err := c.PersistStartWorkflowBatchEvents(ctx, startEvents) + blob, err := c.persistStartWorkflowBatchEventsFn(ctx, startEvents) if err != nil { return err } @@ -530,23 +524,19 @@ func (c *contextImpl) ConflictResolveWorkflowExecution( var currentWorkflow *persistence.WorkflowMutation var currentWorkflowEventsSeq []*persistence.WorkflowEvents if currentContext != nil && currentMutableState != nil && currentTransactionPolicy != nil { - defer func() { if retError != nil { currentContext.Clear() } }() - currentWorkflow, currentWorkflowEventsSeq, err = currentMutableState.CloseTransactionAsMutation( - now, - *currentTransactionPolicy, - ) + currentWorkflow, currentWorkflowEventsSeq, err = currentMutableState.CloseTransactionAsMutation(now, *currentTransactionPolicy) if err != nil { return err } currentWorkflowSize := currentContext.GetHistorySize() for _, workflowEvents := range currentWorkflowEventsSeq { - blob, err := c.PersistNonStartWorkflowBatchEvents(ctx, workflowEvents) + blob, err := c.persistNonStartWorkflowBatchEventsFn(ctx, workflowEvents) if err != nil { return err } @@ -559,7 +549,7 @@ func (c *contextImpl) ConflictResolveWorkflowExecution( } } - if err := c.conflictResolveEventReapply( + if err := c.conflictResolveEventReapplyFn( conflictResolveMode, resetWorkflowEventsSeq, newWorkflowEventsSeq, @@ -582,9 +572,9 @@ func (c *contextImpl) ConflictResolveWorkflowExecution( }) if err != nil { if isOperationPossiblySuccessfulError(err) { - notifyTasksFromWorkflowSnapshot(c.shard.GetEngine(), resetWorkflow, persistedBlobs, true) - notifyTasksFromWorkflowSnapshot(c.shard.GetEngine(), newWorkflow, persistedBlobs, true) - notifyTasksFromWorkflowMutation(c.shard.GetEngine(), currentWorkflow, persistedBlobs, true) + c.notifyTasksFromWorkflowSnapshotFn(resetWorkflow, persistedBlobs, true) + c.notifyTasksFromWorkflowSnapshotFn(newWorkflow, persistedBlobs, true) + c.notifyTasksFromWorkflowMutationFn(currentWorkflow, persistedBlobs, true) } return err } @@ -607,34 +597,21 @@ func (c *contextImpl) ConflictResolveWorkflowExecution( workflowCloseState, )) - notifyTasksFromWorkflowSnapshot(c.shard.GetEngine(), resetWorkflow, persistedBlobs, false) - notifyTasksFromWorkflowSnapshot(c.shard.GetEngine(), newWorkflow, persistedBlobs, false) - notifyTasksFromWorkflowMutation(c.shard.GetEngine(), currentWorkflow, persistedBlobs, false) + c.notifyTasksFromWorkflowSnapshotFn(resetWorkflow, persistedBlobs, false) + c.notifyTasksFromWorkflowSnapshotFn(newWorkflow, persistedBlobs, false) + c.notifyTasksFromWorkflowMutationFn(currentWorkflow, persistedBlobs, false) // finally emit session stats - domainName := c.GetDomainName() - emitWorkflowHistoryStats( - c.metricsClient, - domainName, - int(c.stats.HistorySize), - int(resetMutableState.GetNextEventID()-1), - ) - emitSessionUpdateStats( - c.metricsClient, - domainName, - resp.MutableStateUpdateSessionStats, - ) + c.emitWorkflowHistoryStatsFn(domain, int(c.stats.HistorySize), int(resetMutableState.GetNextEventID()-1)) + c.emitSessionUpdateStatsFn(domain, resp.MutableStateUpdateSessionStats) // emit workflow completion stats if any if resetWorkflow.ExecutionInfo.State == persistence.WorkflowStateCompleted { if event, err := resetMutableState.GetCompletionEvent(ctx); err == nil { workflowType := resetWorkflow.ExecutionInfo.WorkflowTypeName taskList := resetWorkflow.ExecutionInfo.TaskList - emitWorkflowCompletionStats(c.metricsClient, c.logger, - domainName, workflowType, c.workflowExecution.GetWorkflowID(), c.workflowExecution.GetRunID(), - taskList, event) + c.emitWorkflowCompletionStatsFn(domain, workflowType, c.workflowExecution.GetWorkflowID(), c.workflowExecution.GetRunID(), taskList, event) } } - return nil } diff --git a/service/history/execution/context_test.go b/service/history/execution/context_test.go index cc89f05c8c4..16556807e0d 100644 --- a/service/history/execution/context_test.go +++ b/service/history/execution/context_test.go @@ -1900,6 +1900,634 @@ func TestUpdateWorkflowExecutionWithNew(t *testing.T) { } } +func TestConflictResolveWorkflowExecution(t *testing.T) { + testCases := []struct { + name string + conflictResolveMode persistence.ConflictResolveWorkflowMode + newContext Context + currentContext Context + currentWorkflowTransactionPolicy *TransactionPolicy + mockSetup func(*shard.MockContext, *cache.MockDomainCache, *MockMutableState, *MockMutableState, *MockMutableState, *engine.MockEngine) + mockPersistNonStartWorkflowBatchEventsFn func(context.Context, *persistence.WorkflowEvents) (events.PersistedBlob, error) + mockPersistStartWorkflowBatchEventsFn func(context.Context, *persistence.WorkflowEvents) (events.PersistedBlob, error) + mockUpdateWorkflowExecutionFn func(context.Context, *persistence.UpdateWorkflowExecutionRequest) (*persistence.UpdateWorkflowExecutionResponse, error) + mockNotifyTasksFromWorkflowMutationFn func(*persistence.WorkflowMutation, events.PersistedBlobs, bool) + mockNotifyTasksFromWorkflowSnapshotFn func(*persistence.WorkflowSnapshot, events.PersistedBlobs, bool) + mockEmitSessionUpdateStatsFn func(string, *persistence.MutableStateUpdateSessionStats) + mockEmitWorkflowHistoryStatsFn func(string, int, int) + mockEmitLargeWorkflowShardIDStatsFn func(int64, int64, int64, int64) + mockEmitWorkflowCompletionStatsFn func(string, string, string, string, string, *types.HistoryEvent) + mockMergeContinueAsNewReplicationTasksFn func(persistence.UpdateWorkflowMode, *persistence.WorkflowMutation, *persistence.WorkflowSnapshot) error + mockConflictResolveWorkflowExecutionEventReapplyFn func(persistence.ConflictResolveWorkflowMode, []*persistence.WorkflowEvents, []*persistence.WorkflowEvents) error + wantErr bool + assertErr func(*testing.T, error) + }{ + { + name: "resetMutableState CloseTransactionAsSnapshot failed", + mockSetup: func(mockShard *shard.MockContext, mockDomainCache *cache.MockDomainCache, mockResetMutableState *MockMutableState, mockNewMutableState *MockMutableState, mockMutableState *MockMutableState, mockEngine *engine.MockEngine) { + mockResetMutableState.EXPECT().CloseTransactionAsSnapshot(gomock.Any(), gomock.Any()).Return(nil, nil, errors.New("some error")) + }, + wantErr: true, + assertErr: func(t *testing.T, err error) { + assert.Equal(t, errors.New("some error"), err) + }, + }, + { + name: "persistNonStartWorkflowEvents failed", + mockSetup: func(mockShard *shard.MockContext, mockDomainCache *cache.MockDomainCache, mockResetMutableState *MockMutableState, mockNewMutableState *MockMutableState, mockMutableState *MockMutableState, mockEngine *engine.MockEngine) { + mockResetMutableState.EXPECT().CloseTransactionAsSnapshot(gomock.Any(), gomock.Any()).Return(&persistence.WorkflowSnapshot{}, []*persistence.WorkflowEvents{ + { + Events: []*types.HistoryEvent{ + { + ID: 1, + }, + }, + BranchToken: []byte{1, 2, 3}, + }, + }, nil) + }, + mockPersistNonStartWorkflowBatchEventsFn: func(context.Context, *persistence.WorkflowEvents) (events.PersistedBlob, error) { + return events.PersistedBlob{}, errors.New("some error") + }, + wantErr: true, + assertErr: func(t *testing.T, err error) { + assert.Equal(t, errors.New("some error"), err) + }, + }, + { + name: "newMutableState CloseTransactionAsSnapshot failed", + newContext: &contextImpl{ + stats: &persistence.ExecutionStats{}, + metricsClient: metrics.NewNoopMetricsClient(), + }, + mockSetup: func(mockShard *shard.MockContext, mockDomainCache *cache.MockDomainCache, mockResetMutableState *MockMutableState, mockNewMutableState *MockMutableState, mockMutableState *MockMutableState, mockEngine *engine.MockEngine) { + mockResetMutableState.EXPECT().CloseTransactionAsSnapshot(gomock.Any(), gomock.Any()).Return(&persistence.WorkflowSnapshot{}, []*persistence.WorkflowEvents{ + { + Events: []*types.HistoryEvent{ + { + ID: 1, + }, + }, + BranchToken: []byte{1, 2, 3}, + }, + }, nil) + mockNewMutableState.EXPECT().CloseTransactionAsSnapshot(gomock.Any(), gomock.Any()).Return(nil, nil, errors.New("some error")) + }, + mockPersistNonStartWorkflowBatchEventsFn: func(context.Context, *persistence.WorkflowEvents) (events.PersistedBlob, error) { + return events.PersistedBlob{}, nil + }, + wantErr: true, + assertErr: func(t *testing.T, err error) { + assert.Equal(t, errors.New("some error"), err) + }, + }, + { + name: "persistStartWorkflowEvents failed", + newContext: &contextImpl{ + stats: &persistence.ExecutionStats{}, + metricsClient: metrics.NewNoopMetricsClient(), + }, + mockSetup: func(mockShard *shard.MockContext, mockDomainCache *cache.MockDomainCache, mockResetMutableState *MockMutableState, mockNewMutableState *MockMutableState, mockMutableState *MockMutableState, mockEngine *engine.MockEngine) { + mockResetMutableState.EXPECT().CloseTransactionAsSnapshot(gomock.Any(), gomock.Any()).Return(&persistence.WorkflowSnapshot{}, []*persistence.WorkflowEvents{ + { + Events: []*types.HistoryEvent{ + { + ID: 1, + }, + }, + BranchToken: []byte{1, 2, 3}, + }, + }, nil) + mockNewMutableState.EXPECT().CloseTransactionAsSnapshot(gomock.Any(), gomock.Any()).Return(&persistence.WorkflowSnapshot{}, []*persistence.WorkflowEvents{ + { + Events: []*types.HistoryEvent{ + { + ID: common.FirstEventID, + }, + }, + BranchToken: []byte{4}, + }, + }, nil) + }, + mockPersistNonStartWorkflowBatchEventsFn: func(context.Context, *persistence.WorkflowEvents) (events.PersistedBlob, error) { + return events.PersistedBlob{}, nil + }, + mockPersistStartWorkflowBatchEventsFn: func(context.Context, *persistence.WorkflowEvents) (events.PersistedBlob, error) { + return events.PersistedBlob{}, errors.New("some error") + }, + wantErr: true, + assertErr: func(t *testing.T, err error) { + assert.Equal(t, errors.New("some error"), err) + }, + }, + { + name: "currentMutableState CloseTransactionAsMutation failed", + newContext: &contextImpl{ + stats: &persistence.ExecutionStats{}, + metricsClient: metrics.NewNoopMetricsClient(), + }, + currentContext: &contextImpl{ + stats: &persistence.ExecutionStats{}, + metricsClient: metrics.NewNoopMetricsClient(), + }, + currentWorkflowTransactionPolicy: TransactionPolicyActive.Ptr(), + mockSetup: func(mockShard *shard.MockContext, mockDomainCache *cache.MockDomainCache, mockResetMutableState *MockMutableState, mockNewMutableState *MockMutableState, mockMutableState *MockMutableState, mockEngine *engine.MockEngine) { + mockResetMutableState.EXPECT().CloseTransactionAsSnapshot(gomock.Any(), gomock.Any()).Return(&persistence.WorkflowSnapshot{}, []*persistence.WorkflowEvents{ + { + Events: []*types.HistoryEvent{ + { + ID: 1, + }, + }, + BranchToken: []byte{1, 2, 3}, + }, + }, nil) + mockNewMutableState.EXPECT().CloseTransactionAsSnapshot(gomock.Any(), gomock.Any()).Return(&persistence.WorkflowSnapshot{}, []*persistence.WorkflowEvents{ + { + Events: []*types.HistoryEvent{ + { + ID: common.FirstEventID, + }, + }, + BranchToken: []byte{4}, + }, + }, nil) + mockMutableState.EXPECT().CloseTransactionAsMutation(gomock.Any(), gomock.Any()).Return(nil, nil, errors.New("some error")) + }, + mockPersistNonStartWorkflowBatchEventsFn: func(context.Context, *persistence.WorkflowEvents) (events.PersistedBlob, error) { + return events.PersistedBlob{}, nil + }, + mockPersistStartWorkflowBatchEventsFn: func(context.Context, *persistence.WorkflowEvents) (events.PersistedBlob, error) { + return events.PersistedBlob{}, nil + }, + wantErr: true, + assertErr: func(t *testing.T, err error) { + assert.Equal(t, errors.New("some error"), err) + }, + }, + { + name: "currentMutableState persistNonStartWorkflowEvents failed", + newContext: &contextImpl{ + stats: &persistence.ExecutionStats{}, + metricsClient: metrics.NewNoopMetricsClient(), + }, + currentContext: &contextImpl{ + stats: &persistence.ExecutionStats{}, + metricsClient: metrics.NewNoopMetricsClient(), + }, + currentWorkflowTransactionPolicy: TransactionPolicyActive.Ptr(), + mockSetup: func(mockShard *shard.MockContext, mockDomainCache *cache.MockDomainCache, mockResetMutableState *MockMutableState, mockNewMutableState *MockMutableState, mockMutableState *MockMutableState, mockEngine *engine.MockEngine) { + mockResetMutableState.EXPECT().CloseTransactionAsSnapshot(gomock.Any(), gomock.Any()).Return(&persistence.WorkflowSnapshot{}, []*persistence.WorkflowEvents{ + { + Events: []*types.HistoryEvent{ + { + ID: 1, + }, + }, + BranchToken: []byte{1, 2, 3}, + }, + }, nil) + mockNewMutableState.EXPECT().CloseTransactionAsSnapshot(gomock.Any(), gomock.Any()).Return(&persistence.WorkflowSnapshot{}, []*persistence.WorkflowEvents{ + { + Events: []*types.HistoryEvent{ + { + ID: common.FirstEventID, + }, + }, + BranchToken: []byte{4}, + }, + }, nil) + mockMutableState.EXPECT().CloseTransactionAsMutation(gomock.Any(), gomock.Any()).Return(&persistence.WorkflowMutation{}, []*persistence.WorkflowEvents{ + { + Events: []*types.HistoryEvent{ + { + ID: 2, + }, + }, + BranchToken: []byte{5, 6}, + }, + }, nil) + }, + mockPersistNonStartWorkflowBatchEventsFn: func(_ context.Context, history *persistence.WorkflowEvents) (events.PersistedBlob, error) { + if history.BranchToken[0] == 1 { + return events.PersistedBlob{}, nil + } + return events.PersistedBlob{}, errors.New("some error") + }, + mockPersistStartWorkflowBatchEventsFn: func(context.Context, *persistence.WorkflowEvents) (events.PersistedBlob, error) { + return events.PersistedBlob{}, nil + }, + wantErr: true, + assertErr: func(t *testing.T, err error) { + assert.Equal(t, errors.New("some error"), err) + }, + }, + { + name: "conflictResolveEventReapply failed", + newContext: &contextImpl{ + stats: &persistence.ExecutionStats{}, + metricsClient: metrics.NewNoopMetricsClient(), + }, + currentContext: &contextImpl{ + stats: &persistence.ExecutionStats{}, + metricsClient: metrics.NewNoopMetricsClient(), + }, + currentWorkflowTransactionPolicy: TransactionPolicyActive.Ptr(), + mockSetup: func(mockShard *shard.MockContext, mockDomainCache *cache.MockDomainCache, mockResetMutableState *MockMutableState, mockNewMutableState *MockMutableState, mockMutableState *MockMutableState, mockEngine *engine.MockEngine) { + mockResetMutableState.EXPECT().CloseTransactionAsSnapshot(gomock.Any(), gomock.Any()).Return(&persistence.WorkflowSnapshot{}, []*persistence.WorkflowEvents{ + { + Events: []*types.HistoryEvent{ + { + ID: 1, + }, + }, + BranchToken: []byte{1, 2, 3}, + }, + }, nil) + mockNewMutableState.EXPECT().CloseTransactionAsSnapshot(gomock.Any(), gomock.Any()).Return(&persistence.WorkflowSnapshot{}, []*persistence.WorkflowEvents{ + { + Events: []*types.HistoryEvent{ + { + ID: common.FirstEventID, + }, + }, + BranchToken: []byte{4}, + }, + }, nil) + mockMutableState.EXPECT().CloseTransactionAsMutation(gomock.Any(), gomock.Any()).Return(&persistence.WorkflowMutation{}, []*persistence.WorkflowEvents{ + { + Events: []*types.HistoryEvent{ + { + ID: 2, + }, + }, + BranchToken: []byte{5, 6}, + }, + }, nil) + }, + mockPersistNonStartWorkflowBatchEventsFn: func(_ context.Context, history *persistence.WorkflowEvents) (events.PersistedBlob, error) { + return events.PersistedBlob{}, nil + }, + mockPersistStartWorkflowBatchEventsFn: func(context.Context, *persistence.WorkflowEvents) (events.PersistedBlob, error) { + return events.PersistedBlob{}, nil + }, + mockConflictResolveWorkflowExecutionEventReapplyFn: func(persistence.ConflictResolveWorkflowMode, []*persistence.WorkflowEvents, []*persistence.WorkflowEvents) error { + return errors.New("some error") + }, + wantErr: true, + assertErr: func(t *testing.T, err error) { + assert.Equal(t, errors.New("some error"), err) + }, + }, + { + name: "ConflictResolveWorkflowExecution failed", + newContext: &contextImpl{ + stats: &persistence.ExecutionStats{}, + metricsClient: metrics.NewNoopMetricsClient(), + }, + currentContext: &contextImpl{ + stats: &persistence.ExecutionStats{}, + metricsClient: metrics.NewNoopMetricsClient(), + }, + currentWorkflowTransactionPolicy: TransactionPolicyActive.Ptr(), + mockSetup: func(mockShard *shard.MockContext, mockDomainCache *cache.MockDomainCache, mockResetMutableState *MockMutableState, mockNewMutableState *MockMutableState, mockMutableState *MockMutableState, mockEngine *engine.MockEngine) { + mockResetMutableState.EXPECT().CloseTransactionAsSnapshot(gomock.Any(), gomock.Any()).Return(&persistence.WorkflowSnapshot{}, []*persistence.WorkflowEvents{ + { + Events: []*types.HistoryEvent{ + { + ID: 1, + }, + }, + BranchToken: []byte{1, 2, 3}, + }, + }, nil) + mockNewMutableState.EXPECT().CloseTransactionAsSnapshot(gomock.Any(), gomock.Any()).Return(&persistence.WorkflowSnapshot{}, []*persistence.WorkflowEvents{ + { + Events: []*types.HistoryEvent{ + { + ID: common.FirstEventID, + }, + }, + BranchToken: []byte{4}, + }, + }, nil) + mockMutableState.EXPECT().CloseTransactionAsMutation(gomock.Any(), gomock.Any()).Return(&persistence.WorkflowMutation{}, []*persistence.WorkflowEvents{ + { + Events: []*types.HistoryEvent{ + { + ID: 2, + }, + }, + BranchToken: []byte{5, 6}, + }, + }, nil) + mockShard.EXPECT().GetDomainCache().Return(mockDomainCache) + mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return("test-domain", nil) + mockShard.EXPECT().ConflictResolveWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil, errors.New("some error")) + }, + mockPersistNonStartWorkflowBatchEventsFn: func(_ context.Context, history *persistence.WorkflowEvents) (events.PersistedBlob, error) { + return events.PersistedBlob{}, nil + }, + mockPersistStartWorkflowBatchEventsFn: func(context.Context, *persistence.WorkflowEvents) (events.PersistedBlob, error) { + return events.PersistedBlob{}, nil + }, + mockConflictResolveWorkflowExecutionEventReapplyFn: func(persistence.ConflictResolveWorkflowMode, []*persistence.WorkflowEvents, []*persistence.WorkflowEvents) error { + return nil + }, + mockNotifyTasksFromWorkflowMutationFn: func(currentWorkflow *persistence.WorkflowMutation, currentEvents events.PersistedBlobs, persistenceError bool) { + assert.Equal(t, true, persistenceError, "case: ConflictResolveWorkflowExecution failed") + }, + mockNotifyTasksFromWorkflowSnapshotFn: func(newWorkflow *persistence.WorkflowSnapshot, newEvents events.PersistedBlobs, persistenceError bool) { + assert.Equal(t, true, persistenceError, "case: ConflictResolveWorkflowExecution failed") + }, + wantErr: true, + assertErr: func(t *testing.T, err error) { + assert.Equal(t, errors.New("some error"), err) + }, + }, + { + name: "ConflictResolveWorkflowExecution success", + conflictResolveMode: persistence.ConflictResolveWorkflowModeUpdateCurrent, + newContext: &contextImpl{ + stats: &persistence.ExecutionStats{}, + metricsClient: metrics.NewNoopMetricsClient(), + }, + currentContext: &contextImpl{ + stats: &persistence.ExecutionStats{}, + metricsClient: metrics.NewNoopMetricsClient(), + }, + currentWorkflowTransactionPolicy: TransactionPolicyActive.Ptr(), + mockSetup: func(mockShard *shard.MockContext, mockDomainCache *cache.MockDomainCache, mockResetMutableState *MockMutableState, mockNewMutableState *MockMutableState, mockMutableState *MockMutableState, mockEngine *engine.MockEngine) { + mockResetMutableState.EXPECT().CloseTransactionAsSnapshot(gomock.Any(), gomock.Any()).Return(&persistence.WorkflowSnapshot{ + ExecutionInfo: &persistence.WorkflowExecutionInfo{ + DomainID: "test-domain-id", + WorkflowID: "test-workflow-id", + RunID: "test-run-id", + State: persistence.WorkflowStateCompleted, + }, + }, []*persistence.WorkflowEvents{ + { + Events: []*types.HistoryEvent{ + { + ID: 1, + }, + }, + BranchToken: []byte{1, 2, 3}, + }, + }, nil) + mockNewMutableState.EXPECT().CloseTransactionAsSnapshot(gomock.Any(), gomock.Any()).Return(&persistence.WorkflowSnapshot{ + ExecutionInfo: &persistence.WorkflowExecutionInfo{ + DomainID: "test-domain-id", + WorkflowID: "test-workflow-id", + RunID: "test-run-id2", + }, + }, []*persistence.WorkflowEvents{ + { + Events: []*types.HistoryEvent{ + { + ID: common.FirstEventID, + }, + }, + BranchToken: []byte{4}, + }, + }, nil) + mockMutableState.EXPECT().CloseTransactionAsMutation(gomock.Any(), gomock.Any()).Return(&persistence.WorkflowMutation{ + ExecutionInfo: &persistence.WorkflowExecutionInfo{ + DomainID: "test-domain-id", + WorkflowID: "test-workflow-id", + RunID: "test-run-id0", + }, + }, []*persistence.WorkflowEvents{ + { + Events: []*types.HistoryEvent{ + { + ID: 2, + }, + }, + BranchToken: []byte{5, 6}, + }, + }, nil) + mockShard.EXPECT().GetDomainCache().Return(mockDomainCache) + mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return("test-domain", nil) + mockShard.EXPECT().ConflictResolveWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.ConflictResolveWorkflowExecutionResponse{ + MutableStateUpdateSessionStats: &persistence.MutableStateUpdateSessionStats{ + MutableStateSize: 123, + }, + }, nil) + mockResetMutableState.EXPECT().GetCurrentBranchToken().Return([]byte{1}, nil) + mockResetMutableState.EXPECT().GetWorkflowStateCloseStatus().Return(persistence.WorkflowStateCompleted, persistence.WorkflowCloseStatusCompleted) + mockShard.EXPECT().GetEngine().Return(mockEngine) + mockEngine.EXPECT().NotifyNewHistoryEvent(gomock.Any()) + mockResetMutableState.EXPECT().GetLastFirstEventID().Return(int64(123)) + mockResetMutableState.EXPECT().GetNextEventID().Return(int64(456)) + mockResetMutableState.EXPECT().GetPreviousStartedEventID().Return(int64(789)) + mockResetMutableState.EXPECT().GetNextEventID().Return(int64(1111)) + mockResetMutableState.EXPECT().GetCompletionEvent(gomock.Any()).Return(&types.HistoryEvent{ + ID: 123, + }, nil) + }, + mockPersistNonStartWorkflowBatchEventsFn: func(_ context.Context, history *persistence.WorkflowEvents) (events.PersistedBlob, error) { + if history.BranchToken[0] == 1 { + assert.Equal(t, &persistence.WorkflowEvents{ + Events: []*types.HistoryEvent{ + { + ID: 1, + }, + }, + BranchToken: []byte{1, 2, 3}, + }, history, "case: success") + return events.PersistedBlob{ + DataBlob: persistence.DataBlob{ + Data: []byte{1, 2, 3, 4, 5}, + }, + }, nil + } + + assert.Equal(t, &persistence.WorkflowEvents{ + Events: []*types.HistoryEvent{ + { + ID: 2, + }, + }, + BranchToken: []byte{5, 6}, + }, history, "case: success") + return events.PersistedBlob{ + DataBlob: persistence.DataBlob{ + Data: []byte{1, 2}, + }, + }, nil + }, + mockPersistStartWorkflowBatchEventsFn: func(_ context.Context, history *persistence.WorkflowEvents) (events.PersistedBlob, error) { + assert.Equal(t, &persistence.WorkflowEvents{ + Events: []*types.HistoryEvent{ + { + ID: common.FirstEventID, + }, + }, + BranchToken: []byte{4}, + }, history, "case: success") + return events.PersistedBlob{ + DataBlob: persistence.DataBlob{ + Data: []byte{3, 2}, + }, + }, nil + }, + mockConflictResolveWorkflowExecutionEventReapplyFn: func(mode persistence.ConflictResolveWorkflowMode, resetEvents []*persistence.WorkflowEvents, newEvents []*persistence.WorkflowEvents) error { + assert.Equal(t, persistence.ConflictResolveWorkflowModeUpdateCurrent, mode, "case: success") + assert.Equal(t, []*persistence.WorkflowEvents{ + { + Events: []*types.HistoryEvent{ + { + ID: 1, + }, + }, + BranchToken: []byte{1, 2, 3}, + }, + }, resetEvents, "case: success") + assert.Equal(t, []*persistence.WorkflowEvents{ + { + Events: []*types.HistoryEvent{ + { + ID: common.FirstEventID, + }, + }, + BranchToken: []byte{4}, + }, + }, newEvents, "case: success") + return nil + }, + mockNotifyTasksFromWorkflowMutationFn: func(currentWorkflow *persistence.WorkflowMutation, currentEvents events.PersistedBlobs, persistenceError bool) { + assert.Equal(t, &persistence.WorkflowMutation{ + ExecutionInfo: &persistence.WorkflowExecutionInfo{ + DomainID: "test-domain-id", + WorkflowID: "test-workflow-id", + RunID: "test-run-id0", + }, + ExecutionStats: &persistence.ExecutionStats{ + HistorySize: 2, + }, + }, currentWorkflow, "case: success") + assert.Equal(t, events.PersistedBlobs{ + { + DataBlob: persistence.DataBlob{ + Data: []byte{1, 2, 3, 4, 5}, + }, + }, + { + DataBlob: persistence.DataBlob{ + Data: []byte{3, 2}, + }, + }, + { + DataBlob: persistence.DataBlob{ + Data: []byte{1, 2}, + }, + }, + }, currentEvents, "case: success") + assert.Equal(t, false, persistenceError, "case: success") + }, + mockNotifyTasksFromWorkflowSnapshotFn: func(newWorkflow *persistence.WorkflowSnapshot, newEvents events.PersistedBlobs, persistenceError bool) { + if newWorkflow.ExecutionInfo.RunID == "test-run-id" { + assert.Equal(t, &persistence.WorkflowSnapshot{ + ExecutionInfo: &persistence.WorkflowExecutionInfo{ + DomainID: "test-domain-id", + WorkflowID: "test-workflow-id", + RunID: "test-run-id", + State: persistence.WorkflowStateCompleted, + }, + ExecutionStats: &persistence.ExecutionStats{ + HistorySize: 5, + }, + }, newWorkflow, "case: success") + } else { + assert.Equal(t, &persistence.WorkflowSnapshot{ + ExecutionInfo: &persistence.WorkflowExecutionInfo{ + DomainID: "test-domain-id", + WorkflowID: "test-workflow-id", + RunID: "test-run-id2", + }, + ExecutionStats: &persistence.ExecutionStats{ + HistorySize: 2, + }, + }, newWorkflow, "case: success") + } + assert.Equal(t, events.PersistedBlobs{ + { + DataBlob: persistence.DataBlob{ + Data: []byte{1, 2, 3, 4, 5}, + }, + }, + { + DataBlob: persistence.DataBlob{ + Data: []byte{3, 2}, + }, + }, + { + DataBlob: persistence.DataBlob{ + Data: []byte{1, 2}, + }, + }, + }, newEvents, "case: success") + assert.Equal(t, false, persistenceError, "case: success") + }, + mockEmitWorkflowHistoryStatsFn: func(domainName string, size int, count int) { + assert.Equal(t, 5, size, "case: success") + assert.Equal(t, 1110, count, "case: success") + }, + mockEmitSessionUpdateStatsFn: func(domainName string, stats *persistence.MutableStateUpdateSessionStats) { + assert.Equal(t, &persistence.MutableStateUpdateSessionStats{ + MutableStateSize: 123, + }, stats, "case: success") + }, + mockEmitWorkflowCompletionStatsFn: func(domainName string, workflowType string, workflowID string, runID string, taskList string, lastEvent *types.HistoryEvent) { + assert.Equal(t, &types.HistoryEvent{ + ID: 123, + }, lastEvent, "case: success") + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + mockCtrl := gomock.NewController(t) + mockShard := shard.NewMockContext(mockCtrl) + mockDomainCache := cache.NewMockDomainCache(mockCtrl) + mockResetMutableState := NewMockMutableState(mockCtrl) + mockMutableState := NewMockMutableState(mockCtrl) + mockNewMutableState := NewMockMutableState(mockCtrl) + mockEngine := engine.NewMockEngine(mockCtrl) + if tc.mockSetup != nil { + tc.mockSetup(mockShard, mockDomainCache, mockResetMutableState, mockNewMutableState, mockMutableState, mockEngine) + } + ctx := &contextImpl{ + shard: mockShard, + stats: &persistence.ExecutionStats{}, + metricsClient: metrics.NewNoopMetricsClient(), + persistNonStartWorkflowBatchEventsFn: tc.mockPersistNonStartWorkflowBatchEventsFn, + persistStartWorkflowBatchEventsFn: tc.mockPersistStartWorkflowBatchEventsFn, + updateWorkflowExecutionFn: tc.mockUpdateWorkflowExecutionFn, + notifyTasksFromWorkflowMutationFn: tc.mockNotifyTasksFromWorkflowMutationFn, + notifyTasksFromWorkflowSnapshotFn: tc.mockNotifyTasksFromWorkflowSnapshotFn, + emitSessionUpdateStatsFn: tc.mockEmitSessionUpdateStatsFn, + emitWorkflowHistoryStatsFn: tc.mockEmitWorkflowHistoryStatsFn, + mergeContinueAsNewReplicationTasksFn: tc.mockMergeContinueAsNewReplicationTasksFn, + conflictResolveEventReapplyFn: tc.mockConflictResolveWorkflowExecutionEventReapplyFn, + emitLargeWorkflowShardIDStatsFn: tc.mockEmitLargeWorkflowShardIDStatsFn, + emitWorkflowCompletionStatsFn: tc.mockEmitWorkflowCompletionStatsFn, + } + err := ctx.ConflictResolveWorkflowExecution(context.Background(), time.Unix(0, 0), tc.conflictResolveMode, mockResetMutableState, tc.newContext, mockNewMutableState, tc.currentContext, mockMutableState, tc.currentWorkflowTransactionPolicy) + if tc.wantErr { + assert.Error(t, err) + if tc.assertErr != nil { + tc.assertErr(t, err) + } + } else { + assert.NoError(t, err) + } + }) + } +} + func TestReapplyEvents(t *testing.T) { testCases := []struct { name string