Skip to content

Commit

Permalink
Fix sync activity update zombie (cadence-workflow#2805)
Browse files Browse the repository at this point in the history
* Fix sync activity update zombie
  • Loading branch information
yux0 authored Nov 9, 2019
1 parent d4308b9 commit 1fac8f6
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 5 deletions.
25 changes: 21 additions & 4 deletions service/history/nDCActivityReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func (r *nDCActivityReplicatorImpl) SyncActivity(
// 2. activity heart beat
// no sync activity task will be sent when active side fail / timeout activity,
// since standby side does not have activity retry timer

domainID := request.GetDomainId()
execution := workflow.WorkflowExecution{
WorkflowId: request.WorkflowId,
Expand Down Expand Up @@ -184,7 +183,20 @@ func (r *nDCActivityReplicatorImpl) SyncActivity(
).createNextActivityTimer(); err != nil {
return err
}
return context.updateWorkflowExecutionAsPassive(now)

updateMode := persistence.UpdateWorkflowModeUpdateCurrent
if state, _ := mutableState.GetWorkflowStateCloseStatus(); state == persistence.WorkflowStateZombie {
updateMode = persistence.UpdateWorkflowModeBypassCurrent
}

return context.updateWorkflowExecutionWithNew(
now,
updateMode,
nil, // no new workflow
nil, // no new workflow
transactionPolicyPassive,
nil,
)
}

func (r *nDCActivityReplicatorImpl) shouldApplySyncActivity(
Expand Down Expand Up @@ -224,8 +236,8 @@ func (r *nDCActivityReplicatorImpl) shouldApplySyncActivity(
// resend the missing event if local version history doesn't have the schedule event

// case 2: local version history and incoming version history diverged
// case 2-1: local version history is the dominator and discard the incoming event
// case 2-2: incoming version history is the dominator and resend the missing incoming events
// case 2-1: local version history has the higher version and discard the incoming event
// case 2-2: incoming version history has the higher version and resend the missing incoming events
if currentVersionHistory.IsLCAAppendable(lcaItem) || incomingVersionHistory.IsLCAAppendable(lcaItem) {
// case 1
if scheduleID > lcaItem.GetEventID() {
Expand Down Expand Up @@ -257,6 +269,10 @@ func (r *nDCActivityReplicatorImpl) shouldApplySyncActivity(
)
}
}

if !mutableState.IsWorkflowExecutionRunning() {
return false, nil
}
} else if mutableState.GetReplicationState() != nil {
// TODO when 2DC is deprecated, remove this block
if !mutableState.IsWorkflowExecutionRunning() {
Expand Down Expand Up @@ -286,5 +302,6 @@ func (r *nDCActivityReplicatorImpl) shouldApplySyncActivity(
} else {
return false, &shared.InternalServiceError{Message: "The workflow is neither 2DC or 3DC enabled."}
}

return true, nil
}
96 changes: 95 additions & 1 deletion service/history/nDCActivityReplicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,7 @@ func (s *activityReplicatorSuite) TestSyncActivity_VersionHistories_SameSchedule
}
s.mockMutableState.EXPECT().GetVersionHistories().Return(localVersionHistories).AnyTimes()
s.mockMutableState.EXPECT().GetActivityInfo(scheduleID).Return(nil, false).AnyTimes()
s.mockMutableState.EXPECT().IsWorkflowExecutionRunning().Return(true).AnyTimes()
s.mockDomainCache.EXPECT().GetDomainByID(domainID).Return(
cache.NewGlobalDomainCacheEntryForTest(
&persistence.DomainInfo{ID: domainID, Name: domainName},
Expand Down Expand Up @@ -747,6 +748,7 @@ func (s *activityReplicatorSuite) TestSyncActivity_VersionHistories_LocalVersion
}
s.mockMutableState.EXPECT().GetVersionHistories().Return(localVersionHistories).AnyTimes()
s.mockMutableState.EXPECT().GetActivityInfo(scheduleID).Return(nil, false).AnyTimes()
s.mockMutableState.EXPECT().IsWorkflowExecutionRunning().Return(true).AnyTimes()
s.mockDomainCache.EXPECT().GetDomainByID(domainID).Return(
cache.NewGlobalDomainCacheEntryForTest(
&persistence.DomainInfo{ID: domainID, Name: domainName},
Expand Down Expand Up @@ -1130,6 +1132,91 @@ func (s *activityReplicatorSuite) TestSyncActivity_ActivityRunning() {
var versionHistories *persistence.VersionHistories
s.mockMutableState.EXPECT().GetVersionHistories().Return(versionHistories).AnyTimes()
s.mockMutableState.EXPECT().GetReplicationState().Return(&persistence.ReplicationState{}).AnyTimes()
s.mockMutableState.EXPECT().GetWorkflowStateCloseStatus().Return(1, 0).AnyTimes()
s.mockDomainCache.EXPECT().GetDomainByID(domainID).Return(
cache.NewGlobalDomainCacheEntryForTest(
&persistence.DomainInfo{ID: domainID, Name: domainName},
&persistence.DomainConfig{Retention: 1},
&persistence.DomainReplicationConfig{
ActiveClusterName: cluster.TestCurrentClusterName,
Clusters: []*persistence.ClusterReplicationConfig{
{ClusterName: cluster.TestCurrentClusterName},
{ClusterName: cluster.TestAlternativeClusterName},
},
},
version,
nil,
), nil,
).AnyTimes()
activityInfo := &persistence.ActivityInfo{
Version: version - 1,
ScheduleID: scheduleID,
Attempt: attempt + 1,
}
s.mockMutableState.EXPECT().GetActivityInfo(scheduleID).Return(activityInfo, true).AnyTimes()
activityInfos := map[int64]*persistence.ActivityInfo{activityInfo.ScheduleID: activityInfo}
s.mockMutableState.EXPECT().GetPendingActivityInfos().Return(activityInfos).AnyTimes()
s.mockClusterMetadata.On("IsVersionFromSameCluster", version, activityInfo.Version).Return(false)

s.mockMutableState.EXPECT().ReplicateActivityInfo(request, true).Return(nil).Times(1)
s.mockMutableState.EXPECT().UpdateActivity(activityInfo).Return(nil).Times(1)
s.mockMutableState.EXPECT().GetCurrentVersion().Return(int64(1)).Times(1)
s.mockMutableState.EXPECT().AddTimerTasks(gomock.Any()).Times(1)
now := time.Unix(0, request.GetLastHeartbeatTime())
context.EXPECT().updateWorkflowExecutionWithNew(
now,
persistence.UpdateWorkflowModeUpdateCurrent,
nil,
nil,
transactionPolicyPassive,
nil,
).Return(nil).Times(1)
err = s.nDCActivityReplicator.SyncActivity(ctx.Background(), request)
s.NoError(err)
}

func (s *activityReplicatorSuite) TestSyncActivity_ActivityRunning_ZombieWorkflow() {
domainName := "some random domain name"
domainID := testDomainID
workflowID := "some random workflow ID"
runID := uuid.New()
version := int64(100)
scheduleID := int64(144)
scheduledTime := time.Now()
startedID := scheduleID + 1
startedTime := scheduledTime.Add(time.Minute)
heartBeatUpdatedTime := startedTime.Add(time.Minute)
attempt := int32(100)
details := []byte("some random activity heartbeat progress")
nextEventID := scheduleID + 10

key := definition.NewWorkflowIdentifier(domainID, workflowID, runID)
context := NewMockworkflowExecutionContext(s.controller)
context.EXPECT().loadWorkflowExecution().Return(s.mockMutableState, nil).Times(1)
context.EXPECT().lock(gomock.Any()).Return(nil)
context.EXPECT().unlock().Times(1)
_, err := s.historyCache.PutIfNotExist(key, context)
s.NoError(err)

request := &h.SyncActivityRequest{
DomainId: common.StringPtr(domainID),
WorkflowId: common.StringPtr(workflowID),
RunId: common.StringPtr(runID),
Version: common.Int64Ptr(version),
ScheduledId: common.Int64Ptr(scheduleID),
ScheduledTime: common.Int64Ptr(scheduledTime.UnixNano()),
StartedId: common.Int64Ptr(startedID),
StartedTime: common.Int64Ptr(startedTime.UnixNano()),
Attempt: common.Int32Ptr(attempt),
LastHeartbeatTime: common.Int64Ptr(heartBeatUpdatedTime.UnixNano()),
Details: details,
}
s.mockMutableState.EXPECT().IsWorkflowExecutionRunning().Return(true).AnyTimes()
s.mockMutableState.EXPECT().GetNextEventID().Return(nextEventID).AnyTimes()
var versionHistories *persistence.VersionHistories
s.mockMutableState.EXPECT().GetVersionHistories().Return(versionHistories).AnyTimes()
s.mockMutableState.EXPECT().GetReplicationState().Return(&persistence.ReplicationState{}).AnyTimes()
s.mockMutableState.EXPECT().GetWorkflowStateCloseStatus().Return(3, 0).AnyTimes()
s.mockDomainCache.EXPECT().GetDomainByID(domainID).Return(
cache.NewGlobalDomainCacheEntryForTest(
&persistence.DomainInfo{ID: domainID, Name: domainName},
Expand Down Expand Up @@ -1160,7 +1247,14 @@ func (s *activityReplicatorSuite) TestSyncActivity_ActivityRunning() {
s.mockMutableState.EXPECT().GetCurrentVersion().Return(int64(1)).Times(1)
s.mockMutableState.EXPECT().AddTimerTasks(gomock.Any()).Times(1)
now := time.Unix(0, request.GetLastHeartbeatTime())
context.EXPECT().updateWorkflowExecutionAsPassive(now).Return(nil).Times(1)
context.EXPECT().updateWorkflowExecutionWithNew(
now,
persistence.UpdateWorkflowModeBypassCurrent,
nil,
nil,
transactionPolicyPassive,
nil,
).Return(nil).Times(1)
err = s.nDCActivityReplicator.SyncActivity(ctx.Background(), request)
s.NoError(err)
}

0 comments on commit 1fac8f6

Please sign in to comment.