From ee5461b7cc532f031be523fa320e555deb372efc Mon Sep 17 00:00:00 2001 From: Vytautas Date: Tue, 17 May 2022 16:25:12 +0300 Subject: [PATCH] Check for resurrected activities during RecordActivityTaskStarted (#4806) --- service/history/execution/integrity.go | 177 ++++++++++++++++++ service/history/historyEngine.go | 36 ++++ service/history/historyEngine2_test.go | 46 +++++ .../task/timer_active_task_executor.go | 142 +------------- 4 files changed, 261 insertions(+), 140 deletions(-) create mode 100644 service/history/execution/integrity.go diff --git a/service/history/execution/integrity.go b/service/history/execution/integrity.go new file mode 100644 index 00000000000..605fcc50ac9 --- /dev/null +++ b/service/history/execution/integrity.go @@ -0,0 +1,177 @@ +// Copyright (c) 2022 Uber Technologies, Inc. +// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package execution + +import ( + "context" + + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/collection" + persistenceutils "github.com/uber/cadence/common/persistence/persistence-utils" + "github.com/uber/cadence/common/types" + "github.com/uber/cadence/service/history/shard" +) + +// GetResurrectedTimers returns a set of timers (timer IDs) that were resurrected. +// Meaning timers that are still pending in mutable state, but were already completed based on event history. +func GetResurrectedTimers( + ctx context.Context, + shard shard.Context, + mutableState MutableState, +) (map[string]struct{}, error) { + // 1. find min timer startedID for all pending timers + pendingTimerInfos := mutableState.GetPendingTimerInfos() + minTimerStartedID := common.EndEventID + for _, timerInfo := range pendingTimerInfos { + minTimerStartedID = common.MinInt64(minTimerStartedID, timerInfo.StartedID) + } + + // 2. scan history from minTimerStartedID and see if any + // TimerFiredEvent or TimerCancelledEvent matches pending timer + // NOTE: since we can't read from middle of an events batch, + // history returned by persistence layer won't actually start + // from minTimerStartedID, but start from the batch whose nodeID is + // larger than minTimerStartedID. + // This is ok since the event types we are interested in must in batches + // later than the timer started events. + resurrectedTimer := make(map[string]struct{}) + branchToken, err := mutableState.GetCurrentBranchToken() + if err != nil { + return nil, err + } + + iter := collection.NewPagingIterator(getHistoryPaginationFn( + ctx, + shard, + minTimerStartedID, + mutableState.GetNextEventID(), + branchToken, + )) + for iter.HasNext() { + item, err := iter.Next() + if err != nil { + return nil, err + } + event := item.(*types.HistoryEvent) + var timerID string + switch event.GetEventType() { + case types.EventTypeTimerFired: + timerID = event.TimerFiredEventAttributes.TimerID + case types.EventTypeTimerCanceled: + timerID = event.TimerCanceledEventAttributes.TimerID + } + if _, ok := pendingTimerInfos[timerID]; ok && timerID != "" { + resurrectedTimer[timerID] = struct{}{} + } + } + return resurrectedTimer, nil +} + +// GetResurrectedActivities returns a set of activities (schedule IDs) that were resurrected. +// Meaning activities that are still pending in mutable state, but were already completed based on event history. +func GetResurrectedActivities( + ctx context.Context, + shard shard.Context, + mutableState MutableState, +) (map[int64]struct{}, error) { + // 1. find min activity scheduledID for all pending activities + pendingActivityInfos := mutableState.GetPendingActivityInfos() + minActivityScheduledID := common.EndEventID + for _, activityInfo := range pendingActivityInfos { + minActivityScheduledID = common.MinInt64(minActivityScheduledID, activityInfo.ScheduleID) + } + + // 2. scan history from minActivityScheduledID and see if any + // activity termination events matches pending activity + // NOTE: since we can't read from middle of an events batch, + // history returned by persistence layer won't actually start + // from minActivityScheduledID, but start from the batch whose nodeID is + // larger than minActivityScheduledID. + // This is ok since the event types we are interested in must in batches + // later than the activity scheduled events. + resurrectedActivity := make(map[int64]struct{}) + branchToken, err := mutableState.GetCurrentBranchToken() + if err != nil { + return nil, err + } + + iter := collection.NewPagingIterator(getHistoryPaginationFn( + ctx, + shard, + minActivityScheduledID, + mutableState.GetNextEventID(), + branchToken, + )) + for iter.HasNext() { + item, err := iter.Next() + if err != nil { + return nil, err + } + event := item.(*types.HistoryEvent) + var scheduledID int64 + switch event.GetEventType() { + case types.EventTypeActivityTaskCompleted: + scheduledID = event.ActivityTaskCompletedEventAttributes.ScheduledEventID + case types.EventTypeActivityTaskFailed: + scheduledID = event.ActivityTaskFailedEventAttributes.ScheduledEventID + case types.EventTypeActivityTaskTimedOut: + scheduledID = event.ActivityTaskTimedOutEventAttributes.ScheduledEventID + case types.EventTypeActivityTaskCanceled: + scheduledID = event.ActivityTaskCanceledEventAttributes.ScheduledEventID + } + if _, ok := pendingActivityInfos[scheduledID]; ok && scheduledID != 0 { + resurrectedActivity[scheduledID] = struct{}{} + } + } + return resurrectedActivity, nil +} + +func getHistoryPaginationFn( + ctx context.Context, + shard shard.Context, + firstEventID int64, + nextEventID int64, + branchToken []byte, +) collection.PaginationFn { + return func(token []byte) ([]interface{}, []byte, error) { + historyEvents, _, token, _, err := persistenceutils.PaginateHistory( + ctx, + shard.GetHistoryManager(), + false, + branchToken, + firstEventID, + nextEventID, + token, + NDCDefaultPageSize, + common.IntPtr(shard.GetShardID()), + ) + if err != nil { + return nil, nil, err + } + + var items []interface{} + for _, event := range historyEvents { + items = append(items, event) + } + return items, token, nil + } +} diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 0e34f19de60..f733aea5ddc 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -1690,6 +1690,7 @@ func (e *historyEngineImpl) RecordActivityTaskStarted( RunID: request.WorkflowExecution.RunID, } + var resurrectError error response := &types.RecordActivityTaskStartedResponse{} err = workflow.UpdateWithAction(ctx, e.executionCache, domainID, workflowExecution, false, e.timeSource.Now(), func(wfContext execution.Context, mutableState execution.MutableState) error { @@ -1701,6 +1702,38 @@ func (e *historyEngineImpl) RecordActivityTaskStarted( requestID := request.GetRequestID() ai, isRunning := mutableState.GetActivityInfo(scheduleID) + // RecordActivityTaskStarted is already past scheduleToClose timeout. + // If at this point pending activity is still in mutable state it may be resurrected. + // Otherwise it would be completed or timed out already. + if isRunning && e.timeSource.Now().After(ai.ScheduledTime.Add(time.Duration(ai.ScheduleToCloseTimeout)*time.Second)) { + resurrectedActivities, err := execution.GetResurrectedActivities(ctx, e.shard, mutableState) + if err != nil { + e.logger.Error("Activity resurrection check failed", tag.Error(err)) + return err + } + + if _, ok := resurrectedActivities[scheduleID]; ok { + // found activity resurrection + domainName := mutableState.GetDomainEntry().GetInfo().Name + e.metricsClient.IncCounter(metrics.HistoryRecordActivityTaskStartedScope, metrics.ActivityResurrectionCounter) + e.logger.Error("Encounter resurrected activity, skip", + tag.WorkflowDomainName(domainName), + tag.WorkflowID(workflowExecution.GetWorkflowID()), + tag.WorkflowRunID(workflowExecution.GetRunID()), + tag.WorkflowScheduleID(scheduleID), + ) + + // remove resurrected activity from mutable state + if err := mutableState.DeleteActivity(scheduleID); err != nil { + return err + } + + // save resurrection error but return nil here, so that mutable state would get updated in DB + resurrectError = workflow.ErrActivityTaskNotFound + return nil + } + } + // First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in // some extreme cassandra failure cases. if !isRunning && scheduleID >= mutableState.GetNextEventID() { @@ -1764,6 +1797,9 @@ func (e *historyEngineImpl) RecordActivityTaskStarted( if err != nil { return nil, err } + if resurrectError != nil { + return nil, resurrectError + } return response, err } diff --git a/service/history/historyEngine2_test.go b/service/history/historyEngine2_test.go index 583fb6be35d..889cdf08ed0 100644 --- a/service/history/historyEngine2_test.go +++ b/service/history/historyEngine2_test.go @@ -764,6 +764,52 @@ func (s *engine2Suite) TestRecordActivityTaskStartedSuccess() { s.Equal(scheduledEvent, response.ScheduledEvent) } +func (s *engine2Suite) TestRecordActivityTaskStartedResurrected() { + domainID := constants.TestDomainID + workflowExecution := types.WorkflowExecution{WorkflowID: constants.TestWorkflowID, RunID: constants.TestRunID} + identity := "testIdentity" + tl := "testTaskList" + + timeSource := clock.NewEventTimeSource() + s.historyEngine.timeSource = timeSource + timeSource.Update(time.Now()) + + msBuilder := s.createExecutionStartedState(workflowExecution, tl, identity, true) + decisionCompletedEvent := test.AddDecisionTaskCompletedEvent(msBuilder, int64(2), int64(3), nil, identity) + scheduledEvent, _ := test.AddActivityTaskScheduledEvent(msBuilder, decisionCompletedEvent.ID, "activity1_id", "activity_type1", tl, []byte("input1"), 100, 10, 1, 5) + + // Use mutable state snapshot before start/completion of the activity (to indicate resurrected state) + msSnapshot := execution.CreatePersistenceMutableState(msBuilder) + + startedEvent := test.AddActivityTaskStartedEvent(msBuilder, scheduledEvent.ID, identity) + test.AddActivityTaskCompletedEvent(msBuilder, scheduledEvent.ID, startedEvent.ID, nil, identity) + + // Use full history after the activity start/completion + historySnapshot := msBuilder.GetHistoryBuilder().GetHistory() + + s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&p.GetWorkflowExecutionResponse{State: msSnapshot}, nil).Once() + s.mockHistoryV2Mgr.On("ReadHistoryBranch", mock.Anything, mock.Anything).Return(&p.ReadHistoryBranchResponse{HistoryEvents: historySnapshot.Events}, nil).Once() + + // Expect that mutable state will be updated to delete resurrected activity + s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.MatchedBy(func(request *p.UpdateWorkflowExecutionRequest) bool { + return len(request.UpdateWorkflowMutation.DeleteActivityInfos) == 1 + })).Return(&p.UpdateWorkflowExecutionResponse{}, nil).Once() + + // Ensure enough time passed + timeSource.Update(timeSource.Now().Add(time.Hour)) + + _, err := s.historyEngine.RecordActivityTaskStarted(context.Background(), &types.RecordActivityTaskStartedRequest{ + DomainUUID: domainID, + WorkflowExecution: &workflowExecution, + ScheduleID: scheduledEvent.ID, + TaskID: 100, + RequestID: "reqId", + PollRequest: &types.PollForActivityTaskRequest{TaskList: &types.TaskList{Name: tl}, Identity: identity}, + }) + + s.Equal(err, workflow.ErrActivityTaskNotFound) +} + func (s *engine2Suite) TestRequestCancelWorkflowExecutionSuccess() { domainID := constants.TestDomainID workflowExecution := types.WorkflowExecution{ diff --git a/service/history/task/timer_active_task_executor.go b/service/history/task/timer_active_task_executor.go index 2587473748c..19bc4281c3c 100644 --- a/service/history/task/timer_active_task_executor.go +++ b/service/history/task/timer_active_task_executor.go @@ -27,12 +27,10 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/backoff" - "github.com/uber/cadence/common/collection" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" - persistenceutils "github.com/uber/cadence/common/persistence/persistence-utils" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/history/config" "github.com/uber/cadence/service/history/execution" @@ -171,7 +169,7 @@ Loop: // overwrite the context here as scan history may take a long time to complete // ctx will also be used by other operations like updateWorkflow ctx = scanWorkflowCtx - resurrectedTimer, err = t.getResurrectedTimer(ctx, mutableState) + resurrectedTimer, err = execution.GetResurrectedTimers(ctx, t.shard, mutableState) if err != nil { t.logger.Error("Timer resurrection check failed", tag.Error(err)) return err @@ -294,7 +292,7 @@ Loop: // overwrite the context here as scan history may take a long time to complete // ctx will also be used by other operations like updateWorkflow ctx = scanWorkflowCtx - resurrectedActivity, err = t.getResurrectedActivity(ctx, mutableState) + resurrectedActivity, err = execution.GetResurrectedActivities(ctx, t.shard, mutableState) if err != nil { t.logger.Error("Activity resurrection check failed", tag.Error(err)) return err @@ -712,142 +710,6 @@ func (t *timerActiveTaskExecutor) executeWorkflowTimeoutTask( ) } -func (t *timerActiveTaskExecutor) getResurrectedTimer( - ctx context.Context, - mutableState execution.MutableState, -) (map[string]struct{}, error) { - // 1. find min timer startedID for all pending timers - pendingTimerInfos := mutableState.GetPendingTimerInfos() - minTimerStartedID := common.EndEventID - for _, timerInfo := range pendingTimerInfos { - minTimerStartedID = common.MinInt64(minTimerStartedID, timerInfo.StartedID) - } - - // 2. scan history from minTimerStartedID and see if any - // TimerFiredEvent or TimerCancelledEvent matches pending timer - // NOTE: since we can't read from middle of an events batch, - // history returned by persistence layer won't actually start - // from minTimerStartedID, but start from the batch whose nodeID is - // larger than minTimerStartedID. - // This is ok since the event types we are interested in must in batches - // later than the timer started events. - resurrectedTimer := make(map[string]struct{}) - branchToken, err := mutableState.GetCurrentBranchToken() - if err != nil { - return nil, err - } - - iter := collection.NewPagingIterator(t.getHistoryPaginationFn( - ctx, - minTimerStartedID, - mutableState.GetNextEventID(), - branchToken, - )) - for iter.HasNext() { - item, err := iter.Next() - if err != nil { - return nil, err - } - event := item.(*types.HistoryEvent) - var timerID string - switch event.GetEventType() { - case types.EventTypeTimerFired: - timerID = event.TimerFiredEventAttributes.TimerID - case types.EventTypeTimerCanceled: - timerID = event.TimerCanceledEventAttributes.TimerID - } - if _, ok := pendingTimerInfos[timerID]; ok && timerID != "" { - resurrectedTimer[timerID] = struct{}{} - } - } - return resurrectedTimer, nil -} - -func (t *timerActiveTaskExecutor) getResurrectedActivity( - ctx context.Context, - mutableState execution.MutableState, -) (map[int64]struct{}, error) { - // 1. find min activity scheduledID for all pending activities - pendingActivityInfos := mutableState.GetPendingActivityInfos() - minActivityScheduledID := common.EndEventID - for _, activityInfo := range pendingActivityInfos { - minActivityScheduledID = common.MinInt64(minActivityScheduledID, activityInfo.ScheduleID) - } - - // 2. scan history from minActivityScheduledID and see if any - // activity termination events matches pending activity - // NOTE: since we can't read from middle of an events batch, - // history returned by persistence layer won't actually start - // from minActivityScheduledID, but start from the batch whose nodeID is - // larger than minActivityScheduledID. - // This is ok since the event types we are interested in must in batches - // later than the activity scheduled events. - resurrectedActivity := make(map[int64]struct{}) - branchToken, err := mutableState.GetCurrentBranchToken() - if err != nil { - return nil, err - } - - iter := collection.NewPagingIterator(t.getHistoryPaginationFn( - ctx, - minActivityScheduledID, - mutableState.GetNextEventID(), - branchToken, - )) - for iter.HasNext() { - item, err := iter.Next() - if err != nil { - return nil, err - } - event := item.(*types.HistoryEvent) - var scheduledID int64 - switch event.GetEventType() { - case types.EventTypeActivityTaskCompleted: - scheduledID = event.ActivityTaskCompletedEventAttributes.ScheduledEventID - case types.EventTypeActivityTaskFailed: - scheduledID = event.ActivityTaskFailedEventAttributes.ScheduledEventID - case types.EventTypeActivityTaskTimedOut: - scheduledID = event.ActivityTaskTimedOutEventAttributes.ScheduledEventID - case types.EventTypeActivityTaskCanceled: - scheduledID = event.ActivityTaskCanceledEventAttributes.ScheduledEventID - } - if _, ok := pendingActivityInfos[scheduledID]; ok && scheduledID != 0 { - resurrectedActivity[scheduledID] = struct{}{} - } - } - return resurrectedActivity, nil -} - -func (t *timerActiveTaskExecutor) getHistoryPaginationFn( - ctx context.Context, - firstEventID int64, - nextEventID int64, - branchToken []byte, -) collection.PaginationFn { - return func(token []byte) ([]interface{}, []byte, error) { - historyEvents, _, token, _, err := persistenceutils.PaginateHistory( - ctx, - t.shard.GetHistoryManager(), - false, - branchToken, - firstEventID, - nextEventID, - token, - execution.NDCDefaultPageSize, - common.IntPtr(t.shard.GetShardID()), - ) - if err != nil { - return nil, nil, err - } - - var items []interface{} - for _, event := range historyEvents { - items = append(items, event) - } - return items, token, nil - } -} - func (t *timerActiveTaskExecutor) updateWorkflowExecution( ctx context.Context, wfContext execution.Context,