Skip to content

Commit

Permalink
Wire NDC implementation end to end (cadence-workflow#2521)
Browse files Browse the repository at this point in the history
* Refactor replication queue, add handling of NDC replication task
* Fix issue that branch token should be set before task refreshment during mutable state rebuild
* Fix dead lock issue when performing NDC workflow reset
* Reorder task ID generation in ConflictResolveWorkflowExecution, ResetWorkflowExecution, making sure current workflow tasks are handled before new workflow tasks
* Add NDC handling for continue as new workflow in state builder, history engine, workflow resetter
* Fix cassandra schema
  • Loading branch information
wxing1292 authored Sep 9, 2019
1 parent 4d77673 commit d8a87a4
Show file tree
Hide file tree
Showing 45 changed files with 1,629 additions and 667 deletions.
110 changes: 56 additions & 54 deletions .gen/go/history/history.go

Large diffs are not rendered by default.

598 changes: 592 additions & 6 deletions .gen/go/replicator/replicator.go

Large diffs are not rendered by default.

17 changes: 15 additions & 2 deletions .gen/go/shared/shared.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*.iml
*.cov
*.html
.gobincache/
.tmp/
.vscode/
/vendor
Expand Down
7 changes: 3 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ COVER_ROOT := $(BUILD)/coverage
UNIT_COVER_FILE := $(COVER_ROOT)/unit_cover.out
INTEG_COVER_FILE := $(COVER_ROOT)/integ_$(PERSISTENCE_TYPE)$(EV2_TEST)_cover.out
INTEG_XDC_COVER_FILE := $(COVER_ROOT)/integ_xdc_$(PERSISTENCE_TYPE)_cover.out

INTEG_CASS_COVER_FILE := $(COVER_ROOT)/integ_cassandra_cover.out
INTEG_CASS_EV2_COVER_FILE := $(COVER_ROOT)/integ_cassandra_ev2_cover.out
INTEG_XDC_CASS_COVER_FILE := $(COVER_ROOT)/integ_xdc_cassandra_cover.out
Expand All @@ -97,9 +96,9 @@ INTEG_XDC_SQL_COVER_FILE := $(COVER_ROOT)/integ_xdc_sql_cover.out
GOCOVERPKG_ARG := -coverpkg="$(PROJECT_ROOT)/common/...,$(PROJECT_ROOT)/service/...,$(PROJECT_ROOT)/client/...,$(PROJECT_ROOT)/tools/..."

yarpc-install:
go mod vendor
go get './vendor/go.uber.org/thriftrw'
go get './vendor/go.uber.org/yarpc/encoding/thrift/thriftrw-plugin-yarpc'
GO111MODULE=off go get -u github.com/myitcv/gobin
gobin -mod=readonly go.uber.org/thriftrw
gobin -mod=readonly go.uber.org/yarpc/encoding/thrift/thriftrw-plugin-yarpc

clean_thrift:
rm -rf .gen
Expand Down
3 changes: 3 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,8 @@ const (
HistoryReplicationTaskScope
// HistoryMetadataReplicationTaskScope is the scope used by history metadata task replication processing
HistoryMetadataReplicationTaskScope
// HistoryReplicationTaskV2Scope is the scope used by history task replication processing
HistoryReplicationTaskV2Scope
// SyncShardTaskScope is the scope used by sync shrad information processing
SyncShardTaskScope
// SyncActivityTaskScope is the scope used by sync activity information processing
Expand Down Expand Up @@ -1269,6 +1271,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
DomainReplicationTaskScope: {operation: "DomainReplicationTask"},
HistoryReplicationTaskScope: {operation: "HistoryReplicationTask"},
HistoryMetadataReplicationTaskScope: {operation: "HistoryMetadataReplicationTask"},
HistoryReplicationTaskV2Scope: {operation: "HistoryReplicationTaskV2"},
SyncShardTaskScope: {operation: "SyncShardTask"},
SyncActivityTaskScope: {operation: "SyncActivityTask"},
ESProcessorScope: {operation: "ESProcessor"},
Expand Down
28 changes: 15 additions & 13 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,21 +359,23 @@ type (

// ReplicationTaskInfo describes the replication task created for replication of history events
ReplicationTaskInfo struct {
DomainID string
WorkflowID string
RunID string
TaskID int64
TaskType int
FirstEventID int64
NextEventID int64
Version int64
LastReplicationInfo map[string]*ReplicationInfo
ScheduledID int64
DomainID string
WorkflowID string
RunID string
TaskID int64
TaskType int
FirstEventID int64
NextEventID int64
Version int64
ScheduledID int64
BranchToken []byte
NewRunBranchToken []byte
ResetWorkflow bool

// TODO deprecate when NDC is fully released && migrated
EventStoreVersion int32
BranchToken []byte
NewRunEventStoreVersion int32
NewRunBranchToken []byte
ResetWorkflow bool
LastReplicationInfo map[string]*ReplicationInfo
}

// TimerTaskInfo describes a timer task.
Expand Down
36 changes: 36 additions & 0 deletions common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,3 +700,39 @@ func (d *DataBlob) GetEncoding() common.EncodingType {
return common.EncodingTypeUnknown
}
}

// ToThrift convert data blob to thrift representation
func (d *DataBlob) ToThrift() *workflow.DataBlob {
switch d.Encoding {
case common.EncodingTypeJSON:
return &workflow.DataBlob{
EncodingType: workflow.EncodingTypeJSON.Ptr(),
Data: d.Data,
}
case common.EncodingTypeThriftRW:
return &workflow.DataBlob{
EncodingType: workflow.EncodingTypeThriftRW.Ptr(),
Data: d.Data,
}
default:
panic(fmt.Sprintf("DataBlob seeing unsupported enconding type: %v", d.Encoding))
}
}

// NewDataBlobFromThrift convert data blob from thrift representation
func NewDataBlobFromThrift(blob *workflow.DataBlob) *DataBlob {
switch blob.GetEncodingType() {
case workflow.EncodingTypeJSON:
return &DataBlob{
Encoding: common.EncodingTypeJSON,
Data: blob.Data,
}
case workflow.EncodingTypeThriftRW:
return &DataBlob{
Encoding: common.EncodingTypeThriftRW,
Data: blob.Data,
}
default:
panic(fmt.Sprintf("NewDataBlobFromThrift seeing unsupported enconding type: %v", blob.GetEncodingType()))
}
}
190 changes: 90 additions & 100 deletions host/ndc/nDC_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package ndc

import (
"context"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -64,6 +63,7 @@ type (

domainName string
domainID string
version int64
}
)

Expand Down Expand Up @@ -113,125 +113,122 @@ func (s *nDCIntegrationTestSuite) SetupSuite() {

s.registerDomain()

s.generator = test.InitializeHistoryEventGenerator(s.domainName, 101)
s.version = 101
s.generator = test.InitializeHistoryEventGenerator(s.domainName, s.version)
}

func (s *nDCIntegrationTestSuite) SetupTest() {
// Have to define our overridden assertions in the test setup. If we did it earlier, s.T() will return nil
s.Assertions = require.New(s.T())
s.generator = test.InitializeHistoryEventGenerator(s.domainName, s.version)
}

func (s *nDCIntegrationTestSuite) TearDownSuite() {
if s.generator != nil {
s.generator.Reset()
}
s.active.TearDownCluster()
s.passive.TearDownCluster()
}

func (s *nDCIntegrationTestSuite) TestSingleBranch() {

workflowID := uuid.New()

workflowType := "event-generator-workflow-type"
tasklist := "event-generator-taskList"

historyClient := s.active.GetHistoryClient()
print := func(value interface{}) string {
bytes, _ := json.MarshalIndent(value, "", " ")
return string(bytes)
}
//versions := []int64{101, 1, 201}
//for _, version := range versions {
runID := uuid.New()
historyBatch := []*shared.History{}
s.generator.Reset()
fmt.Printf("##########\n")
for s.generator.HasNextVertex() {
events := s.generator.GetNextVertices()
history := &shared.History{}
for _, event := range events {
history.Events = append(history.Events, event.GetData().(*shared.HistoryEvent))

versions := []int64{101, 1, 201, 301}
for _, version := range versions {
runID := uuid.New()
historyBatch := []*shared.History{}
s.generator = test.InitializeHistoryEventGenerator(s.domainName, version)

for s.generator.HasNextVertex() {
events := s.generator.GetNextVertices()
history := &shared.History{}
for _, event := range events {
history.Events = append(history.Events, event.GetData().(*shared.HistoryEvent))
}
historyBatch = append(historyBatch, history)
}
historyBatch = append(historyBatch, history)
}

// TODO temporary code to generate version history
// we should generate version as part of modeled based testing
versionHistory := persistence.NewVersionHistory(nil, nil)
for _, batch := range historyBatch {
for _, event := range batch.Events {
fmt.Printf("++++++++++\n")
fmt.Printf("## SEEING:\n%v\n.", print(event))
fmt.Printf("++++++++++\n")
err := versionHistory.AddOrUpdateItem(
persistence.NewVersionHistoryItem(
event.GetEventId(),
event.GetVersion(),
))
s.NoError(err)
// TODO temporary code to generate version history
// we should generate version as part of modeled based testing
versionHistory := persistence.NewVersionHistory(nil, nil)
for _, batch := range historyBatch {
for _, event := range batch.Events {
err := versionHistory.AddOrUpdateItem(
persistence.NewVersionHistoryItem(
event.GetEventId(),
event.GetVersion(),
))
s.NoError(err)
}
}
}

for _, batch := range historyBatch {
for _, batch := range historyBatch {

// TODO temporary code to generate first event & version history
// we should generate these as part of modeled based testing
lastEvent := batch.Events[len(batch.Events)-1]
newRunEventBlob, newRunVersionHistory := s.generateNewRunHistory(
lastEvent, s.domainName, workflowID, runID, 101, workflowType, tasklist,
)
// TODO temporary code to generate next run first event
// we should generate these as part of modeled based testing
lastEvent := batch.Events[len(batch.Events)-1]
newRunEventBlob := s.generateNewRunHistory(
lastEvent, s.domainName, workflowID, runID, version, workflowType, tasklist,
)

// must serialize events batch after attempt on continue as new as generateNewRunHistory will
// modify the NewExecutionRunId attr
eventBlob, err := s.serializer.SerializeBatchEvents(batch.Events, common.EncodingTypeThriftRW)
s.NoError(err)
// must serialize events batch after attempt on continue as new as generateNewRunHistory will
// modify the NewExecutionRunId attr
eventBlob, err := s.serializer.SerializeBatchEvents(batch.Events, common.EncodingTypeThriftRW)
s.NoError(err)

err = historyClient.ReplicateEventsV2(s.createContext(), &history.ReplicateEventsV2Request{
DomainUUID: common.StringPtr(s.domainID),
WorkflowExecution: &shared.WorkflowExecution{
WorkflowId: common.StringPtr(workflowID),
RunId: common.StringPtr(runID),
},
VersionHistoryItems: s.toThriftVersionHistoryItems(versionHistory),
Events: s.toThriftDataBlob(eventBlob),
NewRunVersionHistoryItems: s.toThriftVersionHistoryItems(newRunVersionHistory),
NewRunEvents: s.toThriftDataBlob(newRunEventBlob),
ResetWorkflow: common.BoolPtr(false),
})
s.Nil(err, "Failed to replicate history event")
}
err = historyClient.ReplicateEventsV2(s.createContext(), &history.ReplicateEventsV2Request{
DomainUUID: common.StringPtr(s.domainID),
WorkflowExecution: &shared.WorkflowExecution{
WorkflowId: common.StringPtr(workflowID),
RunId: common.StringPtr(runID),
},
VersionHistoryItems: s.toThriftVersionHistoryItems(versionHistory),
Events: s.toThriftDataBlob(eventBlob),
NewRunEvents: s.toThriftDataBlob(newRunEventBlob),
ResetWorkflow: common.BoolPtr(false),
})
s.Nil(err, "Failed to replicate history event")
}

// get replicated history events from passive side
passiveClient := s.active.GetFrontendClient()
replicatedHistory, err := passiveClient.GetWorkflowExecutionHistory(
s.createContext(),
&shared.GetWorkflowExecutionHistoryRequest{
Domain: common.StringPtr(s.domainName),
Execution: &shared.WorkflowExecution{
WorkflowId: common.StringPtr(workflowID),
RunId: common.StringPtr(runID),
// get replicated history events from passive side
passiveClient := s.active.GetFrontendClient()
replicatedHistory, err := passiveClient.GetWorkflowExecutionHistory(
s.createContext(),
&shared.GetWorkflowExecutionHistoryRequest{
Domain: common.StringPtr(s.domainName),
Execution: &shared.WorkflowExecution{
WorkflowId: common.StringPtr(workflowID),
RunId: common.StringPtr(runID),
},
MaximumPageSize: common.Int32Ptr(1000),
NextPageToken: nil,
WaitForNewEvent: common.BoolPtr(false),
HistoryEventFilterType: shared.HistoryEventFilterTypeAllEvent.Ptr(),
},
MaximumPageSize: common.Int32Ptr(1000),
NextPageToken: nil,
WaitForNewEvent: common.BoolPtr(false),
HistoryEventFilterType: shared.HistoryEventFilterTypeAllEvent.Ptr(),
},
)
s.Nil(err, "Failed to get history event from passive side")

// compare origin events with replicated events
batchIndex := 0
batch := historyBatch[batchIndex].Events
eventIndex := 0
for _, event := range replicatedHistory.GetHistory().GetEvents() {
if eventIndex >= len(batch) {
batchIndex++
batch = historyBatch[batchIndex].Events
eventIndex = 0
)
s.Nil(err, "Failed to get history event from passive side")

// compare origin events with replicated events
batchIndex := 0
batch := historyBatch[batchIndex].Events
eventIndex := 0
for _, event := range replicatedHistory.GetHistory().GetEvents() {
if eventIndex >= len(batch) {
batchIndex++
batch = historyBatch[batchIndex].Events
eventIndex = 0
}
originEvent := batch[eventIndex]
eventIndex++
s.Equal(originEvent.GetEventType().String(), event.GetEventType().String(), "The replicated event and the origin event are not the same")
}
originEvent := batch[eventIndex]
eventIndex++
s.Equal(originEvent.GetEventType().String(), event.GetEventType().String(), "The replicated event and the origin event are not the same")
}
//}
}

func (s *nDCIntegrationTestSuite) registerDomain() {
Expand Down Expand Up @@ -267,13 +264,13 @@ func (s *nDCIntegrationTestSuite) generateNewRunHistory(
version int64,
workflowType string,
taskList string,
) (*persistence.DataBlob, *persistence.VersionHistory) {
) *persistence.DataBlob {

// TODO temporary code to generate first event & version history
// we should generate these as part of modeled based testing

if event.GetWorkflowExecutionContinuedAsNewEventAttributes() == nil {
return nil, nil
return nil
}

event.WorkflowExecutionContinuedAsNewEventAttributes.NewExecutionRunId = common.StringPtr(uuid.New())
Expand Down Expand Up @@ -311,14 +308,7 @@ func (s *nDCIntegrationTestSuite) generateNewRunHistory(
eventBlob, err := s.serializer.SerializeBatchEvents([]*shared.HistoryEvent{newRunFirstEvent}, common.EncodingTypeThriftRW)
s.NoError(err)

newRunVersionHistory := persistence.NewVersionHistory(nil, nil)
err = newRunVersionHistory.AddOrUpdateItem(persistence.NewVersionHistoryItem(
newRunFirstEvent.GetEventId(),
newRunFirstEvent.GetVersion(),
))
s.NoError(err)

return eventBlob, newRunVersionHistory
return eventBlob
}

func (s *nDCIntegrationTestSuite) toThriftDataBlob(
Expand Down
3 changes: 2 additions & 1 deletion idl/github.com/uber/cadence/history.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ struct ReplicateEventsRequest {
110: optional i32 eventStoreVersion
120: optional i32 newRunEventStoreVersion
130: optional bool resetWorkflow
140: optional bool newRunNDC
}

struct ReplicateRawEventsRequest {
Expand All @@ -304,7 +305,7 @@ struct ReplicateEventsV2Request {
20: optional shared.WorkflowExecution workflowExecution
30: optional list<shared.VersionHistoryItem> versionHistoryItems
40: optional shared.DataBlob events
50: optional list<shared.VersionHistoryItem> newRunVersionHistoryItems
// new run events does not need version history since there is no prior events
60: optional shared.DataBlob newRunEvents
70: optional bool resetWorkflow
}
Expand Down
Loading

0 comments on commit d8a87a4

Please sign in to comment.