Skip to content

Commit

Permalink
Improve 2DC NDC compatibility (cadence-workflow#2747)
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 authored and wxing1292 committed Nov 7, 2019
1 parent 60e09ca commit e5b6bdb
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 98 deletions.
6 changes: 3 additions & 3 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ type (
taskAllocator taskAllocator
replicator *historyReplicator
nDCReplicator nDCHistoryReplicator
nDCactivityReplicator nDCActivityReplicator
nDCActivityReplicator nDCActivityReplicator
replicatorProcessor ReplicatorQueueProcessor
historyEventNotifier historyEventNotifier
tokenSerializer common.TaskTokenSerializer
Expand Down Expand Up @@ -261,7 +261,7 @@ func NewEngineWithShardContext(
historyEngImpl.eventsReapplier,
logger,
)
historyEngImpl.nDCactivityReplicator = newNDCActivityReplicator(
historyEngImpl.nDCActivityReplicator = newNDCActivityReplicator(
shard,
historyCache,
logger,
Expand Down Expand Up @@ -2113,7 +2113,7 @@ func (e *historyEngineImpl) SyncActivity(
request *h.SyncActivityRequest,
) (retError error) {

return e.nDCactivityReplicator.SyncActivity(ctx, request)
return e.nDCActivityReplicator.SyncActivity(ctx, request)
}

func (e *historyEngineImpl) ResetWorkflowExecution(
Expand Down
19 changes: 16 additions & 3 deletions service/history/historyReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,10 @@ func (r *historyReplicator) terminateWorkflow(
}

// setting the current version to be the last write version
msBuilder.UpdateReplicationStateVersion(currentLastWriteVersion, true)
if err := msBuilder.UpdateCurrentVersion(currentLastWriteVersion, true); err != nil {
return err
}

eventBatchFirstEventID := msBuilder.GetNextEventID()
if _, err := msBuilder.AddWorkflowExecutionTerminatedEvent(
eventBatchFirstEventID,
Expand Down Expand Up @@ -1260,7 +1263,12 @@ func (r *historyReplicator) prepareWorkflowMutation(
}
lastWriteVersionActive := r.clusterMetadata.ClusterNameForFailoverVersion(lastWriteVersion) == r.clusterMetadata.GetCurrentClusterName()
if lastWriteVersionActive {
msBuilder.UpdateReplicationStateVersion(lastWriteVersion, true)
if err := msBuilder.UpdateCurrentVersion(
lastWriteVersion,
true,
); err != nil {
return false, err
}
return true, nil
}

Expand All @@ -1274,7 +1282,12 @@ func (r *historyReplicator) prepareWorkflowMutation(
domainFailoverVersion >= lastWriteVersion

if domainActive {
msBuilder.UpdateReplicationStateVersion(domainFailoverVersion, true)
if err := msBuilder.UpdateCurrentVersion(
lastWriteVersion,
true,
); err != nil {
return false, err
}
return true, nil
}
return false, nil
Expand Down
24 changes: 12 additions & 12 deletions service/history/historyReplicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func (s *historyReplicatorSuite) TestApplyOtherEventsMissingMutableState_Incomin
msBuilderCurrent.EXPECT().GetNextEventID().Return(currentNextEventID).AnyTimes()
msBuilderCurrent.EXPECT().GetLastWriteVersion().Return(currentVersion, nil).AnyTimes()
msBuilderCurrent.EXPECT().IsWorkflowExecutionRunning().Return(true).AnyTimes()
msBuilderCurrent.EXPECT().UpdateReplicationStateVersion(currentVersion, true).Times(1)
msBuilderCurrent.EXPECT().UpdateCurrentVersion(currentVersion, true).Return(nil).Times(1)
msBuilderCurrent.EXPECT().AddWorkflowExecutionSignaled(signalName, signalInput, signalIdentity).Return(&shared.HistoryEvent{
EventType: shared.EventTypeWorkflowExecutionSignaled.Ptr(),
Timestamp: common.Int64Ptr(time.Now().UnixNano()),
Expand Down Expand Up @@ -426,7 +426,7 @@ func (s *historyReplicatorSuite) TestApplyOtherEventsMissingMutableState_Incomin
msBuilderCurrent.EXPECT().GetNextEventID().Return(currentNextEventID).AnyTimes()
msBuilderCurrent.EXPECT().GetLastWriteVersion().Return(currentVersion, nil).AnyTimes()
msBuilderCurrent.EXPECT().IsWorkflowExecutionRunning().Return(true).AnyTimes()
msBuilderCurrent.EXPECT().UpdateReplicationStateVersion(currentVersion, true).Times(1)
msBuilderCurrent.EXPECT().UpdateCurrentVersion(currentVersion, true).Return(nil).Times(1)
msBuilderCurrent.EXPECT().AddWorkflowExecutionSignaled(signalName, signalInput, signalIdentity).Return(&shared.HistoryEvent{
EventType: shared.EventTypeWorkflowExecutionSignaled.Ptr(),
Timestamp: common.Int64Ptr(time.Now().UnixNano()),
Expand Down Expand Up @@ -653,7 +653,7 @@ func (s *historyReplicatorSuite) TestApplyOtherEventsMissingMutableState_Incomin
msBuilderCurrent.EXPECT().GetLastWriteVersion().Return(currentVersion, nil).AnyTimes()
msBuilderCurrent.EXPECT().GetNextEventID().Return(currentNextEventID).AnyTimes()
msBuilderCurrent.EXPECT().IsWorkflowExecutionRunning().Return(true).AnyTimes() // this is used to update the version on mutable state
msBuilderCurrent.EXPECT().UpdateReplicationStateVersion(currentVersion, true).Times(1)
msBuilderCurrent.EXPECT().UpdateCurrentVersion(currentVersion, true).Return(nil).Times(1)

s.mockExecutionMgr.On("GetCurrentExecution", &persistence.GetCurrentExecutionRequest{
DomainID: domainID,
Expand Down Expand Up @@ -1045,7 +1045,7 @@ func (s *historyReplicatorSuite) TestApplyOtherEventsVersionChecking_IncomingLes

msBuilderCurrent.EXPECT().IsWorkflowExecutionRunning().Return(true).AnyTimes()
msBuilderCurrent.EXPECT().GetLastWriteVersion().Return(currentLastWriteVersion, nil).AnyTimes()
msBuilderCurrent.EXPECT().UpdateReplicationStateVersion(currentLastWriteVersion, true).Times(1)
msBuilderCurrent.EXPECT().UpdateCurrentVersion(currentLastWriteVersion, true).Return(nil).Times(1)
msBuilderCurrent.EXPECT().AddWorkflowExecutionSignaled(signalName, signalInput, signalIdentity).Return(&shared.HistoryEvent{
EventType: shared.EventTypeWorkflowExecutionSignaled.Ptr(),
Timestamp: common.Int64Ptr(time.Now().UnixNano()),
Expand Down Expand Up @@ -1131,7 +1131,7 @@ func (s *historyReplicatorSuite) TestApplyOtherEventsVersionChecking_IncomingLes

msBuilderCurrent.EXPECT().IsWorkflowExecutionRunning().Return(true).AnyTimes()
msBuilderCurrent.EXPECT().GetLastWriteVersion().Return(currentLastWriteVersion, nil).AnyTimes()
msBuilderCurrent.EXPECT().UpdateReplicationStateVersion(currentLastWriteVersion, true).Times(1)
msBuilderCurrent.EXPECT().UpdateCurrentVersion(currentLastWriteVersion, true).Return(nil).Times(1)
msBuilderCurrent.EXPECT().AddWorkflowExecutionSignaled(signalName, signalInput, signalIdentity).Return(&shared.HistoryEvent{
EventType: shared.EventTypeWorkflowExecutionSignaled.Ptr(),
Timestamp: common.Int64Ptr(time.Now().UnixNano()),
Expand Down Expand Up @@ -1217,7 +1217,7 @@ func (s *historyReplicatorSuite) TestApplyOtherEventsVersionChecking_IncomingLes
msBuilderIn.EXPECT().GetReplicationState().Return(&persistence.ReplicationState{LastWriteVersion: currentLastWriteVersion}).AnyTimes()
msBuilderIn.EXPECT().GetLastWriteVersion().Return(currentLastWriteVersion, nil).AnyTimes()
msBuilderIn.EXPECT().IsWorkflowExecutionRunning().Return(true).AnyTimes()
msBuilderIn.EXPECT().UpdateReplicationStateVersion(currentLastWriteVersion, true).Times(1)
msBuilderIn.EXPECT().UpdateCurrentVersion(currentLastWriteVersion, true).Return(nil).Times(1)
msBuilderIn.EXPECT().AddWorkflowExecutionSignaled(signalName, signalInput, signalIdentity).Return(&shared.HistoryEvent{
EventType: shared.EventTypeWorkflowExecutionSignaled.Ptr(),
Timestamp: common.Int64Ptr(time.Now().UnixNano()),
Expand Down Expand Up @@ -1268,7 +1268,7 @@ func (s *historyReplicatorSuite) TestApplyOtherEventsVersionChecking_IncomingLes
msBuilderIn.EXPECT().GetReplicationState().Return(&persistence.ReplicationState{LastWriteVersion: currentLastWriteVersion}).AnyTimes()
msBuilderIn.EXPECT().GetLastWriteVersion().Return(currentLastWriteVersion, nil).AnyTimes()
msBuilderIn.EXPECT().IsWorkflowExecutionRunning().Return(true).AnyTimes()
msBuilderIn.EXPECT().UpdateReplicationStateVersion(currentLastWriteVersion, true).Times(1)
msBuilderIn.EXPECT().UpdateCurrentVersion(currentLastWriteVersion, true).Return(nil).Times(1)
msBuilderIn.EXPECT().AddWorkflowExecutionSignaled(signalName, signalInput, signalIdentity).Return(&shared.HistoryEvent{
EventType: shared.EventTypeWorkflowExecutionSignaled.Ptr(),
Timestamp: common.Int64Ptr(time.Now().UnixNano()),
Expand Down Expand Up @@ -1724,7 +1724,7 @@ func (s *historyReplicatorSuite) TestApplyOtherEventsVersionChecking_IncomingGre
}).Times(1)
msBuilderIn.EXPECT().HasBufferedEvents().Return(true).Times(1)
msBuilderIn.EXPECT().GetInFlightDecision().Return(pendingDecisionInfo, true).Times(1)
msBuilderIn.EXPECT().UpdateReplicationStateVersion(currentLastWriteVersion, true).Times(1)
msBuilderIn.EXPECT().UpdateCurrentVersion(currentLastWriteVersion, true).Return(nil).Times(1)
msBuilderIn.EXPECT().AddDecisionTaskFailedEvent(pendingDecisionInfo.ScheduleID, pendingDecisionInfo.StartedID,
workflow.DecisionTaskFailedCauseFailoverCloseDecision, ([]byte)(nil), identityHistoryService, "", "", "", int64(0),
).Return(&shared.HistoryEvent{}, nil).Times(1)
Expand Down Expand Up @@ -2959,7 +2959,7 @@ func (s *historyReplicatorSuite) TestReplicateWorkflowStarted_CurrentRunning_Inc
Identity: common.StringPtr(signalIdentity),
},
}, nil).Times(1)
msBuilderCurrent.EXPECT().UpdateReplicationStateVersion(currentVersion, true).Times(1)
msBuilderCurrent.EXPECT().UpdateCurrentVersion(currentVersion, true).Return(nil).Times(1)
msBuilderCurrent.EXPECT().HasPendingDecision().Return(true).Times(1)
contextCurrent.EXPECT().updateWorkflowExecutionAsActive(gomock.Any()).Return(nil).Times(1)

Expand Down Expand Up @@ -3137,7 +3137,7 @@ func (s *historyReplicatorSuite) TestReplicateWorkflowStarted_CurrentRunning_Inc
Identity: common.StringPtr(signalIdentity),
},
}, nil).Times(1)
msBuilderCurrent.EXPECT().UpdateReplicationStateVersion(currentVersion, true).Times(1)
msBuilderCurrent.EXPECT().UpdateCurrentVersion(currentVersion, true).Return(nil).Times(1)
msBuilderCurrent.EXPECT().HasPendingDecision().Return(false).Times(1)

newDecision := &decisionInfo{
Expand Down Expand Up @@ -3639,7 +3639,7 @@ func (s *historyReplicatorSuite) TestReplicateWorkflowStarted_CurrentRunning_Inc
msBuilderCurrent.EXPECT().GetNextEventID().Return(currentNextEventID).AnyTimes()
msBuilderCurrent.EXPECT().GetLastWriteVersion().Return(currentVersion, nil).AnyTimes()
msBuilderCurrent.EXPECT().IsWorkflowExecutionRunning().Return(true).AnyTimes() // this is used to update the version on mutable state
msBuilderCurrent.EXPECT().UpdateReplicationStateVersion(currentVersion, true).Times(1)
msBuilderCurrent.EXPECT().UpdateCurrentVersion(currentVersion, true).Return(nil).Times(1)

currentClusterName := cluster.TestCurrentClusterName
s.mockClusterMetadata.On("ClusterNameForFailoverVersion", currentVersion).Return(currentClusterName)
Expand Down Expand Up @@ -3770,7 +3770,7 @@ func (s *historyReplicatorSuite) TestConflictResolutionTerminateCurrentRunningIf
msBuilderCurrent.EXPECT().GetNextEventID().Return(currentNextEventID).AnyTimes()
msBuilderCurrent.EXPECT().GetLastWriteVersion().Return(currentVersion, nil).AnyTimes()
msBuilderCurrent.EXPECT().IsWorkflowExecutionRunning().Return(true).AnyTimes() // this is used to update the version on mutable state
msBuilderCurrent.EXPECT().UpdateReplicationStateVersion(currentVersion, true).Times(1)
msBuilderCurrent.EXPECT().UpdateCurrentVersion(currentVersion, true).Return(nil).Times(1)

s.mockExecutionMgr.On("GetCurrentExecution", &persistence.GetCurrentExecutionRequest{
DomainID: domainID,
Expand Down
1 change: 1 addition & 0 deletions service/history/nDCBranchMgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func (s *nDCBranchMgrSuite) TestFlushBufferedEvents() {
)
s.NoError(err)

s.mockMutableState.EXPECT().GetLastWriteVersion().Return(lastWriteVersion, nil).AnyTimes()
s.mockMutableState.EXPECT().GetVersionHistories().Return(versionHistories).AnyTimes()
s.mockMutableState.EXPECT().HasBufferedEvents().Return(true).AnyTimes()
s.mockMutableState.EXPECT().IsWorkflowExecutionRunning().Return(true).AnyTimes()
Expand Down
10 changes: 2 additions & 8 deletions service/history/nDCWorkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,13 @@ func (r *nDCWorkflowImpl) getReleaseFn() releaseWorkflowExecutionFunc {

func (r *nDCWorkflowImpl) getVectorClock() (int64, int64, error) {

currentVersionHistory, err := r.mutableState.GetVersionHistories().GetCurrentVersionHistory()
if err != nil {
return 0, 0, err
}

lastItem, err := currentVersionHistory.GetLastItem()
lastWriteVersion, err := r.mutableState.GetLastWriteVersion()
if err != nil {
return 0, 0, err
}

lastEventTaskID := r.mutableState.GetExecutionInfo().LastEventTaskID

return lastItem.GetVersion(), lastEventTaskID, nil
return lastWriteVersion, lastEventTaskID, nil
}

func (r *nDCWorkflowImpl) happensAfter(
Expand Down
69 changes: 7 additions & 62 deletions service/history/nDCWorkflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,9 @@ func (s *nDCWorkflowSuite) TearDownTest() {
}

func (s *nDCWorkflowSuite) TestGetMethods() {
branchToken := []byte("some random branch token")
lastEventID := int64(2)
lastEventTaskID := int64(144)
lastEventVersion := int64(12)
versionHistoryItem := persistence.NewVersionHistoryItem(lastEventID, lastEventVersion)
versionHistory := persistence.NewVersionHistory(
branchToken,
[]*persistence.VersionHistoryItem{versionHistoryItem},
)
versionHistories := persistence.NewVersionHistories(versionHistory)
s.mockMutableState.EXPECT().GetVersionHistories().Return(versionHistories).AnyTimes()
s.mockMutableState.EXPECT().GetLastWriteVersion().Return(lastEventVersion, nil).AnyTimes()
s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{
DomainID: s.domainID,
WorkflowID: s.workflowID,
Expand Down Expand Up @@ -226,17 +218,9 @@ func (s *nDCWorkflowSuite) TestSuppressWorkflowBy_Error() {
)

// cannot suppress by older workflow
branchToken := []byte("some random branch token")
lastEventID := int64(2)
lastEventTaskID := int64(144)
lastEventVersion := int64(12)
versionHistories := persistence.NewVersionHistories(persistence.NewVersionHistory(
branchToken,
[]*persistence.VersionHistoryItem{
persistence.NewVersionHistoryItem(lastEventID, lastEventVersion),
},
))
s.mockMutableState.EXPECT().GetVersionHistories().Return(versionHistories).AnyTimes()
s.mockMutableState.EXPECT().GetLastWriteVersion().Return(lastEventVersion, nil).AnyTimes()
s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{
DomainID: s.domainID,
WorkflowID: s.workflowID,
Expand All @@ -245,17 +229,9 @@ func (s *nDCWorkflowSuite) TestSuppressWorkflowBy_Error() {
}).AnyTimes()

incomingRunID := uuid.New()
incomingBranchToken := []byte("other random branch token")
incomingLastEventID := int64(2)
incomingLastEventTaskID := int64(144)
incomingLastEventVersion := lastEventVersion - 1
incomingVersionHistories := persistence.NewVersionHistories(persistence.NewVersionHistory(
incomingBranchToken,
[]*persistence.VersionHistoryItem{
persistence.NewVersionHistoryItem(incomingLastEventID, incomingLastEventVersion),
},
))
incomingMockMutableState.EXPECT().GetVersionHistories().Return(incomingVersionHistories).AnyTimes()
incomingMockMutableState.EXPECT().GetLastWriteVersion().Return(incomingLastEventVersion, nil).AnyTimes()
incomingMockMutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{
DomainID: s.domainID,
WorkflowID: s.workflowID,
Expand All @@ -268,18 +244,11 @@ func (s *nDCWorkflowSuite) TestSuppressWorkflowBy_Error() {
}

func (s *nDCWorkflowSuite) TestSuppressWorkflowBy_Terminate() {
branchToken := []byte("some random branch token")
lastEventID := int64(2)
lastEventTaskID := int64(144)
lastEventVersion := int64(12)
versionHistories := persistence.NewVersionHistories(persistence.NewVersionHistory(
branchToken,
[]*persistence.VersionHistoryItem{
persistence.NewVersionHistoryItem(lastEventID, lastEventVersion),
},
))
s.mockMutableState.EXPECT().GetNextEventID().Return(lastEventID + 1).AnyTimes()
s.mockMutableState.EXPECT().GetVersionHistories().Return(versionHistories).AnyTimes()
s.mockMutableState.EXPECT().GetLastWriteVersion().Return(lastEventVersion, nil).AnyTimes()
s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{
DomainID: s.domainID,
WorkflowID: s.workflowID,
Expand All @@ -296,16 +265,8 @@ func (s *nDCWorkflowSuite) TestSuppressWorkflowBy_Terminate() {
)

incomingRunID := uuid.New()
incomingBranchToken := []byte("other random branch token")
incomingLastEventID := int64(2)
incomingLastEventTaskID := int64(144)
incomingLastEventVersion := lastEventVersion + 1
incomingVersionHistories := persistence.NewVersionHistories(persistence.NewVersionHistory(
incomingBranchToken,
[]*persistence.VersionHistoryItem{
persistence.NewVersionHistoryItem(incomingLastEventID, incomingLastEventVersion),
},
))
incomingMockContext := NewMockworkflowExecutionContext(s.controller)
incomingMockMutableState := NewMockmutableState(s.controller)
incomingNDCWorkflow := newNDCWorkflow(
Expand All @@ -316,7 +277,7 @@ func (s *nDCWorkflowSuite) TestSuppressWorkflowBy_Terminate() {
incomingMockMutableState,
noopReleaseFn,
)
incomingMockMutableState.EXPECT().GetVersionHistories().Return(incomingVersionHistories).AnyTimes()
incomingMockMutableState.EXPECT().GetLastWriteVersion().Return(incomingLastEventVersion, nil).AnyTimes()
incomingMockMutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{
DomainID: s.domainID,
WorkflowID: s.workflowID,
Expand Down Expand Up @@ -364,17 +325,9 @@ func (s *nDCWorkflowSuite) TestSuppressWorkflowBy_Terminate() {
}

func (s *nDCWorkflowSuite) TestSuppressWorkflowBy_Zombiefy() {
branchToken := []byte("some random branch token")
lastEventID := int64(2)
lastEventTaskID := int64(144)
lastEventVersion := int64(12)
versionHistories := persistence.NewVersionHistories(persistence.NewVersionHistory(
branchToken,
[]*persistence.VersionHistoryItem{
persistence.NewVersionHistoryItem(lastEventID, lastEventVersion),
},
))
s.mockMutableState.EXPECT().GetVersionHistories().Return(versionHistories).AnyTimes()
s.mockMutableState.EXPECT().GetLastWriteVersion().Return(lastEventVersion, nil).AnyTimes()
executionInfo := &persistence.WorkflowExecutionInfo{
DomainID: s.domainID,
WorkflowID: s.workflowID,
Expand All @@ -394,16 +347,8 @@ func (s *nDCWorkflowSuite) TestSuppressWorkflowBy_Zombiefy() {
)

incomingRunID := uuid.New()
incomingBranchToken := []byte("other random branch token")
incomingLastEventID := int64(2)
incomingLastEventTaskID := int64(144)
incomingLastEventVersion := lastEventVersion + 1
incomingVersionHistories := persistence.NewVersionHistories(persistence.NewVersionHistory(
incomingBranchToken,
[]*persistence.VersionHistoryItem{
persistence.NewVersionHistoryItem(incomingLastEventID, incomingLastEventVersion),
},
))
incomingMockContext := NewMockworkflowExecutionContext(s.controller)
incomingMockMutableState := NewMockmutableState(s.controller)
incomingNDCWorkflow := newNDCWorkflow(
Expand All @@ -414,7 +359,7 @@ func (s *nDCWorkflowSuite) TestSuppressWorkflowBy_Zombiefy() {
incomingMockMutableState,
noopReleaseFn,
)
incomingMockMutableState.EXPECT().GetVersionHistories().Return(incomingVersionHistories).AnyTimes()
incomingMockMutableState.EXPECT().GetLastWriteVersion().Return(incomingLastEventVersion, nil).AnyTimes()
incomingMockMutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{
DomainID: s.domainID,
WorkflowID: s.workflowID,
Expand Down
22 changes: 12 additions & 10 deletions service/history/workflowResetor.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,15 +274,11 @@ func (w *workflowResetorImpl) buildNewMutableStateForReset(
}

// set the new mutable state with the version in domain
if newMutableState.GetReplicationState() != nil {
newMutableState.UpdateReplicationStateVersion(domainEntry.GetFailoverVersion(), false)
} else if newMutableState.GetVersionHistories() != nil {
if retError = newMutableState.UpdateCurrentVersion(
domainEntry.GetFailoverVersion(),
false,
); retError != nil {
return
}
if retError = newMutableState.UpdateCurrentVersion(
domainEntry.GetFailoverVersion(),
false,
); retError != nil {
return
}

// failed the in-flight decision(started).
Expand Down Expand Up @@ -940,7 +936,13 @@ func (w *workflowResetorImpl) replicateResetEvent(
w.eng.logger,
domainEntry,
)
newMsBuilder.UpdateReplicationStateVersion(firstEvent.GetVersion(), true)
if err := newMsBuilder.UpdateCurrentVersion(
firstEvent.GetVersion(),
true,
); err != nil {
return nil, 0, nil, nil, err
}

sBuilder = newStateBuilder(
w.eng.shard,
w.eng.logger,
Expand Down

0 comments on commit e5b6bdb

Please sign in to comment.