diff --git a/common/persistence/nosql/nosqlplugin/dynamodb/tests/dynamodb_persistence_test.go b/common/persistence/nosql/nosqlplugin/dynamodb/tests/dynamodb_persistence_test.go index 72bca0f3f52..266436d09b1 100644 --- a/common/persistence/nosql/nosqlplugin/dynamodb/tests/dynamodb_persistence_test.go +++ b/common/persistence/nosql/nosqlplugin/dynamodb/tests/dynamodb_persistence_test.go @@ -28,7 +28,7 @@ import ( ) // This is to make sure adding new noop method when adding new nosql interfaces -// Remove it when any other tests are implemented. +// Remove it when any other tests are implemented. func TestNoopStruct(t *testing.T) { _, _ = dynamodb.NewDynamoDB(config.NoSQL{}, nil) } diff --git a/service/history/events/cache.go b/service/history/events/cache.go index fa80232952f..8dbd50c5625 100644 --- a/service/history/events/cache.go +++ b/service/history/events/cache.go @@ -177,7 +177,8 @@ func (e *cacheImpl) GetEvent( shardID int, domainID string, workflowID string, - runID string, firstEventID int64, + runID string, + firstEventID int64, eventID int64, branchToken []byte, ) (*types.HistoryEvent, error) { @@ -228,7 +229,7 @@ func (e *cacheImpl) PutEvent( func (e *cacheImpl) getHistoryEventFromStore( ctx context.Context, - firstEventID, + firstEventID int64, eventID int64, branchToken []byte, shardID int, diff --git a/service/history/execution/mutable_state.go b/service/history/execution/mutable_state.go index 83cf126a2e6..c4bed6ad6e3 100644 --- a/service/history/execution/mutable_state.go +++ b/service/history/execution/mutable_state.go @@ -124,6 +124,7 @@ type ( GetDecisionInfo(int64) (*DecisionInfo, bool) GetDomainEntry() *cache.DomainCacheEntry GetStartEvent(context.Context) (*types.HistoryEvent, error) + GetCloseEvent(context.Context) (*types.HistoryEvent, error) GetCurrentBranchToken() ([]byte, error) GetVersionHistories() *persistence.VersionHistories GetCurrentVersion() int64 diff --git a/service/history/execution/mutable_state_builder.go b/service/history/execution/mutable_state_builder.go index bdb4feaf34d..b0f15afbdd0 100644 --- a/service/history/execution/mutable_state_builder.go +++ b/service/history/execution/mutable_state_builder.go @@ -67,6 +67,8 @@ var ( ErrMissingChildWorkflowInfo = &types.InternalServiceError{Message: "unable to get child workflow info"} // ErrMissingWorkflowStartEvent indicates missing workflow start event ErrMissingWorkflowStartEvent = &types.InternalServiceError{Message: "unable to get workflow start event"} + // ErrMissingWorkflowCloseEvent indicates missing workflow close event + ErrMissingWorkflowCloseEvent = &types.InternalServiceError{Message: "unable to get workflow close event"} // ErrMissingWorkflowCompletionEvent indicates missing workflow completion event ErrMissingWorkflowCompletionEvent = &types.InternalServiceError{Message: "unable to get workflow completion event"} // ErrMissingActivityScheduledEvent indicates missing workflow activity scheduled event @@ -1152,6 +1154,36 @@ func (e *mutableStateBuilder) GetStartEvent( return startEvent, nil } +// GetCloseEvent returns the last event in history +func (e *mutableStateBuilder) GetCloseEvent( + ctx context.Context, +) (*types.HistoryEvent, error) { + + if e.GetExecutionInfo().CloseStatus == persistence.WorkflowCloseStatusNone { + return nil, ErrMissingWorkflowCloseEvent + } + + currentBranchToken, err := e.GetCurrentBranchToken() + if err != nil { + return nil, err + } + + closeEvent, err := e.eventsCache.GetEvent( + ctx, + e.shard.GetShardID(), + e.executionInfo.DomainID, + e.executionInfo.WorkflowID, + e.executionInfo.RunID, + common.FirstEventID, + e.GetNextEventID()-1, + currentBranchToken, + ) + if err != nil { + return nil, err + } + return closeEvent, nil +} + // DeletePendingChildExecution deletes details about a ChildExecutionInfo. func (e *mutableStateBuilder) DeletePendingChildExecution( initiatedEventID int64, @@ -2553,6 +2585,7 @@ func (e *mutableStateBuilder) AddCompletedWorkflowEvent( // TODO merge active & passive task generation if err := e.taskGenerator.GenerateWorkflowCloseTasks( e.unixNanoToTime(event.GetTimestamp()), + event, ); err != nil { return nil, err } @@ -2593,6 +2626,7 @@ func (e *mutableStateBuilder) AddFailWorkflowEvent( // TODO merge active & passive task generation if err := e.taskGenerator.GenerateWorkflowCloseTasks( e.unixNanoToTime(event.GetTimestamp()), + event, ); err != nil { return nil, err } @@ -2632,6 +2666,7 @@ func (e *mutableStateBuilder) AddTimeoutWorkflowEvent( // TODO merge active & passive task generation if err := e.taskGenerator.GenerateWorkflowCloseTasks( e.unixNanoToTime(event.GetTimestamp()), + event, ); err != nil { return nil, err } @@ -2711,6 +2746,7 @@ func (e *mutableStateBuilder) AddWorkflowExecutionCanceledEvent( // TODO merge active & passive task generation if err := e.taskGenerator.GenerateWorkflowCloseTasks( e.unixNanoToTime(event.GetTimestamp()), + event, ); err != nil { return nil, err } @@ -3217,6 +3253,7 @@ func (e *mutableStateBuilder) AddWorkflowExecutionTerminatedEvent( // TODO merge active & passive task generation if err := e.taskGenerator.GenerateWorkflowCloseTasks( e.unixNanoToTime(event.GetTimestamp()), + event, ); err != nil { return nil, err } @@ -3334,6 +3371,7 @@ func (e *mutableStateBuilder) AddContinueAsNewEvent( // TODO merge active & passive task generation if err := e.taskGenerator.GenerateWorkflowCloseTasks( e.unixNanoToTime(continueAsNewEvent.GetTimestamp()), + continueAsNewEvent, ); err != nil { return nil, nil, err } diff --git a/service/history/execution/mutable_state_builder_test.go b/service/history/execution/mutable_state_builder_test.go index cdcef70ba1d..ec48dee66b8 100644 --- a/service/history/execution/mutable_state_builder_test.go +++ b/service/history/execution/mutable_state_builder_test.go @@ -21,6 +21,8 @@ package execution import ( + "context" + "fmt" "testing" "time" @@ -788,6 +790,58 @@ func (s *mutableStateSuite) prepareTransientDecisionCompletionFirstBatchReplicat return newDecisionScheduleEvent, newDecisionStartedEvent } +func (s *mutableStateSuite) TestGetCloseEvent_WorkflowIsOpen() { + mutableState := s.buildWorkflowMutableState() + + s.msBuilder.Load(mutableState) + + closeEvent, err := s.msBuilder.GetCloseEvent(context.Background()) + s.Error(err) + s.Nil(closeEvent) +} + +func (s *mutableStateSuite) TestGetCloseEvent_WorkflowIsClose() { + mutableState := s.buildWorkflowMutableState() + mutableState.ExecutionInfo.CloseStatus = persistence.WorkflowCloseStatusCompleted + s.msBuilder.Load(mutableState) + + expectedCloseEvent := &types.HistoryEvent{} + + s.mockEventsCache.EXPECT().GetEvent( + gomock.Any(), + gomock.Any(), + gomock.Any(), + gomock.Any(), + gomock.Any(), + common.FirstEventID, + s.msBuilder.GetNextEventID()-1, + gomock.Any(), + ).Return(expectedCloseEvent, nil) + closeEvent, err := s.msBuilder.GetCloseEvent(context.Background()) + s.NoError(err) + s.Equal(expectedCloseEvent, closeEvent) +} + +func (s *mutableStateSuite) TestGetCloseEvent_Error() { + mutableState := s.buildWorkflowMutableState() + mutableState.ExecutionInfo.CloseStatus = persistence.WorkflowCloseStatusCompleted + s.msBuilder.Load(mutableState) + + s.mockEventsCache.EXPECT().GetEvent( + gomock.Any(), + gomock.Any(), + gomock.Any(), + gomock.Any(), + gomock.Any(), + common.FirstEventID, + s.msBuilder.GetNextEventID()-1, + gomock.Any(), + ).Return(nil, fmt.Errorf("test")) + closeEvent, err := s.msBuilder.GetCloseEvent(context.Background()) + s.Error(err) + s.Nil(closeEvent) +} + func (s *mutableStateSuite) newDomainCacheEntry() *cache.DomainCacheEntry { return cache.NewDomainCacheEntryForTest( &persistence.DomainInfo{Name: "mutableStateTest"}, diff --git a/service/history/execution/mutable_state_mock.go b/service/history/execution/mutable_state_mock.go index 84fac0fd699..9912ca0e0ab 100644 --- a/service/history/execution/mutable_state_mock.go +++ b/service/history/execution/mutable_state_mock.go @@ -1055,6 +1055,21 @@ func (mr *MockMutableStateMockRecorder) GetStartEvent(arg0 interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStartEvent", reflect.TypeOf((*MockMutableState)(nil).GetStartEvent), arg0) } +// GetCloseEvent mocks base method +func (m *MockMutableState) GetCloseEvent(arg0 context.Context) (*types.HistoryEvent, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetCloseEvent", arg0) + ret0, _ := ret[0].(*types.HistoryEvent) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetCloseEvent indicates an expected call of GetCloseEvent +func (mr *MockMutableStateMockRecorder) GetCloseEvent(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetCloseEvent", reflect.TypeOf((*MockMutableState)(nil).GetCloseEvent), arg0) +} + // GetCurrentBranchToken mocks base method func (m *MockMutableState) GetCurrentBranchToken() ([]byte, error) { m.ctrl.T.Helper() diff --git a/service/history/execution/mutable_state_task_generator.go b/service/history/execution/mutable_state_task_generator.go index c501aa365de..44cb31032fc 100644 --- a/service/history/execution/mutable_state_task_generator.go +++ b/service/history/execution/mutable_state_task_generator.go @@ -44,6 +44,7 @@ type ( ) error GenerateWorkflowCloseTasks( now time.Time, + closeEvent *types.HistoryEvent, ) error GenerateRecordWorkflowStartedTasks( startEvent *types.HistoryEvent, @@ -145,14 +146,13 @@ func (r *mutableStateTaskGeneratorImpl) GenerateWorkflowStartTasks( func (r *mutableStateTaskGeneratorImpl) GenerateWorkflowCloseTasks( now time.Time, + closeEvent *types.HistoryEvent, ) error { - currentVersion := r.mutableState.GetCurrentVersion() executionInfo := r.mutableState.GetExecutionInfo() - r.mutableState.AddTransferTasks(&persistence.CloseExecutionTask{ // TaskID and VisibilityTimestamp are set by shard context - Version: currentVersion, + Version: closeEvent.GetVersion(), }) retentionInDays := defaultWorkflowRetentionInDays @@ -170,7 +170,7 @@ func (r *mutableStateTaskGeneratorImpl) GenerateWorkflowCloseTasks( r.mutableState.AddTimerTasks(&persistence.DeleteHistoryEventTask{ // TaskID is set by shard VisibilityTimestamp: now.Add(retentionDuration), - Version: currentVersion, + Version: closeEvent.GetVersion(), }) return nil diff --git a/service/history/execution/mutable_state_task_generator_mock.go b/service/history/execution/mutable_state_task_generator_mock.go index 0317dda931b..d3dfa5eb9fc 100644 --- a/service/history/execution/mutable_state_task_generator_mock.go +++ b/service/history/execution/mutable_state_task_generator_mock.go @@ -73,17 +73,17 @@ func (mr *MockMutableStateTaskGeneratorMockRecorder) GenerateWorkflowStartTasks( } // GenerateWorkflowCloseTasks mocks base method -func (m *MockMutableStateTaskGenerator) GenerateWorkflowCloseTasks(now time.Time) error { +func (m *MockMutableStateTaskGenerator) GenerateWorkflowCloseTasks(now time.Time, closeEvent *types.HistoryEvent) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GenerateWorkflowCloseTasks", now) + ret := m.ctrl.Call(m, "GenerateWorkflowCloseTasks", now, closeEvent) ret0, _ := ret[0].(error) return ret0 } // GenerateWorkflowCloseTasks indicates an expected call of GenerateWorkflowCloseTasks -func (mr *MockMutableStateTaskGeneratorMockRecorder) GenerateWorkflowCloseTasks(now interface{}) *gomock.Call { +func (mr *MockMutableStateTaskGeneratorMockRecorder) GenerateWorkflowCloseTasks(now, closeEvent interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenerateWorkflowCloseTasks", reflect.TypeOf((*MockMutableStateTaskGenerator)(nil).GenerateWorkflowCloseTasks), now) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenerateWorkflowCloseTasks", reflect.TypeOf((*MockMutableStateTaskGenerator)(nil).GenerateWorkflowCloseTasks), now, closeEvent) } // GenerateRecordWorkflowStartedTasks mocks base method diff --git a/service/history/execution/mutable_state_task_refresher.go b/service/history/execution/mutable_state_task_refresher.go index 50fd0984620..209f7bbd9ba 100644 --- a/service/history/execution/mutable_state_task_refresher.go +++ b/service/history/execution/mutable_state_task_refresher.go @@ -218,10 +218,14 @@ func (r *mutableStateTaskRefresherImpl) refreshTasksForWorkflowClose( ) error { executionInfo := mutableState.GetExecutionInfo() - if executionInfo.CloseStatus != persistence.WorkflowCloseStatusNone { + closeEvent, err := mutableState.GetCloseEvent(ctx) + if err != nil { + return err + } return taskGenerator.GenerateWorkflowCloseTasks( now, + closeEvent, ) } diff --git a/service/history/execution/state_builder.go b/service/history/execution/state_builder.go index a1b902e2f54..856c963fef4 100644 --- a/service/history/execution/state_builder.go +++ b/service/history/execution/state_builder.go @@ -525,6 +525,7 @@ func (b *stateBuilderImpl) ApplyEvents( if err := taskGenerator.GenerateWorkflowCloseTasks( b.unixNanoToTime(event.GetTimestamp()), + event, ); err != nil { return nil, err } @@ -539,6 +540,7 @@ func (b *stateBuilderImpl) ApplyEvents( if err := taskGenerator.GenerateWorkflowCloseTasks( b.unixNanoToTime(event.GetTimestamp()), + event, ); err != nil { return nil, err } @@ -553,6 +555,7 @@ func (b *stateBuilderImpl) ApplyEvents( if err := taskGenerator.GenerateWorkflowCloseTasks( b.unixNanoToTime(event.GetTimestamp()), + event, ); err != nil { return nil, err } @@ -567,6 +570,7 @@ func (b *stateBuilderImpl) ApplyEvents( if err := taskGenerator.GenerateWorkflowCloseTasks( b.unixNanoToTime(event.GetTimestamp()), + event, ); err != nil { return nil, err } @@ -581,6 +585,7 @@ func (b *stateBuilderImpl) ApplyEvents( if err := taskGenerator.GenerateWorkflowCloseTasks( b.unixNanoToTime(event.GetTimestamp()), + event, ); err != nil { return nil, err } @@ -623,6 +628,7 @@ func (b *stateBuilderImpl) ApplyEvents( if err := taskGenerator.GenerateWorkflowCloseTasks( b.unixNanoToTime(event.GetTimestamp()), + event, ); err != nil { return nil, err } diff --git a/service/history/execution/state_builder_test.go b/service/history/execution/state_builder_test.go index 899c7d77f9b..0f7fcb6c9d1 100644 --- a/service/history/execution/state_builder_test.go +++ b/service/history/execution/state_builder_test.go @@ -268,6 +268,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionTimedOut() s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{}).AnyTimes() s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks( s.stateBuilder.unixNanoToTime(event.GetTimestamp()), + event, ).Return(nil).Times(1) s.mockMutableState.EXPECT().ClearStickyness().Times(1) @@ -298,6 +299,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionTerminated s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{}).AnyTimes() s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks( s.stateBuilder.unixNanoToTime(event.GetTimestamp()), + event, ).Return(nil).Times(1) s.mockMutableState.EXPECT().ClearStickyness().Times(1) _, err := s.stateBuilder.ApplyEvents(constants.TestDomainID, requestID, workflowExecution, s.toHistory(event), nil) @@ -327,6 +329,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionFailed() { s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{}).AnyTimes() s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks( s.stateBuilder.unixNanoToTime(event.GetTimestamp()), + event, ).Return(nil).Times(1) s.mockMutableState.EXPECT().ClearStickyness().Times(1) @@ -357,6 +360,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionCompleted( s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{}).AnyTimes() s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks( s.stateBuilder.unixNanoToTime(event.GetTimestamp()), + event, ).Return(nil).Times(1) s.mockMutableState.EXPECT().ClearStickyness().Times(1) @@ -387,6 +391,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionCanceled() s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{}).AnyTimes() s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks( s.stateBuilder.unixNanoToTime(event.GetTimestamp()), + event, ).Return(nil).Times(1) s.mockMutableState.EXPECT().ClearStickyness().Times(1) @@ -481,6 +486,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionContinuedA s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{}).AnyTimes() s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks( s.stateBuilder.unixNanoToTime(continueAsNewEvent.GetTimestamp()), + continueAsNewEvent, ).Return(nil).Times(1) s.mockMutableState.EXPECT().ClearStickyness().Times(1) @@ -540,6 +546,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionContinuedA s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{}).AnyTimes() s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks( s.stateBuilder.unixNanoToTime(continueAsNewEvent.GetTimestamp()), + continueAsNewEvent, ).Return(nil).Times(1) s.mockMutableState.EXPECT().ClearStickyness().Times(1)