Skip to content

Commit

Permalink
Fix NDC continue as new to non-current branch deadlock (cadence-workf…
Browse files Browse the repository at this point in the history
…low#2831)

* Fix issue that continue as new to non current branch may encounter deadlock if target workflow is the current workflow
  • Loading branch information
wxing1292 authored Nov 16, 2019
1 parent 0d160a9 commit edf871d
Show file tree
Hide file tree
Showing 3 changed files with 356 additions and 22 deletions.
269 changes: 269 additions & 0 deletions host/ndc/nDC_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,275 @@ func (s *nDCIntegrationTestSuite) TestHandcraftedMultipleBranches() {
)
}

func (s *nDCIntegrationTestSuite) TestHandcraftedMultipleBranchesWithZombieContinueAsNew() {

s.setupRemoteFrontendClients()
workflowID := "ndc-handcrafted-multiple-branches-with-continue-as-new-test" + uuid.New()
runID := uuid.New()

workflowType := "event-generator-workflow-type"
tasklist := "event-generator-taskList"
identity := "worker-identity"

// active has initial version 0
historyClient := s.active.GetHistoryClient()

eventsBatch1 := []*shared.History{
{Events: []*shared.HistoryEvent{
{
EventId: common.Int64Ptr(1),
Version: common.Int64Ptr(21),
EventType: shared.EventTypeWorkflowExecutionStarted.Ptr(),
WorkflowExecutionStartedEventAttributes: &shared.WorkflowExecutionStartedEventAttributes{
WorkflowType: &shared.WorkflowType{Name: common.StringPtr(workflowType)},
TaskList: &shared.TaskList{Name: common.StringPtr(tasklist)},
Input: nil,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(1000),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1000),
FirstDecisionTaskBackoffSeconds: common.Int32Ptr(100),
},
},
{
EventId: common.Int64Ptr(2),
Version: common.Int64Ptr(21),
EventType: shared.EventTypeDecisionTaskScheduled.Ptr(),
DecisionTaskScheduledEventAttributes: &shared.DecisionTaskScheduledEventAttributes{
TaskList: &shared.TaskList{Name: common.StringPtr(tasklist)},
StartToCloseTimeoutSeconds: common.Int32Ptr(1000),
Attempt: common.Int64Ptr(0),
},
},
}},
{Events: []*shared.HistoryEvent{
{
EventId: common.Int64Ptr(3),
Version: common.Int64Ptr(21),
EventType: shared.EventTypeDecisionTaskStarted.Ptr(),
DecisionTaskStartedEventAttributes: &shared.DecisionTaskStartedEventAttributes{
ScheduledEventId: common.Int64Ptr(2),
Identity: common.StringPtr(identity),
RequestId: common.StringPtr(uuid.New()),
},
},
}},
{Events: []*shared.HistoryEvent{
{
EventId: common.Int64Ptr(4),
Version: common.Int64Ptr(21),
EventType: shared.EventTypeDecisionTaskCompleted.Ptr(),
DecisionTaskCompletedEventAttributes: &shared.DecisionTaskCompletedEventAttributes{
ScheduledEventId: common.Int64Ptr(2),
StartedEventId: common.Int64Ptr(3),
Identity: common.StringPtr(identity),
},
},
{
EventId: common.Int64Ptr(5),
Version: common.Int64Ptr(21),
EventType: shared.EventTypeMarkerRecorded.Ptr(),
MarkerRecordedEventAttributes: &shared.MarkerRecordedEventAttributes{
MarkerName: common.StringPtr("some marker name"),
Details: []byte("some marker details"),
DecisionTaskCompletedEventId: common.Int64Ptr(4),
},
},
{
EventId: common.Int64Ptr(6),
Version: common.Int64Ptr(21),
EventType: shared.EventTypeActivityTaskScheduled.Ptr(),
ActivityTaskScheduledEventAttributes: &shared.ActivityTaskScheduledEventAttributes{
DecisionTaskCompletedEventId: common.Int64Ptr(4),
ActivityId: common.StringPtr("0"),
ActivityType: &shared.ActivityType{Name: common.StringPtr("activity-type")},
TaskList: &shared.TaskList{Name: common.StringPtr(tasklist)},
Input: nil,
ScheduleToCloseTimeoutSeconds: common.Int32Ptr(20),
ScheduleToStartTimeoutSeconds: common.Int32Ptr(20),
StartToCloseTimeoutSeconds: common.Int32Ptr(20),
HeartbeatTimeoutSeconds: common.Int32Ptr(20),
},
},
}},
{Events: []*shared.HistoryEvent{
{
EventId: common.Int64Ptr(7),
Version: common.Int64Ptr(21),
EventType: shared.EventTypeActivityTaskStarted.Ptr(),
ActivityTaskStartedEventAttributes: &shared.ActivityTaskStartedEventAttributes{
ScheduledEventId: common.Int64Ptr(6),
Identity: common.StringPtr(identity),
RequestId: common.StringPtr(uuid.New()),
Attempt: common.Int32Ptr(0),
},
},
}},
{Events: []*shared.HistoryEvent{
{
EventId: common.Int64Ptr(8),
Version: common.Int64Ptr(21),
EventType: shared.EventTypeWorkflowExecutionSignaled.Ptr(),
WorkflowExecutionSignaledEventAttributes: &shared.WorkflowExecutionSignaledEventAttributes{
SignalName: common.StringPtr("some signal name 1"),
Input: []byte("some signal details 1"),
Identity: common.StringPtr(identity),
},
},
{
EventId: common.Int64Ptr(9),
Version: common.Int64Ptr(21),
EventType: shared.EventTypeDecisionTaskScheduled.Ptr(),
DecisionTaskScheduledEventAttributes: &shared.DecisionTaskScheduledEventAttributes{
TaskList: &shared.TaskList{Name: common.StringPtr(tasklist)},
StartToCloseTimeoutSeconds: common.Int32Ptr(1000),
Attempt: common.Int64Ptr(0),
},
},
}},
{Events: []*shared.HistoryEvent{
{
EventId: common.Int64Ptr(10),
Version: common.Int64Ptr(21),
EventType: shared.EventTypeDecisionTaskStarted.Ptr(),
DecisionTaskStartedEventAttributes: &shared.DecisionTaskStartedEventAttributes{
ScheduledEventId: common.Int64Ptr(9),
Identity: common.StringPtr(identity),
RequestId: common.StringPtr(uuid.New()),
},
},
}},
{Events: []*shared.HistoryEvent{
{
EventId: common.Int64Ptr(11),
Version: common.Int64Ptr(21),
EventType: shared.EventTypeDecisionTaskCompleted.Ptr(),
DecisionTaskCompletedEventAttributes: &shared.DecisionTaskCompletedEventAttributes{
ScheduledEventId: common.Int64Ptr(9),
StartedEventId: common.Int64Ptr(10),
Identity: common.StringPtr(identity),
},
},
{
EventId: common.Int64Ptr(12),
Version: common.Int64Ptr(21),
EventType: shared.EventTypeWorkflowExecutionSignaled.Ptr(),
WorkflowExecutionSignaledEventAttributes: &shared.WorkflowExecutionSignaledEventAttributes{
SignalName: common.StringPtr("some signal name 2"),
Input: []byte("some signal details 2"),
Identity: common.StringPtr(identity),
},
},
{
EventId: common.Int64Ptr(13),
Version: common.Int64Ptr(21),
EventType: shared.EventTypeDecisionTaskScheduled.Ptr(),
DecisionTaskScheduledEventAttributes: &shared.DecisionTaskScheduledEventAttributes{
TaskList: &shared.TaskList{Name: common.StringPtr(tasklist)},
StartToCloseTimeoutSeconds: common.Int32Ptr(1000),
Attempt: common.Int64Ptr(0),
},
},
{
EventId: common.Int64Ptr(14),
Version: common.Int64Ptr(21),
EventType: shared.EventTypeDecisionTaskStarted.Ptr(),
DecisionTaskStartedEventAttributes: &shared.DecisionTaskStartedEventAttributes{
ScheduledEventId: common.Int64Ptr(13),
Identity: common.StringPtr(identity),
RequestId: common.StringPtr(uuid.New()),
},
},
}},
}

eventsBatch2 := []*shared.History{
{Events: []*shared.HistoryEvent{
{
EventId: common.Int64Ptr(15),
Version: common.Int64Ptr(32),
EventType: shared.EventTypeDecisionTaskCompleted.Ptr(),
DecisionTaskCompletedEventAttributes: &shared.DecisionTaskCompletedEventAttributes{
ScheduledEventId: common.Int64Ptr(8),
StartedEventId: common.Int64Ptr(9),
Identity: common.StringPtr(identity),
},
},
}},
// need to keep the workflow open for testing
}

eventsBatch3 := []*shared.History{
{Events: []*shared.HistoryEvent{
{
EventId: common.Int64Ptr(15),
Version: common.Int64Ptr(21),
EventType: shared.EventTypeDecisionTaskCompleted.Ptr(),
DecisionTaskCompletedEventAttributes: &shared.DecisionTaskCompletedEventAttributes{
ScheduledEventId: common.Int64Ptr(8),
StartedEventId: common.Int64Ptr(9),
Identity: common.StringPtr(identity),
},
},
{
EventId: common.Int64Ptr(16),
Version: common.Int64Ptr(21),
EventType: shared.EventTypeWorkflowExecutionContinuedAsNew.Ptr(),
WorkflowExecutionContinuedAsNewEventAttributes: &shared.WorkflowExecutionContinuedAsNewEventAttributes{
NewExecutionRunId: common.StringPtr(uuid.New()),
WorkflowType: &shared.WorkflowType{Name: common.StringPtr(workflowType)},
TaskList: &shared.TaskList{Name: common.StringPtr(tasklist)},
Input: nil,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(1000),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1000),
DecisionTaskCompletedEventId: common.Int64Ptr(19),
Initiator: shared.ContinueAsNewInitiatorDecider.Ptr(),
},
},
}},
}

versionHistory1 := s.eventBatchesToVersionHistory(nil, eventsBatch1)

versionHistory2, err := versionHistory1.DuplicateUntilLCAItem(
persistence.NewVersionHistoryItem(14, 21),
)
s.NoError(err)
versionHistory2 = s.eventBatchesToVersionHistory(versionHistory2, eventsBatch2)

versionHistory3, err := versionHistory1.DuplicateUntilLCAItem(
persistence.NewVersionHistoryItem(14, 21),
)
s.NoError(err)
versionHistory3 = s.eventBatchesToVersionHistory(versionHistory3, eventsBatch3)

s.applyEvents(
workflowID,
runID,
workflowType,
tasklist,
versionHistory1,
eventsBatch1,
historyClient,
)
s.applyEvents(
workflowID,
runID,
workflowType,
tasklist,
versionHistory2,
eventsBatch2,
historyClient,
)
s.applyEvents(
workflowID,
runID,
workflowType,
tasklist,
versionHistory3,
eventsBatch3,
historyClient,
)
}

func (s *nDCIntegrationTestSuite) TestEventsReapply_ZombieWorkflow() {

workflowID := "ndc-single-branch-test" + uuid.New()
Expand Down
92 changes: 77 additions & 15 deletions service/history/nDCHistoryReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,24 +463,36 @@ func (r *nDCHistoryReplicatorImpl) applyNonStartEventsToNoneCurrentBranch(
task nDCReplicationTask,
) error {

// workflow backfill to non current branch
// if encounter backfill with continue as new
// first, create the new workflow as zombie
if len(task.getNewEvents()) != 0 {
startTime := time.Now()
newTask, err := task.generateNewRunTask(startTime)
if err != nil {
return err
}
if err := r.applyEvents(ctx, newTask); err != nil {
newTask.getLogger().Error(
"nDCHistoryReplicator unable to create new workflow when applyNonStartEventsToNoneCurrentBranch",
tag.Error(err),
)
return err
}
return r.applyNonStartEventsToNoneCurrentBranchWithContinueAsNew(
ctx,
context,
mutableState,
branchIndex,
releaseFn,
task,
)
}

return r.applyNonStartEventsToNoneCurrentBranchWithoutContinueAsNew(
ctx,
context,
mutableState,
branchIndex,
releaseFn,
task,
)
}

func (r *nDCHistoryReplicatorImpl) applyNonStartEventsToNoneCurrentBranchWithoutContinueAsNew(
ctx ctx.Context,
context workflowExecutionContext,
mutableState mutableState,
branchIndex int,
releaseFn releaseWorkflowExecutionFunc,
task nDCReplicationTask,
) error {

versionHistoryItem := persistence.NewVersionHistoryItem(
task.getLastEvent().GetEventId(),
task.getLastEvent().GetVersion(),
Expand Down Expand Up @@ -522,6 +534,56 @@ func (r *nDCHistoryReplicatorImpl) applyNonStartEventsToNoneCurrentBranch(
return nil
}

func (r *nDCHistoryReplicatorImpl) applyNonStartEventsToNoneCurrentBranchWithContinueAsNew(
ctx ctx.Context,
context workflowExecutionContext,
mutableState mutableState,
branchIndex int,
releaseFn releaseWorkflowExecutionFunc,
task nDCReplicationTask,
) error {

// workflow backfill to non current branch with continue as new
// first, release target workflow lock & create the new workflow as zombie
// NOTE: need to release target workflow due to target workflow
// can potentially be the current workflow causing deadlock

// 1. clear all in memory changes & release target workflow lock
// 2. apply new workflow first
// 3. apply target workflow

// step 1
context.clear()
releaseFn(nil)
context = nil
mutableState = nil
releaseFn = nil

// step 2
startTime := time.Now()
task, newTask, err := task.splitTask(startTime)
if err != nil {
return err
}
if err := r.applyEvents(ctx, newTask); err != nil {
newTask.getLogger().Error(
"nDCHistoryReplicator unable to create new workflow when applyNonStartEventsToNoneCurrentBranchWithContinueAsNew",
tag.Error(err),
)
return err
}

// step 3
if err := r.applyEvents(ctx, task); err != nil {
newTask.getLogger().Error(
"nDCHistoryReplicator unable to create target workflow when applyNonStartEventsToNoneCurrentBranchWithContinueAsNew",
tag.Error(err),
)
return err
}
return nil
}

func (r *nDCHistoryReplicatorImpl) applyNonStartEventsMissingMutableState(
ctx ctx.Context,
newContext workflowExecutionContext,
Expand Down
Loading

0 comments on commit edf871d

Please sign in to comment.