diff --git a/common/persistence/serializer_test.go b/common/persistence/serializer_test.go index 02cbd604f60..5a45f5f73c7 100644 --- a/common/persistence/serializer_test.go +++ b/common/persistence/serializer_test.go @@ -25,13 +25,13 @@ import ( "testing" "time" - "github.com/uber/cadence/common/log" - "github.com/uber/cadence/common/log/loggerimpl" - "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/loggerimpl" ) type ( diff --git a/host/ndc/nDC_integration_test.go b/host/ndc/nDC_integration_test.go index 6965ad5ad02..ef50694b019 100644 --- a/host/ndc/nDC_integration_test.go +++ b/host/ndc/nDC_integration_test.go @@ -132,14 +132,15 @@ func (s *nDCIntegrationTestSuite) TearDownSuite() { } func (s *nDCIntegrationTestSuite) TestSingleBranch() { - workflowID := uuid.New() + workflowID := "ndc-single-branch-test" + uuid.New() workflowType := "event-generator-workflow-type" tasklist := "event-generator-taskList" + // active has initial version 0 historyClient := s.active.GetHistoryClient() - versions := []int64{101, 1, 201, 301} + versions := []int64{101, 1, 201, 301, 401, 601, 501, 801, 1001, 901, 701, 1101} for _, version := range versions { runID := uuid.New() historyBatch := []*shared.History{} @@ -147,11 +148,11 @@ func (s *nDCIntegrationTestSuite) TestSingleBranch() { for s.generator.HasNextVertex() { events := s.generator.GetNextVertices() - history := &shared.History{} + historyEvents := &shared.History{} for _, event := range events { - history.Events = append(history.Events, event.GetData().(*shared.HistoryEvent)) + historyEvents.Events = append(historyEvents.Events, event.GetData().(*shared.HistoryEvent)) } - historyBatch = append(historyBatch, history) + historyBatch = append(historyBatch, historyEvents) } // TODO temporary code to generate version history @@ -231,14 +232,378 @@ func (s *nDCIntegrationTestSuite) TestSingleBranch() { } } +func (s *nDCIntegrationTestSuite) TestHandcraftedMultipleBranches() { + workflowID := "ndc-handcrafted-multiple-branches-test" + uuid.New() + runID := uuid.New() + + workflowType := "event-generator-workflow-type" + tasklist := "event-generator-taskList" + identity := "worker-identity" + + // active has initial version 0 + historyClient := s.active.GetHistoryClient() + + events1 := [][]*shared.HistoryEvent{ + { + { + EventId: common.Int64Ptr(1), + Version: common.Int64Ptr(21), + EventType: shared.EventTypeWorkflowExecutionStarted.Ptr(), + WorkflowExecutionStartedEventAttributes: &shared.WorkflowExecutionStartedEventAttributes{ + WorkflowType: &shared.WorkflowType{Name: common.StringPtr(workflowType)}, + TaskList: &shared.TaskList{Name: common.StringPtr(tasklist)}, + Input: nil, + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(1000), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1000), + FirstDecisionTaskBackoffSeconds: common.Int32Ptr(100), + }, + }, + { + EventId: common.Int64Ptr(2), + Version: common.Int64Ptr(21), + EventType: shared.EventTypeDecisionTaskScheduled.Ptr(), + DecisionTaskScheduledEventAttributes: &shared.DecisionTaskScheduledEventAttributes{ + TaskList: &shared.TaskList{Name: common.StringPtr(tasklist)}, + StartToCloseTimeoutSeconds: common.Int32Ptr(1000), + Attempt: common.Int64Ptr(0), + }, + }, + }, + { + { + EventId: common.Int64Ptr(3), + Version: common.Int64Ptr(21), + EventType: shared.EventTypeDecisionTaskStarted.Ptr(), + DecisionTaskStartedEventAttributes: &shared.DecisionTaskStartedEventAttributes{ + ScheduledEventId: common.Int64Ptr(2), + Identity: common.StringPtr(identity), + RequestId: common.StringPtr(uuid.New()), + }, + }, + }, + { + { + EventId: common.Int64Ptr(4), + Version: common.Int64Ptr(21), + EventType: shared.EventTypeDecisionTaskCompleted.Ptr(), + DecisionTaskCompletedEventAttributes: &shared.DecisionTaskCompletedEventAttributes{ + ScheduledEventId: common.Int64Ptr(2), + StartedEventId: common.Int64Ptr(3), + Identity: common.StringPtr(identity), + }, + }, + { + EventId: common.Int64Ptr(5), + Version: common.Int64Ptr(21), + EventType: shared.EventTypeMarkerRecorded.Ptr(), + MarkerRecordedEventAttributes: &shared.MarkerRecordedEventAttributes{ + MarkerName: common.StringPtr("some marker name"), + Details: []byte("some marker details"), + DecisionTaskCompletedEventId: common.Int64Ptr(4), + }, + }, + { + EventId: common.Int64Ptr(6), + Version: common.Int64Ptr(21), + EventType: shared.EventTypeActivityTaskScheduled.Ptr(), + ActivityTaskScheduledEventAttributes: &shared.ActivityTaskScheduledEventAttributes{ + DecisionTaskCompletedEventId: common.Int64Ptr(4), + ActivityId: common.StringPtr("0"), + ActivityType: &shared.ActivityType{Name: common.StringPtr("activity-type")}, + TaskList: &shared.TaskList{Name: common.StringPtr(tasklist)}, + Input: nil, + ScheduleToCloseTimeoutSeconds: common.Int32Ptr(20), + ScheduleToStartTimeoutSeconds: common.Int32Ptr(20), + StartToCloseTimeoutSeconds: common.Int32Ptr(20), + HeartbeatTimeoutSeconds: common.Int32Ptr(20), + }, + }, + }, + { + { + EventId: common.Int64Ptr(7), + Version: common.Int64Ptr(21), + EventType: shared.EventTypeActivityTaskStarted.Ptr(), + ActivityTaskStartedEventAttributes: &shared.ActivityTaskStartedEventAttributes{ + ScheduledEventId: common.Int64Ptr(6), + Identity: common.StringPtr(identity), + RequestId: common.StringPtr(uuid.New()), + Attempt: common.Int32Ptr(0), + }, + }, + }, + { + { + EventId: common.Int64Ptr(8), + Version: common.Int64Ptr(21), + EventType: shared.EventTypeWorkflowExecutionSignaled.Ptr(), + WorkflowExecutionSignaledEventAttributes: &shared.WorkflowExecutionSignaledEventAttributes{ + SignalName: common.StringPtr("some signal name 1"), + Input: []byte("some signal details 1"), + Identity: common.StringPtr(identity), + }, + }, + { + EventId: common.Int64Ptr(9), + Version: common.Int64Ptr(21), + EventType: shared.EventTypeDecisionTaskScheduled.Ptr(), + DecisionTaskScheduledEventAttributes: &shared.DecisionTaskScheduledEventAttributes{ + TaskList: &shared.TaskList{Name: common.StringPtr(tasklist)}, + StartToCloseTimeoutSeconds: common.Int32Ptr(1000), + Attempt: common.Int64Ptr(0), + }, + }, + }, + { + { + EventId: common.Int64Ptr(10), + Version: common.Int64Ptr(21), + EventType: shared.EventTypeDecisionTaskStarted.Ptr(), + DecisionTaskStartedEventAttributes: &shared.DecisionTaskStartedEventAttributes{ + ScheduledEventId: common.Int64Ptr(9), + Identity: common.StringPtr(identity), + RequestId: common.StringPtr(uuid.New()), + }, + }, + }, + { + { + EventId: common.Int64Ptr(11), + Version: common.Int64Ptr(21), + EventType: shared.EventTypeDecisionTaskCompleted.Ptr(), + DecisionTaskCompletedEventAttributes: &shared.DecisionTaskCompletedEventAttributes{ + ScheduledEventId: common.Int64Ptr(9), + StartedEventId: common.Int64Ptr(10), + Identity: common.StringPtr(identity), + }, + }, + { + EventId: common.Int64Ptr(12), + Version: common.Int64Ptr(21), + EventType: shared.EventTypeWorkflowExecutionSignaled.Ptr(), + WorkflowExecutionSignaledEventAttributes: &shared.WorkflowExecutionSignaledEventAttributes{ + SignalName: common.StringPtr("some signal name 2"), + Input: []byte("some signal details 2"), + Identity: common.StringPtr(identity), + }, + }, + { + EventId: common.Int64Ptr(13), + Version: common.Int64Ptr(21), + EventType: shared.EventTypeDecisionTaskScheduled.Ptr(), + DecisionTaskScheduledEventAttributes: &shared.DecisionTaskScheduledEventAttributes{ + TaskList: &shared.TaskList{Name: common.StringPtr(tasklist)}, + StartToCloseTimeoutSeconds: common.Int32Ptr(1000), + Attempt: common.Int64Ptr(0), + }, + }, + { + EventId: common.Int64Ptr(14), + Version: common.Int64Ptr(21), + EventType: shared.EventTypeDecisionTaskStarted.Ptr(), + DecisionTaskStartedEventAttributes: &shared.DecisionTaskStartedEventAttributes{ + ScheduledEventId: common.Int64Ptr(13), + Identity: common.StringPtr(identity), + RequestId: common.StringPtr(uuid.New()), + }, + }, + }, + } + + events2 := [][]*shared.HistoryEvent{ + { + { + EventId: common.Int64Ptr(15), + Version: common.Int64Ptr(31), + EventType: shared.EventTypeWorkflowExecutionTimedOut.Ptr(), + WorkflowExecutionTimedOutEventAttributes: &shared.WorkflowExecutionTimedOutEventAttributes{ + TimeoutType: shared.TimeoutTypeStartToClose.Ptr(), + }, + }, + }, + } + + events3 := [][]*shared.HistoryEvent{ + { + { + EventId: common.Int64Ptr(15), + Version: common.Int64Ptr(30), + EventType: shared.EventTypeDecisionTaskTimedOut.Ptr(), + DecisionTaskTimedOutEventAttributes: &shared.DecisionTaskTimedOutEventAttributes{ + ScheduledEventId: common.Int64Ptr(13), + StartedEventId: common.Int64Ptr(14), + TimeoutType: shared.TimeoutTypeStartToClose.Ptr(), + }, + }, + { + EventId: common.Int64Ptr(16), + Version: common.Int64Ptr(30), + EventType: shared.EventTypeActivityTaskTimedOut.Ptr(), + ActivityTaskTimedOutEventAttributes: &shared.ActivityTaskTimedOutEventAttributes{ + ScheduledEventId: common.Int64Ptr(6), + StartedEventId: common.Int64Ptr(7), + TimeoutType: shared.TimeoutTypeStartToClose.Ptr(), + }, + }, + { + EventId: common.Int64Ptr(17), + Version: common.Int64Ptr(30), + EventType: shared.EventTypeDecisionTaskScheduled.Ptr(), + DecisionTaskScheduledEventAttributes: &shared.DecisionTaskScheduledEventAttributes{ + TaskList: &shared.TaskList{Name: common.StringPtr(tasklist)}, + StartToCloseTimeoutSeconds: common.Int32Ptr(1000), + Attempt: common.Int64Ptr(0), + }, + }, + }, + { + { + EventId: common.Int64Ptr(18), + Version: common.Int64Ptr(30), + EventType: shared.EventTypeDecisionTaskStarted.Ptr(), + DecisionTaskStartedEventAttributes: &shared.DecisionTaskStartedEventAttributes{ + ScheduledEventId: common.Int64Ptr(17), + Identity: common.StringPtr(identity), + RequestId: common.StringPtr(uuid.New()), + }, + }, + }, + { + { + EventId: common.Int64Ptr(19), + Version: common.Int64Ptr(30), + EventType: shared.EventTypeDecisionTaskCompleted.Ptr(), + DecisionTaskCompletedEventAttributes: &shared.DecisionTaskCompletedEventAttributes{ + ScheduledEventId: common.Int64Ptr(8), + StartedEventId: common.Int64Ptr(9), + Identity: common.StringPtr(identity), + }, + }, + { + EventId: common.Int64Ptr(20), + Version: common.Int64Ptr(30), + EventType: shared.EventTypeWorkflowExecutionFailed.Ptr(), + WorkflowExecutionFailedEventAttributes: &shared.WorkflowExecutionFailedEventAttributes{ + DecisionTaskCompletedEventId: common.Int64Ptr(19), + Reason: common.StringPtr("some random reason"), + Details: nil, + }, + }, + }, + } + + versionHistory1 := persistence.NewVersionHistory(nil, nil) + for _, batch := range events1 { + for _, event := range batch { + err := versionHistory1.AddOrUpdateItem( + persistence.NewVersionHistoryItem( + event.GetEventId(), + event.GetVersion(), + )) + s.NoError(err) + } + } + + versionHistory2, err := versionHistory1.DuplicateUntilLCAItem( + persistence.NewVersionHistoryItem(14, 21), + ) + s.NoError(err) + for _, batch := range events2 { + for _, event := range batch { + err := versionHistory2.AddOrUpdateItem( + persistence.NewVersionHistoryItem( + event.GetEventId(), + event.GetVersion(), + )) + s.NoError(err) + } + } + + versionHistory3, err := versionHistory1.DuplicateUntilLCAItem( + persistence.NewVersionHistoryItem(14, 21), + ) + s.NoError(err) + for _, batch := range events3 { + for _, event := range batch { + err := versionHistory3.AddOrUpdateItem( + persistence.NewVersionHistoryItem( + event.GetEventId(), + event.GetVersion(), + )) + s.NoError(err) + } + } + + for _, batch := range events1 { + // must serialize events batch after attempt on continue as new as generateNewRunHistory will + // modify the NewExecutionRunId attr + eventBlob, err := s.serializer.SerializeBatchEvents(batch, common.EncodingTypeThriftRW) + s.NoError(err) + + err = historyClient.ReplicateEventsV2(s.createContext(), &history.ReplicateEventsV2Request{ + DomainUUID: common.StringPtr(s.domainID), + WorkflowExecution: &shared.WorkflowExecution{ + WorkflowId: common.StringPtr(workflowID), + RunId: common.StringPtr(runID), + }, + VersionHistoryItems: s.toThriftVersionHistoryItems(versionHistory1), + Events: s.toThriftDataBlob(eventBlob), + NewRunEvents: nil, + ResetWorkflow: common.BoolPtr(false), + }) + s.Nil(err, "Failed to replicate history event") + } + + for _, batch := range events3 { + // must serialize events batch after attempt on continue as new as generateNewRunHistory will + // modify the NewExecutionRunId attr + eventBlob, err := s.serializer.SerializeBatchEvents(batch, common.EncodingTypeThriftRW) + s.NoError(err) + + err = historyClient.ReplicateEventsV2(s.createContext(), &history.ReplicateEventsV2Request{ + DomainUUID: common.StringPtr(s.domainID), + WorkflowExecution: &shared.WorkflowExecution{ + WorkflowId: common.StringPtr(workflowID), + RunId: common.StringPtr(runID), + }, + VersionHistoryItems: s.toThriftVersionHistoryItems(versionHistory3), + Events: s.toThriftDataBlob(eventBlob), + NewRunEvents: nil, + ResetWorkflow: common.BoolPtr(false), + }) + s.Nil(err, "Failed to replicate history event") + } + + for _, batch := range events2 { + // must serialize events batch after attempt on continue as new as generateNewRunHistory will + // modify the NewExecutionRunId attr + eventBlob, err := s.serializer.SerializeBatchEvents(batch, common.EncodingTypeThriftRW) + s.NoError(err) + + err = historyClient.ReplicateEventsV2(s.createContext(), &history.ReplicateEventsV2Request{ + DomainUUID: common.StringPtr(s.domainID), + WorkflowExecution: &shared.WorkflowExecution{ + WorkflowId: common.StringPtr(workflowID), + RunId: common.StringPtr(runID), + }, + VersionHistoryItems: s.toThriftVersionHistoryItems(versionHistory2), + Events: s.toThriftDataBlob(eventBlob), + NewRunEvents: nil, + ResetWorkflow: common.BoolPtr(false), + }) + s.Nil(err, "Failed to replicate history event") + } +} + func (s *nDCIntegrationTestSuite) registerDomain() { s.domainName = "test-simple-workflow-ndc-" + common.GenerateRandomString(5) client1 := s.active.GetFrontendClient() // active err := client1.RegisterDomain(s.createContext(), &shared.RegisterDomainRequest{ - Name: common.StringPtr(s.domainName), - IsGlobalDomain: common.BoolPtr(true), - Clusters: clusterReplicationConfig, - ActiveClusterName: common.StringPtr(clusterName[0]), + Name: common.StringPtr(s.domainName), + IsGlobalDomain: common.BoolPtr(true), + Clusters: clusterReplicationConfig, + // make the active cluster `standby` and replicate to `active` cluster + ActiveClusterName: common.StringPtr(clusterName[1]), WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(1), }) s.Require().NoError(err) diff --git a/service/history/historyTestBase.go b/service/history/historyTestBase.go index d7a169a7f5e..02e5dc48593 100644 --- a/service/history/historyTestBase.go +++ b/service/history/historyTestBase.go @@ -422,7 +422,7 @@ func (s *TestShardContext) UpdateWorkflowExecution(request *persistence.UpdateWo if ts.Before(s.timerMaxReadLevelMap[clusterName]) { // This can happen if shard move and new host have a time SKU, or there is db write delay. // We generate a new timer ID using timerMaxReadLevel. - s.logger.Warn(fmt.Sprintf("%v: New timer generated is less than read level. timestamp: %v, timerMaxReadLevel: %v", + s.logger.Debug(fmt.Sprintf("%v: New timer generated is less than read level. timestamp: %v, timerMaxReadLevel: %v", time.Now(), ts, s.timerMaxReadLevelMap[clusterName]), tag.ClusterName(clusterName)) task.SetVisibilityTimestamp(s.timerMaxReadLevelMap[clusterName].Add(time.Millisecond)) } diff --git a/service/history/mutableStateBuilder.go b/service/history/mutableStateBuilder.go index c54f16feebc..2a00f026f7c 100644 --- a/service/history/mutableStateBuilder.go +++ b/service/history/mutableStateBuilder.go @@ -2406,7 +2406,7 @@ func (e *mutableStateBuilder) ReplicateActivityTaskScheduledEvent( Version: event.GetVersion(), ScheduleID: scheduleEventID, ScheduledEventBatchID: firstEventID, - ScheduledTime: time.Unix(0, *event.Timestamp), + ScheduledTime: time.Unix(0, event.GetTimestamp()), StartedID: common.EmptyEventID, StartedTime: time.Time{}, ActivityID: common.StringDefault(attributes.ActivityId), diff --git a/service/history/nDCHistoryReplicator.go b/service/history/nDCHistoryReplicator.go index 42bf83de283..10ff2895eb0 100644 --- a/service/history/nDCHistoryReplicator.go +++ b/service/history/nDCHistoryReplicator.go @@ -31,6 +31,7 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/cluster" + "github.com/uber/cadence/common/errors" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" @@ -85,6 +86,8 @@ type ( } ) +var errPanic = errors.NewInternalFailureError("encounter panic") + func newNDCHistoryReplicator( shard ShardContext, historyCache *historyCache, @@ -189,7 +192,14 @@ func (r *nDCHistoryReplicatorImpl) applyEvents( // err will not be of type EntityNotExistsError return err } - defer func() { releaseFn(retError) }() + defer func() { + if rec := recover(); rec != nil { + releaseFn(errPanic) + panic(rec) + } else { + releaseFn(retError) + } + }() switch task.getFirstEvent().GetEventType() { case shared.EventTypeWorkflowExecutionStarted: @@ -521,7 +531,11 @@ func (r *nDCHistoryReplicatorImpl) notify( clusterName string, now time.Time, ) { - + if clusterName == r.clusterMetadata.GetCurrentClusterName() { + // this is a valid use case for testing, but not for production + r.logger.Warn("nDCHistoryReplicator applying events generated by current cluster") + return + } now = now.Add(-r.shard.GetConfig().StandbyClusterDelay()) r.shard.SetCurrentTime(clusterName, now) } diff --git a/service/history/timerQueueProcessor_test.go b/service/history/timerQueueProcessor_test.go index 33f5611357d..99ff3da42b6 100644 --- a/service/history/timerQueueProcessor_test.go +++ b/service/history/timerQueueProcessor_test.go @@ -147,7 +147,7 @@ func (s *timerQueueProcessorSuite) updateTimerSeqNumbers(timerTasks []persistenc if ts.Before(s.engineImpl.shard.GetTimerMaxReadLevel(cluster)) { // This can happen if shard move and new host have a time SKU, or there is db write delay. // We generate a new timer ID using timerMaxReadLevel. - s.logger.Warn(fmt.Sprintf("%v: New timer generated is less than read level. timestamp: %v, timerMaxReadLevel: %v", + s.logger.Debug(fmt.Sprintf("%v: New timer generated is less than read level. timestamp: %v, timerMaxReadLevel: %v", time.Now(), ts, s.engineImpl.shard.GetTimerMaxReadLevel(cluster))) task.SetVisibilityTimestamp(s.engineImpl.shard.GetTimerMaxReadLevel(cluster).Add(time.Millisecond)) }