diff --git a/common/testing/event_generator.go b/common/testing/event_generator.go index ce13a97a6e3..7d99aaca312 100644 --- a/common/testing/event_generator.go +++ b/common/testing/event_generator.go @@ -23,11 +23,11 @@ package testing import ( "math/rand" "strings" - "time" ) const ( emptyCandidateIndex = -1 + versionBumpGap = int64(100) ) var ( @@ -42,6 +42,8 @@ type ( name string isStrictOnNextVertex bool maxNextGeneration int + dataFunc func(...interface{}) interface{} + data interface{} } // HistoryEventModel is a graph represents relationships among history event types @@ -60,8 +62,10 @@ type ( exitVertices map[Vertex]bool randomEntryVertices []Vertex dice *rand.Rand + seed int64 canDoBatch func([]Vertex) bool resetPoints []ResetPoint + resetCount int64 } // ResetPoint is a mark in the generated event history that generator can be reset to @@ -90,6 +94,7 @@ type ( func NewEventGenerator( seed int64, ) Generator { + return &EventGenerator{ connections: make(map[Vertex][]Edge), previousVertices: make([]Vertex, 0), @@ -98,8 +103,10 @@ func NewEventGenerator( exitVertices: make(map[Vertex]bool), randomEntryVertices: make([]Vertex, 0), dice: rand.New(rand.NewSource(seed)), + seed: seed, canDoBatch: defaultBatchFunc, resetPoints: make([]ResetPoint, 0), + resetCount: 0, } } @@ -184,7 +191,6 @@ func (g *EventGenerator) Reset() { g.leafVertices = make([]Vertex, 0) g.previousVertices = make([]Vertex, 0) - g.dice = rand.New(rand.NewSource(time.Now().Unix())) } // ListResetPoint returns a list of available point to reset the event generator @@ -195,27 +201,45 @@ func (g *EventGenerator) ListResetPoint() []ResetPoint { } // RandomResetToResetPoint randomly pick a reset point and reset the event generator to the point -func (g *EventGenerator) RandomResetToResetPoint() int { +func (g *EventGenerator) RandomResetToResetPoint() Generator { // Random reset does not reset to index 0 nextIdx := g.dice.Intn(len(g.resetPoints)-1) + 1 - g.ResetToResetPoint(nextIdx) - return nextIdx + return g.ResetToResetPoint(nextIdx) } // ResetToResetPoint resets to the corresponding reset point based on the input reset point index func (g *EventGenerator) ResetToResetPoint( index int, -) { +) Generator { if index >= len(g.resetPoints) { panic("The reset point does not exist.") } toReset := g.resetPoints[index] - g.previousVertices = toReset.previousVertices - g.leafVertices = toReset.leafVertices - g.resetPoints = g.resetPoints[index:] - g.dice = rand.New(rand.NewSource(time.Now().Unix())) + previousVertices := make([]Vertex, len(toReset.previousVertices)) + copy(previousVertices, toReset.previousVertices) + leafVertices := make([]Vertex, len(toReset.leafVertices)) + copy(leafVertices, toReset.leafVertices) + entryVertices := make([]Vertex, len(g.entryVertices)) + copy(entryVertices, g.entryVertices) + randomEntryVertices := make([]Vertex, len(g.randomEntryVertices)) + copy(randomEntryVertices, g.randomEntryVertices) + resetPoints := make([]ResetPoint, len(g.resetPoints[index:])) + copy(resetPoints, g.resetPoints[index:]) + return &EventGenerator{ + connections: copyConnections(g.connections), + previousVertices: previousVertices, + leafVertices: leafVertices, + entryVertices: entryVertices, + exitVertices: copyExitVertices(g.exitVertices), + randomEntryVertices: randomEntryVertices, + dice: rand.New(rand.NewSource(g.seed)), + seed: g.seed, + canDoBatch: g.canDoBatch, + resetPoints: resetPoints, + resetCount: g.resetCount + 1, + } } // SetBatchGenerationRule sets a function to determine next generated batch of history events @@ -274,6 +298,7 @@ func (g *EventGenerator) getEntryVertex() Vertex { nextRange := len(g.entryVertices) nextIdx := g.dice.Intn(nextRange) vertex := g.entryVertices[nextIdx] + vertex.GenerateData(nil) return vertex } @@ -285,6 +310,13 @@ func (g *EventGenerator) getRandomVertex() Vertex { nextRange := len(g.randomEntryVertices) nextIdx := g.dice.Intn(nextRange) vertex := g.randomEntryVertices[nextIdx] + lastEvent := g.previousVertices[len(g.previousVertices)-1] + + versionBump := int64(0) + if g.shouldBumpVersion() { + versionBump = versionBumpGap + } + vertex.GenerateData(lastEvent.GetData(), int64(1), versionBump, g.resetCount) return vertex } @@ -335,10 +367,16 @@ func (g *EventGenerator) randomNextVertex( ) []Vertex { nextVertex := g.leafVertices[nextVertexIdx] - count := g.dice.Intn(nextVertex.GetMaxNextVertex()) + 1 + versionBump := int64(0) + if g.shouldBumpVersion() { + versionBump = versionBumpGap + } + + count := g.dice.Intn(nextVertex.GetMaxNextVertex()) + 2 res := make([]Vertex, 0) - for i := 0; i < count; i++ { + for i := 1; i < count; i++ { endVertex := g.pickRandomVertex(nextVertex) + endVertex.GenerateData(nextVertex.GetData(), int64(i), versionBump, g.resetCount) res = append(res, endVertex) if _, ok := g.exitVertices[endVertex]; ok { res = []Vertex{endVertex} @@ -366,6 +404,11 @@ func (g *EventGenerator) pickRandomVertex( return endVertex } +func (g *EventGenerator) shouldBumpVersion() bool { + // 1//1000 to bump the version + return g.dice.Intn(1000) == 500 +} + // NewHistoryEventEdge initials a new edge between two HistoryEventVertexes func NewHistoryEventEdge( start Vertex, @@ -495,6 +538,39 @@ func (he HistoryEventVertex) GetMaxNextVertex() int { return he.maxNextGeneration } +// SetDataFunc sets the data generation function +func (he *HistoryEventVertex) SetDataFunc( + dataFunc func(...interface{}) interface{}, +) { + + he.dataFunc = dataFunc +} + +// GetDataFunc returns the data generation function +func (he HistoryEventVertex) GetDataFunc() func(...interface{}) interface{} { + + return he.dataFunc +} + +// GenerateData generates the data and return +func (he *HistoryEventVertex) GenerateData( + input ...interface{}, +) interface{} { + + if he.dataFunc == nil { + return nil + } + + he.data = he.dataFunc(input...) + return he.data +} + +// GetData returns the vertex data +func (he HistoryEventVertex) GetData() interface{} { + + return he.data +} + // NewHistoryEventModel initials new history event model func NewHistoryEventModel() Model { diff --git a/common/testing/generator_interface.go b/common/testing/generator_interface.go index 3147a0dbfa6..8811b935e51 100644 --- a/common/testing/generator_interface.go +++ b/common/testing/generator_interface.go @@ -55,9 +55,9 @@ type ( // ListResetPoint lists all available reset points ListResetPoint() []ResetPoint // RandomResetToResetPoint randomly pick a reset point to reset and return the reset point index - RandomResetToResetPoint() int + RandomResetToResetPoint() Generator // ResetToResetPoint resets the generator to a reset point - ResetToResetPoint(int) + ResetToResetPoint(int) Generator // SetBatchGenerationRule sets a function that used in GetNextVertex to return batch result SetBatchGenerationRule(func([]Vertex) bool) } @@ -75,6 +75,12 @@ type ( // MaxNextVertex means the max neighbors can branch out from this vertex SetMaxNextVertex(int) GetMaxNextVertex() int + + // SetVertexDataFunc sets a function to generate end vertex data + SetDataFunc(func(...interface{}) interface{}) + GetDataFunc() func(...interface{}) interface{} + GenerateData(...interface{}) interface{} + GetData() interface{} } // Edge is the connection between two vertices diff --git a/common/testing/history_event_test.go b/common/testing/history_event_test.go index 89730657e5e..30b21bbda51 100644 --- a/common/testing/history_event_test.go +++ b/common/testing/history_event_test.go @@ -24,8 +24,8 @@ import ( "fmt" "testing" - "github.com/pborman/uuid" "github.com/stretchr/testify/suite" + "github.com/uber/cadence/.gen/go/shared" ) type ( @@ -40,7 +40,7 @@ func TestHistoryEventTestSuite(t *testing.T) { } func (s *historyEventTestSuit) SetupSuite() { - s.generator = InitializeHistoryEventGenerator() + s.generator = InitializeHistoryEventGenerator("domain") } func (s *historyEventTestSuit) SetupTest() { @@ -49,56 +49,22 @@ func (s *historyEventTestSuit) SetupTest() { // This is a sample about how to use the generator func (s *historyEventTestSuit) Test_HistoryEvent_Generator() { - - totalBranchNumber := 2 - currentBranch := totalBranchNumber - root := &NDCTestBranch{ - Batches: make([]NDCTestBatch, 0), - } - curr := root - // eventRanches := make([][]Vertex, 0, totalBranchNumber) - for currentBranch > 0 { - for s.generator.HasNextVertex() { - events := s.generator.GetNextVertices() - newBatch := NDCTestBatch{ - Events: events, - } - curr.Batches = append(curr.Batches, newBatch) - for _, e := range events { - fmt.Println(e.GetName()) - } - } - currentBranch-- - if currentBranch > 0 { - resetIdx := s.generator.RandomResetToResetPoint() - curr = root.Split(resetIdx) + for s.generator.HasNextVertex() { + events := s.generator.GetNextVertices() + for _, e := range events { + fmt.Println(e.GetName()) + fmt.Println(e.GetData().(*shared.HistoryEvent).GetEventId()) } } s.NotEmpty(s.generator.ListGeneratedVertices()) - queue := []*NDCTestBranch{root} - for len(queue) > 0 { - b := queue[0] - queue = queue[1:] - for _, batch := range b.Batches { - for _, event := range batch.Events { - fmt.Println(event.GetName()) - } + + newGenerator := s.generator.RandomResetToResetPoint() + for newGenerator.HasNextVertex() { + events := newGenerator.GetNextVertices() + for _, e := range events { + fmt.Println(e.GetName()) + fmt.Println(e.GetData().(*shared.HistoryEvent).GetEventId()) } - queue = append(queue, b.Next...) } - - // Generator one branch of history events - batches := []NDCTestBatch{} - batches = append(batches, root.Batches...) - batches = append(batches, root.Next[0].Batches...) - identity := "test-event-generator" - wid := uuid.New() - rid := uuid.New() - wt := "event-generator-workflow-type" - tl := "event-generator-taskList" - domain := "event-generator" - domainID := uuid.New() - attributeGenerator := NewHistoryAttributesGenerator(wid, rid, tl, wt, domain, domainID, identity) - history := attributeGenerator.GenerateHistoryEvents(batches, 1, 100) - s.NotEmpty(history) + s.NotEmpty(newGenerator.ListGeneratedVertices()) } diff --git a/common/testing/history_event_util.go b/common/testing/history_event_util.go index 240d275fdc6..b473c4af15f 100644 --- a/common/testing/history_event_util.go +++ b/common/testing/history_event_util.go @@ -21,690 +21,35 @@ package testing import ( + "time" + "github.com/pborman/uuid" "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" - - "reflect" - "time" ) const ( - timeout = int32(1) - cause = "NDC test" - signal = "NDC signal" - checksum = "NDC checksum" - childWorkflowPrefix = "child-" - externalWorkflowPrefix = "external-" - reason = "NDC reason" -) - -type ( - // HistoryAttributesGenerator is to generator history attribute in history events - HistoryAttributesGenerator interface { - GenerateHistoryEvents([]NDCTestBatch, int64, int64) []*shared.History - } - - // HistoryAttributesGeneratorImpl is to generator history attribute in history events - HistoryAttributesGeneratorImpl struct { - decisionTaskAttempts int64 - timerIDs map[string]bool - timerStartEventIDs map[int64]bool - childWorkflowInitialEventIDs map[int64]int64 - childWorkflowStartEventIDs map[int64]int64 - childWorkflowRunIDs map[int64]string - signalExternalWorkflowEventIDs map[int64]int64 - requestExternalWorkflowCanceledEventIDs map[int64]int64 - activityScheduleEventIDs map[int64]bool - activityStartEventIDs map[int64]int64 - activityCancelRequestEventIDs map[int64]int64 - decisionTaskScheduleEventID int64 - decisionTaskStartEventID int64 - decisionTaskCompleteEventID int64 - - identity string - workflowID string - runID string - taskList string - workflowType string - domainID string - domain string - } + timeout = int32(1) + cause = "NDC test" + signal = "NDC signal" + checksum = "NDC checksum" + childWorkflowPrefix = "child-" + reason = "NDC reason" + defaultVersion = 1 + workflowType = "test-workflow-type" + taskList = "taskList" + identity = "identity" + decisionTaskAttempts = 0 + childWorkflowID = "child-workflowID" + externalWorkflowID = "external-workflowID" ) -// NewHistoryAttributesGenerator is initial a generator -func NewHistoryAttributesGenerator( - workflowID, - runID, - taskList, - workflowType, - domainID, - domain, - identity string, -) HistoryAttributesGenerator { - - return &HistoryAttributesGeneratorImpl{ - decisionTaskAttempts: int64(0), - timerIDs: make(map[string]bool), - timerStartEventIDs: make(map[int64]bool), - childWorkflowInitialEventIDs: make(map[int64]int64), - childWorkflowStartEventIDs: make(map[int64]int64), - childWorkflowRunIDs: make(map[int64]string), - signalExternalWorkflowEventIDs: make(map[int64]int64), - requestExternalWorkflowCanceledEventIDs: make(map[int64]int64), - activityScheduleEventIDs: make(map[int64]bool), - activityStartEventIDs: make(map[int64]int64), - activityCancelRequestEventIDs: make(map[int64]int64), - decisionTaskScheduleEventID: -1, - decisionTaskStartEventID: -1, - decisionTaskCompleteEventID: -1, - identity: identity, - workflowID: workflowID, - runID: runID, - taskList: taskList, - workflowType: workflowType, - domainID: domainID, - domain: domain, - } -} - -// GenerateHistoryEvents is to generator batches of history events -func (h *HistoryAttributesGeneratorImpl) GenerateHistoryEvents( - batches []NDCTestBatch, - startEventID int64, - version int64, -) []*shared.History { - - history := make([]*shared.History, 0, len(batches)) - eventID := startEventID - - // TODO: Marker and EventTypeUpsertWorkflowSearchAttributes need to be added to the model and also to generate event attributes - for _, batch := range batches { - historyEvents := make([]*shared.HistoryEvent, 0) - for _, event := range batch.Events { - historyEvent := h.generateEventAttribute(event, eventID, version) - eventID++ - historyEvents = append(historyEvents, historyEvent) - } - nextHistoryBatch := &shared.History{ - Events: historyEvents, - } - history = append(history, nextHistoryBatch) - } - return history -} - -func (h *HistoryAttributesGeneratorImpl) generateEventAttribute(vertex Vertex, eventID, version int64) *shared.HistoryEvent { - historyEvent := getDefaultHistoryEvent(eventID, version) - switch vertex.GetName() { - case shared.EventTypeWorkflowExecutionStarted.String(): - historyEvent.EventType = shared.EventTypeWorkflowExecutionStarted.Ptr() - historyEvent.WorkflowExecutionStartedEventAttributes = &shared.WorkflowExecutionStartedEventAttributes{ - WorkflowType: &shared.WorkflowType{ - Name: common.StringPtr(h.workflowType), - }, - TaskList: &shared.TaskList{ - Name: common.StringPtr(h.taskList), - Kind: shared.TaskListKindNormal.Ptr(), - }, - ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(timeout), - TaskStartToCloseTimeoutSeconds: common.Int32Ptr(timeout), - Identity: common.StringPtr(h.identity), - FirstExecutionRunId: common.StringPtr(h.runID), - } - case shared.EventTypeWorkflowExecutionCompleted.String(): - historyEvent.EventType = shared.EventTypeWorkflowExecutionCompleted.Ptr() - historyEvent.WorkflowExecutionCompletedEventAttributes = &shared.WorkflowExecutionCompletedEventAttributes{ - DecisionTaskCompletedEventId: common.Int64Ptr(h.decisionTaskCompleteEventID), - } - case shared.EventTypeWorkflowExecutionFailed.String(): - historyEvent.EventType = shared.EventTypeWorkflowExecutionFailed.Ptr() - historyEvent.WorkflowExecutionFailedEventAttributes = &shared.WorkflowExecutionFailedEventAttributes{ - DecisionTaskCompletedEventId: common.Int64Ptr(h.decisionTaskCompleteEventID), - } - case shared.EventTypeWorkflowExecutionTerminated.String(): - historyEvent.EventType = shared.EventTypeWorkflowExecutionTerminated.Ptr() - historyEvent.WorkflowExecutionTerminatedEventAttributes = &shared.WorkflowExecutionTerminatedEventAttributes{ - Identity: common.StringPtr(h.identity), - } - case shared.EventTypeWorkflowExecutionTimedOut.String(): - historyEvent.EventType = shared.EventTypeWorkflowExecutionTimedOut.Ptr() - historyEvent.WorkflowExecutionTimedOutEventAttributes = &shared.WorkflowExecutionTimedOutEventAttributes{ - TimeoutType: shared.TimeoutTypeStartToClose.Ptr(), - } - case shared.EventTypeWorkflowExecutionCancelRequested.String(): - historyEvent.EventType = shared.EventTypeWorkflowExecutionCancelRequested.Ptr() - historyEvent.WorkflowExecutionCancelRequestedEventAttributes = &shared.WorkflowExecutionCancelRequestedEventAttributes{ - Cause: common.StringPtr(cause), - ExternalInitiatedEventId: common.Int64Ptr(10), - ExternalWorkflowExecution: &shared.WorkflowExecution{ - WorkflowId: common.StringPtr(h.workflowID), - RunId: common.StringPtr(h.runID), - }, - Identity: common.StringPtr(h.identity), - } - case shared.EventTypeWorkflowExecutionCanceled.String(): - historyEvent.EventType = shared.EventTypeWorkflowExecutionCanceled.Ptr() - historyEvent.WorkflowExecutionCanceledEventAttributes = &shared.WorkflowExecutionCanceledEventAttributes{ - DecisionTaskCompletedEventId: common.Int64Ptr(eventID - 1), - } - case shared.EventTypeWorkflowExecutionContinuedAsNew.String(): - historyEvent.EventType = shared.EventTypeWorkflowExecutionContinuedAsNew.Ptr() - historyEvent.WorkflowExecutionContinuedAsNewEventAttributes = &shared.WorkflowExecutionContinuedAsNewEventAttributes{ - NewExecutionRunId: common.StringPtr(h.runID), - WorkflowType: &shared.WorkflowType{ - Name: common.StringPtr(h.workflowType), - }, - TaskList: &shared.TaskList{ - Name: common.StringPtr(h.taskList), - Kind: shared.TaskListKindNormal.Ptr(), - }, - ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(timeout), - TaskStartToCloseTimeoutSeconds: common.Int32Ptr(timeout), - DecisionTaskCompletedEventId: common.Int64Ptr(eventID - 1), - Initiator: shared.ContinueAsNewInitiatorDecider.Ptr(), - } - case shared.EventTypeWorkflowExecutionSignaled.String(): - historyEvent.EventType = shared.EventTypeWorkflowExecutionSignaled.Ptr() - historyEvent.WorkflowExecutionSignaledEventAttributes = &shared.WorkflowExecutionSignaledEventAttributes{ - SignalName: common.StringPtr(signal), - Identity: common.StringPtr(h.identity), - } - case shared.EventTypeDecisionTaskScheduled.String(): - historyEvent.EventType = shared.EventTypeDecisionTaskScheduled.Ptr() - historyEvent.DecisionTaskScheduledEventAttributes = &shared.DecisionTaskScheduledEventAttributes{ - TaskList: &shared.TaskList{ - Name: common.StringPtr(h.taskList), - Kind: shared.TaskListKindNormal.Ptr(), - }, - StartToCloseTimeoutSeconds: common.Int32Ptr(timeout), - Attempt: common.Int64Ptr(h.decisionTaskAttempts), - } - h.decisionTaskScheduleEventID = eventID - case shared.EventTypeDecisionTaskStarted.String(): - historyEvent.EventType = shared.EventTypeDecisionTaskStarted.Ptr() - historyEvent.DecisionTaskStartedEventAttributes = &shared.DecisionTaskStartedEventAttributes{ - ScheduledEventId: common.Int64Ptr(h.decisionTaskScheduleEventID), - Identity: common.StringPtr(h.identity), - RequestId: common.StringPtr(uuid.New()), - } - h.decisionTaskStartEventID = eventID - case shared.EventTypeDecisionTaskTimedOut.String(): - historyEvent.EventType = shared.EventTypeDecisionTaskTimedOut.Ptr() - historyEvent.DecisionTaskTimedOutEventAttributes = &shared.DecisionTaskTimedOutEventAttributes{ - ScheduledEventId: common.Int64Ptr(h.decisionTaskScheduleEventID), - StartedEventId: common.Int64Ptr(h.decisionTaskStartEventID), - TimeoutType: shared.TimeoutTypeScheduleToStart.Ptr(), - } - h.decisionTaskAttempts++ - case shared.EventTypeDecisionTaskFailed.String(): - historyEvent.EventType = shared.EventTypeDecisionTaskFailed.Ptr() - historyEvent.DecisionTaskFailedEventAttributes = &shared.DecisionTaskFailedEventAttributes{ - ScheduledEventId: common.Int64Ptr(h.decisionTaskScheduleEventID), - StartedEventId: common.Int64Ptr(h.decisionTaskStartEventID), - Cause: common.DecisionTaskFailedCausePtr(shared.DecisionTaskFailedCauseUnhandledDecision), - Identity: common.StringPtr(h.identity), - BaseRunId: common.StringPtr(h.runID), - } - h.decisionTaskAttempts++ - case shared.EventTypeDecisionTaskCompleted.String(): - historyEvent.EventType = shared.EventTypeDecisionTaskCompleted.Ptr() - h.decisionTaskAttempts = 0 - historyEvent.DecisionTaskCompletedEventAttributes = &shared.DecisionTaskCompletedEventAttributes{ - ScheduledEventId: common.Int64Ptr(h.decisionTaskScheduleEventID), - StartedEventId: common.Int64Ptr(h.decisionTaskStartEventID), - Identity: common.StringPtr(h.identity), - BinaryChecksum: common.StringPtr(checksum), - } - h.decisionTaskCompleteEventID = eventID - case shared.EventTypeActivityTaskScheduled.String(): - historyEvent.EventType = shared.EventTypeActivityTaskScheduled.Ptr() - historyEvent.ActivityTaskScheduledEventAttributes = &shared.ActivityTaskScheduledEventAttributes{ - ActivityId: common.StringPtr(uuid.New()), - ActivityType: common.ActivityTypePtr(shared.ActivityType{Name: common.StringPtr("activity")}), - Domain: common.StringPtr(h.domain), - TaskList: common.TaskListPtr(shared.TaskList{Name: common.StringPtr(h.taskList), Kind: common.TaskListKindPtr(shared.TaskListKindNormal)}), - ScheduleToCloseTimeoutSeconds: common.Int32Ptr(timeout), - ScheduleToStartTimeoutSeconds: common.Int32Ptr(timeout), - StartToCloseTimeoutSeconds: common.Int32Ptr(timeout), - DecisionTaskCompletedEventId: common.Int64Ptr(h.decisionTaskCompleteEventID), - } - h.activityScheduleEventIDs[eventID] = true - case shared.EventTypeActivityTaskStarted.String(): - if len(h.activityScheduleEventIDs) == 0 { - panic("No activity scheduled") - } - activityScheduleEventID := reflect.ValueOf(h.activityScheduleEventIDs).MapKeys()[0].Int() - delete(h.activityScheduleEventIDs, activityScheduleEventID) - - historyEvent.EventType = shared.EventTypeActivityTaskStarted.Ptr() - historyEvent.ActivityTaskStartedEventAttributes = &shared.ActivityTaskStartedEventAttributes{ - ScheduledEventId: common.Int64Ptr(activityScheduleEventID), - Identity: common.StringPtr(h.identity), - RequestId: common.StringPtr(uuid.New()), - Attempt: common.Int32Ptr(0), - } - h.activityStartEventIDs[eventID] = activityScheduleEventID - case shared.EventTypeActivityTaskCompleted.String(): - if len(h.activityStartEventIDs) == 0 { - panic("No activity started before complete") - } - activityStartEventID := reflect.ValueOf(h.activityStartEventIDs).MapKeys()[0].Int() - activityScheduleEventID := h.activityStartEventIDs[activityStartEventID] - delete(h.activityStartEventIDs, activityStartEventID) - - historyEvent.EventType = shared.EventTypeActivityTaskCompleted.Ptr() - historyEvent.ActivityTaskCompletedEventAttributes = &shared.ActivityTaskCompletedEventAttributes{ - ScheduledEventId: common.Int64Ptr(activityScheduleEventID), - StartedEventId: common.Int64Ptr(activityStartEventID), - Identity: common.StringPtr(h.identity), - } - case shared.EventTypeActivityTaskTimedOut.String(): - if len(h.activityStartEventIDs) == 0 { - panic("No activity started before timeout") - } - activityStartEventID := reflect.ValueOf(h.activityStartEventIDs).MapKeys()[0].Int() - activityScheduleEventID := h.activityStartEventIDs[activityStartEventID] - delete(h.activityStartEventIDs, activityStartEventID) - - historyEvent.EventType = shared.EventTypeActivityTaskTimedOut.Ptr() - historyEvent.ActivityTaskTimedOutEventAttributes = &shared.ActivityTaskTimedOutEventAttributes{ - ScheduledEventId: common.Int64Ptr(activityScheduleEventID), - StartedEventId: common.Int64Ptr(activityStartEventID), - TimeoutType: common.TimeoutTypePtr(shared.TimeoutTypeScheduleToClose), - } - case shared.EventTypeActivityTaskFailed.String(): - if len(h.activityStartEventIDs) == 0 { - panic("No activity started before fail") - } - activityStartEventID := reflect.ValueOf(h.activityStartEventIDs).MapKeys()[0].Int() - activityScheduleEventID := h.activityStartEventIDs[activityStartEventID] - delete(h.activityStartEventIDs, activityStartEventID) - - historyEvent.EventType = shared.EventTypeActivityTaskFailed.Ptr() - historyEvent.ActivityTaskFailedEventAttributes = &shared.ActivityTaskFailedEventAttributes{ - ScheduledEventId: common.Int64Ptr(activityScheduleEventID), - StartedEventId: common.Int64Ptr(activityStartEventID), - Identity: common.StringPtr(h.identity), - Reason: common.StringPtr(reason), - } - case shared.EventTypeActivityTaskCancelRequested.String(): - historyEvent.EventType = shared.EventTypeActivityTaskCancelRequested.Ptr() - historyEvent.ActivityTaskCancelRequestedEventAttributes = &shared.ActivityTaskCancelRequestedEventAttributes{ - DecisionTaskCompletedEventId: common.Int64Ptr(h.decisionTaskCompleteEventID), - ActivityId: common.StringPtr(uuid.New()), - } - h.activityCancelRequestEventIDs[eventID] = h.decisionTaskCompleteEventID - case shared.EventTypeActivityTaskCanceled.String(): - if len(h.activityStartEventIDs) == 0 { - panic("No activity started before canceled") - } - activityStartEventID := reflect.ValueOf(h.activityStartEventIDs).MapKeys()[0].Int() - activityScheduleEventID := h.activityStartEventIDs[activityStartEventID] - delete(h.activityStartEventIDs, activityStartEventID) - delete(h.activityScheduleEventIDs, activityScheduleEventID) - if len(h.activityCancelRequestEventIDs) == 0 { - panic("No activity cancel requested before canceled") - } - activityCancelRequestEventID := reflect.ValueOf(h.activityCancelRequestEventIDs).MapKeys()[0].Int() - delete(h.activityCancelRequestEventIDs, activityCancelRequestEventID) - - historyEvent.EventType = shared.EventTypeActivityTaskCanceled.Ptr() - historyEvent.ActivityTaskCanceledEventAttributes = &shared.ActivityTaskCanceledEventAttributes{ - LatestCancelRequestedEventId: common.Int64Ptr(activityCancelRequestEventID), - ScheduledEventId: common.Int64Ptr(activityScheduleEventID), - StartedEventId: common.Int64Ptr(activityStartEventID), - Identity: common.StringPtr(h.identity), - } - case shared.EventTypeRequestCancelActivityTaskFailed.String(): - if len(h.activityCancelRequestEventIDs) == 0 { - panic("No activity cancel requested before failed") - } - activityCancelRequestEventID := reflect.ValueOf(h.activityCancelRequestEventIDs).MapKeys()[0].Int() - completeEventID := h.activityCancelRequestEventIDs[activityCancelRequestEventID] - delete(h.activityCancelRequestEventIDs, activityCancelRequestEventID) - - historyEvent.EventType = shared.EventTypeRequestCancelActivityTaskFailed.Ptr() - historyEvent.RequestCancelActivityTaskFailedEventAttributes = &shared.RequestCancelActivityTaskFailedEventAttributes{ - ActivityId: common.StringPtr(uuid.New()), - DecisionTaskCompletedEventId: common.Int64Ptr(completeEventID), - } - case shared.EventTypeTimerStarted.String(): - historyEvent.EventType = shared.EventTypeTimerStarted.Ptr() - timerID := uuid.New() - historyEvent.TimerStartedEventAttributes = &shared.TimerStartedEventAttributes{ - TimerId: common.StringPtr(timerID), - StartToFireTimeoutSeconds: common.Int64Ptr(10), - DecisionTaskCompletedEventId: common.Int64Ptr(h.decisionTaskCompleteEventID), - } - h.timerStartEventIDs[eventID] = true - h.timerIDs[timerID] = true - case shared.EventTypeTimerFired.String(): - historyEvent.EventType = shared.EventTypeTimerFired.Ptr() - if len(h.timerIDs) == 0 { - panic("Timer fired before timer started") - } - timerID := reflect.ValueOf(h.timerIDs).MapKeys()[0].String() - delete(h.timerIDs, timerID) - - if len(h.timerStartEventIDs) == 0 { - panic("Timer fired before timer started") - } - timerStartEventID := reflect.ValueOf(h.timerStartEventIDs).MapKeys()[0].Int() - delete(h.timerStartEventIDs, timerStartEventID) - - historyEvent.TimerFiredEventAttributes = &shared.TimerFiredEventAttributes{ - TimerId: common.StringPtr(timerID), - StartedEventId: common.Int64Ptr(timerStartEventID), - } - case shared.EventTypeTimerCanceled.String(): - historyEvent.EventType = shared.EventTypeTimerCanceled.Ptr() - if len(h.timerIDs) == 0 { - panic("Timer fired before timer started") - } - timerID := reflect.ValueOf(h.timerIDs).MapKeys()[0].String() - delete(h.timerIDs, timerID) - if len(h.timerStartEventIDs) == 0 { - panic("Timer fired before timer started") - } - timerStartEventID := reflect.ValueOf(h.timerStartEventIDs).MapKeys()[0].Int() - delete(h.timerStartEventIDs, timerStartEventID) - - historyEvent.TimerCanceledEventAttributes = &shared.TimerCanceledEventAttributes{ - TimerId: common.StringPtr(timerID), - StartedEventId: common.Int64Ptr(timerStartEventID), - DecisionTaskCompletedEventId: common.Int64Ptr(h.decisionTaskCompleteEventID), - Identity: common.StringPtr(h.identity), - } - case shared.EventTypeStartChildWorkflowExecutionInitiated.String(): - historyEvent.EventType = shared.EventTypeStartChildWorkflowExecutionInitiated.Ptr() - historyEvent.StartChildWorkflowExecutionInitiatedEventAttributes = &shared.StartChildWorkflowExecutionInitiatedEventAttributes{ - Domain: common.StringPtr(h.domain), - WorkflowId: common.StringPtr(childWorkflowPrefix + h.workflowID), - WorkflowType: common.WorkflowTypePtr(shared.WorkflowType{Name: common.StringPtr(childWorkflowPrefix + h.workflowType)}), - TaskList: common.TaskListPtr(shared.TaskList{Name: common.StringPtr(h.taskList), Kind: common.TaskListKindPtr(shared.TaskListKindNormal)}), - ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(timeout), - TaskStartToCloseTimeoutSeconds: common.Int32Ptr(timeout), - DecisionTaskCompletedEventId: common.Int64Ptr(h.decisionTaskCompleteEventID), - WorkflowIdReusePolicy: shared.WorkflowIdReusePolicyRejectDuplicate.Ptr(), - } - h.childWorkflowInitialEventIDs[eventID] = h.decisionTaskCompleteEventID - case shared.EventTypeStartChildWorkflowExecutionFailed.String(): - if len(h.childWorkflowInitialEventIDs) == 0 { - panic("Child workflow did not initial before failed") - } - childWorkflowInitialEventID := reflect.ValueOf(h.childWorkflowInitialEventIDs).MapKeys()[0].Int() - completeEventID := h.childWorkflowInitialEventIDs[childWorkflowInitialEventID] - delete(h.childWorkflowInitialEventIDs, childWorkflowInitialEventID) - - historyEvent.EventType = shared.EventTypeStartChildWorkflowExecutionFailed.Ptr() - historyEvent.StartChildWorkflowExecutionFailedEventAttributes = &shared.StartChildWorkflowExecutionFailedEventAttributes{ - Domain: common.StringPtr(h.domain), - WorkflowId: common.StringPtr(childWorkflowPrefix + h.workflowID), - WorkflowType: common.WorkflowTypePtr(shared.WorkflowType{Name: common.StringPtr(childWorkflowPrefix + h.workflowType)}), - Cause: shared.ChildWorkflowExecutionFailedCauseWorkflowAlreadyRunning.Ptr(), - InitiatedEventId: common.Int64Ptr(childWorkflowInitialEventID), - DecisionTaskCompletedEventId: common.Int64Ptr(completeEventID), - } - case shared.EventTypeChildWorkflowExecutionStarted.String(): - if len(h.childWorkflowInitialEventIDs) == 0 { - panic("Child workflow did not initial before start") - } - childWorkflowInitialEventID := reflect.ValueOf(h.childWorkflowInitialEventIDs).MapKeys()[0].Int() - delete(h.childWorkflowInitialEventIDs, childWorkflowInitialEventID) - - runID := uuid.New() - historyEvent.EventType = shared.EventTypeChildWorkflowExecutionStarted.Ptr() - historyEvent.ChildWorkflowExecutionStartedEventAttributes = &shared.ChildWorkflowExecutionStartedEventAttributes{ - Domain: common.StringPtr(h.domain), - WorkflowType: common.WorkflowTypePtr(shared.WorkflowType{Name: common.StringPtr(childWorkflowPrefix + h.workflowType)}), - InitiatedEventId: common.Int64Ptr(childWorkflowInitialEventID), - WorkflowExecution: &shared.WorkflowExecution{ - WorkflowId: common.StringPtr(childWorkflowPrefix + h.workflowID), - RunId: common.StringPtr(runID), - }, - } - h.childWorkflowStartEventIDs[eventID] = childWorkflowInitialEventID - h.childWorkflowRunIDs[eventID] = runID - case shared.EventTypeChildWorkflowExecutionCompleted.String(): - if len(h.childWorkflowStartEventIDs) == 0 { - panic("Child workflow did not start before complete") - } - childWorkflowStartEventID := reflect.ValueOf(h.childWorkflowStartEventIDs).MapKeys()[0].Int() - childWorkflowInitialEventID := h.childWorkflowStartEventIDs[childWorkflowStartEventID] - delete(h.childWorkflowStartEventIDs, childWorkflowStartEventID) - - runID := h.childWorkflowRunIDs[childWorkflowStartEventID] - if runID == "" { - panic("child run id is not set with the start event") - } - delete(h.childWorkflowRunIDs, childWorkflowStartEventID) - - historyEvent.EventType = shared.EventTypeChildWorkflowExecutionCompleted.Ptr() - historyEvent.ChildWorkflowExecutionCompletedEventAttributes = &shared.ChildWorkflowExecutionCompletedEventAttributes{ - Domain: common.StringPtr(h.domain), - WorkflowType: common.WorkflowTypePtr(shared.WorkflowType{Name: common.StringPtr(childWorkflowPrefix + h.workflowType)}), - InitiatedEventId: common.Int64Ptr(childWorkflowInitialEventID), - WorkflowExecution: &shared.WorkflowExecution{ - WorkflowId: common.StringPtr(childWorkflowPrefix + h.workflowID), - RunId: common.StringPtr(runID), - }, - StartedEventId: common.Int64Ptr(childWorkflowStartEventID), - } - case shared.EventTypeChildWorkflowExecutionTimedOut.String(): - if len(h.childWorkflowStartEventIDs) == 0 { - panic("Child workflow did not start before timeout") - } - childWorkflowStartEventID := reflect.ValueOf(h.childWorkflowStartEventIDs).MapKeys()[0].Int() - childWorkflowInitialEventID := h.childWorkflowStartEventIDs[childWorkflowStartEventID] - delete(h.childWorkflowStartEventIDs, childWorkflowStartEventID) - - runID := h.childWorkflowRunIDs[childWorkflowStartEventID] - if runID == "" { - panic("child run id is not set with the start event") - } - delete(h.childWorkflowRunIDs, childWorkflowStartEventID) - - historyEvent.EventType = shared.EventTypeChildWorkflowExecutionTimedOut.Ptr() - historyEvent.ChildWorkflowExecutionTimedOutEventAttributes = &shared.ChildWorkflowExecutionTimedOutEventAttributes{ - Domain: common.StringPtr(h.domain), - WorkflowType: common.WorkflowTypePtr(shared.WorkflowType{Name: common.StringPtr(childWorkflowPrefix + h.workflowType)}), - InitiatedEventId: common.Int64Ptr(childWorkflowInitialEventID), - WorkflowExecution: &shared.WorkflowExecution{ - WorkflowId: common.StringPtr(childWorkflowPrefix + h.workflowID), - RunId: common.StringPtr(runID), - }, - StartedEventId: common.Int64Ptr(childWorkflowStartEventID), - TimeoutType: common.TimeoutTypePtr(shared.TimeoutTypeScheduleToClose), - } - case shared.EventTypeChildWorkflowExecutionTerminated.String(): - - if len(h.childWorkflowStartEventIDs) == 0 { - panic("Child workflow did not start before terminate ") - } - childWorkflowStartEventID := reflect.ValueOf(h.childWorkflowStartEventIDs).MapKeys()[0].Int() - childWorkflowInitialEventID := h.childWorkflowStartEventIDs[childWorkflowStartEventID] - delete(h.childWorkflowStartEventIDs, childWorkflowStartEventID) - - runID := h.childWorkflowRunIDs[childWorkflowStartEventID] - if runID == "" { - panic("child run id is not set with the start event") - } - delete(h.childWorkflowRunIDs, childWorkflowStartEventID) - - historyEvent.EventType = shared.EventTypeChildWorkflowExecutionTerminated.Ptr() - historyEvent.ChildWorkflowExecutionTerminatedEventAttributes = &shared.ChildWorkflowExecutionTerminatedEventAttributes{ - Domain: common.StringPtr(h.domain), - WorkflowType: common.WorkflowTypePtr(shared.WorkflowType{Name: common.StringPtr(childWorkflowPrefix + h.workflowType)}), - InitiatedEventId: common.Int64Ptr(childWorkflowInitialEventID), - WorkflowExecution: &shared.WorkflowExecution{ - WorkflowId: common.StringPtr(childWorkflowPrefix + h.workflowID), - RunId: common.StringPtr(runID), - }, - StartedEventId: common.Int64Ptr(childWorkflowStartEventID), - } - case shared.EventTypeChildWorkflowExecutionFailed.String(): - if len(h.childWorkflowStartEventIDs) == 0 { - panic("Child workflow did not start before fail") - } - childWorkflowStartEventID := reflect.ValueOf(h.childWorkflowStartEventIDs).MapKeys()[0].Int() - childWorkflowInitialEventID := h.childWorkflowStartEventIDs[childWorkflowStartEventID] - delete(h.childWorkflowStartEventIDs, childWorkflowStartEventID) - - runID := h.childWorkflowRunIDs[childWorkflowStartEventID] - if runID == "" { - panic("child run id is not set with the start event") - } - delete(h.childWorkflowRunIDs, childWorkflowStartEventID) - - historyEvent.EventType = shared.EventTypeChildWorkflowExecutionFailed.Ptr() - historyEvent.ChildWorkflowExecutionFailedEventAttributes = &shared.ChildWorkflowExecutionFailedEventAttributes{ - Domain: common.StringPtr(h.domain), - WorkflowType: common.WorkflowTypePtr(shared.WorkflowType{Name: common.StringPtr(childWorkflowPrefix + h.workflowType)}), - InitiatedEventId: common.Int64Ptr(childWorkflowInitialEventID), - WorkflowExecution: &shared.WorkflowExecution{ - WorkflowId: common.StringPtr(childWorkflowPrefix + h.workflowID), - RunId: common.StringPtr(runID), - }, - StartedEventId: common.Int64Ptr(childWorkflowStartEventID), - } - case shared.EventTypeChildWorkflowExecutionCanceled.String(): - if len(h.childWorkflowStartEventIDs) == 0 { - panic("Child workflow did not start before cancel") - } - childWorkflowStartEventID := reflect.ValueOf(h.childWorkflowStartEventIDs).MapKeys()[0].Int() - childWorkflowInitialEventID := h.childWorkflowStartEventIDs[childWorkflowStartEventID] - delete(h.childWorkflowStartEventIDs, childWorkflowStartEventID) - - runID := h.childWorkflowRunIDs[childWorkflowStartEventID] - if runID == "" { - panic("child run id is not set with the start event") - } - delete(h.childWorkflowRunIDs, childWorkflowStartEventID) - - historyEvent.EventType = shared.EventTypeChildWorkflowExecutionCanceled.Ptr() - historyEvent.ChildWorkflowExecutionCanceledEventAttributes = &shared.ChildWorkflowExecutionCanceledEventAttributes{ - Domain: common.StringPtr(h.domain), - WorkflowType: common.WorkflowTypePtr(shared.WorkflowType{Name: common.StringPtr(childWorkflowPrefix + h.workflowType)}), - InitiatedEventId: common.Int64Ptr(childWorkflowInitialEventID), - WorkflowExecution: &shared.WorkflowExecution{ - WorkflowId: common.StringPtr(childWorkflowPrefix + h.workflowID), - RunId: common.StringPtr(runID), - }, - StartedEventId: common.Int64Ptr(childWorkflowStartEventID), - } - case shared.EventTypeSignalExternalWorkflowExecutionInitiated.String(): - historyEvent.EventType = shared.EventTypeSignalExternalWorkflowExecutionInitiated.Ptr() - historyEvent.SignalExternalWorkflowExecutionInitiatedEventAttributes = &shared.SignalExternalWorkflowExecutionInitiatedEventAttributes{ - DecisionTaskCompletedEventId: common.Int64Ptr(h.decisionTaskCompleteEventID), - Domain: common.StringPtr(h.domain), - WorkflowExecution: &shared.WorkflowExecution{ - WorkflowId: common.StringPtr(externalWorkflowPrefix + h.workflowID), - RunId: common.StringPtr(uuid.New()), - }, - SignalName: common.StringPtr("signal"), - ChildWorkflowOnly: common.BoolPtr(false), - } - h.signalExternalWorkflowEventIDs[eventID] = h.decisionTaskCompleteEventID - case shared.EventTypeSignalExternalWorkflowExecutionFailed.String(): - if len(h.signalExternalWorkflowEventIDs) == 0 { - panic("No external workflow signaled") - } - signalExternalWorkflowEventID := reflect.ValueOf(h.signalExternalWorkflowEventIDs).MapKeys()[0].Int() - completeEventID := h.signalExternalWorkflowEventIDs[signalExternalWorkflowEventID] - delete(h.signalExternalWorkflowEventIDs, signalExternalWorkflowEventID) - - historyEvent.EventType = shared.EventTypeSignalExternalWorkflowExecutionFailed.Ptr() - historyEvent.SignalExternalWorkflowExecutionFailedEventAttributes = &shared.SignalExternalWorkflowExecutionFailedEventAttributes{ - Cause: common.SignalExternalWorkflowExecutionFailedCausePtr(shared.SignalExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution), - DecisionTaskCompletedEventId: common.Int64Ptr(completeEventID), - Domain: common.StringPtr(h.domain), - WorkflowExecution: &shared.WorkflowExecution{ - WorkflowId: common.StringPtr(externalWorkflowPrefix + h.workflowID), - RunId: common.StringPtr(uuid.New()), - }, - InitiatedEventId: common.Int64Ptr(signalExternalWorkflowEventID), - } - case shared.EventTypeExternalWorkflowExecutionSignaled.String(): - if len(h.signalExternalWorkflowEventIDs) == 0 { - panic("No external workflow signaled") - } - signalExternalWorkflowEventID := reflect.ValueOf(h.signalExternalWorkflowEventIDs).MapKeys()[0].Int() - delete(h.signalExternalWorkflowEventIDs, signalExternalWorkflowEventID) - - historyEvent.EventType = shared.EventTypeExternalWorkflowExecutionSignaled.Ptr() - historyEvent.ExternalWorkflowExecutionSignaledEventAttributes = &shared.ExternalWorkflowExecutionSignaledEventAttributes{ - InitiatedEventId: common.Int64Ptr(signalExternalWorkflowEventID), - Domain: common.StringPtr(h.domain), - WorkflowExecution: &shared.WorkflowExecution{ - WorkflowId: common.StringPtr(externalWorkflowPrefix + h.workflowID), - RunId: common.StringPtr(uuid.New()), - }, - } - case shared.EventTypeRequestCancelExternalWorkflowExecutionInitiated.String(): - historyEvent.EventType = shared.EventTypeRequestCancelExternalWorkflowExecutionInitiated.Ptr() - historyEvent.RequestCancelExternalWorkflowExecutionInitiatedEventAttributes = &shared.RequestCancelExternalWorkflowExecutionInitiatedEventAttributes{ - DecisionTaskCompletedEventId: common.Int64Ptr(h.decisionTaskCompleteEventID), - Domain: common.StringPtr(h.domain), - WorkflowExecution: &shared.WorkflowExecution{ - WorkflowId: common.StringPtr(externalWorkflowPrefix + h.workflowID), - RunId: common.StringPtr(uuid.New()), - }, - ChildWorkflowOnly: common.BoolPtr(false), - } - h.requestExternalWorkflowCanceledEventIDs[eventID] = h.decisionTaskCompleteEventID - case shared.EventTypeRequestCancelExternalWorkflowExecutionFailed.String(): - if len(h.requestExternalWorkflowCanceledEventIDs) == 0 { - panic("No cancel request external workflow") - } - requestExternalWorkflowCanceledEventID := reflect.ValueOf(h.requestExternalWorkflowCanceledEventIDs).MapKeys()[0].Int() - completeEventID := h.requestExternalWorkflowCanceledEventIDs[requestExternalWorkflowCanceledEventID] - delete(h.requestExternalWorkflowCanceledEventIDs, requestExternalWorkflowCanceledEventID) - - historyEvent.EventType = shared.EventTypeRequestCancelExternalWorkflowExecutionFailed.Ptr() - historyEvent.RequestCancelExternalWorkflowExecutionFailedEventAttributes = &shared.RequestCancelExternalWorkflowExecutionFailedEventAttributes{ - Cause: common.CancelExternalWorkflowExecutionFailedCausePtr(shared.CancelExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution), - DecisionTaskCompletedEventId: common.Int64Ptr(completeEventID), - Domain: common.StringPtr(h.domain), - WorkflowExecution: &shared.WorkflowExecution{ - WorkflowId: common.StringPtr(externalWorkflowPrefix + h.workflowID), - RunId: common.StringPtr(uuid.New()), - }, - InitiatedEventId: common.Int64Ptr(requestExternalWorkflowCanceledEventID), - } - case shared.EventTypeExternalWorkflowExecutionCancelRequested.String(): - if len(h.requestExternalWorkflowCanceledEventIDs) == 0 { - panic("No cancel request external workflow") - } - requestExternalWorkflowCanceledEventID := reflect.ValueOf(h.requestExternalWorkflowCanceledEventIDs).MapKeys()[0].Int() - delete(h.requestExternalWorkflowCanceledEventIDs, requestExternalWorkflowCanceledEventID) - - historyEvent.EventType = shared.EventTypeExternalWorkflowExecutionCancelRequested.Ptr() - historyEvent.ExternalWorkflowExecutionCancelRequestedEventAttributes = &shared.ExternalWorkflowExecutionCancelRequestedEventAttributes{ - InitiatedEventId: common.Int64Ptr(requestExternalWorkflowCanceledEventID), - Domain: common.StringPtr(h.domain), - WorkflowExecution: &shared.WorkflowExecution{ - WorkflowId: common.StringPtr(externalWorkflowPrefix + h.workflowID), - RunId: common.StringPtr(uuid.New()), - }, - } - } - return historyEvent -} - -func getDefaultHistoryEvent(eventID, version int64) *shared.HistoryEvent { - return &shared.HistoryEvent{ - EventId: common.Int64Ptr(eventID), - Timestamp: common.Int64Ptr(time.Now().Unix()), - TaskId: common.Int64Ptr(common.EmptyEventTaskID), - Version: common.Int64Ptr(version), - } -} - // InitializeHistoryEventGenerator initializes the history event generator -func InitializeHistoryEventGenerator() Generator { - generator := NewEventGenerator(time.Now().UnixNano()) +func InitializeHistoryEventGenerator( + domain string, +) Generator { + generator := NewEventGenerator(time.Now().UnixNano()) // Functions notPendingDecisionTask := func() bool { count := 0 @@ -728,19 +73,6 @@ func InitializeHistoryEventGenerator() Generator { } return false } - hasPendingTimer := func() bool { - count := 0 - for _, e := range generator.ListGeneratedVertices() { - switch e.GetName() { - case shared.EventTypeTimerStarted.String(): - count++ - case shared.EventTypeTimerFired.String(), - shared.EventTypeTimerCanceled.String(): - count-- - } - } - return count > 0 - } hasPendingActivity := func() bool { count := 0 for _, e := range generator.ListGeneratedVertices() { @@ -756,21 +88,6 @@ func InitializeHistoryEventGenerator() Generator { } return count > 0 } - hasPendingStartActivity := func() bool { - count := 0 - for _, e := range generator.ListGeneratedVertices() { - switch e.GetName() { - case shared.EventTypeActivityTaskStarted.String(): - count++ - case shared.EventTypeActivityTaskCanceled.String(), - shared.EventTypeActivityTaskFailed.String(), - shared.EventTypeActivityTaskTimedOut.String(), - shared.EventTypeActivityTaskCompleted.String(): - count-- - } - } - return count > 0 - } canDoBatch := func(history []Vertex) bool { if len(history) == 0 { return true @@ -802,11 +119,92 @@ func InitializeHistoryEventGenerator() Generator { // Setup decision task model decisionModel := NewHistoryEventModel() decisionSchedule := NewHistoryEventVertex(shared.EventTypeDecisionTaskScheduled.String()) + decisionSchedule.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeDecisionTaskScheduled.Ptr() + historyEvent.DecisionTaskScheduledEventAttributes = &shared.DecisionTaskScheduledEventAttributes{ + TaskList: &shared.TaskList{ + Name: common.StringPtr(taskList), + Kind: shared.TaskListKindNormal.Ptr(), + }, + StartToCloseTimeoutSeconds: common.Int32Ptr(timeout), + Attempt: common.Int64Ptr(decisionTaskAttempts), + } + return historyEvent + }) decisionStart := NewHistoryEventVertex(shared.EventTypeDecisionTaskStarted.String()) decisionStart.SetIsStrictOnNextVertex(true) + decisionStart.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeDecisionTaskStarted.Ptr() + historyEvent.DecisionTaskStartedEventAttributes = &shared.DecisionTaskStartedEventAttributes{ + ScheduledEventId: lastEvent.EventId, + Identity: common.StringPtr(identity), + RequestId: common.StringPtr(uuid.New()), + } + return historyEvent + }) decisionFail := NewHistoryEventVertex(shared.EventTypeDecisionTaskFailed.String()) + decisionFail.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeDecisionTaskFailed.Ptr() + historyEvent.DecisionTaskFailedEventAttributes = &shared.DecisionTaskFailedEventAttributes{ + ScheduledEventId: lastEvent.GetDecisionTaskStartedEventAttributes().ScheduledEventId, + StartedEventId: lastEvent.EventId, + Cause: common.DecisionTaskFailedCausePtr(shared.DecisionTaskFailedCauseUnhandledDecision), + Identity: common.StringPtr(identity), + ForkEventVersion: common.Int64Ptr(defaultVersion), + } + return historyEvent + }) decisionTimedOut := NewHistoryEventVertex(shared.EventTypeDecisionTaskTimedOut.String()) + decisionTimedOut.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeDecisionTaskTimedOut.Ptr() + historyEvent.DecisionTaskTimedOutEventAttributes = &shared.DecisionTaskTimedOutEventAttributes{ + ScheduledEventId: lastEvent.GetDecisionTaskStartedEventAttributes().ScheduledEventId, + StartedEventId: lastEvent.EventId, + TimeoutType: shared.TimeoutTypeScheduleToStart.Ptr(), + } + return historyEvent + }) decisionComplete := NewHistoryEventVertex(shared.EventTypeDecisionTaskCompleted.String()) + decisionComplete.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeDecisionTaskCompleted.Ptr() + historyEvent.DecisionTaskCompletedEventAttributes = &shared.DecisionTaskCompletedEventAttributes{ + ScheduledEventId: lastEvent.GetDecisionTaskStartedEventAttributes().ScheduledEventId, + StartedEventId: lastEvent.EventId, + Identity: common.StringPtr(identity), + BinaryChecksum: common.StringPtr(checksum), + } + return historyEvent + }) decisionComplete.SetIsStrictOnNextVertex(true) decisionComplete.SetMaxNextVertex(2) decisionScheduleToStart := NewHistoryEventEdge(decisionSchedule, decisionStart) @@ -822,15 +220,157 @@ func InitializeHistoryEventGenerator() Generator { // Setup workflow model workflowModel := NewHistoryEventModel() + workflowStart := NewHistoryEventVertex(shared.EventTypeWorkflowExecutionStarted.String()) + workflowStart.SetDataFunc(func(input ...interface{}) interface{} { + historyEvent := getDefaultHistoryEvent(1, defaultVersion) + historyEvent.EventType = shared.EventTypeWorkflowExecutionStarted.Ptr() + historyEvent.WorkflowExecutionStartedEventAttributes = &shared.WorkflowExecutionStartedEventAttributes{ + WorkflowType: &shared.WorkflowType{ + Name: common.StringPtr(workflowType), + }, + TaskList: &shared.TaskList{ + Name: common.StringPtr(taskList), + Kind: shared.TaskListKindNormal.Ptr(), + }, + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(timeout), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(timeout), + Identity: common.StringPtr(identity), + FirstExecutionRunId: common.StringPtr(uuid.New()), + } + return historyEvent + }) workflowSignal := NewHistoryEventVertex(shared.EventTypeWorkflowExecutionSignaled.String()) + workflowSignal.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeWorkflowExecutionSignaled.Ptr() + historyEvent.WorkflowExecutionSignaledEventAttributes = &shared.WorkflowExecutionSignaledEventAttributes{ + SignalName: common.StringPtr(signal), + Identity: common.StringPtr(identity), + } + return historyEvent + }) workflowComplete := NewHistoryEventVertex(shared.EventTypeWorkflowExecutionCompleted.String()) + workflowComplete.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeWorkflowExecutionCompleted.Ptr() + historyEvent.WorkflowExecutionCompletedEventAttributes = &shared.WorkflowExecutionCompletedEventAttributes{ + DecisionTaskCompletedEventId: lastEvent.EventId, + } + return historyEvent + }) continueAsNew := NewHistoryEventVertex(shared.EventTypeWorkflowExecutionContinuedAsNew.String()) + continueAsNew.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeWorkflowExecutionContinuedAsNew.Ptr() + historyEvent.WorkflowExecutionContinuedAsNewEventAttributes = &shared.WorkflowExecutionContinuedAsNewEventAttributes{ + NewExecutionRunId: common.StringPtr(uuid.New()), + WorkflowType: &shared.WorkflowType{ + Name: common.StringPtr(workflowType), + }, + TaskList: &shared.TaskList{ + Name: common.StringPtr(taskList), + Kind: shared.TaskListKindNormal.Ptr(), + }, + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(timeout), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(timeout), + DecisionTaskCompletedEventId: common.Int64Ptr(eventID - 1), + Initiator: shared.ContinueAsNewInitiatorDecider.Ptr(), + } + return historyEvent + }) workflowFail := NewHistoryEventVertex(shared.EventTypeWorkflowExecutionFailed.String()) + workflowFail.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeWorkflowExecutionFailed.Ptr() + historyEvent.WorkflowExecutionFailedEventAttributes = &shared.WorkflowExecutionFailedEventAttributes{ + DecisionTaskCompletedEventId: lastEvent.EventId, + } + return historyEvent + }) workflowCancel := NewHistoryEventVertex(shared.EventTypeWorkflowExecutionCanceled.String()) + workflowCancel.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeWorkflowExecutionCanceled.Ptr() + historyEvent.WorkflowExecutionCanceledEventAttributes = &shared.WorkflowExecutionCanceledEventAttributes{ + DecisionTaskCompletedEventId: lastEvent.EventId, + } + return historyEvent + }) workflowCancelRequest := NewHistoryEventVertex(shared.EventTypeWorkflowExecutionCancelRequested.String()) + workflowCancelRequest.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeWorkflowExecutionCancelRequested.Ptr() + historyEvent.WorkflowExecutionCancelRequestedEventAttributes = &shared.WorkflowExecutionCancelRequestedEventAttributes{ + Cause: common.StringPtr(""), + ExternalInitiatedEventId: common.Int64Ptr(1), + ExternalWorkflowExecution: &shared.WorkflowExecution{ + WorkflowId: common.StringPtr(externalWorkflowID), + RunId: common.StringPtr(uuid.New()), + }, + Identity: common.StringPtr(identity), + } + return historyEvent + }) workflowTerminate := NewHistoryEventVertex(shared.EventTypeWorkflowExecutionTerminated.String()) + workflowTerminate.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeWorkflowExecutionTerminated.Ptr() + historyEvent.WorkflowExecutionTerminatedEventAttributes = &shared.WorkflowExecutionTerminatedEventAttributes{ + Identity: common.StringPtr(identity), + Reason: common.StringPtr(reason), + } + return historyEvent + }) workflowTimedOut := NewHistoryEventVertex(shared.EventTypeWorkflowExecutionTimedOut.String()) + workflowTimedOut.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeWorkflowExecutionTimedOut.Ptr() + historyEvent.WorkflowExecutionTimedOutEventAttributes = &shared.WorkflowExecutionTimedOutEventAttributes{ + TimeoutType: shared.TimeoutTypeStartToClose.Ptr(), + } + return historyEvent + }) workflowStartToSignal := NewHistoryEventEdge(workflowStart, workflowSignal) workflowStartToDecisionSchedule := NewHistoryEventEdge(workflowStart, decisionSchedule) workflowStartToDecisionSchedule.SetCondition(notPendingDecisionTask) @@ -849,13 +389,144 @@ func InitializeHistoryEventGenerator() Generator { // Setup activity model activityModel := NewHistoryEventModel() activitySchedule := NewHistoryEventVertex(shared.EventTypeActivityTaskScheduled.String()) + activitySchedule.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeActivityTaskScheduled.Ptr() + historyEvent.ActivityTaskScheduledEventAttributes = &shared.ActivityTaskScheduledEventAttributes{ + ActivityId: common.StringPtr(uuid.New()), + ActivityType: common.ActivityTypePtr(shared.ActivityType{ + Name: common.StringPtr("activity"), + }), + Domain: common.StringPtr(domain), + TaskList: common.TaskListPtr(shared.TaskList{ + Name: common.StringPtr(taskList), + Kind: common.TaskListKindPtr(shared.TaskListKindNormal), + }), + ScheduleToCloseTimeoutSeconds: common.Int32Ptr(timeout), + ScheduleToStartTimeoutSeconds: common.Int32Ptr(timeout), + StartToCloseTimeoutSeconds: common.Int32Ptr(timeout), + DecisionTaskCompletedEventId: lastEvent.EventId, + } + return historyEvent + }) activityStart := NewHistoryEventVertex(shared.EventTypeActivityTaskStarted.String()) + activityStart.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeActivityTaskStarted.Ptr() + historyEvent.ActivityTaskStartedEventAttributes = &shared.ActivityTaskStartedEventAttributes{ + ScheduledEventId: lastEvent.EventId, + Identity: common.StringPtr(identity), + RequestId: common.StringPtr(uuid.New()), + Attempt: common.Int32Ptr(0), + } + return historyEvent + }) activityComplete := NewHistoryEventVertex(shared.EventTypeActivityTaskCompleted.String()) + activityComplete.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeActivityTaskCompleted.Ptr() + historyEvent.ActivityTaskCompletedEventAttributes = &shared.ActivityTaskCompletedEventAttributes{ + ScheduledEventId: lastEvent.GetActivityTaskStartedEventAttributes().ScheduledEventId, + StartedEventId: lastEvent.EventId, + Identity: common.StringPtr(identity), + } + return historyEvent + }) activityFail := NewHistoryEventVertex(shared.EventTypeActivityTaskFailed.String()) + activityFail.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeActivityTaskFailed.Ptr() + historyEvent.ActivityTaskFailedEventAttributes = &shared.ActivityTaskFailedEventAttributes{ + ScheduledEventId: lastEvent.GetActivityTaskStartedEventAttributes().ScheduledEventId, + StartedEventId: lastEvent.EventId, + Identity: common.StringPtr(identity), + Reason: common.StringPtr(reason), + } + return historyEvent + }) activityTimedOut := NewHistoryEventVertex(shared.EventTypeActivityTaskTimedOut.String()) + activityTimedOut.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeActivityTaskTimedOut.Ptr() + historyEvent.ActivityTaskTimedOutEventAttributes = &shared.ActivityTaskTimedOutEventAttributes{ + ScheduledEventId: lastEvent.GetActivityTaskStartedEventAttributes().ScheduledEventId, + StartedEventId: lastEvent.EventId, + TimeoutType: common.TimeoutTypePtr(shared.TimeoutTypeScheduleToClose), + } + return historyEvent + }) activityCancelRequest := NewHistoryEventVertex(shared.EventTypeActivityTaskCancelRequested.String()) + activityCancelRequest.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeActivityTaskCancelRequested.Ptr() + historyEvent.ActivityTaskCancelRequestedEventAttributes = &shared.ActivityTaskCancelRequestedEventAttributes{ + DecisionTaskCompletedEventId: lastEvent.GetActivityTaskScheduledEventAttributes().DecisionTaskCompletedEventId, + ActivityId: lastEvent.GetActivityTaskScheduledEventAttributes().ActivityId, + } + return historyEvent + }) activityCancel := NewHistoryEventVertex(shared.EventTypeActivityTaskCanceled.String()) + activityCancel.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeActivityTaskCanceled.Ptr() + historyEvent.ActivityTaskCanceledEventAttributes = &shared.ActivityTaskCanceledEventAttributes{ + LatestCancelRequestedEventId: lastEvent.EventId, + ScheduledEventId: lastEvent.EventId, + StartedEventId: lastEvent.EventId, + Identity: common.StringPtr(identity), + } + return historyEvent + }) activityCancelRequestFail := NewHistoryEventVertex(shared.EventTypeRequestCancelActivityTaskFailed.String()) + activityCancelRequestFail.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeRequestCancelActivityTaskFailed.Ptr() + historyEvent.RequestCancelActivityTaskFailedEventAttributes = &shared.RequestCancelActivityTaskFailedEventAttributes{ + ActivityId: common.StringPtr(uuid.New()), + DecisionTaskCompletedEventId: lastEvent.GetActivityTaskCancelRequestedEventAttributes().DecisionTaskCompletedEventId, + } + return historyEvent + }) decisionCompleteToATSchedule := NewHistoryEventEdge(decisionComplete, activitySchedule) activityScheduleToStart := NewHistoryEventEdge(activitySchedule, activityStart) @@ -879,47 +550,268 @@ func InitializeHistoryEventGenerator() Generator { activityCancelToDecisionSchedule := NewHistoryEventEdge(activityCancel, decisionSchedule) activityCancelToDecisionSchedule.SetCondition(notPendingDecisionTask) + // TODO: bypass activity cancel request event. Support this event later. + //activityScheduleToActivityCancelRequest := NewHistoryEventEdge(activitySchedule, activityCancelRequest) + //activityScheduleToActivityCancelRequest.SetCondition(hasPendingActivity) activityCancelReqToCancel := NewHistoryEventEdge(activityCancelRequest, activityCancel) activityCancelReqToCancel.SetCondition(hasPendingActivity) activityCancelReqToCancelFail := NewHistoryEventEdge(activityCancelRequest, activityCancelRequestFail) activityCancelRequestFailToDecisionSchedule := NewHistoryEventEdge(activityCancelRequestFail, decisionSchedule) activityCancelRequestFailToDecisionSchedule.SetCondition(notPendingDecisionTask) - decisionCompleteToActivityCancelRequest := NewHistoryEventEdge(decisionComplete, activityCancelRequest) - decisionCompleteToActivityCancelRequest.SetCondition(hasPendingStartActivity) activityModel.AddEdge(decisionCompleteToATSchedule, activityScheduleToStart, activityStartToComplete, activityStartToFail, activityStartToTimedOut, decisionCompleteToATSchedule, activityCompleteToDecisionSchedule, - activityFailToDecisionSchedule, activityTimedOutToDecisionSchedule, activityCancelReqToCancel, activityCancelReqToCancelFail, - activityCancelToDecisionSchedule, decisionCompleteToActivityCancelRequest, activityCancelRequestFailToDecisionSchedule) + activityFailToDecisionSchedule, activityTimedOutToDecisionSchedule, activityCancelReqToCancel, + activityCancelReqToCancelFail, activityCancelToDecisionSchedule, activityCancelRequestFailToDecisionSchedule) // Setup timer model timerModel := NewHistoryEventModel() timerStart := NewHistoryEventVertex(shared.EventTypeTimerStarted.String()) + timerStart.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeTimerStarted.Ptr() + historyEvent.TimerStartedEventAttributes = &shared.TimerStartedEventAttributes{ + TimerId: common.StringPtr(uuid.New()), + StartToFireTimeoutSeconds: common.Int64Ptr(10), + DecisionTaskCompletedEventId: lastEvent.EventId, + } + return historyEvent + }) timerFired := NewHistoryEventVertex(shared.EventTypeTimerFired.String()) + timerFired.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeTimerFired.Ptr() + historyEvent.TimerFiredEventAttributes = &shared.TimerFiredEventAttributes{ + TimerId: lastEvent.GetTimerStartedEventAttributes().TimerId, + StartedEventId: lastEvent.EventId, + } + return historyEvent + }) timerCancel := NewHistoryEventVertex(shared.EventTypeTimerCanceled.String()) + timerCancel.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeTimerCanceled.Ptr() + historyEvent.TimerCanceledEventAttributes = &shared.TimerCanceledEventAttributes{ + TimerId: lastEvent.GetTimerStartedEventAttributes().TimerId, + StartedEventId: lastEvent.EventId, + DecisionTaskCompletedEventId: lastEvent.GetTimerStartedEventAttributes().DecisionTaskCompletedEventId, + Identity: common.StringPtr(identity), + } + return historyEvent + }) timerStartToFire := NewHistoryEventEdge(timerStart, timerFired) - timerStartToFire.SetCondition(hasPendingTimer) - decisionCompleteToCancel := NewHistoryEventEdge(decisionComplete, timerCancel) - decisionCompleteToCancel.SetCondition(hasPendingTimer) + timerStartToCancel := NewHistoryEventEdge(timerStart, timerCancel) decisionCompleteToTimerStart := NewHistoryEventEdge(decisionComplete, timerStart) timerFiredToDecisionSchedule := NewHistoryEventEdge(timerFired, decisionSchedule) timerFiredToDecisionSchedule.SetCondition(notPendingDecisionTask) timerCancelToDecisionSchedule := NewHistoryEventEdge(timerCancel, decisionSchedule) timerCancelToDecisionSchedule.SetCondition(notPendingDecisionTask) - timerModel.AddEdge(timerStartToFire, decisionCompleteToCancel, decisionCompleteToTimerStart, timerFiredToDecisionSchedule, timerCancelToDecisionSchedule) + timerModel.AddEdge(timerStartToFire, timerStartToCancel, decisionCompleteToTimerStart, timerFiredToDecisionSchedule, timerCancelToDecisionSchedule) // Setup child workflow model childWorkflowModel := NewHistoryEventModel() childWorkflowInitial := NewHistoryEventVertex(shared.EventTypeStartChildWorkflowExecutionInitiated.String()) + childWorkflowInitial.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeStartChildWorkflowExecutionInitiated.Ptr() + historyEvent.StartChildWorkflowExecutionInitiatedEventAttributes = &shared.StartChildWorkflowExecutionInitiatedEventAttributes{ + Domain: common.StringPtr(domain), + WorkflowId: common.StringPtr(childWorkflowID), + WorkflowType: common.WorkflowTypePtr(shared.WorkflowType{ + Name: common.StringPtr(childWorkflowPrefix + workflowType), + }), + TaskList: common.TaskListPtr(shared.TaskList{ + Name: common.StringPtr(taskList), + Kind: common.TaskListKindPtr(shared.TaskListKindNormal), + }), + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(timeout), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(timeout), + DecisionTaskCompletedEventId: lastEvent.EventId, + WorkflowIdReusePolicy: shared.WorkflowIdReusePolicyRejectDuplicate.Ptr(), + } + return historyEvent + }) childWorkflowInitialFail := NewHistoryEventVertex(shared.EventTypeStartChildWorkflowExecutionFailed.String()) + childWorkflowInitialFail.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeStartChildWorkflowExecutionFailed.Ptr() + historyEvent.StartChildWorkflowExecutionFailedEventAttributes = &shared.StartChildWorkflowExecutionFailedEventAttributes{ + Domain: common.StringPtr(domain), + WorkflowId: common.StringPtr(childWorkflowID), + WorkflowType: common.WorkflowTypePtr(shared.WorkflowType{ + Name: common.StringPtr(childWorkflowPrefix + workflowType), + }), + Cause: shared.ChildWorkflowExecutionFailedCauseWorkflowAlreadyRunning.Ptr(), + InitiatedEventId: lastEvent.EventId, + DecisionTaskCompletedEventId: lastEvent.GetStartChildWorkflowExecutionInitiatedEventAttributes().DecisionTaskCompletedEventId, + } + return historyEvent + }) childWorkflowStart := NewHistoryEventVertex(shared.EventTypeChildWorkflowExecutionStarted.String()) + childWorkflowStart.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeChildWorkflowExecutionStarted.Ptr() + historyEvent.ChildWorkflowExecutionStartedEventAttributes = &shared.ChildWorkflowExecutionStartedEventAttributes{ + Domain: common.StringPtr(domain), + WorkflowType: common.WorkflowTypePtr(shared.WorkflowType{ + Name: common.StringPtr(childWorkflowPrefix + workflowType), + }), + InitiatedEventId: lastEvent.EventId, + WorkflowExecution: &shared.WorkflowExecution{ + WorkflowId: common.StringPtr(childWorkflowID), + RunId: common.StringPtr(uuid.New()), + }, + } + return historyEvent + }) childWorkflowCancel := NewHistoryEventVertex(shared.EventTypeChildWorkflowExecutionCanceled.String()) + childWorkflowCancel.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeChildWorkflowExecutionCanceled.Ptr() + historyEvent.ChildWorkflowExecutionCanceledEventAttributes = &shared.ChildWorkflowExecutionCanceledEventAttributes{ + Domain: common.StringPtr(domain), + WorkflowType: common.WorkflowTypePtr(shared.WorkflowType{ + Name: common.StringPtr(childWorkflowPrefix + workflowType), + }), + InitiatedEventId: lastEvent.GetChildWorkflowExecutionStartedEventAttributes().InitiatedEventId, + WorkflowExecution: &shared.WorkflowExecution{ + WorkflowId: common.StringPtr(childWorkflowID), + RunId: lastEvent.GetChildWorkflowExecutionStartedEventAttributes().GetWorkflowExecution().RunId, + }, + StartedEventId: lastEvent.EventId, + } + return historyEvent + }) childWorkflowComplete := NewHistoryEventVertex(shared.EventTypeChildWorkflowExecutionCompleted.String()) + childWorkflowComplete.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeChildWorkflowExecutionCompleted.Ptr() + historyEvent.ChildWorkflowExecutionCompletedEventAttributes = &shared.ChildWorkflowExecutionCompletedEventAttributes{ + Domain: common.StringPtr(domain), + WorkflowType: common.WorkflowTypePtr(shared.WorkflowType{ + Name: common.StringPtr(childWorkflowPrefix + workflowType), + }), + InitiatedEventId: lastEvent.GetChildWorkflowExecutionStartedEventAttributes().InitiatedEventId, + WorkflowExecution: &shared.WorkflowExecution{ + WorkflowId: common.StringPtr(childWorkflowID), + RunId: lastEvent.GetChildWorkflowExecutionStartedEventAttributes().GetWorkflowExecution().RunId, + }, + StartedEventId: lastEvent.EventId, + } + return historyEvent + }) childWorkflowFail := NewHistoryEventVertex(shared.EventTypeChildWorkflowExecutionFailed.String()) + childWorkflowFail.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeChildWorkflowExecutionFailed.Ptr() + historyEvent.ChildWorkflowExecutionFailedEventAttributes = &shared.ChildWorkflowExecutionFailedEventAttributes{ + Domain: common.StringPtr(domain), + WorkflowType: common.WorkflowTypePtr(shared.WorkflowType{ + Name: common.StringPtr(childWorkflowPrefix + workflowType), + }), + InitiatedEventId: lastEvent.GetChildWorkflowExecutionStartedEventAttributes().InitiatedEventId, + WorkflowExecution: &shared.WorkflowExecution{ + WorkflowId: common.StringPtr(childWorkflowID), + RunId: lastEvent.GetChildWorkflowExecutionStartedEventAttributes().GetWorkflowExecution().RunId, + }, + StartedEventId: lastEvent.EventId, + } + return historyEvent + }) childWorkflowTerminate := NewHistoryEventVertex(shared.EventTypeChildWorkflowExecutionTerminated.String()) + childWorkflowTerminate.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeChildWorkflowExecutionTerminated.Ptr() + historyEvent.ChildWorkflowExecutionTerminatedEventAttributes = &shared.ChildWorkflowExecutionTerminatedEventAttributes{ + Domain: common.StringPtr(domain), + WorkflowType: common.WorkflowTypePtr(shared.WorkflowType{ + Name: common.StringPtr(childWorkflowPrefix + workflowType), + }), + InitiatedEventId: lastEvent.GetChildWorkflowExecutionStartedEventAttributes().InitiatedEventId, + WorkflowExecution: &shared.WorkflowExecution{ + WorkflowId: common.StringPtr(childWorkflowID), + RunId: lastEvent.GetChildWorkflowExecutionStartedEventAttributes().GetWorkflowExecution().RunId, + }, + StartedEventId: lastEvent.EventId, + } + return historyEvent + }) childWorkflowTimedOut := NewHistoryEventVertex(shared.EventTypeChildWorkflowExecutionTimedOut.String()) + childWorkflowTimedOut.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeChildWorkflowExecutionTimedOut.Ptr() + historyEvent.ChildWorkflowExecutionTimedOutEventAttributes = &shared.ChildWorkflowExecutionTimedOutEventAttributes{ + Domain: common.StringPtr(domain), + WorkflowType: common.WorkflowTypePtr(shared.WorkflowType{ + Name: common.StringPtr(childWorkflowPrefix + workflowType), + }), + InitiatedEventId: lastEvent.GetChildWorkflowExecutionStartedEventAttributes().InitiatedEventId, + WorkflowExecution: &shared.WorkflowExecution{ + WorkflowId: common.StringPtr(childWorkflowID), + RunId: lastEvent.GetChildWorkflowExecutionStartedEventAttributes().GetWorkflowExecution().RunId, + }, + StartedEventId: lastEvent.EventId, + TimeoutType: common.TimeoutTypePtr(shared.TimeoutTypeScheduleToClose), + } + return historyEvent + }) decisionCompleteToChildWorkflowInitial := NewHistoryEventEdge(decisionComplete, childWorkflowInitial) childWorkflowInitialToFail := NewHistoryEventEdge(childWorkflowInitial, childWorkflowInitialFail) childWorkflowInitialToStart := NewHistoryEventEdge(childWorkflowInitial, childWorkflowStart) @@ -949,11 +841,127 @@ func InitializeHistoryEventGenerator() Generator { // Setup external workflow model externalWorkflowModel := NewHistoryEventModel() externalWorkflowSignal := NewHistoryEventVertex(shared.EventTypeSignalExternalWorkflowExecutionInitiated.String()) + externalWorkflowSignal.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeSignalExternalWorkflowExecutionInitiated.Ptr() + historyEvent.SignalExternalWorkflowExecutionInitiatedEventAttributes = &shared.SignalExternalWorkflowExecutionInitiatedEventAttributes{ + DecisionTaskCompletedEventId: lastEvent.EventId, + Domain: common.StringPtr(domain), + WorkflowExecution: &shared.WorkflowExecution{ + WorkflowId: common.StringPtr(externalWorkflowID), + RunId: common.StringPtr(uuid.New()), + }, + SignalName: common.StringPtr("signal"), + ChildWorkflowOnly: common.BoolPtr(false), + } + return historyEvent + }) externalWorkflowSignalFailed := NewHistoryEventVertex(shared.EventTypeSignalExternalWorkflowExecutionFailed.String()) + externalWorkflowSignalFailed.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeSignalExternalWorkflowExecutionFailed.Ptr() + historyEvent.SignalExternalWorkflowExecutionFailedEventAttributes = &shared.SignalExternalWorkflowExecutionFailedEventAttributes{ + Cause: common.SignalExternalWorkflowExecutionFailedCausePtr(shared.SignalExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution), + DecisionTaskCompletedEventId: lastEvent.GetSignalExternalWorkflowExecutionInitiatedEventAttributes().DecisionTaskCompletedEventId, + Domain: common.StringPtr(domain), + WorkflowExecution: &shared.WorkflowExecution{ + WorkflowId: lastEvent.GetSignalExternalWorkflowExecutionInitiatedEventAttributes().GetWorkflowExecution().WorkflowId, + RunId: lastEvent.GetSignalExternalWorkflowExecutionInitiatedEventAttributes().GetWorkflowExecution().RunId, + }, + InitiatedEventId: lastEvent.EventId, + } + return historyEvent + }) externalWorkflowSignaled := NewHistoryEventVertex(shared.EventTypeExternalWorkflowExecutionSignaled.String()) + externalWorkflowSignaled.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeExternalWorkflowExecutionSignaled.Ptr() + historyEvent.ExternalWorkflowExecutionSignaledEventAttributes = &shared.ExternalWorkflowExecutionSignaledEventAttributes{ + InitiatedEventId: lastEvent.EventId, + Domain: common.StringPtr(domain), + WorkflowExecution: &shared.WorkflowExecution{ + WorkflowId: lastEvent.GetSignalExternalWorkflowExecutionInitiatedEventAttributes().GetWorkflowExecution().WorkflowId, + RunId: lastEvent.GetSignalExternalWorkflowExecutionInitiatedEventAttributes().GetWorkflowExecution().RunId, + }, + } + return historyEvent + }) externalWorkflowCancel := NewHistoryEventVertex(shared.EventTypeRequestCancelExternalWorkflowExecutionInitiated.String()) + externalWorkflowCancel.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeRequestCancelExternalWorkflowExecutionInitiated.Ptr() + historyEvent.RequestCancelExternalWorkflowExecutionInitiatedEventAttributes = + &shared.RequestCancelExternalWorkflowExecutionInitiatedEventAttributes{ + DecisionTaskCompletedEventId: lastEvent.EventId, + Domain: common.StringPtr(domain), + WorkflowExecution: &shared.WorkflowExecution{ + WorkflowId: common.StringPtr(externalWorkflowID), + RunId: common.StringPtr(uuid.New()), + }, + ChildWorkflowOnly: common.BoolPtr(false), + } + return historyEvent + }) externalWorkflowCancelFail := NewHistoryEventVertex(shared.EventTypeRequestCancelExternalWorkflowExecutionFailed.String()) + externalWorkflowCancelFail.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeRequestCancelExternalWorkflowExecutionFailed.Ptr() + historyEvent.RequestCancelExternalWorkflowExecutionFailedEventAttributes = &shared.RequestCancelExternalWorkflowExecutionFailedEventAttributes{ + Cause: common.CancelExternalWorkflowExecutionFailedCausePtr(shared.CancelExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution), + DecisionTaskCompletedEventId: lastEvent.GetRequestCancelExternalWorkflowExecutionInitiatedEventAttributes().DecisionTaskCompletedEventId, + Domain: common.StringPtr(domain), + WorkflowExecution: &shared.WorkflowExecution{ + WorkflowId: lastEvent.GetRequestCancelExternalWorkflowExecutionInitiatedEventAttributes().GetWorkflowExecution().WorkflowId, + RunId: lastEvent.GetRequestCancelExternalWorkflowExecutionInitiatedEventAttributes().GetWorkflowExecution().RunId, + }, + InitiatedEventId: lastEvent.EventId, + } + return historyEvent + }) externalWorkflowCanceled := NewHistoryEventVertex(shared.EventTypeExternalWorkflowExecutionCancelRequested.String()) + externalWorkflowCanceled.SetDataFunc(func(input ...interface{}) interface{} { + lastEvent := input[0].(*shared.HistoryEvent) + eventID := lastEvent.GetEventId() + input[1].(int64) + versionBump := input[2].(int64) + subVersion := input[3].(int64) + version := lastEvent.GetVersion() + versionBump + subVersion + historyEvent := getDefaultHistoryEvent(eventID, version) + historyEvent.EventType = shared.EventTypeExternalWorkflowExecutionCancelRequested.Ptr() + historyEvent.ExternalWorkflowExecutionCancelRequestedEventAttributes = &shared.ExternalWorkflowExecutionCancelRequestedEventAttributes{ + InitiatedEventId: lastEvent.EventId, + Domain: common.StringPtr(domain), + WorkflowExecution: &shared.WorkflowExecution{ + WorkflowId: lastEvent.GetRequestCancelExternalWorkflowExecutionInitiatedEventAttributes().GetWorkflowExecution().WorkflowId, + RunId: lastEvent.GetRequestCancelExternalWorkflowExecutionInitiatedEventAttributes().GetWorkflowExecution().RunId, + }, + } + return historyEvent + }) decisionCompleteToExternalWorkflowSignal := NewHistoryEventEdge(decisionComplete, externalWorkflowSignal) decisionCompleteToExternalWorkflowCancel := NewHistoryEventEdge(decisionComplete, externalWorkflowCancel) externalWorkflowSignalToFail := NewHistoryEventEdge(externalWorkflowSignal, externalWorkflowSignalFailed) @@ -986,3 +994,38 @@ func InitializeHistoryEventGenerator() Generator { generator.AddModel(externalWorkflowModel) return generator } + +func getDefaultHistoryEvent( + eventID int64, + version int64, +) *shared.HistoryEvent { + + return &shared.HistoryEvent{ + EventId: common.Int64Ptr(eventID), + Timestamp: common.Int64Ptr(time.Now().Unix()), + TaskId: common.Int64Ptr(common.EmptyEventTaskID), + Version: common.Int64Ptr(version), + } +} + +func copyConnections( + originalMap map[Vertex][]Edge, +) map[Vertex][]Edge { + + newMap := make(map[Vertex][]Edge) + for key, value := range originalMap { + newMap[key] = value + } + return newMap +} + +func copyExitVertices( + originalMap map[Vertex]bool, +) map[Vertex]bool { + + newMap := make(map[Vertex]bool) + for key, value := range originalMap { + newMap[key] = value + } + return newMap +} diff --git a/host/ndc/nDC_integration_test.go b/host/ndc/nDC_integration_test.go index 374709a3b08..376f3622ec7 100644 --- a/host/ndc/nDC_integration_test.go +++ b/host/ndc/nDC_integration_test.go @@ -105,7 +105,6 @@ func (s *nDCIntegrationTestSuite) SetupSuite() { c, err = host.NewCluster(clusterConfigs[1], s.logger.WithTags(tag.ClusterName(clusterName[1]))) s.Require().NoError(err) s.passive = c - s.generator = test.InitializeHistoryEventGenerator() } func (s *nDCIntegrationTestSuite) SetupTest() { @@ -146,6 +145,14 @@ func (s *nDCIntegrationTestSuite) TestSimpleNDC() { root := &test.NDCTestBranch{ Batches: make([]test.NDCTestBatch, 0), } + wid := uuid.New() + rid := uuid.New() + wt := "event-generator-workflow-type" + tl := "event-generator-taskList" + domain := *resp.DomainInfo.Name + version := int64(100) + + s.generator = test.InitializeHistoryEventGenerator(domain) for s.generator.HasNextVertex() { events := s.generator.GetNextVertices() newBatch := test.NDCTestBatch{ @@ -154,17 +161,6 @@ func (s *nDCIntegrationTestSuite) TestSimpleNDC() { root.Batches = append(root.Batches, newBatch) } - identity := "test-event-generator" - wid := uuid.New() - rid := uuid.New() - wt := "event-generator-workflow-type" - tl := "event-generator-taskList" - domain := *resp.DomainInfo.Name - domainID := *resp.DomainInfo.UUID - version := int64(100) - attributeGenerator := test.NewHistoryAttributesGenerator(wid, rid, tl, wt, domainID, domain, identity) - historyBatch := attributeGenerator.GenerateHistoryEvents(root.Batches, 1, version) - historyClient := s.passive.GetHistoryClient() replicationInfo := make(map[string]*shared.ReplicationInfo) replicationInfo["active"] = &shared.ReplicationInfo{ @@ -177,9 +173,14 @@ func (s *nDCIntegrationTestSuite) TestSimpleNDC() { LastEventId: common.Int64Ptr(0), } - for _, batch := range historyBatch { + for _, batch := range root.Batches { // Generate a new run history only when the last event is continue as new - newRunHistory := generateNewRunHistory(batch.Events[len(batch.Events)-1], version, domain, wid, rid, wt, tl) + newRunHistory := generateNewRunHistory(batch.Events[len(batch.Events)-1].GetData().(shared.HistoryEvent), version, domain, wid, rid, wt, tl) + batchHistory := shared.History{} + for _, event := range batch.Events { + historyEvent := event.GetData().(shared.HistoryEvent) + batchHistory.Events = append(batchHistory.Events, &historyEvent) + } err = historyClient.ReplicateEvents(createContext(), &history.ReplicateEventsRequest{ SourceCluster: common.StringPtr("active"), DomainUUID: resp.DomainInfo.UUID, @@ -187,10 +188,10 @@ func (s *nDCIntegrationTestSuite) TestSimpleNDC() { WorkflowId: common.StringPtr(wid), RunId: common.StringPtr(rid), }, - FirstEventId: batch.Events[0].EventId, - NextEventId: common.Int64Ptr(*batch.Events[len(batch.Events)-1].EventId + int64(1)), + FirstEventId: batch.Events[0].GetData().(shared.HistoryEvent).EventId, + NextEventId: common.Int64Ptr(*batch.Events[len(batch.Events)-1].GetData().(shared.HistoryEvent).EventId + int64(1)), Version: common.Int64Ptr(version), - History: batch, + History: &batchHistory, NewRunHistory: newRunHistory, ForceBufferEvents: common.BoolPtr(false), EventStoreVersion: common.Int32Ptr(persistence.EventStoreVersionV2), @@ -217,21 +218,29 @@ func (s *nDCIntegrationTestSuite) TestSimpleNDC() { // compare origin events with replicated events batchIndex := 0 - batch := historyBatch[batchIndex].GetEvents() + batch := root.Batches[batchIndex].Events eventIndex := 0 for _, event := range replicatedHistory.GetHistory().GetEvents() { if eventIndex >= len(batch) { batchIndex++ - batch = historyBatch[batchIndex].GetEvents() + batch = root.Batches[batchIndex].Events eventIndex = 0 } - originEvent := batch[eventIndex] + originEvent := batch[eventIndex].GetData().(shared.HistoryEvent) eventIndex++ s.Equal(originEvent.GetEventType().String(), event.GetEventType().String(), "The replicated event and the origin event are not the same") } } -func generateNewRunHistory(event *shared.HistoryEvent, version int64, domain, wid, rid, workflowType, taskList string) *shared.History { +func generateNewRunHistory( + event shared.HistoryEvent, + version int64, + domain string, + workflowID string, + runID string, + workflowType string, + taskList string, +) *shared.History { if event.GetWorkflowExecutionContinuedAsNewEventAttributes() != nil { event.WorkflowExecutionContinuedAsNewEventAttributes.NewExecutionRunId = common.StringPtr(uuid.New()) @@ -247,8 +256,8 @@ func generateNewRunHistory(event *shared.HistoryEvent, version int64, domain, wi WorkflowType: common.WorkflowTypePtr(shared.WorkflowType{Name: common.StringPtr(workflowType)}), ParentWorkflowDomain: common.StringPtr(domain), ParentWorkflowExecution: &shared.WorkflowExecution{ - WorkflowId: common.StringPtr(wid), - RunId: common.StringPtr(rid), + WorkflowId: common.StringPtr(workflowID), + RunId: common.StringPtr(runID), }, ParentInitiatedEventId: common.Int64Ptr(event.GetEventId()), TaskList: common.TaskListPtr(shared.TaskList{ @@ -257,11 +266,11 @@ func generateNewRunHistory(event *shared.HistoryEvent, version int64, domain, wi }), ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(10), TaskStartToCloseTimeoutSeconds: common.Int32Ptr(10), - ContinuedExecutionRunId: common.StringPtr(rid), + ContinuedExecutionRunId: common.StringPtr(runID), Initiator: shared.ContinueAsNewInitiatorCronSchedule.Ptr(), - OriginalExecutionRunId: common.StringPtr(rid), + OriginalExecutionRunId: common.StringPtr(runID), Identity: common.StringPtr("NDC-test"), - FirstExecutionRunId: common.StringPtr(rid), + FirstExecutionRunId: common.StringPtr(runID), Attempt: common.Int32Ptr(0), ExpirationTimestamp: common.Int64Ptr(time.Now().Add(time.Minute).UnixNano()), },