Skip to content

Commit

Permalink
Update generating close event to use event version (cadence-workflow#…
Browse files Browse the repository at this point in the history
…4261)

Update generating close event to use event version
  • Loading branch information
yux0 authored Jun 8, 2021
1 parent 0b058c0 commit ea912b0
Show file tree
Hide file tree
Showing 11 changed files with 138 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
)

// This is to make sure adding new noop method when adding new nosql interfaces
// Remove it when any other tests are implemented.
// Remove it when any other tests are implemented.
func TestNoopStruct(t *testing.T) {
_, _ = dynamodb.NewDynamoDB(config.NoSQL{}, nil)
}
Expand Down
5 changes: 3 additions & 2 deletions service/history/events/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ func (e *cacheImpl) GetEvent(
shardID int,
domainID string,
workflowID string,
runID string, firstEventID int64,
runID string,
firstEventID int64,
eventID int64,
branchToken []byte,
) (*types.HistoryEvent, error) {
Expand Down Expand Up @@ -228,7 +229,7 @@ func (e *cacheImpl) PutEvent(

func (e *cacheImpl) getHistoryEventFromStore(
ctx context.Context,
firstEventID,
firstEventID int64,
eventID int64,
branchToken []byte,
shardID int,
Expand Down
1 change: 1 addition & 0 deletions service/history/execution/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ type (
GetDecisionInfo(int64) (*DecisionInfo, bool)
GetDomainEntry() *cache.DomainCacheEntry
GetStartEvent(context.Context) (*types.HistoryEvent, error)
GetCloseEvent(context.Context) (*types.HistoryEvent, error)
GetCurrentBranchToken() ([]byte, error)
GetVersionHistories() *persistence.VersionHistories
GetCurrentVersion() int64
Expand Down
38 changes: 38 additions & 0 deletions service/history/execution/mutable_state_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ var (
ErrMissingChildWorkflowInfo = &types.InternalServiceError{Message: "unable to get child workflow info"}
// ErrMissingWorkflowStartEvent indicates missing workflow start event
ErrMissingWorkflowStartEvent = &types.InternalServiceError{Message: "unable to get workflow start event"}
// ErrMissingWorkflowCloseEvent indicates missing workflow close event
ErrMissingWorkflowCloseEvent = &types.InternalServiceError{Message: "unable to get workflow close event"}
// ErrMissingWorkflowCompletionEvent indicates missing workflow completion event
ErrMissingWorkflowCompletionEvent = &types.InternalServiceError{Message: "unable to get workflow completion event"}
// ErrMissingActivityScheduledEvent indicates missing workflow activity scheduled event
Expand Down Expand Up @@ -1152,6 +1154,36 @@ func (e *mutableStateBuilder) GetStartEvent(
return startEvent, nil
}

// GetCloseEvent returns the last event in history
func (e *mutableStateBuilder) GetCloseEvent(
ctx context.Context,
) (*types.HistoryEvent, error) {

if e.GetExecutionInfo().CloseStatus == persistence.WorkflowCloseStatusNone {
return nil, ErrMissingWorkflowCloseEvent
}

currentBranchToken, err := e.GetCurrentBranchToken()
if err != nil {
return nil, err
}

closeEvent, err := e.eventsCache.GetEvent(
ctx,
e.shard.GetShardID(),
e.executionInfo.DomainID,
e.executionInfo.WorkflowID,
e.executionInfo.RunID,
common.FirstEventID,
e.GetNextEventID()-1,
currentBranchToken,
)
if err != nil {
return nil, err
}
return closeEvent, nil
}

// DeletePendingChildExecution deletes details about a ChildExecutionInfo.
func (e *mutableStateBuilder) DeletePendingChildExecution(
initiatedEventID int64,
Expand Down Expand Up @@ -2553,6 +2585,7 @@ func (e *mutableStateBuilder) AddCompletedWorkflowEvent(
// TODO merge active & passive task generation
if err := e.taskGenerator.GenerateWorkflowCloseTasks(
e.unixNanoToTime(event.GetTimestamp()),
event,
); err != nil {
return nil, err
}
Expand Down Expand Up @@ -2593,6 +2626,7 @@ func (e *mutableStateBuilder) AddFailWorkflowEvent(
// TODO merge active & passive task generation
if err := e.taskGenerator.GenerateWorkflowCloseTasks(
e.unixNanoToTime(event.GetTimestamp()),
event,
); err != nil {
return nil, err
}
Expand Down Expand Up @@ -2632,6 +2666,7 @@ func (e *mutableStateBuilder) AddTimeoutWorkflowEvent(
// TODO merge active & passive task generation
if err := e.taskGenerator.GenerateWorkflowCloseTasks(
e.unixNanoToTime(event.GetTimestamp()),
event,
); err != nil {
return nil, err
}
Expand Down Expand Up @@ -2711,6 +2746,7 @@ func (e *mutableStateBuilder) AddWorkflowExecutionCanceledEvent(
// TODO merge active & passive task generation
if err := e.taskGenerator.GenerateWorkflowCloseTasks(
e.unixNanoToTime(event.GetTimestamp()),
event,
); err != nil {
return nil, err
}
Expand Down Expand Up @@ -3217,6 +3253,7 @@ func (e *mutableStateBuilder) AddWorkflowExecutionTerminatedEvent(
// TODO merge active & passive task generation
if err := e.taskGenerator.GenerateWorkflowCloseTasks(
e.unixNanoToTime(event.GetTimestamp()),
event,
); err != nil {
return nil, err
}
Expand Down Expand Up @@ -3334,6 +3371,7 @@ func (e *mutableStateBuilder) AddContinueAsNewEvent(
// TODO merge active & passive task generation
if err := e.taskGenerator.GenerateWorkflowCloseTasks(
e.unixNanoToTime(continueAsNewEvent.GetTimestamp()),
continueAsNewEvent,
); err != nil {
return nil, nil, err
}
Expand Down
54 changes: 54 additions & 0 deletions service/history/execution/mutable_state_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package execution

import (
"context"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -788,6 +790,58 @@ func (s *mutableStateSuite) prepareTransientDecisionCompletionFirstBatchReplicat
return newDecisionScheduleEvent, newDecisionStartedEvent
}

func (s *mutableStateSuite) TestGetCloseEvent_WorkflowIsOpen() {
mutableState := s.buildWorkflowMutableState()

s.msBuilder.Load(mutableState)

closeEvent, err := s.msBuilder.GetCloseEvent(context.Background())
s.Error(err)
s.Nil(closeEvent)
}

func (s *mutableStateSuite) TestGetCloseEvent_WorkflowIsClose() {
mutableState := s.buildWorkflowMutableState()
mutableState.ExecutionInfo.CloseStatus = persistence.WorkflowCloseStatusCompleted
s.msBuilder.Load(mutableState)

expectedCloseEvent := &types.HistoryEvent{}

s.mockEventsCache.EXPECT().GetEvent(
gomock.Any(),
gomock.Any(),
gomock.Any(),
gomock.Any(),
gomock.Any(),
common.FirstEventID,
s.msBuilder.GetNextEventID()-1,
gomock.Any(),
).Return(expectedCloseEvent, nil)
closeEvent, err := s.msBuilder.GetCloseEvent(context.Background())
s.NoError(err)
s.Equal(expectedCloseEvent, closeEvent)
}

func (s *mutableStateSuite) TestGetCloseEvent_Error() {
mutableState := s.buildWorkflowMutableState()
mutableState.ExecutionInfo.CloseStatus = persistence.WorkflowCloseStatusCompleted
s.msBuilder.Load(mutableState)

s.mockEventsCache.EXPECT().GetEvent(
gomock.Any(),
gomock.Any(),
gomock.Any(),
gomock.Any(),
gomock.Any(),
common.FirstEventID,
s.msBuilder.GetNextEventID()-1,
gomock.Any(),
).Return(nil, fmt.Errorf("test"))
closeEvent, err := s.msBuilder.GetCloseEvent(context.Background())
s.Error(err)
s.Nil(closeEvent)
}

func (s *mutableStateSuite) newDomainCacheEntry() *cache.DomainCacheEntry {
return cache.NewDomainCacheEntryForTest(
&persistence.DomainInfo{Name: "mutableStateTest"},
Expand Down
15 changes: 15 additions & 0 deletions service/history/execution/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions service/history/execution/mutable_state_task_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type (
) error
GenerateWorkflowCloseTasks(
now time.Time,
closeEvent *types.HistoryEvent,
) error
GenerateRecordWorkflowStartedTasks(
startEvent *types.HistoryEvent,
Expand Down Expand Up @@ -145,14 +146,13 @@ func (r *mutableStateTaskGeneratorImpl) GenerateWorkflowStartTasks(

func (r *mutableStateTaskGeneratorImpl) GenerateWorkflowCloseTasks(
now time.Time,
closeEvent *types.HistoryEvent,
) error {

currentVersion := r.mutableState.GetCurrentVersion()
executionInfo := r.mutableState.GetExecutionInfo()

r.mutableState.AddTransferTasks(&persistence.CloseExecutionTask{
// TaskID and VisibilityTimestamp are set by shard context
Version: currentVersion,
Version: closeEvent.GetVersion(),
})

retentionInDays := defaultWorkflowRetentionInDays
Expand All @@ -170,7 +170,7 @@ func (r *mutableStateTaskGeneratorImpl) GenerateWorkflowCloseTasks(
r.mutableState.AddTimerTasks(&persistence.DeleteHistoryEventTask{
// TaskID is set by shard
VisibilityTimestamp: now.Add(retentionDuration),
Version: currentVersion,
Version: closeEvent.GetVersion(),
})

return nil
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion service/history/execution/mutable_state_task_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,14 @@ func (r *mutableStateTaskRefresherImpl) refreshTasksForWorkflowClose(
) error {

executionInfo := mutableState.GetExecutionInfo()

if executionInfo.CloseStatus != persistence.WorkflowCloseStatusNone {
closeEvent, err := mutableState.GetCloseEvent(ctx)
if err != nil {
return err
}
return taskGenerator.GenerateWorkflowCloseTasks(
now,
closeEvent,
)
}

Expand Down
6 changes: 6 additions & 0 deletions service/history/execution/state_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ func (b *stateBuilderImpl) ApplyEvents(

if err := taskGenerator.GenerateWorkflowCloseTasks(
b.unixNanoToTime(event.GetTimestamp()),
event,
); err != nil {
return nil, err
}
Expand All @@ -539,6 +540,7 @@ func (b *stateBuilderImpl) ApplyEvents(

if err := taskGenerator.GenerateWorkflowCloseTasks(
b.unixNanoToTime(event.GetTimestamp()),
event,
); err != nil {
return nil, err
}
Expand All @@ -553,6 +555,7 @@ func (b *stateBuilderImpl) ApplyEvents(

if err := taskGenerator.GenerateWorkflowCloseTasks(
b.unixNanoToTime(event.GetTimestamp()),
event,
); err != nil {
return nil, err
}
Expand All @@ -567,6 +570,7 @@ func (b *stateBuilderImpl) ApplyEvents(

if err := taskGenerator.GenerateWorkflowCloseTasks(
b.unixNanoToTime(event.GetTimestamp()),
event,
); err != nil {
return nil, err
}
Expand All @@ -581,6 +585,7 @@ func (b *stateBuilderImpl) ApplyEvents(

if err := taskGenerator.GenerateWorkflowCloseTasks(
b.unixNanoToTime(event.GetTimestamp()),
event,
); err != nil {
return nil, err
}
Expand Down Expand Up @@ -623,6 +628,7 @@ func (b *stateBuilderImpl) ApplyEvents(

if err := taskGenerator.GenerateWorkflowCloseTasks(
b.unixNanoToTime(event.GetTimestamp()),
event,
); err != nil {
return nil, err
}
Expand Down
7 changes: 7 additions & 0 deletions service/history/execution/state_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionTimedOut()
s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{}).AnyTimes()
s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks(
s.stateBuilder.unixNanoToTime(event.GetTimestamp()),
event,
).Return(nil).Times(1)
s.mockMutableState.EXPECT().ClearStickyness().Times(1)

Expand Down Expand Up @@ -298,6 +299,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionTerminated
s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{}).AnyTimes()
s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks(
s.stateBuilder.unixNanoToTime(event.GetTimestamp()),
event,
).Return(nil).Times(1)
s.mockMutableState.EXPECT().ClearStickyness().Times(1)
_, err := s.stateBuilder.ApplyEvents(constants.TestDomainID, requestID, workflowExecution, s.toHistory(event), nil)
Expand Down Expand Up @@ -327,6 +329,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionFailed() {
s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{}).AnyTimes()
s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks(
s.stateBuilder.unixNanoToTime(event.GetTimestamp()),
event,
).Return(nil).Times(1)
s.mockMutableState.EXPECT().ClearStickyness().Times(1)

Expand Down Expand Up @@ -357,6 +360,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionCompleted(
s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{}).AnyTimes()
s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks(
s.stateBuilder.unixNanoToTime(event.GetTimestamp()),
event,
).Return(nil).Times(1)
s.mockMutableState.EXPECT().ClearStickyness().Times(1)

Expand Down Expand Up @@ -387,6 +391,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionCanceled()
s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{}).AnyTimes()
s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks(
s.stateBuilder.unixNanoToTime(event.GetTimestamp()),
event,
).Return(nil).Times(1)
s.mockMutableState.EXPECT().ClearStickyness().Times(1)

Expand Down Expand Up @@ -481,6 +486,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionContinuedA
s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{}).AnyTimes()
s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks(
s.stateBuilder.unixNanoToTime(continueAsNewEvent.GetTimestamp()),
continueAsNewEvent,
).Return(nil).Times(1)
s.mockMutableState.EXPECT().ClearStickyness().Times(1)

Expand Down Expand Up @@ -540,6 +546,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionContinuedA
s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{}).AnyTimes()
s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks(
s.stateBuilder.unixNanoToTime(continueAsNewEvent.GetTimestamp()),
continueAsNewEvent,
).Return(nil).Times(1)
s.mockMutableState.EXPECT().ClearStickyness().Times(1)

Expand Down

0 comments on commit ea912b0

Please sign in to comment.