Skip to content

Commit

Permalink
Add nDC support for workflow execution reset (cadence-workflow#2428)
Browse files Browse the repository at this point in the history
* New nDCWorkflowResetter handling workflow execution reset
* Add handling of workflow reset in nDCHistoryReplicator
* Add ContainsItem API to version history
* Rename nDCStateRebuilder to nDCConflictResolver
* Move common mutable state rebuild logic into new nDCStateRebuilder
* Add '+build test' for all NDC related test files
* Add //go:generate mockgen for automated mock file generation
  • Loading branch information
wxing1292 authored Aug 24, 2019
1 parent f5fbb20 commit adf16be
Show file tree
Hide file tree
Showing 30 changed files with 1,250 additions and 401 deletions.
47 changes: 40 additions & 7 deletions common/persistence/versionHistory.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,28 +235,47 @@ func (v *VersionHistory) AddOrUpdateItem(
return nil
}

// ContainsItem check whether given version history item is included
func (v *VersionHistory) ContainsItem(
item *VersionHistoryItem,
) bool {

prevEventID := common.FirstEventID - 1
for _, currentItem := range v.items {
if item.GetVersion() == currentItem.GetVersion() {
if prevEventID < item.GetEventID() && item.GetEventID() <= currentItem.GetEventID() {
return true
}
} else if item.GetVersion() < currentItem.GetVersion() {
return false
}
prevEventID = currentItem.GetEventID()
}
return false
}

// FindLCAItem returns the lowest common ancestor version history item
func (v *VersionHistory) FindLCAItem(
remote *VersionHistory,
) (*VersionHistoryItem, error) {

localIdx := len(v.items) - 1
remoteIdx := len(remote.items) - 1
localIndex := len(v.items) - 1
remoteIndex := len(remote.items) - 1

for localIdx >= 0 && remoteIdx >= 0 {
localVersionItem := v.items[localIdx]
remoteVersionItem := remote.items[remoteIdx]
for localIndex >= 0 && remoteIndex >= 0 {
localVersionItem := v.items[localIndex]
remoteVersionItem := remote.items[remoteIndex]

if localVersionItem.version == remoteVersionItem.version {
if localVersionItem.eventID > remoteVersionItem.eventID {
return remoteVersionItem.Duplicate(), nil
}
return localVersionItem.Duplicate(), nil
} else if localVersionItem.version > remoteVersionItem.version {
localIdx--
localIndex--
} else {
// localVersionItem.version < remoteVersionItem.version
remoteIdx--
remoteIndex--
}
}

Expand Down Expand Up @@ -489,6 +508,20 @@ func (h *VersionHistories) FindLCAVersionHistoryIndexAndItem(
return versionHistoryIndex, versionHistoryItem, nil
}

// FindFirstVersionHistoryIndexByItem find the first version history index which
// contains the given version history item
func (h *VersionHistories) FindFirstVersionHistoryIndexByItem(
item *VersionHistoryItem,
) (int, error) {

for index, localHistory := range h.histories {
if localHistory.ContainsItem(item) {
return index, nil
}
}
return 0, &shared.BadRequestError{Message: "version histories does not contains given item."}
}

// IsRebuilt returns true if the current branch index's last write version is not the largest
// among all branches' last write version
func (h *VersionHistories) IsRebuilt() (bool, error) {
Expand Down
72 changes: 67 additions & 5 deletions common/persistence/versionHistory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"testing"

"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"

"github.com/stretchr/testify/suite"
)
Expand Down Expand Up @@ -152,7 +153,7 @@ func (s *versionHistorySuite) TestSetBranchToken() {
s.NoError(err)
}

func (s *versionHistorySuite) TestUpdateItem_VersionIncrease() {
func (s *versionHistorySuite) TestAddOrUpdateItem_VersionIncrease() {
branchToken := []byte("some random branch token")
items := []*VersionHistoryItem{
{eventID: 3, version: 0},
Expand All @@ -178,7 +179,7 @@ func (s *versionHistorySuite) TestUpdateItem_VersionIncrease() {

}

func (s *versionHistorySuite) TestUpdateItem_EventIDIncrease() {
func (s *versionHistorySuite) TestAddOrUpdateItem_EventIDIncrease() {
branchToken := []byte("some random branch token")
items := []*VersionHistoryItem{
{eventID: 3, version: 0},
Expand All @@ -202,7 +203,7 @@ func (s *versionHistorySuite) TestUpdateItem_EventIDIncrease() {
), history)
}

func (s *versionHistorySuite) TestUpdateItem_Failed_LowerVersion() {
func (s *versionHistorySuite) TestAddOrUpdateItem_Failed_LowerVersion() {
branchToken := []byte("some random branch token")
items := []*VersionHistoryItem{
{eventID: 3, version: 0},
Expand All @@ -214,7 +215,7 @@ func (s *versionHistorySuite) TestUpdateItem_Failed_LowerVersion() {
s.Error(err)
}

func (s *versionHistorySuite) TestUpdateItem_Failed_SameVersion_EventIDNotIncreasing() {
func (s *versionHistorySuite) TestAddOrUpdateItem_Failed_SameVersion_EventIDNotIncreasing() {
branchToken := []byte("some random branch token")
items := []*VersionHistoryItem{
{eventID: 3, version: 0},
Expand All @@ -229,7 +230,7 @@ func (s *versionHistorySuite) TestUpdateItem_Failed_SameVersion_EventIDNotIncrea
s.Error(err)
}

func (s *versionHistorySuite) TestUpdateItem_Failed_VersionNoIncreasing() {
func (s *versionHistorySuite) TestAddOrUpdateItem_Failed_VersionNoIncreasing() {
branchToken := []byte("some random branch token")
items := []*VersionHistoryItem{
{eventID: 3, version: 0},
Expand All @@ -247,6 +248,38 @@ func (s *versionHistorySuite) TestUpdateItem_Failed_VersionNoIncreasing() {
s.Error(err)
}

func (s *versionHistoriesSuite) TestContainsItem_True() {
branchToken := []byte("some random branch token")
items := []*VersionHistoryItem{
{eventID: 3, version: 0},
{eventID: 6, version: 4},
}
history := NewVersionHistory(branchToken, items)

prevEventID := common.FirstEventID - 1
for _, item := range items {
for eventID := prevEventID + 1; eventID <= item.GetEventID(); eventID++ {
s.True(history.ContainsItem(NewVersionHistoryItem(eventID, item.GetVersion())))
}
prevEventID = item.GetEventID()
}
}

func (s *versionHistoriesSuite) TestContainsItem_False() {
branchToken := []byte("some random branch token")
items := []*VersionHistoryItem{
{eventID: 3, version: 0},
{eventID: 6, version: 4},
}
history := NewVersionHistory(branchToken, items)

s.False(history.ContainsItem(NewVersionHistoryItem(4, 0)))
s.False(history.ContainsItem(NewVersionHistoryItem(3, 1)))

s.False(history.ContainsItem(NewVersionHistoryItem(7, 4)))
s.False(history.ContainsItem(NewVersionHistoryItem(6, 5)))
}

func (s *versionHistorySuite) TestIsLCAAppendable_True() {
branchToken := []byte("some random branch token")
items := []*VersionHistoryItem{
Expand Down Expand Up @@ -551,6 +584,35 @@ func (s *versionHistoriesSuite) TestFindLCAVersionHistoryIndexAndItem_SameEventI
s.Equal(NewVersionHistoryItem(7, 6), item)
}

func (s *versionHistoriesSuite) TestFindFirstVersionHistoryIndexByItem() {
versionHistory1 := NewVersionHistory([]byte("branch token 1"), []*VersionHistoryItem{
{eventID: 3, version: 0},
{eventID: 5, version: 4},
{eventID: 7, version: 6},
})
versionHistory2 := NewVersionHistory([]byte("branch token 2"), []*VersionHistoryItem{
{eventID: 3, version: 0},
{eventID: 5, version: 4},
{eventID: 7, version: 6},
{eventID: 9, version: 10},
})

histories := NewVersionHistories(versionHistory1)
_, _, err := histories.AddVersionHistory(versionHistory2)
s.Nil(err)

index, err := histories.FindFirstVersionHistoryIndexByItem(NewVersionHistoryItem(8, 10))
s.NoError(err)
s.Equal(1, index)

index, err = histories.FindFirstVersionHistoryIndexByItem(NewVersionHistoryItem(4, 4))
s.NoError(err)
s.Equal(0, index)

index, err = histories.FindFirstVersionHistoryIndexByItem(NewVersionHistoryItem(41, 4))
s.Error(err)
}

func (s *versionHistoriesSuite) TestCurrentVersionHistoryIndexIsInReplay() {
versionHistory1 := NewVersionHistory([]byte("branch token 1"), []*VersionHistoryItem{
{eventID: 3, version: 0},
Expand Down
4 changes: 2 additions & 2 deletions service/history/MockMutableState.go
Original file line number Diff line number Diff line change
Expand Up @@ -2731,8 +2731,8 @@ func (_m *mockMutableState) ReplicateWorkflowExecutionTimedoutEvent(_a0 int64, _
return r0
}

// SetHistoryBuilder provides a mock function with given fields: _a0
func (_m *mockMutableState) SetBranchToken(_a0 []byte) error {
// SetCurrentBranchToken provides a mock function with given fields: _a0
func (_m *mockMutableState) SetCurrentBranchToken(_a0 []byte) error {
ret := _m.Called(_a0)

var r0 error
Expand Down
2 changes: 1 addition & 1 deletion service/history/mutableState.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ type (
ReplicateWorkflowExecutionStartedEvent(*cache.DomainCacheEntry, *string, workflow.WorkflowExecution, string, *workflow.HistoryEvent) error
ReplicateWorkflowExecutionTerminatedEvent(int64, *workflow.HistoryEvent) error
ReplicateWorkflowExecutionTimedoutEvent(int64, *workflow.HistoryEvent) error
SetBranchToken(branchToken []byte) error
SetCurrentBranchToken(branchToken []byte) error
SetHistoryBuilder(hBuilder *historyBuilder)
SetHistoryTree(treeID string) error
SetVersionHistories(*persistence.VersionHistories) error
Expand Down
4 changes: 2 additions & 2 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,10 +301,10 @@ func (e *mutableStateBuilder) SetHistoryTree(
if err != nil {
return err
}
return e.SetBranchToken(initialBranchToken)
return e.SetCurrentBranchToken(initialBranchToken)
}

func (e *mutableStateBuilder) SetBranchToken(
func (e *mutableStateBuilder) SetCurrentBranchToken(
branchToken []byte,
) error {

Expand Down
5 changes: 3 additions & 2 deletions service/history/nDCBranchMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination nDCBranchMgr_mock.go

package history

import (
Expand Down Expand Up @@ -56,7 +58,6 @@ var _ nDCBranchMgr = (*nDCBranchMgrImpl)(nil)

func newNDCBranchMgr(
shard ShardContext,

context workflowExecutionContext,
mutableState mutableState,
logger log.Logger,
Expand Down Expand Up @@ -139,7 +140,7 @@ func (r *nDCBranchMgrImpl) createNewBranch(
defer func() {
if errComplete := r.historyV2Mgr.CompleteForkBranch(&persistence.CompleteForkBranchRequest{
BranchToken: newBranchToken,
Success: retError == nil || persistence.IsTimeoutError(retError),
Success: true, // past lessons learnt from Cassandra & gocql tells that we cannot possibly find all timeout errors
ShardID: common.IntPtr(shardID),
}); errComplete != nil {
r.logger.WithTags(
Expand Down
13 changes: 8 additions & 5 deletions service/history/nDCBranchMgr_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions service/history/nDCBranchMgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

// +build test

package history

import (
Expand Down
Loading

0 comments on commit adf16be

Please sign in to comment.