diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 5f87cfd5534..8c990666a72 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -810,6 +810,13 @@ const ( // Default value: 3*time.Minute // Allowed filters: DomainID StandbyTaskReReplicationContextTimeout + // ResurrectionCheckMinDelay is the minimal timer processing delay before scanning history to see + // if there's a resurrected timer/activity + // KeyName: history.resurrectionCheckMinDelay + // Value type: Duration + // Default value: 24*time.Hour + // Allowed filters: N/A + ResurrectionCheckMinDelay // QueueProcessorEnableSplit is indicates whether processing queue split policy should be enabled // KeyName: history.queueProcessorEnableSplit // Value type: Bool @@ -2091,6 +2098,7 @@ var Keys = map[Key]string{ StandbyTaskRedispatchInterval: "history.standbyTaskRedispatchInterval", TaskRedispatchIntervalJitterCoefficient: "history.taskRedispatchIntervalJitterCoefficient", StandbyTaskReReplicationContextTimeout: "history.standbyTaskReReplicationContextTimeout", + ResurrectionCheckMinDelay: "history.resurrectionCheckMinDelay", QueueProcessorEnableSplit: "history.queueProcessorEnableSplit", QueueProcessorSplitMaxLevel: "history.queueProcessorSplitMaxLevel", QueueProcessorEnableRandomSplitByDomainID: "history.queueProcessorEnableRandomSplitByDomainID", diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 6cced2b9b44..7e47bd4cd9a 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -1901,6 +1901,8 @@ const ( DecisionAttemptTimer StaleMutableStateCounter DataInconsistentCounter + TimerResurrectionCounter + ActivityResurrectionCounter AutoResetPointsLimitExceededCounter AutoResetPointCorruptionCounter ConcurrencyUpdateFailureCounter @@ -2405,6 +2407,8 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ DecisionAttemptTimer: {metricName: "decision_attempt", metricType: Timer}, StaleMutableStateCounter: {metricName: "stale_mutable_state", metricType: Counter}, DataInconsistentCounter: {metricName: "data_inconsistent", metricType: Counter}, + TimerResurrectionCounter: {metricName: "timer_resurrection", metricType: Counter}, + ActivityResurrectionCounter: {metricName: "activity_resurrection", metricType: Counter}, AutoResetPointsLimitExceededCounter: {metricName: "auto_reset_points_exceed_limit", metricType: Counter}, AutoResetPointCorruptionCounter: {metricName: "auto_reset_point_corruption", metricType: Counter}, ConcurrencyUpdateFailureCounter: {metricName: "concurrency_update_failure", metricType: Counter}, diff --git a/service/history/config/config.go b/service/history/config/config.go index 544249342cb..aae96df05fa 100644 --- a/service/history/config/config.go +++ b/service/history/config/config.go @@ -99,6 +99,7 @@ type Config struct { TaskRedispatchIntervalJitterCoefficient dynamicconfig.FloatPropertyFn StandbyTaskReReplicationContextTimeout dynamicconfig.DurationPropertyFnWithDomainIDFilter EnableDropStuckTaskByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter + ResurrectionCheckMinDelay dynamicconfig.DurationPropertyFn // QueueProcessor settings QueueProcessorEnableSplit dynamicconfig.BoolPropertyFn @@ -378,6 +379,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA TaskRedispatchIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TaskRedispatchIntervalJitterCoefficient, 0.15), StandbyTaskReReplicationContextTimeout: dc.GetDurationPropertyFilteredByDomainID(dynamicconfig.StandbyTaskReReplicationContextTimeout, 3*time.Minute), EnableDropStuckTaskByDomainID: dc.GetBoolPropertyFilteredByDomainID(dynamicconfig.EnableDropStuckTaskByDomainID, false), + ResurrectionCheckMinDelay: dc.GetDurationProperty(dynamicconfig.ResurrectionCheckMinDelay, 24*time.Hour), QueueProcessorEnableSplit: dc.GetBoolProperty(dynamicconfig.QueueProcessorEnableSplit, false), QueueProcessorSplitMaxLevel: dc.GetIntProperty(dynamicconfig.QueueProcessorSplitMaxLevel, 2), // 3 levels, start from 0 diff --git a/service/history/execution/mutable_state.go b/service/history/execution/mutable_state.go index 301fd92d4bc..ad001d1d45f 100644 --- a/service/history/execution/mutable_state.go +++ b/service/history/execution/mutable_state.go @@ -112,6 +112,8 @@ type ( CreateNewHistoryEventWithTimestamp(eventType types.EventType, timestamp int64) *types.HistoryEvent CreateTransientDecisionEvents(di *DecisionInfo, identity string) (*types.HistoryEvent, *types.HistoryEvent) DeleteDecision() + DeleteUserTimer(timerID string) error + DeleteActivity(scheduleEventID int64) error DeleteSignalRequested(requestID string) FailDecision(bool) FlushBufferedEvents() error diff --git a/service/history/execution/mutable_state_mock.go b/service/history/execution/mutable_state_mock.go index 62ee8d182be..461f3c52184 100644 --- a/service/history/execution/mutable_state_mock.go +++ b/service/history/execution/mutable_state_mock.go @@ -883,6 +883,34 @@ func (mr *MockMutableStateMockRecorder) DeleteDecision() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteDecision", reflect.TypeOf((*MockMutableState)(nil).DeleteDecision)) } +// DeleteUserTimer mocks base method +func (m *MockMutableState) DeleteUserTimer(timerID string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteUserTimer", timerID) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteUserTimer indicates an expected call of DeleteUserTimer +func (mr *MockMutableStateMockRecorder) DeleteUserTimer(timerID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteUserTimer", reflect.TypeOf((*MockMutableState)(nil).DeleteUserTimer), timerID) +} + +// DeleteActivity mocks base method +func (m *MockMutableState) DeleteActivity(scheduleEventID int64) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteActivity", scheduleEventID) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteActivity indicates an expected call of DeleteActivity +func (mr *MockMutableStateMockRecorder) DeleteActivity(scheduleEventID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteActivity", reflect.TypeOf((*MockMutableState)(nil).DeleteActivity), scheduleEventID) +} + // DeleteSignalRequested mocks base method func (m *MockMutableState) DeleteSignalRequested(requestID string) { m.ctrl.T.Helper() diff --git a/service/history/execution/timer_sequence.go b/service/history/execution/timer_sequence.go index 761be5f1af7..d58ba8b11b7 100644 --- a/service/history/execution/timer_sequence.go +++ b/service/history/execution/timer_sequence.go @@ -82,7 +82,7 @@ type ( // TimerSequence manages user / activity timer TimerSequence interface { - IsExpired(referenceTime time.Time, TimerSequenceID TimerSequenceID) bool + IsExpired(referenceTime time.Time, TimerSequenceID TimerSequenceID) (time.Duration, bool) CreateNextUserTimer() (bool, error) CreateNextActivityTimer() (bool, error) @@ -113,11 +113,19 @@ func NewTimerSequence( func (t *timerSequenceImpl) IsExpired( referenceTime time.Time, TimerSequenceID TimerSequenceID, -) bool { +) (time.Duration, bool) { // Cassandra timestamp resolution is in millisecond // here we do the check in terms of second resolution. - return TimerSequenceID.Timestamp.Unix() <= referenceTime.Unix() + + timerFireTimeInSecond := TimerSequenceID.Timestamp.Unix() + referenceTimeInSecond := referenceTime.Unix() + + if timerFireTimeInSecond <= referenceTimeInSecond { + return time.Duration(referenceTimeInSecond-timerFireTimeInSecond) * time.Second, true + } + + return 0, false } func (t *timerSequenceImpl) CreateNextUserTimer() (bool, error) { diff --git a/service/history/execution/timer_sequence_mock.go b/service/history/execution/timer_sequence_mock.go index 35d5445899f..c35d599478d 100644 --- a/service/history/execution/timer_sequence_mock.go +++ b/service/history/execution/timer_sequence_mock.go @@ -57,11 +57,12 @@ func (m *MockTimerSequence) EXPECT() *MockTimerSequenceMockRecorder { } // IsExpired mocks base method -func (m *MockTimerSequence) IsExpired(referenceTime time.Time, TimerSequenceID TimerSequenceID) bool { +func (m *MockTimerSequence) IsExpired(referenceTime time.Time, TimerSequenceID TimerSequenceID) (time.Duration, bool) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "IsExpired", referenceTime, TimerSequenceID) - ret0, _ := ret[0].(bool) - return ret0 + ret0, _ := ret[0].(time.Duration) + ret1, _ := ret[1].(bool) + return ret0, ret1 } // IsExpired indicates an expected call of IsExpired diff --git a/service/history/task/timer_active_task_executor.go b/service/history/task/timer_active_task_executor.go index 827ee43ed0a..607362317e5 100644 --- a/service/history/task/timer_active_task_executor.go +++ b/service/history/task/timer_active_task_executor.go @@ -23,13 +23,16 @@ package task import ( "context" "fmt" + "time" "github.com/uber/cadence/common" "github.com/uber/cadence/common/backoff" + "github.com/uber/cadence/common/collection" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" + persistenceutils "github.com/uber/cadence/common/persistence/persistence-utils" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/history/config" "github.com/uber/cadence/service/history/execution" @@ -37,6 +40,10 @@ import ( "github.com/uber/cadence/service/worker/archiver" ) +const ( + scanWorkflowTimeout = 30 * time.Second +) + type ( timerActiveTaskExecutor struct { *timerTaskExecutorBase @@ -128,7 +135,15 @@ func (t *timerActiveTaskExecutor) executeUserTimerTimeoutTask( timerSequence := t.getTimerSequence(mutableState) referenceTime := t.shard.GetTimeSource().Now() - timerFired := false + resurrectionCheckMinDelay := t.config.ResurrectionCheckMinDelay() + updateMutableState := false + + // initialized when a timer with delay >= resurrectionCheckMinDelay + // is encountered, so that we don't need to scan history multiple times + // where there're multiple timers with high delay + var resurrectedTimer map[string]struct{} + scanWorkflowCtx, cancel := context.WithTimeout(context.Background(), scanWorkflowTimeout) + defer cancel() Loop: for _, timerSequenceID := range timerSequence.LoadAndSortUserTimers() { @@ -139,23 +154,59 @@ Loop: return &types.InternalServiceError{Message: errString} } - if expired := timerSequence.IsExpired(referenceTime, timerSequenceID); !expired { + delay, expired := timerSequence.IsExpired(referenceTime, timerSequenceID) + if !expired { // timer sequence IDs are sorted, once there is one timer // sequence ID not expired, all after that wil not expired break Loop } + if delay >= resurrectionCheckMinDelay || resurrectedTimer != nil { + if resurrectedTimer == nil { + // overwrite the context here as scan history may take a long time to complete + // ctx will also be used by other operations like updateWorkflow + ctx = scanWorkflowCtx + resurrectedTimer, err = t.getResurrectedTimer(ctx, mutableState) + if err != nil { + t.logger.Error("Timer resurrection check failed", tag.Error(err)) + return err + } + } + + if _, ok := resurrectedTimer[timerInfo.TimerID]; ok { + // found timer resurrection + domainName := mutableState.GetDomainEntry().GetInfo().Name + t.metricsClient.Scope(metrics.TimerQueueProcessorScope, metrics.DomainTag(domainName)).IncCounter(metrics.TimerResurrectionCounter) + t.logger.Warn("Encounter resurrected timer, skip", + tag.WorkflowDomainID(task.DomainID), + tag.WorkflowID(task.WorkflowID), + tag.WorkflowRunID(task.RunID), + tag.TaskType(task.TaskType), + tag.TaskID(task.TaskID), + tag.WorkflowTimerID(timerInfo.TimerID), + tag.WorkflowScheduleID(timerInfo.StartedID), // timerStartedEvent is basically scheduled event + ) + + // remove resurrected timer from mutable state + if err := mutableState.DeleteUserTimer(timerInfo.TimerID); err != nil { + return err + } + updateMutableState = true + continue Loop + } + } + if _, err := mutableState.AddTimerFiredEvent(timerInfo.TimerID); err != nil { return err } - timerFired = true + updateMutableState = true } - if !timerFired { + if !updateMutableState { return nil } - return t.updateWorkflowExecution(ctx, wfContext, mutableState, timerFired) + return t.updateWorkflowExecution(ctx, wfContext, mutableState, updateMutableState) } func (t *timerActiveTaskExecutor) executeActivityTimeoutTask( @@ -186,9 +237,17 @@ func (t *timerActiveTaskExecutor) executeActivityTimeoutTask( timerSequence := t.getTimerSequence(mutableState) referenceTime := t.shard.GetTimeSource().Now() + resurrectionCheckMinDelay := t.config.ResurrectionCheckMinDelay() updateMutableState := false scheduleDecision := false + // initialized when an activity timer with delay >= resurrectionCheckMinDelay + // is encountered, so that we don't need to scan history multiple times + // where there're multiple timers with high delay + var resurrectedActivity map[int64]struct{} + scanWorkflowCtx, cancel := context.WithTimeout(context.Background(), scanWorkflowTimeout) + defer cancel() + // need to clear activity heartbeat timer task mask for new activity timer task creation // NOTE: LastHeartbeatTimeoutVisibilityInSeconds is for deduping heartbeat timer creation as it's possible // one heartbeat task was persisted multiple times with different taskIDs due to the retry logic @@ -214,15 +273,52 @@ Loop: // and one of those 4 timers may have fired in this loop // 2. timerSequenceID.attempt < activityInfo.Attempt // retry could update activity attempt, should not timeouts new attempt + // 3. it's a resurrected activity and has already been deleted in this loop continue Loop } - if expired := timerSequence.IsExpired(referenceTime, timerSequenceID); !expired { + delay, expired := timerSequence.IsExpired(referenceTime, timerSequenceID) + if !expired { // timer sequence IDs are sorted, once there is one timer // sequence ID not expired, all after that wil not expired break Loop } + if delay >= resurrectionCheckMinDelay || resurrectedActivity != nil { + if resurrectedActivity == nil { + // overwrite the context here as scan history may take a long time to complete + // ctx will also be used by other operations like updateWorkflow + ctx = scanWorkflowCtx + resurrectedActivity, err = t.getResurrectedActivity(ctx, mutableState) + if err != nil { + t.logger.Error("Activity resurrection check failed", tag.Error(err)) + return err + } + } + + if _, ok := resurrectedActivity[activityInfo.ScheduleID]; ok { + // found activity resurrection + domainName := mutableState.GetDomainEntry().GetInfo().Name + t.metricsClient.Scope(metrics.TimerQueueProcessorScope, metrics.DomainTag(domainName)).IncCounter(metrics.ActivityResurrectionCounter) + t.logger.Warn("Encounter resurrected activity, skip", + tag.WorkflowDomainID(task.DomainID), + tag.WorkflowID(task.WorkflowID), + tag.WorkflowRunID(task.RunID), + tag.TaskType(task.TaskType), + tag.TaskID(task.TaskID), + tag.WorkflowActivityID(activityInfo.ActivityID), + tag.WorkflowScheduleID(activityInfo.ScheduleID), + ) + + // remove resurrected activity from mutable state + if err := mutableState.DeleteActivity(activityInfo.ScheduleID); err != nil { + return err + } + updateMutableState = true + continue Loop + } + } + // check if it's possible that the timeout is due to activity task lost if timerSequenceID.TimerType == execution.TimerTypeScheduleToStart { domainName, err := t.shard.GetDomainCache().GetDomainName(mutableState.GetExecutionInfo().DomainID) @@ -601,6 +697,130 @@ func (t *timerActiveTaskExecutor) getTimerSequence( return execution.NewTimerSequence(timeSource, mutableState) } +func (t *timerActiveTaskExecutor) getResurrectedTimer( + ctx context.Context, + mutableState execution.MutableState, +) (map[string]struct{}, error) { + // 1. find min timer startedID for all pending timers + pendingTimerInfos := mutableState.GetPendingTimerInfos() + minTimerStartedID := common.EndEventID + for _, timerInfo := range pendingTimerInfos { + minTimerStartedID = common.MinInt64(minTimerStartedID, timerInfo.StartedID) + } + + // 2. scan history from minTimerStartedID and see if any + // TimerFiredEvent or TimerCancelledEvent matches pending timer + resurrectedTimer := make(map[string]struct{}) + branchToken, err := mutableState.GetCurrentBranchToken() + if err != nil { + return nil, err + } + + iter := collection.NewPagingIterator(t.getHistoryPaginationFn( + ctx, + minTimerStartedID, + mutableState.GetNextEventID(), + branchToken, + )) + for iter.HasNext() { + item, err := iter.Next() + if err != nil { + return nil, err + } + event := item.(*types.HistoryEvent) + var timerID string + switch event.GetEventType() { + case types.EventTypeTimerFired: + timerID = event.TimerFiredEventAttributes.TimerID + case types.EventTypeTimerCanceled: + timerID = event.TimerCanceledEventAttributes.TimerID + } + if _, ok := pendingTimerInfos[timerID]; ok && timerID != "" { + resurrectedTimer[timerID] = struct{}{} + } + } + return resurrectedTimer, nil +} + +func (t *timerActiveTaskExecutor) getResurrectedActivity( + ctx context.Context, + mutableState execution.MutableState, +) (map[int64]struct{}, error) { + // 1. find min activity scheduledID for all pending activities + pendingActivityInfos := mutableState.GetPendingActivityInfos() + minActivityScheduledID := common.EndEventID + for _, activityInfo := range pendingActivityInfos { + minActivityScheduledID = common.MinInt64(minActivityScheduledID, activityInfo.ScheduleID) + } + + // 2. scan history from minActivityScheduledID and see if any + // activity termination events matches pending activity + resurrectedActivity := make(map[int64]struct{}) + branchToken, err := mutableState.GetCurrentBranchToken() + if err != nil { + return nil, err + } + + iter := collection.NewPagingIterator(t.getHistoryPaginationFn( + ctx, + minActivityScheduledID, + mutableState.GetNextEventID(), + branchToken, + )) + for iter.HasNext() { + item, err := iter.Next() + if err != nil { + return nil, err + } + event := item.(*types.HistoryEvent) + var scheduledID int64 + switch event.GetEventType() { + case types.EventTypeActivityTaskCompleted: + scheduledID = event.ActivityTaskCompletedEventAttributes.ScheduledEventID + case types.EventTypeActivityTaskFailed: + scheduledID = event.ActivityTaskFailedEventAttributes.ScheduledEventID + case types.EventTypeActivityTaskTimedOut: + scheduledID = event.ActivityTaskTimedOutEventAttributes.ScheduledEventID + case types.EventTypeActivityTaskCanceled: + scheduledID = event.ActivityTaskCanceledEventAttributes.ScheduledEventID + } + if _, ok := pendingActivityInfos[scheduledID]; ok && scheduledID != 0 { + resurrectedActivity[scheduledID] = struct{}{} + } + } + return resurrectedActivity, nil +} + +func (t *timerActiveTaskExecutor) getHistoryPaginationFn( + ctx context.Context, + firstEventID int64, + nextEventID int64, + branchToken []byte, +) collection.PaginationFn { + return func(token []byte) ([]interface{}, []byte, error) { + historyEvents, _, token, _, err := persistenceutils.PaginateHistory( + ctx, + t.shard.GetHistoryManager(), + false, + branchToken, + firstEventID, + nextEventID, + nil, + execution.NDCDefaultPageSize, + common.IntPtr(t.shard.GetShardID()), + ) + if err != nil { + return nil, nil, err + } + + var items []interface{} + for _, event := range historyEvents { + items = append(items, event) + } + return items, token, nil + } +} + func (t *timerActiveTaskExecutor) updateWorkflowExecution( ctx context.Context, wfContext execution.Context, diff --git a/service/history/task/timer_active_task_executor_test.go b/service/history/task/timer_active_task_executor_test.go index 3b17298f557..ee5f726d4dc 100644 --- a/service/history/task/timer_active_task_executor_test.go +++ b/service/history/task/timer_active_task_executor_test.go @@ -36,6 +36,7 @@ import ( "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/definition" + "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/mocks" "github.com/uber/cadence/common/persistence" @@ -288,6 +289,110 @@ func (s *timerActiveTaskExecutorSuite) TestProcessUserTimerTimeout_Noop() { s.NoError(err) } +func (s *timerActiveTaskExecutorSuite) TestProcessUserTimerTimeout_Resurrected() { + workflowExecution := types.WorkflowExecution{ + WorkflowID: "some random workflow ID", + RunID: uuid.New(), + } + workflowType := "some random workflow type" + taskListName := "some random task list" + + mutableState := execution.NewMutableStateBuilderWithVersionHistoriesWithEventV2( + s.mockShard, + s.logger, + s.version, + workflowExecution.GetRunID(), + constants.TestGlobalDomainEntry, + ) + _, err := mutableState.AddWorkflowExecutionStartedEvent( + workflowExecution, + &types.HistoryStartWorkflowExecutionRequest{ + DomainUUID: s.domainID, + StartRequest: &types.StartWorkflowExecutionRequest{ + WorkflowType: &types.WorkflowType{Name: workflowType}, + TaskList: &types.TaskList{Name: taskListName}, + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(2), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1), + }, + }, + ) + s.Nil(err) + + di := test.AddDecisionTaskScheduledEvent(mutableState) + event := test.AddDecisionTaskStartedEvent(mutableState, di.ScheduleID, taskListName, uuid.New()) + di.StartedID = event.GetEventID() + event = test.AddDecisionTaskCompletedEvent(mutableState, di.ScheduleID, di.StartedID, nil, "some random identity") + + // schedule two timers + timerID1 := "timer1" + timerTimeout1 := 2 * time.Second + startEvent1, _ := test.AddTimerStartedEvent(mutableState, event.GetEventID(), timerID1, int64(timerTimeout1.Seconds())) + timerID2 := "timer2" + timerTimeout2 := 5 * time.Second + startEvent2, _ := test.AddTimerStartedEvent(mutableState, event.GetEventID(), timerID2, int64(timerTimeout2.Seconds())) + + // fire timer 1 + firedEvent1 := test.AddTimerFiredEvent(mutableState, timerID1) + mutableState.FlushBufferedEvents() + // there should be a decision scheduled event after timer1 is fired + // omitted here to make the test easier to read + + // create timer task for timer2 + timerSequence := execution.NewTimerSequence(s.timeSource, mutableState) + mutableState.DeleteTimerTasks() // remove existing timer task for timerID1 + modified, err := timerSequence.CreateNextUserTimer() + s.NoError(err) + s.True(modified) + task := mutableState.GetTimerTasks()[0] + timerTask := s.newTimerTaskFromInfo(&persistence.TimerTaskInfo{ + Version: s.version, + DomainID: s.domainID, + WorkflowID: workflowExecution.GetWorkflowID(), + RunID: workflowExecution.GetRunID(), + TaskID: int64(100), + TaskType: persistence.TaskTypeUserTimer, + TimeoutType: int(types.TimeoutTypeStartToClose), + VisibilityTimestamp: task.(*persistence.UserTimerTask).GetVisibilityTimestamp(), + EventID: startEvent2.GetEventID(), + }) + + persistenceMutableState := s.createPersistenceMutableState(mutableState, firedEvent1.GetEventID(), event.GetVersion()) + // add resurrected timer info for timer1 + persistenceMutableState.TimerInfos[timerID1] = &persistence.TimerInfo{ + Version: startEvent1.GetVersion(), + TimerID: timerID1, + ExpiryTime: time.Unix(0, startEvent1.GetTimestamp()).Add(timerTimeout1), + StartedID: startEvent1.GetEventID(), + TaskStatus: execution.TimerTaskStatusNone, + } + s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) + s.mockHistoryV2Mgr.On("ReadHistoryBranch", mock.Anything, mock.MatchedBy(func(req *persistence.ReadHistoryBranchRequest) bool { + return req.MinEventID == startEvent1.GetEventID() && req.NextPageToken == nil + })).Return(&persistence.ReadHistoryBranchResponse{ + HistoryEvents: []*types.HistoryEvent{startEvent1, startEvent2, firedEvent1}, + NextPageToken: nil, + }, nil).Once() + // only timer2 should be fired + s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.MatchedBy(func(req *persistence.AppendHistoryNodesRequest) bool { + numTimerFiredEvents := 0 + for _, event := range req.Events { + if event.GetEventType() == types.EventTypeTimerFired { + numTimerFiredEvents++ + } + } + return numTimerFiredEvents == 1 + })).Return(&persistence.AppendHistoryNodesResponse{Size: 0}, nil).Once() + // both timerInfo should be deleted + s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.MatchedBy(func(req *persistence.UpdateWorkflowExecutionRequest) bool { + return len(req.UpdateWorkflowMutation.DeleteTimerInfos) == 2 + })).Return(&persistence.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &persistence.MutableStateUpdateSessionStats{}}, nil).Once() + + s.timerActiveTaskExecutor.config.ResurrectionCheckMinDelay = dynamicconfig.GetDurationPropertyFn(timerTimeout2 - timerTimeout1) + s.timeSource.Update(s.now.Add(timerTimeout2)) + err = s.timerActiveTaskExecutor.Execute(timerTask, true) + s.NoError(err) +} + func (s *timerActiveTaskExecutorSuite) TestProcessActivityTimeout_NoRetryPolicy_Fire() { workflowExecution := types.WorkflowExecution{ @@ -825,6 +930,146 @@ func (s *timerActiveTaskExecutorSuite) TestProcessActivityTimeout_Heartbeat_Noop s.NoError(err) } +func (s *timerActiveTaskExecutorSuite) TestProcessActivityTimeout_Resurrected() { + workflowExecution := types.WorkflowExecution{ + WorkflowID: "some random workflow ID", + RunID: uuid.New(), + } + workflowType := "some random workflow type" + taskListName := "some random task list" + + mutableState := execution.NewMutableStateBuilderWithVersionHistoriesWithEventV2( + s.mockShard, + s.logger, + s.version, + workflowExecution.GetRunID(), + constants.TestGlobalDomainEntry, + ) + _, err := mutableState.AddWorkflowExecutionStartedEvent( + workflowExecution, + &types.HistoryStartWorkflowExecutionRequest{ + DomainUUID: s.domainID, + StartRequest: &types.StartWorkflowExecutionRequest{ + WorkflowType: &types.WorkflowType{Name: workflowType}, + TaskList: &types.TaskList{Name: taskListName}, + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(2), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1), + }, + }, + ) + s.Nil(err) + + di := test.AddDecisionTaskScheduledEvent(mutableState) + event := test.AddDecisionTaskStartedEvent(mutableState, di.ScheduleID, taskListName, uuid.New()) + di.StartedID = event.GetEventID() + event = test.AddDecisionTaskCompletedEvent(mutableState, di.ScheduleID, di.StartedID, nil, "some random identity") + + identity := "identity" + tasklist := "tasklist" + activityID1 := "activity1" + activityID2 := "activity2" + activityType := "activity type" + timerTimeout1 := 2 * time.Second + timerTimeout2 := 5 * time.Second + // schedule 2 activities + scheduledEvent1, _ := test.AddActivityTaskScheduledEvent( + mutableState, + event.GetEventID(), + activityID1, + activityType, + tasklist, + []byte(nil), + int32(timerTimeout1.Seconds()), + int32(timerTimeout1.Seconds()), + int32(timerTimeout1.Seconds()), + int32(timerTimeout1.Seconds()), + ) + scheduledEvent2, _ := test.AddActivityTaskScheduledEvent( + mutableState, + event.GetEventID(), + activityID2, + activityType, + tasklist, + []byte(nil), + int32(timerTimeout2.Seconds()), + int32(timerTimeout2.Seconds()), + int32(timerTimeout2.Seconds()), + int32(timerTimeout2.Seconds()), + ) + + startedEvent1 := test.AddActivityTaskStartedEvent(mutableState, scheduledEvent1.GetEventID(), identity) + completeEvent1 := test.AddActivityTaskCompletedEvent(mutableState, scheduledEvent1.GetEventID(), startedEvent1.GetEventID(), []byte(nil), identity) + mutableState.FlushBufferedEvents() + // there should be a decision scheduled event after activity1 is completed + // omitted here to make the test easier to read + + timerSequence := execution.NewTimerSequence(s.timeSource, mutableState) + mutableState.DeleteTimerTasks() + modified, err := timerSequence.CreateNextActivityTimer() + s.NoError(err) + s.True(modified) + task := mutableState.GetTimerTasks()[0] + timerTask := s.newTimerTaskFromInfo(&persistence.TimerTaskInfo{ + Version: s.version, + DomainID: s.domainID, + WorkflowID: workflowExecution.GetWorkflowID(), + RunID: workflowExecution.GetRunID(), + TaskID: int64(100), + TaskType: persistence.TaskTypeActivityTimeout, + TimeoutType: int(types.TimeoutTypeScheduleToClose), + VisibilityTimestamp: task.(*persistence.ActivityTimeoutTask).GetVisibilityTimestamp(), + EventID: scheduledEvent2.GetEventID(), + }) + + persistenceMutableState := s.createPersistenceMutableState(mutableState, completeEvent1.GetEventID(), event.GetVersion()) + // add resurrected activity info for activity1 + persistenceMutableState.ActivityInfos[scheduledEvent1.GetEventID()] = &persistence.ActivityInfo{ + Version: scheduledEvent1.GetVersion(), + ScheduleID: scheduledEvent1.GetEventID(), + ScheduledEventBatchID: event.GetEventID(), + ScheduledTime: time.Unix(0, event.GetTimestamp()), + StartedID: common.EmptyEventID, + StartedTime: time.Time{}, + ActivityID: activityID1, + DomainID: s.domainID, + ScheduleToStartTimeout: int32(timerTimeout1.Seconds()), + ScheduleToCloseTimeout: int32(timerTimeout1.Seconds()), + StartToCloseTimeout: int32(timerTimeout1.Seconds()), + HeartbeatTimeout: int32(timerTimeout1.Seconds()), + CancelRequested: false, + CancelRequestID: common.EmptyEventID, + LastHeartBeatUpdatedTime: time.Time{}, + TimerTaskStatus: execution.TimerTaskStatusNone, + TaskList: tasklist, + } + s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) + s.mockHistoryV2Mgr.On("ReadHistoryBranch", mock.Anything, mock.MatchedBy(func(req *persistence.ReadHistoryBranchRequest) bool { + return req.MinEventID == scheduledEvent1.GetEventID() && req.NextPageToken == nil + })).Return(&persistence.ReadHistoryBranchResponse{ + HistoryEvents: []*types.HistoryEvent{scheduledEvent1, scheduledEvent2, startedEvent1, completeEvent1}, + NextPageToken: nil, + }, nil).Once() + // only activity timer for activity2 should be fired + s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.MatchedBy(func(req *persistence.AppendHistoryNodesRequest) bool { + numActivityTimeoutEvents := 0 + for _, event := range req.Events { + if event.GetEventType() == types.EventTypeActivityTaskTimedOut { + numActivityTimeoutEvents++ + } + } + return numActivityTimeoutEvents == 1 + })).Return(&persistence.AppendHistoryNodesResponse{Size: 0}, nil).Once() + // both activityInfo should be deleted + s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.MatchedBy(func(req *persistence.UpdateWorkflowExecutionRequest) bool { + return len(req.UpdateWorkflowMutation.DeleteActivityInfos) == 2 + })).Return(&persistence.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &persistence.MutableStateUpdateSessionStats{}}, nil).Once() + + s.timerActiveTaskExecutor.config.ResurrectionCheckMinDelay = dynamicconfig.GetDurationPropertyFn(timerTimeout2 - timerTimeout1) + s.timeSource.Update(s.now.Add(timerTimeout2)) + err = s.timerActiveTaskExecutor.Execute(timerTask, true) + s.NoError(err) +} + func (s *timerActiveTaskExecutorSuite) TestDecisionTimeout_Fire() { workflowExecution := types.WorkflowExecution{ diff --git a/service/history/task/timer_standby_task_executor.go b/service/history/task/timer_standby_task_executor.go index b810d4a4ab8..af9226423b8 100644 --- a/service/history/task/timer_standby_task_executor.go +++ b/service/history/task/timer_standby_task_executor.go @@ -138,7 +138,7 @@ func (t *timerStandbyTaskExecutor) executeUserTimerTimeoutTask( return nil, &types.InternalServiceError{Message: errString} } - if isExpired := timerSequence.IsExpired( + if _, isExpired := timerSequence.IsExpired( timerTask.VisibilityTimestamp, timerSequenceID, ); isExpired { @@ -198,7 +198,7 @@ func (t *timerStandbyTaskExecutor) executeActivityTimeoutTask( return nil, &types.InternalServiceError{Message: errString} } - if isExpired := timerSequence.IsExpired( + if _, isExpired := timerSequence.IsExpired( timerTask.VisibilityTimestamp, timerSequenceID, ); isExpired {