From adf16bec9ff3667bde257663a78679718ece184e Mon Sep 17 00:00:00 2001 From: wxing1292 Date: Fri, 23 Aug 2019 19:06:04 -0700 Subject: [PATCH] Add nDC support for workflow execution reset (#2428) * New nDCWorkflowResetter handling workflow execution reset * Add handling of workflow reset in nDCHistoryReplicator * Add ContainsItem API to version history * Rename nDCStateRebuilder to nDCConflictResolver * Move common mutable state rebuild logic into new nDCStateRebuilder * Add '+build test' for all NDC related test files * Add //go:generate mockgen for automated mock file generation --- common/persistence/versionHistory.go | 47 ++- common/persistence/versionHistory_test.go | 72 +++- service/history/MockMutableState.go | 4 +- service/history/mutableState.go | 2 +- service/history/mutableStateBuilder.go | 4 +- service/history/nDCBranchMgr.go | 5 +- service/history/nDCBranchMgr_mock.go | 13 +- service/history/nDCBranchMgr_test.go | 2 + service/history/nDCConflictResolver.go | 186 +++++++++++ service/history/nDCConflictResolver_mock.go | 74 +++++ service/history/nDCConflictResolver_test.go | 309 ++++++++++++++++++ service/history/nDCHistoryReplicator.go | 182 ++++++++--- service/history/nDCStateRebuilder.go | 181 +++------- service/history/nDCStateRebuilder_mock.go | 28 +- service/history/nDCStateRebuilder_test.go | 208 +++--------- service/history/nDCTransactionMgr.go | 2 + .../nDCTransactionMgrForExistingWorkflow.go | 2 + ...CTransactionMgrForExistingWorkflow_mock.go | 13 +- ...CTransactionMgrForExistingWorkflow_test.go | 2 + .../nDCTransactionMgrForNewWorkflow.go | 3 +- .../nDCTransactionMgrForNewWorkflow_mock.go | 13 +- .../nDCTransactionMgrForNewWorkflow_test.go | 2 + service/history/nDCTransactionMgr_mock.go | 13 +- service/history/nDCTransactionMgr_test.go | 2 + service/history/nDCWorkflow.go | 3 +- service/history/nDCWorkflowResetter.go | 186 +++++++++++ service/history/nDCWorkflowResetter_mock.go | 74 +++++ service/history/nDCWorkflow_mock.go | 13 +- service/history/nDCWorkflow_test.go | 2 + service/history/workflowResetor.go | 4 +- 30 files changed, 1250 insertions(+), 401 deletions(-) create mode 100644 service/history/nDCConflictResolver.go create mode 100644 service/history/nDCConflictResolver_mock.go create mode 100644 service/history/nDCConflictResolver_test.go create mode 100644 service/history/nDCWorkflowResetter.go create mode 100644 service/history/nDCWorkflowResetter_mock.go diff --git a/common/persistence/versionHistory.go b/common/persistence/versionHistory.go index 383d8fa2be1..e63e87ddd73 100644 --- a/common/persistence/versionHistory.go +++ b/common/persistence/versionHistory.go @@ -235,17 +235,36 @@ func (v *VersionHistory) AddOrUpdateItem( return nil } +// ContainsItem check whether given version history item is included +func (v *VersionHistory) ContainsItem( + item *VersionHistoryItem, +) bool { + + prevEventID := common.FirstEventID - 1 + for _, currentItem := range v.items { + if item.GetVersion() == currentItem.GetVersion() { + if prevEventID < item.GetEventID() && item.GetEventID() <= currentItem.GetEventID() { + return true + } + } else if item.GetVersion() < currentItem.GetVersion() { + return false + } + prevEventID = currentItem.GetEventID() + } + return false +} + // FindLCAItem returns the lowest common ancestor version history item func (v *VersionHistory) FindLCAItem( remote *VersionHistory, ) (*VersionHistoryItem, error) { - localIdx := len(v.items) - 1 - remoteIdx := len(remote.items) - 1 + localIndex := len(v.items) - 1 + remoteIndex := len(remote.items) - 1 - for localIdx >= 0 && remoteIdx >= 0 { - localVersionItem := v.items[localIdx] - remoteVersionItem := remote.items[remoteIdx] + for localIndex >= 0 && remoteIndex >= 0 { + localVersionItem := v.items[localIndex] + remoteVersionItem := remote.items[remoteIndex] if localVersionItem.version == remoteVersionItem.version { if localVersionItem.eventID > remoteVersionItem.eventID { @@ -253,10 +272,10 @@ func (v *VersionHistory) FindLCAItem( } return localVersionItem.Duplicate(), nil } else if localVersionItem.version > remoteVersionItem.version { - localIdx-- + localIndex-- } else { // localVersionItem.version < remoteVersionItem.version - remoteIdx-- + remoteIndex-- } } @@ -489,6 +508,20 @@ func (h *VersionHistories) FindLCAVersionHistoryIndexAndItem( return versionHistoryIndex, versionHistoryItem, nil } +// FindFirstVersionHistoryIndexByItem find the first version history index which +// contains the given version history item +func (h *VersionHistories) FindFirstVersionHistoryIndexByItem( + item *VersionHistoryItem, +) (int, error) { + + for index, localHistory := range h.histories { + if localHistory.ContainsItem(item) { + return index, nil + } + } + return 0, &shared.BadRequestError{Message: "version histories does not contains given item."} +} + // IsRebuilt returns true if the current branch index's last write version is not the largest // among all branches' last write version func (h *VersionHistories) IsRebuilt() (bool, error) { diff --git a/common/persistence/versionHistory_test.go b/common/persistence/versionHistory_test.go index fc9fda9c9cd..59fd1c68fab 100644 --- a/common/persistence/versionHistory_test.go +++ b/common/persistence/versionHistory_test.go @@ -24,6 +24,7 @@ import ( "testing" "github.com/uber/cadence/.gen/go/shared" + "github.com/uber/cadence/common" "github.com/stretchr/testify/suite" ) @@ -152,7 +153,7 @@ func (s *versionHistorySuite) TestSetBranchToken() { s.NoError(err) } -func (s *versionHistorySuite) TestUpdateItem_VersionIncrease() { +func (s *versionHistorySuite) TestAddOrUpdateItem_VersionIncrease() { branchToken := []byte("some random branch token") items := []*VersionHistoryItem{ {eventID: 3, version: 0}, @@ -178,7 +179,7 @@ func (s *versionHistorySuite) TestUpdateItem_VersionIncrease() { } -func (s *versionHistorySuite) TestUpdateItem_EventIDIncrease() { +func (s *versionHistorySuite) TestAddOrUpdateItem_EventIDIncrease() { branchToken := []byte("some random branch token") items := []*VersionHistoryItem{ {eventID: 3, version: 0}, @@ -202,7 +203,7 @@ func (s *versionHistorySuite) TestUpdateItem_EventIDIncrease() { ), history) } -func (s *versionHistorySuite) TestUpdateItem_Failed_LowerVersion() { +func (s *versionHistorySuite) TestAddOrUpdateItem_Failed_LowerVersion() { branchToken := []byte("some random branch token") items := []*VersionHistoryItem{ {eventID: 3, version: 0}, @@ -214,7 +215,7 @@ func (s *versionHistorySuite) TestUpdateItem_Failed_LowerVersion() { s.Error(err) } -func (s *versionHistorySuite) TestUpdateItem_Failed_SameVersion_EventIDNotIncreasing() { +func (s *versionHistorySuite) TestAddOrUpdateItem_Failed_SameVersion_EventIDNotIncreasing() { branchToken := []byte("some random branch token") items := []*VersionHistoryItem{ {eventID: 3, version: 0}, @@ -229,7 +230,7 @@ func (s *versionHistorySuite) TestUpdateItem_Failed_SameVersion_EventIDNotIncrea s.Error(err) } -func (s *versionHistorySuite) TestUpdateItem_Failed_VersionNoIncreasing() { +func (s *versionHistorySuite) TestAddOrUpdateItem_Failed_VersionNoIncreasing() { branchToken := []byte("some random branch token") items := []*VersionHistoryItem{ {eventID: 3, version: 0}, @@ -247,6 +248,38 @@ func (s *versionHistorySuite) TestUpdateItem_Failed_VersionNoIncreasing() { s.Error(err) } +func (s *versionHistoriesSuite) TestContainsItem_True() { + branchToken := []byte("some random branch token") + items := []*VersionHistoryItem{ + {eventID: 3, version: 0}, + {eventID: 6, version: 4}, + } + history := NewVersionHistory(branchToken, items) + + prevEventID := common.FirstEventID - 1 + for _, item := range items { + for eventID := prevEventID + 1; eventID <= item.GetEventID(); eventID++ { + s.True(history.ContainsItem(NewVersionHistoryItem(eventID, item.GetVersion()))) + } + prevEventID = item.GetEventID() + } +} + +func (s *versionHistoriesSuite) TestContainsItem_False() { + branchToken := []byte("some random branch token") + items := []*VersionHistoryItem{ + {eventID: 3, version: 0}, + {eventID: 6, version: 4}, + } + history := NewVersionHistory(branchToken, items) + + s.False(history.ContainsItem(NewVersionHistoryItem(4, 0))) + s.False(history.ContainsItem(NewVersionHistoryItem(3, 1))) + + s.False(history.ContainsItem(NewVersionHistoryItem(7, 4))) + s.False(history.ContainsItem(NewVersionHistoryItem(6, 5))) +} + func (s *versionHistorySuite) TestIsLCAAppendable_True() { branchToken := []byte("some random branch token") items := []*VersionHistoryItem{ @@ -551,6 +584,35 @@ func (s *versionHistoriesSuite) TestFindLCAVersionHistoryIndexAndItem_SameEventI s.Equal(NewVersionHistoryItem(7, 6), item) } +func (s *versionHistoriesSuite) TestFindFirstVersionHistoryIndexByItem() { + versionHistory1 := NewVersionHistory([]byte("branch token 1"), []*VersionHistoryItem{ + {eventID: 3, version: 0}, + {eventID: 5, version: 4}, + {eventID: 7, version: 6}, + }) + versionHistory2 := NewVersionHistory([]byte("branch token 2"), []*VersionHistoryItem{ + {eventID: 3, version: 0}, + {eventID: 5, version: 4}, + {eventID: 7, version: 6}, + {eventID: 9, version: 10}, + }) + + histories := NewVersionHistories(versionHistory1) + _, _, err := histories.AddVersionHistory(versionHistory2) + s.Nil(err) + + index, err := histories.FindFirstVersionHistoryIndexByItem(NewVersionHistoryItem(8, 10)) + s.NoError(err) + s.Equal(1, index) + + index, err = histories.FindFirstVersionHistoryIndexByItem(NewVersionHistoryItem(4, 4)) + s.NoError(err) + s.Equal(0, index) + + index, err = histories.FindFirstVersionHistoryIndexByItem(NewVersionHistoryItem(41, 4)) + s.Error(err) +} + func (s *versionHistoriesSuite) TestCurrentVersionHistoryIndexIsInReplay() { versionHistory1 := NewVersionHistory([]byte("branch token 1"), []*VersionHistoryItem{ {eventID: 3, version: 0}, diff --git a/service/history/MockMutableState.go b/service/history/MockMutableState.go index 09d1923c028..1362584307e 100644 --- a/service/history/MockMutableState.go +++ b/service/history/MockMutableState.go @@ -2731,8 +2731,8 @@ func (_m *mockMutableState) ReplicateWorkflowExecutionTimedoutEvent(_a0 int64, _ return r0 } -// SetHistoryBuilder provides a mock function with given fields: _a0 -func (_m *mockMutableState) SetBranchToken(_a0 []byte) error { +// SetCurrentBranchToken provides a mock function with given fields: _a0 +func (_m *mockMutableState) SetCurrentBranchToken(_a0 []byte) error { ret := _m.Called(_a0) var r0 error diff --git a/service/history/mutableState.go b/service/history/mutableState.go index 741028fa1f6..e002d310c20 100644 --- a/service/history/mutableState.go +++ b/service/history/mutableState.go @@ -199,7 +199,7 @@ type ( ReplicateWorkflowExecutionStartedEvent(*cache.DomainCacheEntry, *string, workflow.WorkflowExecution, string, *workflow.HistoryEvent) error ReplicateWorkflowExecutionTerminatedEvent(int64, *workflow.HistoryEvent) error ReplicateWorkflowExecutionTimedoutEvent(int64, *workflow.HistoryEvent) error - SetBranchToken(branchToken []byte) error + SetCurrentBranchToken(branchToken []byte) error SetHistoryBuilder(hBuilder *historyBuilder) SetHistoryTree(treeID string) error SetVersionHistories(*persistence.VersionHistories) error diff --git a/service/history/mutableStateBuilder.go b/service/history/mutableStateBuilder.go index d5b385832b9..e952dfe6f8b 100644 --- a/service/history/mutableStateBuilder.go +++ b/service/history/mutableStateBuilder.go @@ -301,10 +301,10 @@ func (e *mutableStateBuilder) SetHistoryTree( if err != nil { return err } - return e.SetBranchToken(initialBranchToken) + return e.SetCurrentBranchToken(initialBranchToken) } -func (e *mutableStateBuilder) SetBranchToken( +func (e *mutableStateBuilder) SetCurrentBranchToken( branchToken []byte, ) error { diff --git a/service/history/nDCBranchMgr.go b/service/history/nDCBranchMgr.go index f223687df09..d70ef9c260f 100644 --- a/service/history/nDCBranchMgr.go +++ b/service/history/nDCBranchMgr.go @@ -18,6 +18,8 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination nDCBranchMgr_mock.go + package history import ( @@ -56,7 +58,6 @@ var _ nDCBranchMgr = (*nDCBranchMgrImpl)(nil) func newNDCBranchMgr( shard ShardContext, - context workflowExecutionContext, mutableState mutableState, logger log.Logger, @@ -139,7 +140,7 @@ func (r *nDCBranchMgrImpl) createNewBranch( defer func() { if errComplete := r.historyV2Mgr.CompleteForkBranch(&persistence.CompleteForkBranchRequest{ BranchToken: newBranchToken, - Success: retError == nil || persistence.IsTimeoutError(retError), + Success: true, // past lessons learnt from Cassandra & gocql tells that we cannot possibly find all timeout errors ShardID: common.IntPtr(shardID), }); errComplete != nil { r.logger.WithTags( diff --git a/service/history/nDCBranchMgr_mock.go b/service/history/nDCBranchMgr_mock.go index 0b29d7b3385..89589904876 100644 --- a/service/history/nDCBranchMgr_mock.go +++ b/service/history/nDCBranchMgr_mock.go @@ -1,3 +1,5 @@ +// The MIT License (MIT) +// // Copyright (c) 2019 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy @@ -7,19 +9,20 @@ // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. +// // Code generated by MockGen. DO NOT EDIT. -// Source: ./service/history/nDCBranchMgr.go +// Source: nDCBranchMgr.go // Package history is a generated GoMock package. package history diff --git a/service/history/nDCBranchMgr_test.go b/service/history/nDCBranchMgr_test.go index 62ae132b498..ddb79b7bf39 100644 --- a/service/history/nDCBranchMgr_test.go +++ b/service/history/nDCBranchMgr_test.go @@ -18,6 +18,8 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +// +build test + package history import ( diff --git a/service/history/nDCConflictResolver.go b/service/history/nDCConflictResolver.go new file mode 100644 index 00000000000..b841f3a96ab --- /dev/null +++ b/service/history/nDCConflictResolver.go @@ -0,0 +1,186 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination nDCConflictResolver_mock.go + +package history + +import ( + ctx "context" + + "github.com/pborman/uuid" + + "github.com/uber/cadence/.gen/go/shared" + "github.com/uber/cadence/common/definition" + "github.com/uber/cadence/common/log" +) + +type ( + nDCConflictResolver interface { + prepareMutableState( + ctx ctx.Context, + branchIndex int, + incomingVersion int64, + ) (mutableState, bool, error) + } + + nDCConflictResolverImpl struct { + shard ShardContext + stateRebuilder nDCStateRebuilder + + context workflowExecutionContext + mutableState mutableState + logger log.Logger + } +) + +var _ nDCConflictResolver = (*nDCConflictResolverImpl)(nil) + +func newNDCConflictResolver( + shard ShardContext, + context workflowExecutionContext, + mutableState mutableState, + logger log.Logger, +) *nDCConflictResolverImpl { + + return &nDCConflictResolverImpl{ + shard: shard, + stateRebuilder: newNDCStateRebuilder(shard, logger), + + context: context, + mutableState: mutableState, + logger: logger, + } +} + +func (r *nDCConflictResolverImpl) prepareMutableState( + ctx ctx.Context, + branchIndex int, + incomingVersion int64, +) (mutableState, bool, error) { + + versionHistories := r.mutableState.GetVersionHistories() + currentVersionHistoryIndex := versionHistories.GetCurrentVersionHistoryIndex() + + // replication task to be applied to current branch + if branchIndex == currentVersionHistoryIndex { + return r.mutableState, false, nil + } + + currentVersionHistory, err := versionHistories.GetVersionHistory(currentVersionHistoryIndex) + if err != nil { + return nil, false, err + } + currentLastItem, err := currentVersionHistory.GetLastItem() + if err != nil { + return nil, false, err + } + + // mutable state does not need rebuild + if incomingVersion < currentLastItem.GetVersion() { + return r.mutableState, false, nil + } + + if incomingVersion == currentLastItem.GetVersion() { + return nil, false, &shared.BadRequestError{ + Message: "nDCConflictResolver encounter replication task version == current branch last write version", + } + } + + // task.getVersion() > currentLastItem + // incoming replication task, after application, will become the current branch + // (because higher version wins), we need to rebuild the mutable state for that + rebuiltMutableState, err := r.rebuild(ctx, branchIndex, uuid.New()) + if err != nil { + return nil, false, err + } + return rebuiltMutableState, true, nil +} + +func (r *nDCConflictResolverImpl) rebuild( + ctx ctx.Context, + branchIndex int, + requestID string, +) (mutableState, error) { + + versionHistories := r.mutableState.GetVersionHistories() + replayVersionHistory, err := versionHistories.GetVersionHistory(branchIndex) + if err != nil { + return nil, err + } + lastItem, err := replayVersionHistory.GetLastItem() + if err != nil { + return nil, err + } + + executionInfo := r.mutableState.GetExecutionInfo() + workflowIdentifier := definition.NewWorkflowIdentifier( + executionInfo.DomainID, + executionInfo.WorkflowID, + executionInfo.RunID, + ) + + rebuildMutableState, rebuiltHistorySize, err := r.stateRebuilder.rebuild( + ctx, + workflowIdentifier, + replayVersionHistory.GetBranchToken(), + lastItem.GetEventID()+1, + workflowIdentifier, + requestID, + ) + if err != nil { + return nil, err + } + + // after rebuilt verification + rebuildVersionHistories := rebuildMutableState.GetVersionHistories() + rebuildVersionHistory, err := rebuildVersionHistories.GetCurrentVersionHistory() + if err != nil { + return nil, err + } + err = rebuildVersionHistory.SetBranchToken(replayVersionHistory.GetBranchToken()) + if err != nil { + return nil, err + } + + if !rebuildVersionHistory.Equals(replayVersionHistory) { + return nil, &shared.InternalServiceError{ + Message: "nDCConflictResolver encounter mismatch version history after rebuild", + } + } + + // set the current branch index to target branch index + // set the version history back + // + // caller can use the IsRebuilt function in VersionHistories + // telling whether mutable state is rebuilt, before apply new history events + if err := versionHistories.SetCurrentVersionHistoryIndex(branchIndex); err != nil { + return nil, err + } + if err = rebuildMutableState.SetVersionHistories(versionHistories); err != nil { + return nil, err + } + // set the update condition from original mutable state + rebuildMutableState.SetUpdateCondition(r.mutableState.GetUpdateCondition()) + + r.context.clear() + r.context.setHistorySize(rebuiltHistorySize) + return rebuildMutableState, nil +} diff --git a/service/history/nDCConflictResolver_mock.go b/service/history/nDCConflictResolver_mock.go new file mode 100644 index 00000000000..b96a8e3c0a2 --- /dev/null +++ b/service/history/nDCConflictResolver_mock.go @@ -0,0 +1,74 @@ +// The MIT License (MIT) +// +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. +// + +// Code generated by MockGen. DO NOT EDIT. +// Source: nDCConflictResolver.go + +// Package history is a generated GoMock package. +package history + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MocknDCConflictResolver is a mock of nDCConflictResolver interface +type MocknDCConflictResolver struct { + ctrl *gomock.Controller + recorder *MocknDCConflictResolverMockRecorder +} + +// MocknDCConflictResolverMockRecorder is the mock recorder for MocknDCConflictResolver +type MocknDCConflictResolverMockRecorder struct { + mock *MocknDCConflictResolver +} + +// NewMocknDCConflictResolver creates a new mock instance +func NewMocknDCConflictResolver(ctrl *gomock.Controller) *MocknDCConflictResolver { + mock := &MocknDCConflictResolver{ctrl: ctrl} + mock.recorder = &MocknDCConflictResolverMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MocknDCConflictResolver) EXPECT() *MocknDCConflictResolverMockRecorder { + return m.recorder +} + +// prepareMutableState mocks base method +func (m *MocknDCConflictResolver) prepareMutableState(ctx context.Context, branchIndex int, incomingVersion int64) (mutableState, bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "prepareMutableState", ctx, branchIndex, incomingVersion) + ret0, _ := ret[0].(mutableState) + ret1, _ := ret[1].(bool) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// prepareMutableState indicates an expected call of prepareMutableState +func (mr *MocknDCConflictResolverMockRecorder) prepareMutableState(ctx, branchIndex, incomingVersion interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "prepareMutableState", reflect.TypeOf((*MocknDCConflictResolver)(nil).prepareMutableState), ctx, branchIndex, incomingVersion) +} diff --git a/service/history/nDCConflictResolver_test.go b/service/history/nDCConflictResolver_test.go new file mode 100644 index 00000000000..4e87e8bc5cf --- /dev/null +++ b/service/history/nDCConflictResolver_test.go @@ -0,0 +1,309 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// +build test + +package history + +import ( + ctx "context" + "testing" + + "github.com/golang/mock/gomock" + "github.com/pborman/uuid" + "github.com/stretchr/testify/suite" + "github.com/uber-go/tally" + + "github.com/uber/cadence/client" + "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/clock" + "github.com/uber/cadence/common/cluster" + "github.com/uber/cadence/common/definition" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/loggerimpl" + "github.com/uber/cadence/common/messaging" + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/mocks" + "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/service" + "github.com/uber/cadence/common/service/dynamicconfig" +) + +type ( + nDCConflictResolverSuite struct { + suite.Suite + + logger log.Logger + mockExecutionMgr *mocks.ExecutionManager + mockHistoryV2Mgr *mocks.HistoryV2Manager + mockShardManager *mocks.ShardManager + mockClusterMetadata *mocks.ClusterMetadata + mockProducer *mocks.KafkaProducer + mockMetadataMgr *mocks.MetadataManager + mockMessagingClient messaging.Client + mockService service.Service + mockShard *shardContextImpl + mockDomainCache *cache.DomainCacheMock + mockClientBean *client.MockClientBean + mockEventsCache *MockEventsCache + + mockContext *mockWorkflowExecutionContext + mockMutableState *mockMutableState + domainID string + domainName string + workflowID string + runID string + + controller *gomock.Controller + mockStateBuilder *MocknDCStateRebuilder + nDCConflictResolver *nDCConflictResolverImpl + } +) + +func TestNDCConflictResolverSuite(t *testing.T) { + s := new(nDCConflictResolverSuite) + suite.Run(t, s) +} + +func (s *nDCConflictResolverSuite) SetupTest() { + s.logger = loggerimpl.NewDevelopmentForTest(s.Suite) + s.mockHistoryV2Mgr = &mocks.HistoryV2Manager{} + s.mockExecutionMgr = &mocks.ExecutionManager{} + s.mockClusterMetadata = &mocks.ClusterMetadata{} + s.mockShardManager = &mocks.ShardManager{} + s.mockProducer = &mocks.KafkaProducer{} + s.mockMessagingClient = mocks.NewMockMessagingClient(s.mockProducer, nil) + s.mockMetadataMgr = &mocks.MetadataManager{} + metricsClient := metrics.NewClient(tally.NoopScope, metrics.History) + s.mockClientBean = &client.MockClientBean{} + s.mockService = service.NewTestService(s.mockClusterMetadata, s.mockMessagingClient, metricsClient, s.mockClientBean, nil, nil) + s.mockDomainCache = &cache.DomainCacheMock{} + s.mockEventsCache = &MockEventsCache{} + + s.mockShard = &shardContextImpl{ + service: s.mockService, + shardInfo: &persistence.ShardInfo{ShardID: 10, RangeID: 1, TransferAckLevel: 0}, + transferSequenceNumber: 1, + executionManager: s.mockExecutionMgr, + historyV2Mgr: s.mockHistoryV2Mgr, + shardManager: s.mockShardManager, + maxTransferSequenceNumber: 100000, + closeCh: make(chan int, 100), + config: NewDynamicConfigForTest(), + logger: s.logger, + domainCache: s.mockDomainCache, + metricsClient: metrics.NewClient(tally.NoopScope, metrics.History), + eventsCache: s.mockEventsCache, + timeSource: clock.NewRealTimeSource(), + } + s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestCurrentClusterName) + s.mockShard.config.EnableVisibilityToKafka = dynamicconfig.GetBoolPropertyFn(true) + + s.domainID = uuid.New() + s.domainName = "some random domain name" + s.workflowID = "some random workflow ID" + s.runID = uuid.New() + s.mockContext = &mockWorkflowExecutionContext{} + s.mockMutableState = &mockMutableState{} + + s.controller = gomock.NewController(s.T()) + s.mockStateBuilder = NewMocknDCStateRebuilder(s.controller) + s.nDCConflictResolver = newNDCConflictResolver( + s.mockShard, s.mockContext, s.mockMutableState, s.logger, + ) + s.nDCConflictResolver.stateRebuilder = s.mockStateBuilder +} + +func (s *nDCConflictResolverSuite) TearDownTest() { + s.mockHistoryV2Mgr.AssertExpectations(s.T()) + s.mockExecutionMgr.AssertExpectations(s.T()) + s.mockShardManager.AssertExpectations(s.T()) + s.mockProducer.AssertExpectations(s.T()) + s.mockMetadataMgr.AssertExpectations(s.T()) + s.mockClientBean.AssertExpectations(s.T()) + s.mockDomainCache.AssertExpectations(s.T()) + s.mockEventsCache.AssertExpectations(s.T()) + + s.mockMutableState.AssertExpectations(s.T()) + + s.controller.Finish() +} + +func (s *nDCConflictResolverSuite) TestRebuild() { + ctx := ctx.Background() + updateCondition := int64(59) + requestID := uuid.New() + version := int64(12) + historySize := int64(12345) + + branchToken0 := []byte("some random branch token") + lastEventID0 := int64(5) + versionHistory0 := persistence.NewVersionHistory( + branchToken0, + []*persistence.VersionHistoryItem{persistence.NewVersionHistoryItem(lastEventID0, version)}, + ) + branchToken1 := []byte("other random branch token") + lastEventID1 := int64(2) + versionHistory1 := persistence.NewVersionHistory( + branchToken1, + []*persistence.VersionHistoryItem{persistence.NewVersionHistoryItem(lastEventID1, version)}, + ) + versionHistories := persistence.NewVersionHistories(versionHistory0) + _, _, err := versionHistories.AddVersionHistory(versionHistory1) + s.NoError(err) + + s.mockMutableState.On("GetUpdateCondition").Return(updateCondition) + s.mockMutableState.On("GetVersionHistories").Return(versionHistories) + s.mockMutableState.On("GetExecutionInfo").Return(&persistence.WorkflowExecutionInfo{ + DomainID: s.domainID, + WorkflowID: s.workflowID, + RunID: s.runID, + }) + + workflowIdentifier := definition.NewWorkflowIdentifier( + s.domainID, + s.workflowID, + s.runID, + ) + mockRebuildMutableState := &mockMutableState{} + defer mockRebuildMutableState.AssertExpectations(s.T()) + mockRebuildMutableState.On("GetVersionHistories").Return( + persistence.NewVersionHistories( + persistence.NewVersionHistory( + nil, + []*persistence.VersionHistoryItem{persistence.NewVersionHistoryItem(lastEventID1, version)}, + ), + ), + ).Once() + mockRebuildMutableState.On("SetVersionHistories", versionHistories).Return(nil).Once() + mockRebuildMutableState.On("SetUpdateCondition", updateCondition).Once() + + s.mockStateBuilder.EXPECT().rebuild( + ctx, + workflowIdentifier, + branchToken1, + lastEventID1+1, + workflowIdentifier, + requestID, + ).Return(mockRebuildMutableState, historySize, nil).Times(1) + + s.mockContext.On("clear").Once() + s.mockContext.On("setHistorySize", historySize).Once() + rebuiltMutableState, err := s.nDCConflictResolver.rebuild(ctx, 1, requestID) + s.NoError(err) + s.NotNil(rebuiltMutableState) + s.Equal(1, versionHistories.GetCurrentVersionHistoryIndex()) +} + +func (s *nDCConflictResolverSuite) TestPrepareMutableState_NoRebuild() { + branchToken := []byte("some random branch token") + lastEventID := int64(2) + version := int64(12) + versionHistoryItem := persistence.NewVersionHistoryItem(lastEventID, version) + versionHistory := persistence.NewVersionHistory( + branchToken, + []*persistence.VersionHistoryItem{versionHistoryItem}, + ) + versionHistories := persistence.NewVersionHistories(versionHistory) + s.mockMutableState.On("GetVersionHistories").Return(versionHistories) + + rebuiltMutableState, isRebuilt, err := s.nDCConflictResolver.prepareMutableState(ctx.Background(), 0, version) + s.NoError(err) + s.False(isRebuilt) + s.Equal(s.mockMutableState, rebuiltMutableState) +} + +func (s *nDCConflictResolverSuite) TestPrepareMutableState_Rebuild() { + ctx := ctx.Background() + updateCondition := int64(59) + version := int64(12) + incomingVersion := version + 1 + historySize := int64(12345) + + // current branch + branchToken0 := []byte("some random branch token") + lastEventID0 := int64(2) + + versionHistoryItem0 := persistence.NewVersionHistoryItem(lastEventID0, version) + versionHistory0 := persistence.NewVersionHistory( + branchToken0, + []*persistence.VersionHistoryItem{versionHistoryItem0}, + ) + + // stale branch, used for rebuild + branchToken1 := []byte("other random branch token") + lastEventID1 := lastEventID0 - 1 + versionHistoryItem1 := persistence.NewVersionHistoryItem(lastEventID1, version) + versionHistory1 := persistence.NewVersionHistory( + branchToken1, + []*persistence.VersionHistoryItem{versionHistoryItem1}, + ) + + versionHistories := persistence.NewVersionHistories(versionHistory0) + _, _, err := versionHistories.AddVersionHistory(versionHistory1) + s.Nil(err) + + s.mockMutableState.On("GetUpdateCondition").Return(updateCondition) + s.mockMutableState.On("GetVersionHistories").Return(versionHistories) + s.mockMutableState.On("GetExecutionInfo").Return(&persistence.WorkflowExecutionInfo{ + DomainID: s.domainID, + WorkflowID: s.workflowID, + RunID: s.runID, + }) + + workflowIdentifier := definition.NewWorkflowIdentifier( + s.domainID, + s.workflowID, + s.runID, + ) + mockRebuildMutableState := &mockMutableState{} + defer mockRebuildMutableState.AssertExpectations(s.T()) + mockRebuildMutableState.On("GetVersionHistories").Return( + persistence.NewVersionHistories( + persistence.NewVersionHistory( + nil, + []*persistence.VersionHistoryItem{persistence.NewVersionHistoryItem(lastEventID1, version)}, + ), + ), + ).Once() + mockRebuildMutableState.On("SetVersionHistories", versionHistories).Return(nil).Once() + mockRebuildMutableState.On("SetUpdateCondition", updateCondition).Once() + if s.mockShard.config.EnableVisibilityToKafka() { + mockRebuildMutableState.On("AddTransferTasks", []persistence.Task{ + &persistence.UpsertWorkflowSearchAttributesTask{}, + }).Once() + } + + s.mockStateBuilder.EXPECT().rebuild( + ctx, + workflowIdentifier, + branchToken1, + lastEventID1+1, + workflowIdentifier, + gomock.Any(), + ).Return(mockRebuildMutableState, historySize, nil).Times(1) + + s.mockContext.On("clear").Once() + s.mockContext.On("setHistorySize", int64(historySize)).Once() + rebuiltMutableState, isRebuilt, err := s.nDCConflictResolver.prepareMutableState(ctx, 1, incomingVersion) + s.NoError(err) + s.NotNil(rebuiltMutableState) + s.True(isRebuilt) +} diff --git a/service/history/nDCHistoryReplicator.go b/service/history/nDCHistoryReplicator.go index d5b6a93b700..554ddd0d80a 100644 --- a/service/history/nDCHistoryReplicator.go +++ b/service/history/nDCHistoryReplicator.go @@ -44,11 +44,20 @@ type ( logger log.Logger, ) nDCBranchMgr - nDCStateRebuilderProvider func( + nDCConflictResolverProvider func( context workflowExecutionContext, mutableState mutableState, logger log.Logger, - ) nDCStateRebuilder + ) nDCConflictResolver + + nDCWorkflowResetterProvider func( + domainID string, + workflowID string, + baseRunID string, + newContext workflowExecutionContext, + newRunID string, + logger log.Logger, + ) nDCWorkflowResetter nDCHistoryReplicator struct { shard ShardContext @@ -59,12 +68,12 @@ type ( historyCache *historyCache transactionMgr nDCTransactionMgr logger log.Logger - resetor workflowResetor - getNewBranchMgr nDCBranchMgrProvider - getNewStateRebuilder nDCStateRebuilderProvider - getNewStateBuilder stateBuilderProvider - getNewMutableState mutableStateProvider + newBranchMgr nDCBranchMgrProvider + newConflictResolver nDCConflictResolverProvider + newWorkflowResetter nDCWorkflowResetterProvider + newStateBuilder stateBuilderProvider + newMutableState mutableStateProvider } ) @@ -74,6 +83,7 @@ func newNDCHistoryReplicator( logger log.Logger, ) *nDCHistoryReplicator { + transactionMgr := newNDCTransactionMgr(shard, historyCache, logger) replicator := &nDCHistoryReplicator{ shard: shard, clusterMetadata: shard.GetService().GetClusterMetadata(), @@ -81,19 +91,44 @@ func newNDCHistoryReplicator( metricsClient: shard.GetMetricsClient(), domainCache: shard.GetDomainCache(), historyCache: historyCache, - transactionMgr: newNDCTransactionMgr(shard, historyCache, logger), + transactionMgr: transactionMgr, logger: logger.WithTags(tag.ComponentHistoryReplicator), - getNewBranchMgr: func(context workflowExecutionContext, mutableState mutableState, logger log.Logger) nDCBranchMgr { + newBranchMgr: func( + context workflowExecutionContext, + mutableState mutableState, + logger log.Logger, + ) nDCBranchMgr { return newNDCBranchMgr(shard, context, mutableState, logger) }, - getNewStateRebuilder: func(context workflowExecutionContext, mutableState mutableState, logger log.Logger) nDCStateRebuilder { - return newNDCStateRebuilder(shard, context, mutableState, logger) + newConflictResolver: func( + context workflowExecutionContext, + mutableState mutableState, + logger log.Logger, + ) nDCConflictResolver { + return newNDCConflictResolver(shard, context, mutableState, logger) + }, + newWorkflowResetter: func( + domainID string, + workflowID string, + baseRunID string, + newContext workflowExecutionContext, + newRunID string, + logger log.Logger, + ) nDCWorkflowResetter { + return newNDCWorkflowResetter(shard, transactionMgr, domainID, workflowID, baseRunID, newContext, newRunID, logger) }, - getNewStateBuilder: func(msBuilder mutableState, logger log.Logger) stateBuilder { + newStateBuilder: func( + msBuilder mutableState, + logger log.Logger, + ) stateBuilder { return newStateBuilder(shard, msBuilder, logger) }, - getNewMutableState: func(version int64, domainName string, logger log.Logger) mutableState { + newMutableState: func( + version int64, + domainName string, + logger log.Logger, + ) mutableState { return newMutableStateBuilderWithVersionHistories( shard, shard.GetEventsCache(), @@ -106,7 +141,6 @@ func newNDCHistoryReplicator( ) }, } - replicator.resetor = nil // TODO wire v2 history replicator with workflow reseter return replicator } @@ -171,9 +205,16 @@ func (r *nDCHistoryReplicator) applyEvents( return r.applyNonStartEventsToCurrentBranch(ctx, context, mutableState, isRebuilt, releaseFn, task) } return r.applyNonStartEventsToNoneCurrentBranch(ctx, context, mutableState, branchIndex, releaseFn, task) + case *shared.EntityNotExistsError: - // mutable state not created, proceed - return r.applyNonStartEventsMissingMutableState(ctx, context, task) + // mutable state not created, check if is workflow reset + mutableState, completeFn, err := r.applyNonStartEventsMissingMutableState(ctx, context, task) + if err != nil { + return err + } + + return r.applyNonStartEventsResetWorkflow(ctx, context, mutableState, completeFn, task) + default: // unable to get mutable state, return err so we can retry the task later return err @@ -189,8 +230,8 @@ func (r *nDCHistoryReplicator) applyStartEvents( ) (retError error) { requestID := uuid.New() // requestID used for start workflow execution request. This is not on the history event. - mutableState := r.getNewMutableState(task.getVersion(), context.getDomainName(), task.getLogger()) - stateBuilder := r.getNewStateBuilder(mutableState, task.getLogger()) + mutableState := r.newMutableState(task.getVersion(), context.getDomainName(), task.getLogger()) + stateBuilder := r.newStateBuilder(mutableState, task.getLogger()) // use state builder for workflow mutable state mutation _, _, _, err := stateBuilder.applyEvents( @@ -206,10 +247,9 @@ func (r *nDCHistoryReplicator) applyStartEvents( return err } - now := time.Unix(0, task.getLastEvent().GetTimestamp()) err = r.transactionMgr.createWorkflow( ctx, - now, + task.getEventTime(), newNDCWorkflow( ctx, r.domainCache, @@ -233,7 +273,7 @@ func (r *nDCHistoryReplicator) applyNonStartEventsPrepareBranch( ) (int, error) { incomingVersionHistory := task.getVersionHistory() - branchMgr := r.getNewBranchMgr(context, mutableState, task.getLogger()) + branchMgr := r.newBranchMgr(context, mutableState, task.getLogger()) versionHistoryIndex, err := branchMgr.prepareVersionHistory( ctx, incomingVersionHistory, @@ -270,6 +310,7 @@ func (r *nDCHistoryReplicator) applyNonStartEventsPrepareReorder( return false, nil } if task.getFirstEvent().GetEventId() > nextEventID { + // TODO we should use a new retry error for 3+DC return false, newRetryTaskErrorWithHint( ErrRetryBufferEventsMsg, task.getDomainID(), @@ -291,8 +332,8 @@ func (r *nDCHistoryReplicator) applyNonStartEventsPrepareMutableState( ) (mutableState, bool, error) { incomingVersion := task.getVersion() - stateRebuilder := r.getNewStateRebuilder(context, mutableState, task.getLogger()) - return stateRebuilder.prepareMutableState( + conflictResolver := r.newConflictResolver(context, mutableState, task.getLogger()) + return conflictResolver.prepareMutableState( ctx, branchIndex, incomingVersion, @@ -309,7 +350,7 @@ func (r *nDCHistoryReplicator) applyNonStartEventsToCurrentBranch( ) error { requestID := uuid.New() // requestID used for start workflow execution request. This is not on the history event. - stateBuilder := r.getNewStateBuilder(mutableState, task.getLogger()) + stateBuilder := r.newStateBuilder(mutableState, task.getLogger()) _, _, newMutableState, err := stateBuilder.applyEvents( task.getDomainID(), requestID, @@ -323,7 +364,6 @@ func (r *nDCHistoryReplicator) applyNonStartEventsToCurrentBranch( return err } - now := time.Unix(0, task.getLastEvent().GetTimestamp()) targetWorkflow := newNDCWorkflow( ctx, r.domainCache, @@ -359,7 +399,7 @@ func (r *nDCHistoryReplicator) applyNonStartEventsToCurrentBranch( err = r.transactionMgr.updateWorkflow( ctx, - now, + task.getEventTime(), isRebuilt, targetWorkflow, newWorkflow, @@ -405,10 +445,9 @@ func (r *nDCHistoryReplicator) applyNonStartEventsToNoneCurrentBranch( return err } - now := time.Unix(0, task.getLastEvent().GetTimestamp()) return r.transactionMgr.backfillWorkflow( ctx, - now, + task.getEventTime(), newNDCWorkflow( ctx, r.domainCache, @@ -429,33 +468,84 @@ func (r *nDCHistoryReplicator) applyNonStartEventsToNoneCurrentBranch( func (r *nDCHistoryReplicator) applyNonStartEventsMissingMutableState( ctx ctx.Context, - context workflowExecutionContext, + newContext workflowExecutionContext, task nDCReplicationTask, -) error { - - resendTaskErr := newRetryTaskErrorWithHint( - ErrWorkflowNotFoundMsg, - task.getDomainID(), - task.getWorkflowID(), - task.getRunID(), - common.FirstEventID, - ) +) (mutableState, nDCWorkflowResetterCompleteFn, error) { // for non reset workflow execution replication task, just do re-application if !task.getRequest().GetResetWorkflow() { - return resendTaskErr + // TODO we should use a new retry error for 3+DC + return nil, nil, newRetryTaskErrorWithHint( + ErrWorkflowNotFoundMsg, + task.getDomainID(), + task.getWorkflowID(), + task.getRunID(), + common.FirstEventID, + ) } - // TODO nDC: to successfully & correctly do workflow reset - // the apply reset event functionality needs to be refactored - // currently, just use re-send, although it is inefficient - return newRetryTaskErrorWithHint( - ErrWorkflowNotFoundMsg, + decisionTaskFailedEvent := task.getFirstEvent() + attr := decisionTaskFailedEvent.DecisionTaskFailedEventAttributes + baseRunID := attr.GetBaseRunId() + baseEventID := decisionTaskFailedEvent.GetEventId() - 1 + baseEventVersion := attr.GetForkEventVersion() + newRunID := attr.GetNewRunId() + + workflowResetter := r.newWorkflowResetter( task.getDomainID(), task.getWorkflowID(), - task.getRunID(), - common.FirstEventID, + baseRunID, + newContext, + newRunID, + task.getLogger(), ) + + return workflowResetter.resetWorkflow(ctx, baseEventID, baseEventVersion) +} + +func (r *nDCHistoryReplicator) applyNonStartEventsResetWorkflow( + ctx ctx.Context, + context workflowExecutionContext, + mutableState mutableState, + completeFn nDCWorkflowResetterCompleteFn, + task nDCReplicationTask, +) error { + + defer completeFn() + + requestID := uuid.New() // requestID used for start workflow execution request. This is not on the history event. + stateBuilder := r.newStateBuilder(mutableState, task.getLogger()) + _, _, _, err := stateBuilder.applyEvents( + task.getDomainID(), + requestID, + *task.getExecution(), + task.getEvents(), + task.getNewEvents(), + nDCMutableStateEventStoreVersion, + nDCMutableStateEventStoreVersion, + ) + if err != nil { + return err + } + + targetWorkflow := newNDCWorkflow( + ctx, + r.domainCache, + r.clusterMetadata, + context, + mutableState, + noopReleaseFn, + ) + + err = r.transactionMgr.createWorkflow( + ctx, + task.getEventTime(), + targetWorkflow, + ) + if err == nil { + r.notify(task.getSourceCluster(), task.getEventTime()) + } + return err } func (r *nDCHistoryReplicator) notify( diff --git a/service/history/nDCStateRebuilder.go b/service/history/nDCStateRebuilder.go index a956b4cc5a2..f62bd58bc5f 100644 --- a/service/history/nDCStateRebuilder.go +++ b/service/history/nDCStateRebuilder.go @@ -18,18 +18,20 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination nDCStateRebuilder_mock.go + package history import ( ctx "context" - - "github.com/pborman/uuid" + "fmt" "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/collection" + "github.com/uber/cadence/common/definition" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/persistence" @@ -37,11 +39,14 @@ import ( type ( nDCStateRebuilder interface { - prepareMutableState( + rebuild( ctx ctx.Context, - branchIndex int, - incomingVersion int64, - ) (mutableState, bool, error) + baseWorkflowIdentifier definition.WorkflowIdentifier, + baseBranchToken []byte, + baseNextEventID int64, + targetWorkflowIdentifier definition.WorkflowIdentifier, + requestID string, + ) (mutableState, int64, error) } nDCStateRebuilderImpl struct { @@ -50,10 +55,8 @@ type ( clusterMetadata cluster.Metadata historyV2Mgr persistence.HistoryV2Manager - context workflowExecutionContext - mutableState mutableState - historySize int64 - logger log.Logger + rebuiltHistorySize int64 + logger log.Logger } ) @@ -61,9 +64,6 @@ var _ nDCStateRebuilder = (*nDCStateRebuilderImpl)(nil) func newNDCStateRebuilder( shard ShardContext, - - context workflowExecutionContext, - mutableState mutableState, logger log.Logger, ) *nDCStateRebuilderImpl { @@ -73,143 +73,70 @@ func newNDCStateRebuilder( clusterMetadata: shard.GetService().GetClusterMetadata(), historyV2Mgr: shard.GetHistoryV2Manager(), - context: context, - mutableState: mutableState, - logger: logger, + rebuiltHistorySize: 0, + logger: logger, } } -func (r *nDCStateRebuilderImpl) prepareMutableState( - ctx ctx.Context, - branchIndex int, - incomingVersion int64, -) (mutableState, bool, error) { - - // NOTE: this function also need to preserve whether a workflow is a zombie or not - // this is done by the rebuild function below - versionHistories := r.mutableState.GetVersionHistories() - currentVersionHistoryIndex := versionHistories.GetCurrentVersionHistoryIndex() - - // replication task to be applied to current branch - if branchIndex == currentVersionHistoryIndex { - return r.mutableState, false, nil - } - - currentVersionHistory, err := versionHistories.GetVersionHistory(currentVersionHistoryIndex) - if err != nil { - return nil, false, err - } - currentLastItem, err := currentVersionHistory.GetLastItem() - if err != nil { - return nil, false, err - } - - // mutable state does not need rebuild - if incomingVersion < currentLastItem.GetVersion() { - return r.mutableState, false, nil - } - - if incomingVersion == currentLastItem.GetVersion() { - return nil, false, &shared.BadRequestError{ - Message: "nDCStateRebuilder encounter replication task version == current branch last write version", - } - } - - // task.getVersion() > currentLastItem - // incoming replication task, after application, will become the current branch - // (because higher version wins), we need to rebuild the mutable state for that - rebuildMutableState, err := r.rebuild(ctx, branchIndex, uuid.New()) - if err != nil { - return nil, false, err - } - return rebuildMutableState, true, nil -} - func (r *nDCStateRebuilderImpl) rebuild( ctx ctx.Context, - branchIndex int, + baseWorkflowIdentifier definition.WorkflowIdentifier, + baseBranchToken []byte, + baseNextEventID int64, + targetWorkflowIdentifier definition.WorkflowIdentifier, requestID string, -) (mutableState, error) { - - versionHistories := r.mutableState.GetVersionHistories() - replayVersionHistory, err := versionHistories.GetVersionHistory(branchIndex) - if err != nil { - return nil, err - } - lastItem, err := replayVersionHistory.GetLastItem() - if err != nil { - return nil, err - } +) (mutableState, int64, error) { iter := collection.NewPagingIterator(r.getPaginationFn( + baseWorkflowIdentifier, common.FirstEventID, - lastItem.GetEventID()+1, - replayVersionHistory.GetBranchToken(), + baseNextEventID, + baseBranchToken, )) + domainEntry, err := r.domainCache.GetDomainByID(targetWorkflowIdentifier.DomainID) + if err != nil { + return nil, 0, err + } + // need to specially handling the first batch, to initialize mutable state & state builder batch, err := iter.Next() if err != nil { - return nil, err + return nil, 0, err } firstEventBatch := batch.(*shared.History).Events - rebuildMutableState, stateBuilder := r.initializeBuilders( + rebuiltMutableState, stateBuilder := r.initializeBuilders( firstEventBatch[0].GetVersion(), - r.context.getDomainName(), + domainEntry.GetInfo().Name, ) - if err := r.applyEvents(stateBuilder, firstEventBatch, requestID); err != nil { - return nil, err + if err := r.applyEvents(targetWorkflowIdentifier, stateBuilder, firstEventBatch, requestID); err != nil { + return nil, 0, err } for iter.HasNext() { batch, err := iter.Next() if err != nil { - return nil, err + return nil, 0, err } events := batch.(*shared.History).Events - if err := r.applyEvents(stateBuilder, events, requestID); err != nil { - return nil, err + if err := r.applyEvents(targetWorkflowIdentifier, stateBuilder, events, requestID); err != nil { + return nil, 0, err } } - // after rebuilt verification - rebuildVersionHistories := rebuildMutableState.GetVersionHistories() - rebuildVersionHistory, err := rebuildVersionHistories.GetCurrentVersionHistory() - if err != nil { - return nil, err - } - err = rebuildVersionHistory.SetBranchToken(replayVersionHistory.GetBranchToken()) - if err != nil { - return nil, err - } - - if !rebuildVersionHistory.Equals(replayVersionHistory) { - return nil, &shared.InternalServiceError{ - Message: "nDCStateRebuilder encounter mismatch version history after rebuild", + if rebuiltMutableState.GetNextEventID() != baseNextEventID { + return nil, 0, &shared.InternalServiceError{ + Message: fmt.Sprintf("nDCStateRebuilder unable to rebuild mutable state to event ID: %v", baseNextEventID), } } - - // set the current branch index to target branch index - // set the version history back - // - // caller can use the IsRebuilt function in VersionHistories - // telling whether mutable state is rebuilt, before apply new history events - if err := versionHistories.SetCurrentVersionHistoryIndex(branchIndex); err != nil { - return nil, err - } - if err = rebuildMutableState.SetVersionHistories(versionHistories); err != nil { - return nil, err - } - // set the update condition from original mutable state - rebuildMutableState.SetUpdateCondition(r.mutableState.GetUpdateCondition()) - if r.shard.GetConfig().EnableVisibilityToKafka() { - // whenever a reset of mutable state is done, we need to sync the workflow search attribute - rebuildMutableState.AddTransferTasks(&persistence.UpsertWorkflowSearchAttributesTask{}) + if err := rebuiltMutableState.SetCurrentBranchToken(nil); err != nil { + return nil, 0, err } - r.context.clear() - r.context.setHistorySize(r.historySize) - return rebuildMutableState, nil + // TODO refresh tasks before return + // this includes UpsertWorkflowSearchAttributesTask + + return rebuiltMutableState, r.rebuiltHistorySize, nil } func (r *nDCStateRebuilderImpl) initializeBuilders( @@ -232,18 +159,18 @@ func (r *nDCStateRebuilderImpl) initializeBuilders( } func (r *nDCStateRebuilderImpl) applyEvents( + workflowIdentifier definition.WorkflowIdentifier, stateBuilder stateBuilder, events []*shared.HistoryEvent, requestID string, ) error { - executionInfo := r.mutableState.GetExecutionInfo() _, _, _, err := stateBuilder.applyEvents( - executionInfo.DomainID, + workflowIdentifier.DomainID, requestID, shared.WorkflowExecution{ - WorkflowId: common.StringPtr(executionInfo.WorkflowID), - RunId: common.StringPtr(executionInfo.RunID), + WorkflowId: common.StringPtr(workflowIdentifier.WorkflowID), + RunId: common.StringPtr(workflowIdentifier.RunID), }, events, nil, // no new run history when rebuilding mutable state @@ -258,12 +185,12 @@ func (r *nDCStateRebuilderImpl) applyEvents( } func (r *nDCStateRebuilderImpl) getPaginationFn( + workflowIdentifier definition.WorkflowIdentifier, firstEventID int64, nextEventID int64, branchToken []byte, ) collection.PaginationFn { - executionInfo := r.mutableState.GetExecutionInfo() return func(paginationToken []byte) ([]interface{}, []byte, error) { _, historyBatches, token, size, err := PaginateHistory( @@ -272,9 +199,9 @@ func (r *nDCStateRebuilderImpl) getPaginationFn( nil, r.logger, true, - executionInfo.DomainID, - executionInfo.WorkflowID, - executionInfo.RunID, + workflowIdentifier.DomainID, + workflowIdentifier.WorkflowID, + workflowIdentifier.RunID, firstEventID, nextEventID, paginationToken, @@ -286,7 +213,7 @@ func (r *nDCStateRebuilderImpl) getPaginationFn( if err != nil { return nil, nil, err } - r.historySize += int64(size) + r.rebuiltHistorySize += int64(size) var paginateItems []interface{} for _, history := range historyBatches { diff --git a/service/history/nDCStateRebuilder_mock.go b/service/history/nDCStateRebuilder_mock.go index 73ff7ea8bcc..f9c9166ee60 100644 --- a/service/history/nDCStateRebuilder_mock.go +++ b/service/history/nDCStateRebuilder_mock.go @@ -1,3 +1,5 @@ +// The MIT License (MIT) +// // Copyright (c) 2019 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy @@ -7,19 +9,20 @@ // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. +// // Code generated by MockGen. DO NOT EDIT. -// Source: ./service/history/nDCStateRebuilder.go +// Source: nDCStateRebuilder.go // Package history is a generated GoMock package. package history @@ -29,6 +32,7 @@ import ( reflect "reflect" gomock "github.com/golang/mock/gomock" + definition "github.com/uber/cadence/common/definition" ) // MocknDCStateRebuilder is a mock of nDCStateRebuilder interface @@ -54,18 +58,18 @@ func (m *MocknDCStateRebuilder) EXPECT() *MocknDCStateRebuilderMockRecorder { return m.recorder } -// prepareMutableState mocks base method -func (m *MocknDCStateRebuilder) prepareMutableState(ctx context.Context, branchIndex int, incomingVersion int64) (mutableState, bool, error) { +// rebuild mocks base method +func (m *MocknDCStateRebuilder) rebuild(ctx context.Context, baseWorkflowIdentifier definition.WorkflowIdentifier, baseBranchToken []byte, baseNextEventID int64, targetWorkflowIdentifier definition.WorkflowIdentifier, requestID string) (mutableState, int64, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "prepareMutableState", ctx, branchIndex, incomingVersion) + ret := m.ctrl.Call(m, "rebuild", ctx, baseWorkflowIdentifier, baseBranchToken, baseNextEventID, targetWorkflowIdentifier, requestID) ret0, _ := ret[0].(mutableState) - ret1, _ := ret[1].(bool) + ret1, _ := ret[1].(int64) ret2, _ := ret[2].(error) return ret0, ret1, ret2 } -// prepareMutableState indicates an expected call of prepareMutableState -func (mr *MocknDCStateRebuilderMockRecorder) prepareMutableState(ctx, branchIndex, incomingVersion interface{}) *gomock.Call { +// rebuild indicates an expected call of rebuild +func (mr *MocknDCStateRebuilderMockRecorder) rebuild(ctx, baseWorkflowIdentifier, baseBranchToken, baseNextEventID, targetWorkflowIdentifier, requestID interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "prepareMutableState", reflect.TypeOf((*MocknDCStateRebuilder)(nil).prepareMutableState), ctx, branchIndex, incomingVersion) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "rebuild", reflect.TypeOf((*MocknDCStateRebuilder)(nil).rebuild), ctx, baseWorkflowIdentifier, baseBranchToken, baseNextEventID, targetWorkflowIdentifier, requestID) } diff --git a/service/history/nDCStateRebuilder_test.go b/service/history/nDCStateRebuilder_test.go index ac2f14f5753..f2bc100d0f9 100644 --- a/service/history/nDCStateRebuilder_test.go +++ b/service/history/nDCStateRebuilder_test.go @@ -18,6 +18,8 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +// +build test + package history import ( @@ -35,6 +37,7 @@ import ( "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/collection" + "github.com/uber/cadence/common/definition" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/loggerimpl" "github.com/uber/cadence/common/messaging" @@ -63,12 +66,10 @@ type ( mockClientBean *client.MockClientBean mockEventsCache *MockEventsCache - mockContext *mockWorkflowExecutionContext - mockMutableState *mockMutableState - domainID string - domainName string - workflowID string - runID string + domainID string + domainName string + workflowID string + runID string nDCStateRebuilder *nDCStateRebuilderImpl } @@ -117,10 +118,8 @@ func (s *nDCStateRebuilderSuite) SetupTest() { s.domainName = "some random domain name" s.workflowID = "some random workflow ID" s.runID = uuid.New() - s.mockContext = &mockWorkflowExecutionContext{} - s.mockMutableState = &mockMutableState{} s.nDCStateRebuilder = newNDCStateRebuilder( - s.mockShard, s.mockContext, s.mockMutableState, s.logger, + s.mockShard, s.logger, ) } @@ -133,8 +132,6 @@ func (s *nDCStateRebuilderSuite) TearDownTest() { s.mockClientBean.AssertExpectations(s.T()) s.mockDomainCache.AssertExpectations(s.T()) s.mockEventsCache.AssertExpectations(s.T()) - - s.mockMutableState.AssertExpectations(s.T()) } func (s *nDCStateRebuilderSuite) TestInitializeBuilders() { @@ -161,11 +158,7 @@ func (s *nDCStateRebuilderSuite) TestApplyEvents() { }, } - s.mockMutableState.On("GetExecutionInfo").Return(&persistence.WorkflowExecutionInfo{ - DomainID: s.domainID, - WorkflowID: s.workflowID, - RunID: s.runID, - }) + workflowIdentifier := definition.NewWorkflowIdentifier(s.domainID, s.workflowID, s.runID) mockStateBuilder := &mockStateBuilder{} defer mockStateBuilder.AssertExpectations(s.T()) @@ -182,7 +175,7 @@ func (s *nDCStateRebuilderSuite) TestApplyEvents() { int32(nDCMutableStateEventStoreVersion), ).Return(nil, nil, nil, nil).Once() - err := s.nDCStateRebuilder.applyEvents(mockStateBuilder, events, requestID) + err := s.nDCStateRebuilder.applyEvents(workflowIdentifier, mockStateBuilder, events, requestID) s.NoError(err) } @@ -190,11 +183,7 @@ func (s *nDCStateRebuilderSuite) TestPagination() { firstEventID := common.FirstEventID nextEventID := int64(101) branchToken := []byte("some random branch token") - s.mockMutableState.On("GetExecutionInfo").Return(&persistence.WorkflowExecutionInfo{ - DomainID: s.domainID, - WorkflowID: s.workflowID, - RunID: s.runID, - }) + workflowIdentifier := definition.NewWorkflowIdentifier(s.domainID, s.workflowID, s.runID) event1 := &shared.HistoryEvent{ EventId: common.Int64Ptr(1), @@ -246,7 +235,7 @@ func (s *nDCStateRebuilderSuite) TestPagination() { Size: 67890, }, nil).Once() - paginationFn := s.nDCStateRebuilder.getPaginationFn(common.FirstEventID, nextEventID, branchToken) + paginationFn := s.nDCStateRebuilder.getPaginationFn(workflowIdentifier, common.FirstEventID, nextEventID, branchToken) iter := collection.NewPagingIterator(paginationFn) result := []*shared.History{} @@ -262,35 +251,16 @@ func (s *nDCStateRebuilderSuite) TestPagination() { func (s *nDCStateRebuilderSuite) TestRebuild() { requestID := uuid.New() version := int64(12) + lastEventID := int64(2) + branchToken := []byte("other random branch token") - branchToken0 := []byte("some random branch token") - lastEventID0 := int64(5) - versionHistory0 := persistence.NewVersionHistory( - branchToken0, - []*persistence.VersionHistoryItem{persistence.NewVersionHistoryItem(lastEventID0, version)}, - ) - branchToken1 := []byte("other random branch token") - lastEventID1 := int64(2) - versionHistory1 := persistence.NewVersionHistory( - branchToken1, - []*persistence.VersionHistoryItem{persistence.NewVersionHistoryItem(lastEventID1, version)}, - ) - versionHistories := persistence.NewVersionHistories(versionHistory0) - _, _, err := versionHistories.AddVersionHistory(versionHistory1) - s.NoError(err) - - updateCondition := int64(59) - s.mockContext.On("getDomainName").Return(s.domainName) - s.mockMutableState.On("GetUpdateCondition").Return(updateCondition) - s.mockMutableState.On("GetVersionHistories").Return(versionHistories) - s.mockMutableState.On("GetExecutionInfo").Return(&persistence.WorkflowExecutionInfo{ - DomainID: s.domainID, - WorkflowID: s.workflowID, - RunID: s.runID, - }) + targetDomainID := uuid.New() + targetDomainName := "other random domain name" + targetWorkflowID := "other random workflow ID" + targetRunID := uuid.New() firstEventID := common.FirstEventID - nextEventID := lastEventID1 + 1 + nextEventID := lastEventID + 1 events1 := []*shared.HistoryEvent{{ EventId: common.Int64Ptr(1), Version: common.Int64Ptr(version), @@ -321,7 +291,7 @@ func (s *nDCStateRebuilderSuite) TestRebuild() { historySize1 := 12345 historySize2 := 67890 s.mockHistoryV2Mgr.On("ReadHistoryBranchByBatch", &persistence.ReadHistoryBranchRequest{ - BranchToken: branchToken1, + BranchToken: branchToken, MinEventID: firstEventID, MaxEventID: nextEventID, PageSize: nDCDefaultPageSize, @@ -333,7 +303,7 @@ func (s *nDCStateRebuilderSuite) TestRebuild() { Size: historySize1, }, nil).Once() s.mockHistoryV2Mgr.On("ReadHistoryBranchByBatch", &persistence.ReadHistoryBranchRequest{ - BranchToken: branchToken1, + BranchToken: branchToken, MinEventID: firstEventID, MaxEventID: nextEventID, PageSize: nDCDefaultPageSize, @@ -345,8 +315,8 @@ func (s *nDCStateRebuilderSuite) TestRebuild() { Size: historySize2, }, nil).Once() - s.mockDomainCache.On("GetDomainByID", s.domainID).Return(cache.NewGlobalDomainCacheEntryForTest( - &persistence.DomainInfo{ID: s.domainID}, + s.mockDomainCache.On("GetDomainByID", targetDomainID).Return(cache.NewGlobalDomainCacheEntryForTest( + &persistence.DomainInfo{ID: targetDomainID, Name: targetDomainName}, &persistence.DomainConfig{}, &persistence.DomainReplicationConfig{ ActiveClusterName: cluster.TestCurrentClusterName, @@ -359,124 +329,26 @@ func (s *nDCStateRebuilderSuite) TestRebuild() { s.mockClusterMetadata, ), nil) - s.mockContext.On("clear").Once() - s.mockContext.On("setHistorySize", int64(historySize1+historySize2)).Once() s.mockEventsCache.On("putEvent", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return() - rebuildMutableState, err := s.nDCStateRebuilder.rebuild(ctx.Background(), 1, requestID) - s.NoError(err) - s.NotNil(rebuildMutableState) - s.Equal(versionHistories, rebuildMutableState.GetVersionHistories()) - s.Equal(1, versionHistories.GetCurrentVersionHistoryIndex()) - s.Equal(updateCondition, rebuildMutableState.GetUpdateCondition()) -} - -func (s *nDCStateRebuilderSuite) TestPrepareMutableState_NoRebuild() { - branchToken := []byte("some random branch token") - lastEventID := int64(2) - version := int64(12) - versionHistoryItem := persistence.NewVersionHistoryItem(lastEventID, version) - versionHistory := persistence.NewVersionHistory( + rebuildMutableState, rebuiltHistorySize, err := s.nDCStateRebuilder.rebuild( + ctx.Background(), + definition.NewWorkflowIdentifier(s.domainID, s.workflowID, s.runID), branchToken, - []*persistence.VersionHistoryItem{versionHistoryItem}, - ) - versionHistories := persistence.NewVersionHistories(versionHistory) - s.mockMutableState.On("GetVersionHistories").Return(versionHistories) - - rebuildMutableState, isRebuilt, err := s.nDCStateRebuilder.prepareMutableState(ctx.Background(), 0, version) - s.NoError(err) - s.False(isRebuilt) - s.Equal(s.mockMutableState, rebuildMutableState) -} - -func (s *nDCStateRebuilderSuite) TestPrepareMutableState_Rebuild() { - version := int64(12) - incomingVersion := version + 1 - - // current branch - branchToken0 := []byte("some random branch token") - lastEventID0 := int64(2) - - versionHistoryItem0 := persistence.NewVersionHistoryItem(lastEventID0, version) - versionHistory0 := persistence.NewVersionHistory( - branchToken0, - []*persistence.VersionHistoryItem{versionHistoryItem0}, - ) - - // stale branch, used for rebuild - branchToken1 := []byte("other random branch token") - lastEventID1 := lastEventID0 - 1 - versionHistoryItem1 := persistence.NewVersionHistoryItem(lastEventID1, version) - versionHistory1 := persistence.NewVersionHistory( - branchToken1, - []*persistence.VersionHistoryItem{versionHistoryItem1}, + nextEventID, + definition.NewWorkflowIdentifier(targetDomainID, targetWorkflowID, targetRunID), + requestID, ) - - versionHistories := persistence.NewVersionHistories(versionHistory0) - _, _, err := versionHistories.AddVersionHistory(versionHistory1) - s.Nil(err) - - updateCondition := int64(59) - s.mockContext.On("getDomainName").Return(s.domainName) - s.mockMutableState.On("GetUpdateCondition").Return(updateCondition) - s.mockMutableState.On("GetVersionHistories").Return(versionHistories) - s.mockMutableState.On("GetExecutionInfo").Return(&persistence.WorkflowExecutionInfo{ - DomainID: s.domainID, - WorkflowID: s.workflowID, - RunID: s.runID, - }) - - firstEventID := common.FirstEventID - nextEventID := lastEventID1 + 1 - events := []*shared.HistoryEvent{{ - EventId: common.Int64Ptr(1), - Version: common.Int64Ptr(version), - EventType: shared.EventTypeWorkflowExecutionStarted.Ptr(), - WorkflowExecutionStartedEventAttributes: &shared.WorkflowExecutionStartedEventAttributes{ - WorkflowType: &shared.WorkflowType{Name: common.StringPtr("some random workflow type")}, - TaskList: &shared.TaskList{Name: common.StringPtr("some random workflow type")}, - Input: []byte("some random input"), - ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(123), - TaskStartToCloseTimeoutSeconds: common.Int32Ptr(233), - Identity: common.StringPtr("some random identity"), - }, - }} - history := []*shared.History{{events}} - - historySize := 12345 - s.mockHistoryV2Mgr.On("ReadHistoryBranchByBatch", &persistence.ReadHistoryBranchRequest{ - BranchToken: branchToken1, - MinEventID: firstEventID, - MaxEventID: nextEventID, - PageSize: nDCDefaultPageSize, - NextPageToken: nil, - ShardID: common.IntPtr(s.mockShard.GetShardID()), - }).Return(&persistence.ReadHistoryBranchByBatchResponse{ - History: history, - NextPageToken: nil, - Size: historySize, - }, nil).Once() - - s.mockDomainCache.On("GetDomainByID", s.domainID).Return(cache.NewGlobalDomainCacheEntryForTest( - &persistence.DomainInfo{ID: s.domainID}, - &persistence.DomainConfig{}, - &persistence.DomainReplicationConfig{ - ActiveClusterName: cluster.TestCurrentClusterName, - Clusters: []*persistence.ClusterReplicationConfig{ - {ClusterName: cluster.TestCurrentClusterName}, - {ClusterName: cluster.TestAlternativeClusterName}, - }, - }, - 1234, - s.mockClusterMetadata, - ), nil) - - s.mockContext.On("clear").Once() - s.mockContext.On("setHistorySize", int64(historySize)).Once() - s.mockEventsCache.On("putEvent", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return() - rebuildMutableState, isRebuilt, err := s.nDCStateRebuilder.prepareMutableState(ctx.Background(), 1, incomingVersion) s.NoError(err) s.NotNil(rebuildMutableState) - s.True(isRebuilt) - s.Equal(versionHistories, rebuildMutableState.GetVersionHistories()) - s.Equal(updateCondition, rebuildMutableState.GetUpdateCondition()) + rebuildExecutionInfo := rebuildMutableState.GetExecutionInfo() + s.Equal(targetDomainID, rebuildExecutionInfo.DomainID) + s.Equal(targetWorkflowID, rebuildExecutionInfo.WorkflowID) + s.Equal(targetRunID, rebuildExecutionInfo.RunID) + s.Equal(int64(historySize1+historySize2), rebuiltHistorySize) + s.Equal(persistence.NewVersionHistories( + persistence.NewVersionHistory( + nil, + []*persistence.VersionHistoryItem{persistence.NewVersionHistoryItem(lastEventID, version)}, + ), + ), rebuildMutableState.GetVersionHistories()) } diff --git a/service/history/nDCTransactionMgr.go b/service/history/nDCTransactionMgr.go index f35f70e7574..b4a51b3a4e5 100644 --- a/service/history/nDCTransactionMgr.go +++ b/service/history/nDCTransactionMgr.go @@ -18,6 +18,8 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination nDCTransactionMgr_mock.go + package history import ( diff --git a/service/history/nDCTransactionMgrForExistingWorkflow.go b/service/history/nDCTransactionMgrForExistingWorkflow.go index 7eea1d0e4cc..8b8e3daa9b3 100644 --- a/service/history/nDCTransactionMgrForExistingWorkflow.go +++ b/service/history/nDCTransactionMgrForExistingWorkflow.go @@ -18,6 +18,8 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination nDCTransactionMgrForExistingWorkflow_mock.go + package history import ( diff --git a/service/history/nDCTransactionMgrForExistingWorkflow_mock.go b/service/history/nDCTransactionMgrForExistingWorkflow_mock.go index efd9146ba56..c182ef7d942 100644 --- a/service/history/nDCTransactionMgrForExistingWorkflow_mock.go +++ b/service/history/nDCTransactionMgrForExistingWorkflow_mock.go @@ -1,3 +1,5 @@ +// The MIT License (MIT) +// // Copyright (c) 2019 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy @@ -7,19 +9,20 @@ // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. +// // Code generated by MockGen. DO NOT EDIT. -// Source: ./service/history/nDCTransactionMgrForExistingWorkflow.go +// Source: nDCTransactionMgrForExistingWorkflow.go // Package history is a generated GoMock package. package history diff --git a/service/history/nDCTransactionMgrForExistingWorkflow_test.go b/service/history/nDCTransactionMgrForExistingWorkflow_test.go index e4090e3f959..f4a31b906b4 100644 --- a/service/history/nDCTransactionMgrForExistingWorkflow_test.go +++ b/service/history/nDCTransactionMgrForExistingWorkflow_test.go @@ -18,6 +18,8 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +// +build test + package history import ( diff --git a/service/history/nDCTransactionMgrForNewWorkflow.go b/service/history/nDCTransactionMgrForNewWorkflow.go index 1bf7712671d..7afb8186596 100644 --- a/service/history/nDCTransactionMgrForNewWorkflow.go +++ b/service/history/nDCTransactionMgrForNewWorkflow.go @@ -18,6 +18,8 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination nDCTransactionMgrForNewWorkflow_mock.go + package history import ( @@ -59,7 +61,6 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) dispatchForNewWorkflow( now time.Time, targetWorkflow nDCWorkflow, ) error { - // NOTE: this function does NOT mutate current workflow or target workflow, // workflow mutation is done in methods within executeTransaction function diff --git a/service/history/nDCTransactionMgrForNewWorkflow_mock.go b/service/history/nDCTransactionMgrForNewWorkflow_mock.go index 2a7cc8a1fbe..b9b8ff0d15e 100644 --- a/service/history/nDCTransactionMgrForNewWorkflow_mock.go +++ b/service/history/nDCTransactionMgrForNewWorkflow_mock.go @@ -1,3 +1,5 @@ +// The MIT License (MIT) +// // Copyright (c) 2019 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy @@ -7,19 +9,20 @@ // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. +// // Code generated by MockGen. DO NOT EDIT. -// Source: ./service/history/nDCTransactionMgrForNewWorkflow.go +// Source: nDCTransactionMgrForNewWorkflow.go // Package history is a generated GoMock package. package history diff --git a/service/history/nDCTransactionMgrForNewWorkflow_test.go b/service/history/nDCTransactionMgrForNewWorkflow_test.go index d939e1bfa6e..95b5c9a39f2 100644 --- a/service/history/nDCTransactionMgrForNewWorkflow_test.go +++ b/service/history/nDCTransactionMgrForNewWorkflow_test.go @@ -18,6 +18,8 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +// +build test + package history import ( diff --git a/service/history/nDCTransactionMgr_mock.go b/service/history/nDCTransactionMgr_mock.go index a235c4ec2a1..499385f4e77 100644 --- a/service/history/nDCTransactionMgr_mock.go +++ b/service/history/nDCTransactionMgr_mock.go @@ -1,3 +1,5 @@ +// The MIT License (MIT) +// // Copyright (c) 2019 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy @@ -7,19 +9,20 @@ // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. +// // Code generated by MockGen. DO NOT EDIT. -// Source: ./service/history/nDCTransactionMgr.go +// Source: nDCTransactionMgr.go // Package history is a generated GoMock package. package history diff --git a/service/history/nDCTransactionMgr_test.go b/service/history/nDCTransactionMgr_test.go index ed01cafe667..7116bd5986b 100644 --- a/service/history/nDCTransactionMgr_test.go +++ b/service/history/nDCTransactionMgr_test.go @@ -18,6 +18,8 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +// +build test + package history import ( diff --git a/service/history/nDCWorkflow.go b/service/history/nDCWorkflow.go index 81293ca19b7..b068aad7337 100644 --- a/service/history/nDCWorkflow.go +++ b/service/history/nDCWorkflow.go @@ -18,6 +18,8 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination nDCWorkflow_mock.go + package history import ( @@ -57,7 +59,6 @@ func newNDCWorkflow( ctx ctx.Context, domainCache cache.DomainCache, clusterMetadata cluster.Metadata, - context workflowExecutionContext, mutableState mutableState, releaseFn releaseWorkflowExecutionFunc, diff --git a/service/history/nDCWorkflowResetter.go b/service/history/nDCWorkflowResetter.go new file mode 100644 index 00000000000..0d6d30af403 --- /dev/null +++ b/service/history/nDCWorkflowResetter.go @@ -0,0 +1,186 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination nDCWorkflowResetter_mock.go + +package history + +import ( + ctx "context" + + "github.com/pborman/uuid" + + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/definition" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/common/persistence" +) + +type ( + nDCWorkflowResetterCompleteFn func() + + nDCWorkflowResetter interface { + resetWorkflow( + ctx ctx.Context, + baseEventID int64, + baseVersion int64, + ) (mutableState, nDCWorkflowResetterCompleteFn, error) + } + + nDCWorkflowResetterImpl struct { + shard ShardContext + transactionMgr nDCTransactionMgr + historyV2Mgr persistence.HistoryV2Manager + stateRebuilder nDCStateRebuilder + + domainID string + workflowID string + baseRunID string + newContext workflowExecutionContext + newRunID string + + logger log.Logger + } +) + +var _ nDCWorkflowResetter = (*nDCWorkflowResetterImpl)(nil) + +func newNDCWorkflowResetter( + shard ShardContext, + transactionMgr nDCTransactionMgr, + domainID string, + workflowID string, + baseRunID string, + newContext workflowExecutionContext, + newRunID string, + logger log.Logger, +) *nDCWorkflowResetterImpl { + + return &nDCWorkflowResetterImpl{ + shard: shard, + transactionMgr: transactionMgr, + historyV2Mgr: shard.GetHistoryV2Manager(), + stateRebuilder: newNDCStateRebuilder(shard, logger), + + domainID: domainID, + workflowID: workflowID, + baseRunID: baseRunID, + newContext: newContext, + newRunID: newRunID, + logger: logger, + } +} + +func (r *nDCWorkflowResetterImpl) resetWorkflow( + ctx ctx.Context, + baseEventID int64, + baseVersion int64, +) (mutableState, nDCWorkflowResetterCompleteFn, error) { + + baseWorkflow, err := r.transactionMgr.loadNDCWorkflow( + ctx, + r.domainID, + r.workflowID, + r.baseRunID, + ) + if err != nil { + return nil, nil, err + } + + baseVersionHistories := baseWorkflow.getMutableState().GetVersionHistories() + index, err := baseVersionHistories.FindFirstVersionHistoryIndexByItem( + persistence.NewVersionHistoryItem(baseEventID, baseVersion), + ) + if err != nil { + // TODO we should use a new retry error for 3+DC + return nil, nil, newRetryTaskErrorWithHint( + ErrRetryBufferEventsMsg, + r.domainID, + r.workflowID, + r.baseRunID, + baseWorkflow.getMutableState().GetNextEventID(), // especially here + ) + } + + baseVersionHistory, err := baseVersionHistories.GetVersionHistory(index) + if err != nil { + return nil, nil, err + } + baseBranchToken := baseVersionHistory.GetBranchToken() + + baseWorkflowIdentifier := definition.NewWorkflowIdentifier( + r.domainID, + r.workflowID, + r.baseRunID, + ) + resetWorkflowIdentifier := definition.NewWorkflowIdentifier( + r.domainID, + r.workflowID, + r.newRunID, + ) + + requestID := uuid.New() + rebuildMutableState, rebuiltHistorySize, err := r.stateRebuilder.rebuild( + ctx, + baseWorkflowIdentifier, + baseBranchToken, + baseEventID+1, + resetWorkflowIdentifier, + requestID, + ) + if err != nil { + return nil, nil, err + } + + // TODO after the rebuild, create branch and return a defer branch creation finish fn + + // fork a new history branch + shardID := r.shard.GetShardID() + resp, err := r.historyV2Mgr.ForkHistoryBranch(&persistence.ForkHistoryBranchRequest{ + ForkBranchToken: baseBranchToken, + ForkNodeID: baseEventID + 1, + Info: historyGarbageCleanupInfo(r.domainID, r.workflowID, r.newRunID), + ShardID: common.IntPtr(shardID), + }) + if err != nil { + return nil, nil, err + } + newBranchToken := resp.NewBranchToken + completeFn := func() { + if errComplete := r.historyV2Mgr.CompleteForkBranch(&persistence.CompleteForkBranchRequest{ + BranchToken: newBranchToken, + Success: true, // past lessons learnt from Cassandra & gocql tells that we cannot possibly find all timeout errors + ShardID: common.IntPtr(shardID), + }); errComplete != nil { + r.logger.WithTags( + tag.Error(errComplete), + ).Error("newNDCWorkflowResetter unable to complete creation of new branch.") + } + } + err = rebuildMutableState.SetCurrentBranchToken(newBranchToken) + if err != nil { + completeFn() + return nil, nil, err + } + + r.newContext.setHistorySize(rebuiltHistorySize) + return rebuildMutableState, completeFn, nil +} diff --git a/service/history/nDCWorkflowResetter_mock.go b/service/history/nDCWorkflowResetter_mock.go new file mode 100644 index 00000000000..b56a096f6cf --- /dev/null +++ b/service/history/nDCWorkflowResetter_mock.go @@ -0,0 +1,74 @@ +// The MIT License (MIT) +// +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. +// + +// Code generated by MockGen. DO NOT EDIT. +// Source: nDCWorkflowResetter.go + +// Package history is a generated GoMock package. +package history + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MocknDCWorkflowResetter is a mock of nDCWorkflowResetter interface +type MocknDCWorkflowResetter struct { + ctrl *gomock.Controller + recorder *MocknDCWorkflowResetterMockRecorder +} + +// MocknDCWorkflowResetterMockRecorder is the mock recorder for MocknDCWorkflowResetter +type MocknDCWorkflowResetterMockRecorder struct { + mock *MocknDCWorkflowResetter +} + +// NewMocknDCWorkflowResetter creates a new mock instance +func NewMocknDCWorkflowResetter(ctrl *gomock.Controller) *MocknDCWorkflowResetter { + mock := &MocknDCWorkflowResetter{ctrl: ctrl} + mock.recorder = &MocknDCWorkflowResetterMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MocknDCWorkflowResetter) EXPECT() *MocknDCWorkflowResetterMockRecorder { + return m.recorder +} + +// resetWorkflow mocks base method +func (m *MocknDCWorkflowResetter) resetWorkflow(ctx context.Context, baseEventID, baseVersion int64) (mutableState, nDCWorkflowResetterCompleteFn, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "resetWorkflow", ctx, baseEventID, baseVersion) + ret0, _ := ret[0].(mutableState) + ret1, _ := ret[1].(nDCWorkflowResetterCompleteFn) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// resetWorkflow indicates an expected call of resetWorkflow +func (mr *MocknDCWorkflowResetterMockRecorder) resetWorkflow(ctx, baseEventID, baseVersion interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "resetWorkflow", reflect.TypeOf((*MocknDCWorkflowResetter)(nil).resetWorkflow), ctx, baseEventID, baseVersion) +} diff --git a/service/history/nDCWorkflow_mock.go b/service/history/nDCWorkflow_mock.go index 9a7c846889c..1b080175a27 100644 --- a/service/history/nDCWorkflow_mock.go +++ b/service/history/nDCWorkflow_mock.go @@ -1,3 +1,5 @@ +// The MIT License (MIT) +// // Copyright (c) 2019 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy @@ -7,19 +9,20 @@ // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. +// // Code generated by MockGen. DO NOT EDIT. -// Source: ./service/history/nDCWorkflow.go +// Source: nDCWorkflow.go // Package history is a generated GoMock package. package history diff --git a/service/history/nDCWorkflow_test.go b/service/history/nDCWorkflow_test.go index efa7afc73b4..d491c1446a2 100644 --- a/service/history/nDCWorkflow_test.go +++ b/service/history/nDCWorkflow_test.go @@ -18,6 +18,8 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +// +build test + package history import ( diff --git a/service/history/workflowResetor.go b/service/history/workflowResetor.go index 771c48bbcf5..18274a2f266 100644 --- a/service/history/workflowResetor.go +++ b/service/history/workflowResetor.go @@ -358,7 +358,7 @@ func (w *workflowResetorImpl) buildNewMutableStateForReset( return } - retError = newMutableState.SetBranchToken(forkResp.NewBranchToken) + retError = newMutableState.SetCurrentBranchToken(forkResp.NewBranchToken) return } @@ -819,7 +819,7 @@ func (w *workflowResetorImpl) ApplyResetEvent( }) } }() - retError = newMsBuilder.SetBranchToken(forkResp.NewBranchToken) + retError = newMsBuilder.SetCurrentBranchToken(forkResp.NewBranchToken) if retError != nil { return retError }