Skip to content

Commit

Permalink
Check for resurrected activities during RecordActivityTaskStarted (ca…
Browse files Browse the repository at this point in the history
  • Loading branch information
vytautas-karpavicius authored May 17, 2022
1 parent 4194b29 commit ee5461b
Show file tree
Hide file tree
Showing 4 changed files with 261 additions and 140 deletions.
177 changes: 177 additions & 0 deletions service/history/execution/integrity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright (c) 2022 Uber Technologies, Inc.
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package execution

import (
"context"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/collection"
persistenceutils "github.com/uber/cadence/common/persistence/persistence-utils"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/history/shard"
)

// GetResurrectedTimers returns a set of timers (timer IDs) that were resurrected.
// Meaning timers that are still pending in mutable state, but were already completed based on event history.
func GetResurrectedTimers(
ctx context.Context,
shard shard.Context,
mutableState MutableState,
) (map[string]struct{}, error) {
// 1. find min timer startedID for all pending timers
pendingTimerInfos := mutableState.GetPendingTimerInfos()
minTimerStartedID := common.EndEventID
for _, timerInfo := range pendingTimerInfos {
minTimerStartedID = common.MinInt64(minTimerStartedID, timerInfo.StartedID)
}

// 2. scan history from minTimerStartedID and see if any
// TimerFiredEvent or TimerCancelledEvent matches pending timer
// NOTE: since we can't read from middle of an events batch,
// history returned by persistence layer won't actually start
// from minTimerStartedID, but start from the batch whose nodeID is
// larger than minTimerStartedID.
// This is ok since the event types we are interested in must in batches
// later than the timer started events.
resurrectedTimer := make(map[string]struct{})
branchToken, err := mutableState.GetCurrentBranchToken()
if err != nil {
return nil, err
}

iter := collection.NewPagingIterator(getHistoryPaginationFn(
ctx,
shard,
minTimerStartedID,
mutableState.GetNextEventID(),
branchToken,
))
for iter.HasNext() {
item, err := iter.Next()
if err != nil {
return nil, err
}
event := item.(*types.HistoryEvent)
var timerID string
switch event.GetEventType() {
case types.EventTypeTimerFired:
timerID = event.TimerFiredEventAttributes.TimerID
case types.EventTypeTimerCanceled:
timerID = event.TimerCanceledEventAttributes.TimerID
}
if _, ok := pendingTimerInfos[timerID]; ok && timerID != "" {
resurrectedTimer[timerID] = struct{}{}
}
}
return resurrectedTimer, nil
}

// GetResurrectedActivities returns a set of activities (schedule IDs) that were resurrected.
// Meaning activities that are still pending in mutable state, but were already completed based on event history.
func GetResurrectedActivities(
ctx context.Context,
shard shard.Context,
mutableState MutableState,
) (map[int64]struct{}, error) {
// 1. find min activity scheduledID for all pending activities
pendingActivityInfos := mutableState.GetPendingActivityInfos()
minActivityScheduledID := common.EndEventID
for _, activityInfo := range pendingActivityInfos {
minActivityScheduledID = common.MinInt64(minActivityScheduledID, activityInfo.ScheduleID)
}

// 2. scan history from minActivityScheduledID and see if any
// activity termination events matches pending activity
// NOTE: since we can't read from middle of an events batch,
// history returned by persistence layer won't actually start
// from minActivityScheduledID, but start from the batch whose nodeID is
// larger than minActivityScheduledID.
// This is ok since the event types we are interested in must in batches
// later than the activity scheduled events.
resurrectedActivity := make(map[int64]struct{})
branchToken, err := mutableState.GetCurrentBranchToken()
if err != nil {
return nil, err
}

iter := collection.NewPagingIterator(getHistoryPaginationFn(
ctx,
shard,
minActivityScheduledID,
mutableState.GetNextEventID(),
branchToken,
))
for iter.HasNext() {
item, err := iter.Next()
if err != nil {
return nil, err
}
event := item.(*types.HistoryEvent)
var scheduledID int64
switch event.GetEventType() {
case types.EventTypeActivityTaskCompleted:
scheduledID = event.ActivityTaskCompletedEventAttributes.ScheduledEventID
case types.EventTypeActivityTaskFailed:
scheduledID = event.ActivityTaskFailedEventAttributes.ScheduledEventID
case types.EventTypeActivityTaskTimedOut:
scheduledID = event.ActivityTaskTimedOutEventAttributes.ScheduledEventID
case types.EventTypeActivityTaskCanceled:
scheduledID = event.ActivityTaskCanceledEventAttributes.ScheduledEventID
}
if _, ok := pendingActivityInfos[scheduledID]; ok && scheduledID != 0 {
resurrectedActivity[scheduledID] = struct{}{}
}
}
return resurrectedActivity, nil
}

func getHistoryPaginationFn(
ctx context.Context,
shard shard.Context,
firstEventID int64,
nextEventID int64,
branchToken []byte,
) collection.PaginationFn {
return func(token []byte) ([]interface{}, []byte, error) {
historyEvents, _, token, _, err := persistenceutils.PaginateHistory(
ctx,
shard.GetHistoryManager(),
false,
branchToken,
firstEventID,
nextEventID,
token,
NDCDefaultPageSize,
common.IntPtr(shard.GetShardID()),
)
if err != nil {
return nil, nil, err
}

var items []interface{}
for _, event := range historyEvents {
items = append(items, event)
}
return items, token, nil
}
}
36 changes: 36 additions & 0 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1690,6 +1690,7 @@ func (e *historyEngineImpl) RecordActivityTaskStarted(
RunID: request.WorkflowExecution.RunID,
}

var resurrectError error
response := &types.RecordActivityTaskStartedResponse{}
err = workflow.UpdateWithAction(ctx, e.executionCache, domainID, workflowExecution, false, e.timeSource.Now(),
func(wfContext execution.Context, mutableState execution.MutableState) error {
Expand All @@ -1701,6 +1702,38 @@ func (e *historyEngineImpl) RecordActivityTaskStarted(
requestID := request.GetRequestID()
ai, isRunning := mutableState.GetActivityInfo(scheduleID)

// RecordActivityTaskStarted is already past scheduleToClose timeout.
// If at this point pending activity is still in mutable state it may be resurrected.
// Otherwise it would be completed or timed out already.
if isRunning && e.timeSource.Now().After(ai.ScheduledTime.Add(time.Duration(ai.ScheduleToCloseTimeout)*time.Second)) {
resurrectedActivities, err := execution.GetResurrectedActivities(ctx, e.shard, mutableState)
if err != nil {
e.logger.Error("Activity resurrection check failed", tag.Error(err))
return err
}

if _, ok := resurrectedActivities[scheduleID]; ok {
// found activity resurrection
domainName := mutableState.GetDomainEntry().GetInfo().Name
e.metricsClient.IncCounter(metrics.HistoryRecordActivityTaskStartedScope, metrics.ActivityResurrectionCounter)
e.logger.Error("Encounter resurrected activity, skip",
tag.WorkflowDomainName(domainName),
tag.WorkflowID(workflowExecution.GetWorkflowID()),
tag.WorkflowRunID(workflowExecution.GetRunID()),
tag.WorkflowScheduleID(scheduleID),
)

// remove resurrected activity from mutable state
if err := mutableState.DeleteActivity(scheduleID); err != nil {
return err
}

// save resurrection error but return nil here, so that mutable state would get updated in DB
resurrectError = workflow.ErrActivityTaskNotFound
return nil
}
}

// First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in
// some extreme cassandra failure cases.
if !isRunning && scheduleID >= mutableState.GetNextEventID() {
Expand Down Expand Up @@ -1764,6 +1797,9 @@ func (e *historyEngineImpl) RecordActivityTaskStarted(
if err != nil {
return nil, err
}
if resurrectError != nil {
return nil, resurrectError
}

return response, err
}
Expand Down
46 changes: 46 additions & 0 deletions service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,52 @@ func (s *engine2Suite) TestRecordActivityTaskStartedSuccess() {
s.Equal(scheduledEvent, response.ScheduledEvent)
}

func (s *engine2Suite) TestRecordActivityTaskStartedResurrected() {
domainID := constants.TestDomainID
workflowExecution := types.WorkflowExecution{WorkflowID: constants.TestWorkflowID, RunID: constants.TestRunID}
identity := "testIdentity"
tl := "testTaskList"

timeSource := clock.NewEventTimeSource()
s.historyEngine.timeSource = timeSource
timeSource.Update(time.Now())

msBuilder := s.createExecutionStartedState(workflowExecution, tl, identity, true)
decisionCompletedEvent := test.AddDecisionTaskCompletedEvent(msBuilder, int64(2), int64(3), nil, identity)
scheduledEvent, _ := test.AddActivityTaskScheduledEvent(msBuilder, decisionCompletedEvent.ID, "activity1_id", "activity_type1", tl, []byte("input1"), 100, 10, 1, 5)

// Use mutable state snapshot before start/completion of the activity (to indicate resurrected state)
msSnapshot := execution.CreatePersistenceMutableState(msBuilder)

startedEvent := test.AddActivityTaskStartedEvent(msBuilder, scheduledEvent.ID, identity)
test.AddActivityTaskCompletedEvent(msBuilder, scheduledEvent.ID, startedEvent.ID, nil, identity)

// Use full history after the activity start/completion
historySnapshot := msBuilder.GetHistoryBuilder().GetHistory()

s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&p.GetWorkflowExecutionResponse{State: msSnapshot}, nil).Once()
s.mockHistoryV2Mgr.On("ReadHistoryBranch", mock.Anything, mock.Anything).Return(&p.ReadHistoryBranchResponse{HistoryEvents: historySnapshot.Events}, nil).Once()

// Expect that mutable state will be updated to delete resurrected activity
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.MatchedBy(func(request *p.UpdateWorkflowExecutionRequest) bool {
return len(request.UpdateWorkflowMutation.DeleteActivityInfos) == 1
})).Return(&p.UpdateWorkflowExecutionResponse{}, nil).Once()

// Ensure enough time passed
timeSource.Update(timeSource.Now().Add(time.Hour))

_, err := s.historyEngine.RecordActivityTaskStarted(context.Background(), &types.RecordActivityTaskStartedRequest{
DomainUUID: domainID,
WorkflowExecution: &workflowExecution,
ScheduleID: scheduledEvent.ID,
TaskID: 100,
RequestID: "reqId",
PollRequest: &types.PollForActivityTaskRequest{TaskList: &types.TaskList{Name: tl}, Identity: identity},
})

s.Equal(err, workflow.ErrActivityTaskNotFound)
}

func (s *engine2Suite) TestRequestCancelWorkflowExecutionSuccess() {
domainID := constants.TestDomainID
workflowExecution := types.WorkflowExecution{
Expand Down
Loading

0 comments on commit ee5461b

Please sign in to comment.