From 3a79fc039ebfe0e3729f6a5945c61e3bda155ab6 Mon Sep 17 00:00:00 2001 From: wxing1292 Date: Tue, 17 Jul 2018 19:03:15 -0700 Subject: [PATCH] Apply 5 min delay for standby task (#920) * Apply 5 min delay for standby task * When failover, should unblock existing standby task --- common/persistence/cassandraPersistence.go | 4 + .../persistence/cassandraPersistence_test.go | 16 +- common/persistence/dataInterfaces.go | 139 +++++++++++++++--- common/service/dynamicconfig/constants.go | 9 +- common/time_source.go | 15 +- schema/cadence/schema.cql | 1 + schema/cadence/versioned/v0.9/manifest.json | 5 +- .../versioned/v0.9/transfer_timestamp.cql | 1 + service/history/MockTransferQueueProcessor.go | 6 +- service/history/historyEngine.go | 16 +- service/history/historyEngineInterfaces.go | 3 +- service/history/historyReplicator.go | 18 ++- service/history/historyReplicator_test.go | 26 ++-- service/history/mutableStateBuilder.go | 7 +- service/history/queueProcessor.go | 4 +- service/history/replicatorQueueProcessor.go | 6 +- service/history/service.go | 8 +- service/history/stateBuilder.go | 2 +- service/history/timerQueueProcessor.go | 6 +- .../history/transferQueueActiveProcessor.go | 4 +- service/history/transferQueueProcessor.go | 10 +- .../history/transferQueueStandbyProcessor.go | 2 +- service/history/workflowExecutionContext.go | 11 +- 23 files changed, 235 insertions(+), 84 deletions(-) create mode 100644 schema/cadence/versioned/v0.9/transfer_timestamp.cql diff --git a/common/persistence/cassandraPersistence.go b/common/persistence/cassandraPersistence.go index 50d6da6e0c6..3488ca2dcf7 100644 --- a/common/persistence/cassandraPersistence.go +++ b/common/persistence/cassandraPersistence.go @@ -162,6 +162,7 @@ const ( `domain_id: ?, ` + `workflow_id: ?, ` + `run_id: ?, ` + + `visibility_ts: ?, ` + `task_id: ?, ` + `target_domain_id: ?, ` + `target_workflow_id: ?, ` + @@ -2470,6 +2471,7 @@ func (d *cassandraPersistence) createTransferTasks(batch *gocql.Batch, transferT domainID, workflowID, runID, + task.GetVisibilityTimestamp(), task.GetTaskID(), targetDomainID, targetWorkflowID, @@ -3156,6 +3158,8 @@ func createTransferTaskInfo(result map[string]interface{}) *TransferTaskInfo { info.WorkflowID = v.(string) case "run_id": info.RunID = v.(gocql.UUID).String() + case "visibility_ts": + info.VisibilityTimestamp = v.(time.Time) case "task_id": info.TaskID = v.(int64) case "target_domain_id": diff --git a/common/persistence/cassandraPersistence_test.go b/common/persistence/cassandraPersistence_test.go index ea731a4da68..3ad4c7e3816 100644 --- a/common/persistence/cassandraPersistence_test.go +++ b/common/persistence/cassandraPersistence_test.go @@ -993,13 +993,14 @@ func (s *cassandraPersistenceSuite) TestTransferTasks() { targetWorkflowID := "some random target domain ID" targetRunID := uuid.New() currentTransferID := s.GetTransferReadLevel() + now := time.Now() tasks := []Task{ - &ActivityTask{currentTransferID + 10001, domainID, tasklist, scheduleID, 111}, - &DecisionTask{currentTransferID + 10002, domainID, tasklist, scheduleID, 222}, - &CloseExecutionTask{currentTransferID + 10003, 333}, - &CancelExecutionTask{currentTransferID + 10004, targetDomainID, targetWorkflowID, targetRunID, true, scheduleID, 444}, - &SignalExecutionTask{currentTransferID + 10005, targetDomainID, targetWorkflowID, targetRunID, true, scheduleID, 555}, - &StartChildExecutionTask{currentTransferID + 10006, targetDomainID, targetWorkflowID, scheduleID, 666}, + &ActivityTask{now, currentTransferID + 10001, domainID, tasklist, scheduleID, 111}, + &DecisionTask{now, currentTransferID + 10002, domainID, tasklist, scheduleID, 222}, + &CloseExecutionTask{now, currentTransferID + 10003, 333}, + &CancelExecutionTask{now, currentTransferID + 10004, targetDomainID, targetWorkflowID, targetRunID, true, scheduleID, 444}, + &SignalExecutionTask{now, currentTransferID + 10005, targetDomainID, targetWorkflowID, targetRunID, true, scheduleID, 555}, + &StartChildExecutionTask{now, currentTransferID + 10006, targetDomainID, targetWorkflowID, scheduleID, 666}, } err2 := s.UpdateWorklowStateAndReplication(updatedInfo, nil, nil, nil, int64(3), tasks) s.Nil(err2, "No error expected.") @@ -1008,6 +1009,9 @@ func (s *cassandraPersistenceSuite) TestTransferTasks() { s.Nil(err1, "No error expected.") s.NotNil(txTasks, "expected valid list of tasks.") s.Equal(len(tasks), len(txTasks)) + for index := range tasks { + s.True(timeComparator(tasks[index].GetVisibilityTimestamp(), txTasks[index].VisibilityTimestamp, timePrecision)) + } s.Equal(TransferTaskTypeActivityTask, txTasks[0].TaskType) s.Equal(TransferTaskTypeDecisionTask, txTasks[1].TaskType) s.Equal(TransferTaskTypeCloseExecution, txTasks[2].TaskType) diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index f66b12cfd2e..36fbe3cb0be 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -200,6 +200,7 @@ type ( DomainID string WorkflowID string RunID string + VisibilityTimestamp time.Time TaskID int64 TargetDomainID string TargetWorkflowID string @@ -265,30 +266,35 @@ type ( SetVersion(version int64) GetTaskID() int64 SetTaskID(id int64) + GetVisibilityTimestamp() time.Time + SetVisibilityTimestamp(timestamp time.Time) } // ActivityTask identifies a transfer task for activity ActivityTask struct { - TaskID int64 - DomainID string - TaskList string - ScheduleID int64 - Version int64 + VisibilityTimestamp time.Time + TaskID int64 + DomainID string + TaskList string + ScheduleID int64 + Version int64 } // DecisionTask identifies a transfer task for decision DecisionTask struct { - TaskID int64 - DomainID string - TaskList string - ScheduleID int64 - Version int64 + VisibilityTimestamp time.Time + TaskID int64 + DomainID string + TaskList string + ScheduleID int64 + Version int64 } // CloseExecutionTask identifies a transfer task for deletion of execution CloseExecutionTask struct { - TaskID int64 - Version int64 + VisibilityTimestamp time.Time + TaskID int64 + Version int64 } // DeleteHistoryEventTask identifies a timer task for deletion of history events of completed execution. @@ -317,6 +323,7 @@ type ( // CancelExecutionTask identifies a transfer task for cancel of execution CancelExecutionTask struct { + VisibilityTimestamp time.Time TaskID int64 TargetDomainID string TargetWorkflowID string @@ -328,6 +335,7 @@ type ( // SignalExecutionTask identifies a transfer task for signal execution SignalExecutionTask struct { + VisibilityTimestamp time.Time TaskID int64 TargetDomainID string TargetWorkflowID string @@ -339,11 +347,12 @@ type ( // StartChildExecutionTask identifies a transfer task for starting child execution StartChildExecutionTask struct { - TaskID int64 - TargetDomainID string - TargetWorkflowID string - InitiatedID int64 - Version int64 + VisibilityTimestamp time.Time + TaskID int64 + TargetDomainID string + TargetWorkflowID string + InitiatedID int64 + Version int64 } // ActivityTimeoutTask identifies a timeout task. @@ -375,6 +384,7 @@ type ( // HistoryReplicationTask is the transfer task created for shipping history replication events to other clusters HistoryReplicationTask struct { + VisibilityTimestamp time.Time TaskID int64 FirstEventID int64 NextEventID int64 @@ -1009,6 +1019,16 @@ func (a *ActivityTask) SetTaskID(id int64) { a.TaskID = id } +// GetVisibilityTimestamp get the visibility timestamp +func (a *ActivityTask) GetVisibilityTimestamp() time.Time { + return a.VisibilityTimestamp +} + +// SetVisibilityTimestamp set the visibility timestamp +func (a *ActivityTask) SetVisibilityTimestamp(timestamp time.Time) { + a.VisibilityTimestamp = timestamp +} + // GetType returns the type of the decision task func (d *DecisionTask) GetType() int { return TransferTaskTypeDecisionTask @@ -1034,6 +1054,16 @@ func (d *DecisionTask) SetTaskID(id int64) { d.TaskID = id } +// GetVisibilityTimestamp get the visibility timestamp +func (d *DecisionTask) GetVisibilityTimestamp() time.Time { + return d.VisibilityTimestamp +} + +// SetVisibilityTimestamp set the visibility timestamp +func (d *DecisionTask) SetVisibilityTimestamp(timestamp time.Time) { + d.VisibilityTimestamp = timestamp +} + // GetType returns the type of the close execution task func (a *CloseExecutionTask) GetType() int { return TransferTaskTypeCloseExecution @@ -1059,6 +1089,16 @@ func (a *CloseExecutionTask) SetTaskID(id int64) { a.TaskID = id } +// GetVisibilityTimestamp get the visibility timestamp +func (a *CloseExecutionTask) GetVisibilityTimestamp() time.Time { + return a.VisibilityTimestamp +} + +// SetVisibilityTimestamp set the visibility timestamp +func (a *CloseExecutionTask) SetVisibilityTimestamp(timestamp time.Time) { + a.VisibilityTimestamp = timestamp +} + // GetType returns the type of the delete execution task func (a *DeleteHistoryEventTask) GetType() int { return TaskTypeDeleteHistoryEvent @@ -1084,6 +1124,16 @@ func (a *DeleteHistoryEventTask) SetTaskID(id int64) { a.TaskID = id } +// GetVisibilityTimestamp get the visibility timestamp +func (a *DeleteHistoryEventTask) GetVisibilityTimestamp() time.Time { + return a.VisibilityTimestamp +} + +// SetVisibilityTimestamp set the visibility timestamp +func (a *DeleteHistoryEventTask) SetVisibilityTimestamp(timestamp time.Time) { + a.VisibilityTimestamp = timestamp +} + // GetType returns the type of the timer task func (d *DecisionTimeoutTask) GetType() int { return TaskTypeDecisionTimeout @@ -1284,6 +1334,16 @@ func (u *CancelExecutionTask) SetTaskID(id int64) { u.TaskID = id } +// GetVisibilityTimestamp get the visibility timestamp +func (u *CancelExecutionTask) GetVisibilityTimestamp() time.Time { + return u.VisibilityTimestamp +} + +// SetVisibilityTimestamp set the visibility timestamp +func (u *CancelExecutionTask) SetVisibilityTimestamp(timestamp time.Time) { + u.VisibilityTimestamp = timestamp +} + // GetType returns the type of the signal transfer task func (u *SignalExecutionTask) GetType() int { return TransferTaskTypeSignalExecution @@ -1309,6 +1369,16 @@ func (u *SignalExecutionTask) SetTaskID(id int64) { u.TaskID = id } +// GetVisibilityTimestamp get the visibility timestamp +func (u *SignalExecutionTask) GetVisibilityTimestamp() time.Time { + return u.VisibilityTimestamp +} + +// SetVisibilityTimestamp set the visibility timestamp +func (u *SignalExecutionTask) SetVisibilityTimestamp(timestamp time.Time) { + u.VisibilityTimestamp = timestamp +} + // GetType returns the type of the start child transfer task func (u *StartChildExecutionTask) GetType() int { return TransferTaskTypeStartChildExecution @@ -1334,6 +1404,16 @@ func (u *StartChildExecutionTask) SetTaskID(id int64) { u.TaskID = id } +// GetVisibilityTimestamp get the visibility timestamp +func (u *StartChildExecutionTask) GetVisibilityTimestamp() time.Time { + return u.VisibilityTimestamp +} + +// SetVisibilityTimestamp set the visibility timestamp +func (u *StartChildExecutionTask) SetVisibilityTimestamp(timestamp time.Time) { + u.VisibilityTimestamp = timestamp +} + // GetType returns the type of the history replication task func (a *HistoryReplicationTask) GetType() int { return ReplicationTaskTypeHistory @@ -1359,6 +1439,16 @@ func (a *HistoryReplicationTask) SetTaskID(id int64) { a.TaskID = id } +// GetVisibilityTimestamp get the visibility timestamp +func (a *HistoryReplicationTask) GetVisibilityTimestamp() time.Time { + return a.VisibilityTimestamp +} + +// SetVisibilityTimestamp set the visibility timestamp +func (a *HistoryReplicationTask) SetVisibilityTimestamp(timestamp time.Time) { + a.VisibilityTimestamp = timestamp +} + // GetTaskID returns the task ID for transfer task func (t *TransferTaskInfo) GetTaskID() int64 { return t.TaskID @@ -1374,6 +1464,11 @@ func (t *TransferTaskInfo) GetTaskType() int { return t.TaskType } +// GetVisibilityTimestamp returns the task type for transfer task +func (t *TransferTaskInfo) GetVisibilityTimestamp() time.Time { + return t.VisibilityTimestamp +} + // String returns string func (t *TransferTaskInfo) String() string { return fmt.Sprintf( @@ -1397,6 +1492,11 @@ func (t *ReplicationTaskInfo) GetTaskType() int { return t.TaskType } +// GetVisibilityTimestamp returns the task type for transfer task +func (t *ReplicationTaskInfo) GetVisibilityTimestamp() time.Time { + return time.Time{} +} + // GetTaskID returns the task ID for timer task func (t *TimerTaskInfo) GetTaskID() int64 { return t.TaskID @@ -1412,6 +1512,11 @@ func (t *TimerTaskInfo) GetTaskType() int { return t.TaskType } +// GetVisibilityTimestamp returns the task type for transfer task +func (t *TimerTaskInfo) GetVisibilityTimestamp() time.Time { + return t.VisibilityTimestamp +} + // GetTaskType returns the task type for timer task func (t *TimerTaskInfo) String() string { return fmt.Sprintf( diff --git a/common/service/dynamicconfig/constants.go b/common/service/dynamicconfig/constants.go index 3483d3d06eb..9283b681177 100644 --- a/common/service/dynamicconfig/constants.go +++ b/common/service/dynamicconfig/constants.go @@ -78,6 +78,7 @@ var keys = map[Key]string{ HistoryCacheMaxSize: "history.cacheMaxSize", HistoryCacheTTL: "history.cacheTTL", AcquireShardInterval: "history.acquireShardInterval", + StandbyClusterDelay: "history.standbyClusterDelay", TimerTaskBatchSize: "history.timerTaskBatchSize", TimerTaskWorkerCount: "history.timerTaskWorkerCount", TimerTaskMaxRetryCount: "history.timerTaskMaxRetryCount", @@ -92,7 +93,6 @@ var keys = map[Key]string{ TimerProcessorMaxPollRPS: "history.timerProcessorMaxPollRPS", TimerProcessorMaxPollInterval: "history.timerProcessorMaxPollInterval", TimerProcessorMaxPollIntervalJitterCoefficient: "history.timerProcessorMaxPollIntervalJitterCoefficient", - TimerProcessorStandbyTaskDelay: "history.timerProcessorStandbyTaskDelay", TransferTaskBatchSize: "history.transferTaskBatchSize", TransferProcessorFailoverMaxPollRPS: "history.transferProcessorFailoverMaxPollRPS", TransferProcessorMaxPollRPS: "history.transferProcessorMaxPollRPS", @@ -106,7 +106,6 @@ var keys = map[Key]string{ TransferProcessorMaxPollIntervalJitterCoefficient: "history.transferProcessorMaxPollIntervalJitterCoefficient", TransferProcessorUpdateAckInterval: "history.transferProcessorUpdateAckInterval", TransferProcessorCompleteTransferInterval: "history.transferProcessorCompleteTransferInterval", - TransferProcessorStandbyTaskDelay: "history.transferProcessorStandbyTaskDelay", ReplicatorTaskBatchSize: "history.replicatorTaskBatchSize", ReplicatorTaskWorkerCount: "history.replicatorTaskWorkerCount", ReplicatorTaskMaxRetryCount: "history.replicatorTaskMaxRetryCount", @@ -198,6 +197,8 @@ const ( HistoryCacheTTL // AcquireShardInterval is interval that timer used to acquire shard AcquireShardInterval + // StandbyClusterDelay is the atrificial delay added to standby cluster's view of active cluster's time + StandbyClusterDelay // TimerTaskBatchSize is batch size for timer processor to process tasks TimerTaskBatchSize // TimerTaskWorkerCount is number of task workers for timer processor @@ -226,8 +227,6 @@ const ( TimerProcessorMaxPollInterval // TimerProcessorMaxPollIntervalJitterCoefficient is the max poll interval jitter coefficient TimerProcessorMaxPollIntervalJitterCoefficient - // TimerProcessorStandbyTaskDelay is task delay for standby task in timer processor - TimerProcessorStandbyTaskDelay // TransferTaskBatchSize is batch size for transferQueueProcessor TransferTaskBatchSize // TransferProcessorFailoverMaxPollRPS is max poll rate per second for transferQueueProcessor @@ -254,8 +253,6 @@ const ( TransferProcessorUpdateAckInterval // TransferProcessorCompleteTransferInterval is complete timer interval for transferQueueProcessor TransferProcessorCompleteTransferInterval - // TransferProcessorStandbyTaskDelay is delay time for standby task in transferQueueProcessor - TransferProcessorStandbyTaskDelay // ReplicatorTaskBatchSize is batch size for ReplicatorProcessor ReplicatorTaskBatchSize // ReplicatorTaskWorkerCount is number of worker for ReplicatorProcessor diff --git a/common/time_source.go b/common/time_source.go index b2a1a254ccf..9215ffb9093 100644 --- a/common/time_source.go +++ b/common/time_source.go @@ -33,8 +33,8 @@ type ( // RealTimeSource serves real wall-clock time RealTimeSource struct{} - // FakeTimeSource serves fake controlled time - FakeTimeSource struct { + // EventTimeSource serves fake controlled time + EventTimeSource struct { now time.Time } ) @@ -50,18 +50,19 @@ func (ts *RealTimeSource) Now() time.Time { return time.Now() } -// NewFakeTimeSource returns a time source that servers +// NewEventTimeSource returns a time source that servers // fake controlled time -func NewFakeTimeSource() *FakeTimeSource { - return &FakeTimeSource{} +func NewEventTimeSource() *EventTimeSource { + return &EventTimeSource{} } // Now return the fake current time -func (ts *FakeTimeSource) Now() time.Time { +func (ts *EventTimeSource) Now() time.Time { return ts.now } // Update update the fake current time -func (ts *FakeTimeSource) Update(now time.Time) { +func (ts *EventTimeSource) Update(now time.Time) *EventTimeSource { ts.now = now + return ts } diff --git a/schema/cadence/schema.cql b/schema/cadence/schema.cql index cfed817b166..847fe333e76 100644 --- a/schema/cadence/schema.cql +++ b/schema/cadence/schema.cql @@ -77,6 +77,7 @@ CREATE TYPE transfer_task ( workflow_id text, -- The workflow ID that this transfer task belongs to run_id uuid, -- The run ID that this transfer task belongs to task_id bigint, + visibility_ts timestamp, -- The timestamp when the transfer task is generated target_domain_id uuid, -- The external domain ID that this transfer task is doing work for. target_workflow_id text, -- The external workflow ID that this transfer task is doing work for. target_run_id uuid, -- The external run ID that this transfer task is doing work for. diff --git a/schema/cadence/versioned/v0.9/manifest.json b/schema/cadence/versioned/v0.9/manifest.json index 396dfddb6c8..d044dd08070 100644 --- a/schema/cadence/versioned/v0.9/manifest.json +++ b/schema/cadence/versioned/v0.9/manifest.json @@ -1,8 +1,9 @@ { "CurrVersion": "0.9", "MinCompatibleVersion": "0.9", - "Description": "Support domain customized key-value data.", + "Description": "Support domain customized key-value data, adding timestamp to transfer task.", "SchemaUpdateCqlFiles": [ - "domain_data.cql" + "domain_data.cql", + "transfer_timestamp.cql" ] } \ No newline at end of file diff --git a/schema/cadence/versioned/v0.9/transfer_timestamp.cql b/schema/cadence/versioned/v0.9/transfer_timestamp.cql new file mode 100644 index 00000000000..6c8964c66a5 --- /dev/null +++ b/schema/cadence/versioned/v0.9/transfer_timestamp.cql @@ -0,0 +1 @@ +ALTER TYPE transfer_task ADD visibility_ts timestamp; \ No newline at end of file diff --git a/service/history/MockTransferQueueProcessor.go b/service/history/MockTransferQueueProcessor.go index e9a1000bd1c..e9786a92b8c 100644 --- a/service/history/MockTransferQueueProcessor.go +++ b/service/history/MockTransferQueueProcessor.go @@ -21,8 +21,6 @@ package history import ( - "time" - "github.com/stretchr/testify/mock" "github.com/uber/cadence/common/persistence" ) @@ -48,6 +46,6 @@ func (_m *MockTransferQueueProcessor) FailoverDomain(domainID string) { } // NotifyNewTask is mock implementation for NotifyNewTask of Processor -func (_m *MockTransferQueueProcessor) NotifyNewTask(clusterName string, currentTime time.Time, transferTask []persistence.Task) { - _m.Called(clusterName, currentTime, transferTask) +func (_m *MockTransferQueueProcessor) NotifyNewTask(clusterName string, transferTask []persistence.Task) { + _m.Called(clusterName, transferTask) } diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 51e3ea31cd6..6aa69faa925 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -237,7 +237,7 @@ func (e *historyEngineImpl) registerDomainFailoverCallback() { // its length > 0 and has correct timestamp, to trkgger a db scan fakeDecisionTask := []persistence.Task{&persistence.DecisionTask{}} fakeDecisionTimeoutTask := []persistence.Task{&persistence.DecisionTimeoutTask{VisibilityTimestamp: now}} - e.txProcessor.NotifyNewTask(e.currentClusterName, now, fakeDecisionTask) + e.txProcessor.NotifyNewTask(e.currentClusterName, fakeDecisionTask) e.timerProcessor.NotifyNewTimers(e.currentClusterName, now, fakeDecisionTimeoutTask) }) e.shard.UpdateDomainNotificationVersion(nextDomain.GetNotificationVersion() + 1) @@ -356,7 +356,7 @@ func (e *historyEngineImpl) StartWorkflowExecution(startRequest *h.StartWorkflow replicationTasks = append(replicationTasks, replicationTask) } } - setTaskVersion(msBuilder.GetCurrentVersion(), transferTasks, timerTasks) + setTaskInfo(msBuilder.GetCurrentVersion(), time.Now(), transferTasks, timerTasks) createWorkflow := func(isBrandNew bool, prevRunID string) (string, error) { _, err = e.shard.CreateWorkflowExecution(&persistence.CreateWorkflowExecutionRequest{ @@ -1998,7 +1998,7 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(ctx context.Context replicationTasks = append(replicationTasks, replicationTask) } } - setTaskVersion(msBuilder.GetCurrentVersion(), transferTasks, timerTasks) + setTaskInfo(msBuilder.GetCurrentVersion(), time.Now(), transferTasks, timerTasks) createWorkflow := func(isBrandNew bool, prevRunID string) (string, error) { _, err = e.shard.CreateWorkflowExecution(&persistence.CreateWorkflowExecutionRequest{ @@ -2203,7 +2203,7 @@ func (e *historyEngineImpl) SyncShardStatus(ctx context.Context, request *h.Sync // 2. notify the timer gate in the timer queue standby processor // 3, notify the transfer (essentially a no op, just put it here so it looks symmetric) e.shard.SetCurrentTime(clusterName, now) - e.txProcessor.NotifyNewTask(clusterName, now, []persistence.Task{}) + e.txProcessor.NotifyNewTask(clusterName, []persistence.Task{}) e.timerProcessor.NotifyNewTimers(clusterName, now, []persistence.Task{}) return nil } @@ -2402,7 +2402,7 @@ func (e *historyEngineImpl) getTimerBuilder(we *workflow.WorkflowExecution) *tim func (s *shardContextWrapper) UpdateWorkflowExecution(request *persistence.UpdateWorkflowExecutionRequest) error { err := s.ShardContext.UpdateWorkflowExecution(request) if err == nil { - s.txProcessor.NotifyNewTask(s.currentClusterName, s.GetCurrentTime(s.currentClusterName), request.TransferTasks) + s.txProcessor.NotifyNewTask(s.currentClusterName, request.TransferTasks) if len(request.ReplicationTasks) > 0 { s.replcatorProcessor.notifyNewTask() } @@ -2414,7 +2414,7 @@ func (s *shardContextWrapper) CreateWorkflowExecution(request *persistence.Creat *persistence.CreateWorkflowExecutionResponse, error) { resp, err := s.ShardContext.CreateWorkflowExecution(request) if err == nil { - s.txProcessor.NotifyNewTask(s.currentClusterName, s.GetCurrentTime(s.currentClusterName), request.TransferTasks) + s.txProcessor.NotifyNewTask(s.currentClusterName, request.TransferTasks) if len(request.ReplicationTasks) > 0 { s.replcatorProcessor.notifyNewTask() } @@ -2759,9 +2759,11 @@ func getStartRequest(domainID string, return startRequest } -func setTaskVersion(version int64, transferTasks []persistence.Task, timerTasks []persistence.Task) { +func setTaskInfo(version int64, timestamp time.Time, transferTasks []persistence.Task, timerTasks []persistence.Task) { + // set both the task version, as well as the timestamp on the transfer tasks for _, task := range transferTasks { task.SetVersion(version) + task.SetVisibilityTimestamp(timestamp) } for _, task := range timerTasks { task.SetVersion(version) diff --git a/service/history/historyEngineInterfaces.go b/service/history/historyEngineInterfaces.go index 75efd401776..dbdd7ac08dd 100644 --- a/service/history/historyEngineInterfaces.go +++ b/service/history/historyEngineInterfaces.go @@ -104,6 +104,7 @@ type ( GetVersion() int64 GetTaskID() int64 GetTaskType() int + GetVisibilityTimestamp() time.Time } processor interface { @@ -116,7 +117,7 @@ type ( transferQueueProcessor interface { common.Daemon FailoverDomain(domainID string) - NotifyNewTask(clusterName string, currentTime time.Time, transferTasks []persistence.Task) + NotifyNewTask(clusterName string, transferTasks []persistence.Task) } // TODO the timer quque processor and the one below, timer processor diff --git a/service/history/historyReplicator.go b/service/history/historyReplicator.go index 18d2736cce4..9c52600653a 100644 --- a/service/history/historyReplicator.go +++ b/service/history/historyReplicator.go @@ -349,7 +349,10 @@ func (r *historyReplicator) ApplyOtherEvents(ctx context.Context, context *workf // so nothing on the replication state should be changed lastWriteVersion := msBuilder.GetLastWriteVersion() sourceCluster := r.clusterMetadata.ClusterNameForFailoverVersion(lastWriteVersion) - return context.updateHelper(nil, nil, nil, false, sourceCluster, lastWriteVersion, transactionID) + history := request.GetHistory() + lastEvent := history.Events[len(history.Events)-1] + now := time.Unix(0, lastEvent.GetTimestamp()) + return context.updateHelper(nil, nil, nil, false, sourceCluster, lastWriteVersion, transactionID, now) } // Apply the replication task @@ -413,8 +416,9 @@ func (r *historyReplicator) ApplyReplicationTask(ctx context.Context, context *w if err2 != nil { return err2 } + now := time.Unix(0, lastEvent.GetTimestamp()) err = context.replicateWorkflowExecution(request, sBuilder.getTransferTasks(), sBuilder.getTimerTasks(), - lastEvent.GetEventId(), transactionID) + lastEvent.GetEventId(), transactionID, now) } if err == nil { @@ -544,7 +548,12 @@ func (r *historyReplicator) replicateWorkflowStarted(ctx context.Context, contex } transferTasks := sBuilder.getTransferTasks() timerTasks := sBuilder.getTimerTasks() - setTaskVersion(msBuilder.GetCurrentVersion(), transferTasks, timerTasks) + setTaskInfo( + msBuilder.GetCurrentVersion(), + time.Unix(0, lastEvent.GetTimestamp()), + transferTasks, + timerTasks, + ) createWorkflow := func(isBrandNew bool, prevRunID string) error { _, err = r.shard.CreateWorkflowExecution(&persistence.CreateWorkflowExecutionRequest{ @@ -835,8 +844,9 @@ func (r *historyReplicator) terminateWorkflow(ctx context.Context, domainID stri func (r *historyReplicator) notify(clusterName string, now time.Time, transferTasks []persistence.Task, timerTasks []persistence.Task) { + now = now.Add(-r.shard.GetConfig().StandbyClusterDelay()) r.shard.SetCurrentTime(clusterName, now) - r.historyEngine.txProcessor.NotifyNewTask(clusterName, now, transferTasks) + r.historyEngine.txProcessor.NotifyNewTask(clusterName, transferTasks) r.historyEngine.timerProcessor.NotifyNewTimers(clusterName, now, timerTasks) } diff --git a/service/history/historyReplicator_test.go b/service/history/historyReplicator_test.go index cab022eabdd..c56bbe5cf69 100644 --- a/service/history/historyReplicator_test.go +++ b/service/history/historyReplicator_test.go @@ -506,7 +506,7 @@ func (s *historyReplicatorSuite) TestApplyOtherEvents_IncomingGreaterThanCurrent FirstEventId: common.Int64Ptr(incomingFirstEventID), NextEventId: common.Int64Ptr(incomingNextEventID), ForceBufferEvents: common.BoolPtr(true), - History: &shared.History{}, + History: &shared.History{Events: []*shared.HistoryEvent{&shared.HistoryEvent{}}}, } serializedHistoryBatch := &persistence.SerializedHistoryEventBatch{ @@ -626,10 +626,11 @@ func (s *historyReplicatorSuite) TestReplicateWorkflowStarted_BrandNew() { } sBuilder := &mockStateBuilder{} requestID := uuid.New() + now := time.Now() history := &shared.History{ Events: []*shared.HistoryEvent{ - &shared.HistoryEvent{Version: common.Int64Ptr(version), EventId: common.Int64Ptr(1)}, - &shared.HistoryEvent{Version: common.Int64Ptr(version), EventId: common.Int64Ptr(2)}, + &shared.HistoryEvent{Version: common.Int64Ptr(version), EventId: common.Int64Ptr(1), Timestamp: common.Int64Ptr(now.UnixNano())}, + &shared.HistoryEvent{Version: common.Int64Ptr(version), EventId: common.Int64Ptr(2), Timestamp: common.Int64Ptr(now.UnixNano())}, }, } nextEventID := di.ScheduleID + 1 @@ -701,6 +702,7 @@ func (s *historyReplicatorSuite) TestReplicateWorkflowStarted_BrandNew() { s.Nil(err) s.Equal(1, len(transferTasks)) s.Equal(version, transferTasks[0].GetVersion()) + s.True(now.Equal(transferTasks[0].GetVisibilityTimestamp())) s.Equal(1, len(timerTasks)) s.Equal(version, timerTasks[0].GetVersion()) } @@ -999,10 +1001,11 @@ func (s *historyReplicatorSuite) TestReplicateWorkflowStarted_CurrentComplete_In } sBuilder := &mockStateBuilder{} requestID := uuid.New() + now := time.Now() history := &shared.History{ Events: []*shared.HistoryEvent{ - &shared.HistoryEvent{Version: common.Int64Ptr(version), EventId: common.Int64Ptr(1)}, - &shared.HistoryEvent{Version: common.Int64Ptr(version), EventId: common.Int64Ptr(2)}, + &shared.HistoryEvent{Version: common.Int64Ptr(version), EventId: common.Int64Ptr(1), Timestamp: common.Int64Ptr(now.UnixNano())}, + &shared.HistoryEvent{Version: common.Int64Ptr(version), EventId: common.Int64Ptr(2), Timestamp: common.Int64Ptr(now.UnixNano())}, }, } nextEventID := di.ScheduleID + 1 @@ -1116,6 +1119,7 @@ func (s *historyReplicatorSuite) TestReplicateWorkflowStarted_CurrentComplete_In s.Nil(err) s.Equal(1, len(transferTasks)) s.Equal(version, transferTasks[0].GetVersion()) + s.True(now.Equal(transferTasks[0].GetVisibilityTimestamp())) s.Equal(1, len(timerTasks)) s.Equal(version, timerTasks[0].GetVersion()) } @@ -1151,10 +1155,11 @@ func (s *historyReplicatorSuite) TestReplicateWorkflowStarted_CurrentComplete_In } sBuilder := &mockStateBuilder{} requestID := uuid.New() + now := time.Now() history := &shared.History{ Events: []*shared.HistoryEvent{ - &shared.HistoryEvent{Version: common.Int64Ptr(version), EventId: common.Int64Ptr(1)}, - &shared.HistoryEvent{Version: common.Int64Ptr(version), EventId: common.Int64Ptr(2)}, + &shared.HistoryEvent{Version: common.Int64Ptr(version), EventId: common.Int64Ptr(1), Timestamp: common.Int64Ptr(now.UnixNano())}, + &shared.HistoryEvent{Version: common.Int64Ptr(version), EventId: common.Int64Ptr(2), Timestamp: common.Int64Ptr(now.UnixNano())}, }, } nextEventID := di.ScheduleID + 1 @@ -1268,6 +1273,7 @@ func (s *historyReplicatorSuite) TestReplicateWorkflowStarted_CurrentComplete_In s.Nil(err) s.Equal(1, len(transferTasks)) s.Equal(version, transferTasks[0].GetVersion()) + s.True(now.Equal(transferTasks[0].GetVisibilityTimestamp())) s.Equal(1, len(timerTasks)) s.Equal(version, timerTasks[0].GetVersion()) } @@ -1503,10 +1509,11 @@ func (s *historyReplicatorSuite) TestReplicateWorkflowStarted_CurrentRunning_Inc } sBuilder := &mockStateBuilder{} requestID := uuid.New() + now := time.Now() history := &shared.History{ Events: []*shared.HistoryEvent{ - &shared.HistoryEvent{Version: common.Int64Ptr(version), EventId: common.Int64Ptr(1)}, - &shared.HistoryEvent{Version: common.Int64Ptr(version), EventId: common.Int64Ptr(2)}, + &shared.HistoryEvent{Version: common.Int64Ptr(version), EventId: common.Int64Ptr(1), Timestamp: common.Int64Ptr(now.UnixNano())}, + &shared.HistoryEvent{Version: common.Int64Ptr(version), EventId: common.Int64Ptr(2), Timestamp: common.Int64Ptr(now.UnixNano())}, }, } nextEventID := di.ScheduleID + 1 @@ -1649,6 +1656,7 @@ func (s *historyReplicatorSuite) TestReplicateWorkflowStarted_CurrentRunning_Inc s.Nil(err) s.Equal(1, len(transferTasks)) s.Equal(version, transferTasks[0].GetVersion()) + s.True(now.Equal(transferTasks[0].GetVisibilityTimestamp())) s.Equal(1, len(timerTasks)) s.Equal(version, timerTasks[0].GetVersion()) } diff --git a/service/history/mutableStateBuilder.go b/service/history/mutableStateBuilder.go index edaf7c178f0..460dd3e67ab 100644 --- a/service/history/mutableStateBuilder.go +++ b/service/history/mutableStateBuilder.go @@ -2337,7 +2337,12 @@ func (e *mutableStateBuilder) ReplicateWorkflowExecutionContinuedAsNewEvent(sour TaskList: newExecutionInfo.TaskList, ScheduleID: di.ScheduleID, }} - setTaskVersion(newStateBuilder.GetCurrentVersion(), newTransferTasks, nil) + setTaskInfo( + newStateBuilder.GetCurrentVersion(), + time.Unix(0, startedEvent.GetTimestamp()), + newTransferTasks, + nil, + ) e.continueAsNew = &persistence.CreateWorkflowExecutionRequest{ // NOTE: there is no replication task for the start / decision scheduled event, diff --git a/service/history/queueProcessor.go b/service/history/queueProcessor.go index fe79fafe00d..9acde2e4766 100644 --- a/service/history/queueProcessor.go +++ b/service/history/queueProcessor.go @@ -53,6 +53,7 @@ type ( } queueProcessorBase struct { + clusterName string shard ShardContext options *QueueProcessorOptions processor processor @@ -78,13 +79,14 @@ var ( errUnexpectedQueueTask = errors.New("unexpected queue task") ) -func newQueueProcessorBase(shard ShardContext, options *QueueProcessorOptions, processor processor, queueAckMgr queueAckMgr, logger bark.Logger) *queueProcessorBase { +func newQueueProcessorBase(clusterName string, shard ShardContext, options *QueueProcessorOptions, processor processor, queueAckMgr queueAckMgr, logger bark.Logger) *queueProcessorBase { workerNotificationChans := []chan struct{}{} for index := 0; index < options.WorkerCount(); index++ { workerNotificationChans = append(workerNotificationChans, make(chan struct{}, 1)) } p := &queueProcessorBase{ + clusterName: clusterName, shard: shard, options: options, processor: processor, diff --git a/service/history/replicatorQueueProcessor.go b/service/history/replicatorQueueProcessor.go index 29c2ea820b5..37b20a1de42 100644 --- a/service/history/replicatorQueueProcessor.go +++ b/service/history/replicatorQueueProcessor.go @@ -64,6 +64,8 @@ func newReplicatorQueueProcessor(shard ShardContext, replicator messaging.Produc executionMgr persistence.ExecutionManager, historyMgr persistence.HistoryManager, hSerializerFactory persistence.HistorySerializerFactory, logger bark.Logger) queueProcessor { + currentClusterNamer := shard.GetService().GetClusterMetadata().GetCurrentClusterName() + config := shard.GetConfig() options := &QueueProcessorOptions{ StartDelay: config.ReplicatorProcessorStartDelay, @@ -82,7 +84,7 @@ func newReplicatorQueueProcessor(shard ShardContext, replicator messaging.Produc }) processor := &replicatorQueueProcessorImpl{ - currentClusterNamer: shard.GetService().GetClusterMetadata().GetCurrentClusterName(), + currentClusterNamer: currentClusterNamer, shard: shard, executionMgr: executionMgr, historyMgr: historyMgr, @@ -94,7 +96,7 @@ func newReplicatorQueueProcessor(shard ShardContext, replicator messaging.Produc } queueAckMgr := newQueueAckMgr(shard, options, processor, shard.GetReplicatorAckLevel(), logger) - queueProcessorBase := newQueueProcessorBase(shard, options, processor, queueAckMgr, logger) + queueProcessorBase := newQueueProcessorBase(currentClusterNamer, shard, options, processor, queueAckMgr, logger) processor.queueAckMgr = queueAckMgr processor.queueProcessorBase = queueProcessorBase diff --git a/service/history/service.go b/service/history/service.go index cfa07cfc665..101150ccd4f 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -49,6 +49,9 @@ type Config struct { RangeSizeBits uint AcquireShardInterval dynamicconfig.DurationPropertyFn + // the atrificial delay added to standby cluster's view of active cluster's time + StandbyClusterDelay dynamicconfig.DurationPropertyFn + // TimerQueueProcessor settings TimerTaskBatchSize dynamicconfig.IntPropertyFn TimerTaskWorkerCount dynamicconfig.IntPropertyFn @@ -63,7 +66,6 @@ type Config struct { TimerProcessorMaxPollRPS dynamicconfig.IntPropertyFn TimerProcessorMaxPollInterval dynamicconfig.DurationPropertyFn TimerProcessorMaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn - TimerProcessorStandbyTaskDelay dynamicconfig.DurationPropertyFn // TransferQueueProcessor settings TransferTaskBatchSize dynamicconfig.IntPropertyFn @@ -78,7 +80,6 @@ type Config struct { TransferProcessorMaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn TransferProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn TransferProcessorCompleteTransferInterval dynamicconfig.DurationPropertyFn - TransferProcessorStandbyTaskDelay dynamicconfig.DurationPropertyFn // ReplicatorQueueProcessor settings ReplicatorTaskBatchSize dynamicconfig.IntPropertyFn @@ -117,6 +118,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int) *Config { HistoryCacheTTL: dc.GetDurationProperty(dynamicconfig.HistoryCacheTTL, time.Hour), RangeSizeBits: 20, // 20 bits for sequencer, 2^20 sequence number for any range AcquireShardInterval: dc.GetDurationProperty(dynamicconfig.AcquireShardInterval, time.Minute), + StandbyClusterDelay: dc.GetDurationProperty(dynamicconfig.AcquireShardInterval, 5*time.Minute), TimerTaskBatchSize: dc.GetIntProperty(dynamicconfig.TimerTaskBatchSize, 100), TimerTaskWorkerCount: dc.GetIntProperty(dynamicconfig.TimerTaskWorkerCount, 10), TimerTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.TimerTaskMaxRetryCount, 100), @@ -130,7 +132,6 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int) *Config { TimerProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.TimerProcessorMaxPollRPS, 20), TimerProcessorMaxPollInterval: dc.GetDurationProperty(dynamicconfig.TimerProcessorMaxPollInterval, 5*time.Minute), TimerProcessorMaxPollIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TimerProcessorMaxPollIntervalJitterCoefficient, 0.15), - TimerProcessorStandbyTaskDelay: dc.GetDurationProperty(dynamicconfig.TimerProcessorStandbyTaskDelay, 0*time.Minute), TransferTaskBatchSize: dc.GetIntProperty(dynamicconfig.TransferTaskBatchSize, 100), TransferProcessorFailoverMaxPollRPS: dc.GetIntProperty(dynamicconfig.TransferProcessorFailoverMaxPollRPS, 1), TransferProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.TransferProcessorMaxPollRPS, 20), @@ -143,7 +144,6 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int) *Config { TransferProcessorMaxPollIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TransferProcessorMaxPollIntervalJitterCoefficient, 0.15), TransferProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.TransferProcessorUpdateAckInterval, 5*time.Second), TransferProcessorCompleteTransferInterval: dc.GetDurationProperty(dynamicconfig.TransferProcessorCompleteTransferInterval, 3*time.Second), - TransferProcessorStandbyTaskDelay: dc.GetDurationProperty(dynamicconfig.TransferProcessorStandbyTaskDelay, 0*time.Minute), ReplicatorTaskBatchSize: dc.GetIntProperty(dynamicconfig.ReplicatorTaskBatchSize, 100), ReplicatorTaskWorkerCount: dc.GetIntProperty(dynamicconfig.ReplicatorTaskWorkerCount, 10), ReplicatorTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.ReplicatorTaskMaxRetryCount, 100), diff --git a/service/history/stateBuilder.go b/service/history/stateBuilder.go index 51fb5fde5cc..d0648e83a4a 100644 --- a/service/history/stateBuilder.go +++ b/service/history/stateBuilder.go @@ -488,7 +488,7 @@ func (b *stateBuilderImpl) getTaskList(msBuilder mutableState) string { } func (b *stateBuilderImpl) getTimerBuilder(event *shared.HistoryEvent) *timerBuilder { - timeSource := common.NewFakeTimeSource() + timeSource := common.NewEventTimeSource() now := time.Unix(0, event.GetTimestamp()) timeSource.Update(now) return newTimerBuilder(b.shard.GetConfig(), b.logger, timeSource) diff --git a/service/history/timerQueueProcessor.go b/service/history/timerQueueProcessor.go index b5636a39f85..4bc1e0c4ea2 100644 --- a/service/history/timerQueueProcessor.go +++ b/service/history/timerQueueProcessor.go @@ -120,7 +120,7 @@ func (t *timerQueueProcessorImpl) NotifyNewTimers(clusterName string, currentTim if !ok { panic(fmt.Sprintf("Cannot find timer processor for %s.", clusterName)) } - standbyTimerProcessor.setCurrentTime(currentTime.Add(-t.config.TimerProcessorStandbyTaskDelay())) + standbyTimerProcessor.setCurrentTime(currentTime) standbyTimerProcessor.notifyNewTimers(timerTasks) standbyTimerProcessor.retryTasks() } @@ -143,6 +143,10 @@ func (t *timerQueueProcessorImpl) FailoverDomain(domainID string) { failoverTimerProcessor := newTimerQueueFailoverProcessor(t.shard, t.historyService, domainID, standbyClusterName, minLevel, maxLevel, t.matchingClient, t.logger) + for _, standbyTimerProcessor := range t.standbyTimerProcessors { + standbyTimerProcessor.retryTasks() + } + failoverTimerProcessor.Start() } diff --git a/service/history/transferQueueActiveProcessor.go b/service/history/transferQueueActiveProcessor.go index 03267af6930..1c8f035f8de 100644 --- a/service/history/transferQueueActiveProcessor.go +++ b/service/history/transferQueueActiveProcessor.go @@ -113,7 +113,7 @@ func newTransferQueueActiveProcessor(shard ShardContext, historyService *history } queueAckMgr := newQueueAckMgr(shard, options, processor, shard.GetTransferClusterAckLevel(currentClusterName), logger) - queueProcessorBase := newQueueProcessorBase(shard, options, processor, queueAckMgr, logger) + queueProcessorBase := newQueueProcessorBase(currentClusterName, shard, options, processor, queueAckMgr, logger) processor.queueAckMgr = queueAckMgr processor.queueProcessorBase = queueProcessorBase @@ -170,7 +170,7 @@ func newTransferQueueFailoverProcessor(shard ShardContext, historyService *histo transferQueueProcessorBase: newTransferQueueProcessorBase(shard, options, maxReadAckLevel, updateClusterAckLevel), } queueAckMgr := newQueueFailoverAckMgr(shard, options, processor, minLevel, logger) - queueProcessorBase := newQueueProcessorBase(shard, options, processor, queueAckMgr, logger) + queueProcessorBase := newQueueProcessorBase(currentClusterName, shard, options, processor, queueAckMgr, logger) processor.queueAckMgr = queueAckMgr processor.queueProcessorBase = queueProcessorBase diff --git a/service/history/transferQueueProcessor.go b/service/history/transferQueueProcessor.go index 198ec5861d8..de4168edba0 100644 --- a/service/history/transferQueueProcessor.go +++ b/service/history/transferQueueProcessor.go @@ -117,7 +117,7 @@ func (t *transferQueueProcessorImpl) Stop() { // NotifyNewTask - Notify the processor about the new active / standby transfer task arrival. // This should be called each time new transfer task arrives, otherwise tasks maybe delayed. -func (t *transferQueueProcessorImpl) NotifyNewTask(clusterName string, currentTime time.Time, transferTasks []persistence.Task) { +func (t *transferQueueProcessorImpl) NotifyNewTask(clusterName string, transferTasks []persistence.Task) { if clusterName == t.currentClusterName { // we will ignore the current time passed in, since the active processor process task immediately if len(transferTasks) != 0 { @@ -130,8 +130,7 @@ func (t *transferQueueProcessorImpl) NotifyNewTask(clusterName string, currentTi if !ok { panic(fmt.Sprintf("Cannot find transfer processor for %s.", clusterName)) } - currentClusterTime := t.shard.GetCurrentTime(t.currentClusterName) - if currentClusterTime.Sub(currentTime) >= t.config.TransferProcessorStandbyTaskDelay() && len(transferTasks) != 0 { + if len(transferTasks) != 0 { standbyTaskProcessor.notifyNewTask() } standbyTaskProcessor.retryTasks() @@ -156,6 +155,11 @@ func (t *transferQueueProcessorImpl) FailoverDomain(domainID string) { t.shard, t.historyService, t.visibilityMgr, t.matchingClient, t.historyClient, domainID, standbyClusterName, minLevel, maxLevel, t.logger, ) + + for _, standbyTaskProcessor := range t.standbyTaskProcessors { + standbyTaskProcessor.retryTasks() + } + failoverTaskProcessor.Start() } diff --git a/service/history/transferQueueStandbyProcessor.go b/service/history/transferQueueStandbyProcessor.go index e32a3fdc52b..28473e7ed76 100644 --- a/service/history/transferQueueStandbyProcessor.go +++ b/service/history/transferQueueStandbyProcessor.go @@ -89,7 +89,7 @@ func newTransferQueueStandbyProcessor(clusterName string, shard ShardContext, hi transferQueueProcessorBase: newTransferQueueProcessorBase(shard, options, maxReadAckLevel, updateClusterAckLevel), } queueAckMgr := newQueueAckMgr(shard, options, processor, shard.GetTransferClusterAckLevel(clusterName), logger) - queueProcessorBase := newQueueProcessorBase(shard, options, processor, queueAckMgr, logger) + queueProcessorBase := newQueueProcessorBase(clusterName, shard, options, processor, queueAckMgr, logger) processor.queueAckMgr = queueAckMgr processor.queueProcessorBase = queueProcessorBase diff --git a/service/history/workflowExecutionContext.go b/service/history/workflowExecutionContext.go index 80fad8568c1..fb8d1e50534 100644 --- a/service/history/workflowExecutionContext.go +++ b/service/history/workflowExecutionContext.go @@ -141,13 +141,13 @@ func (c *workflowExecutionContext) updateWorkflowExecutionWithDeleteTask(transfe } func (c *workflowExecutionContext) replicateWorkflowExecution(request *h.ReplicateEventsRequest, - transferTasks []persistence.Task, timerTasks []persistence.Task, lastEventID, transactionID int64) error { + transferTasks []persistence.Task, timerTasks []persistence.Task, lastEventID, transactionID int64, now time.Time) error { nextEventID := lastEventID + 1 c.msBuilder.GetExecutionInfo().NextEventID = nextEventID builder := newHistoryBuilderFromEvents(request.History.Events, c.logger) return c.updateHelper(builder, transferTasks, timerTasks, false, request.GetSourceCluster(), request.GetVersion(), - transactionID) + transactionID, now) } func (c *workflowExecutionContext) updateVersion() error { @@ -187,12 +187,13 @@ func (c *workflowExecutionContext) updateWorkflowExecution(transferTasks []persi c.msBuilder.GetExecutionInfo().LastFirstEventID, c.msBuilder.GetExecutionInfo().NextEventID) } - return c.updateHelper(nil, transferTasks, timerTasks, c.createReplicationTask, "", currentVersion, transactionID) + now := time.Now() + return c.updateHelper(nil, transferTasks, timerTasks, c.createReplicationTask, "", currentVersion, transactionID, now) } func (c *workflowExecutionContext) updateHelper(builder *historyBuilder, transferTasks []persistence.Task, timerTasks []persistence.Task, createReplicationTask bool, sourceCluster string, lastWriteVersion int64, - transactionID int64) (errRet error) { + transactionID int64, now time.Time) (errRet error) { defer func() { if errRet != nil { @@ -266,7 +267,7 @@ func (c *workflowExecutionContext) updateHelper(builder *historyBuilder, transfe replicationTasks = append(replicationTasks, c.msBuilder.CreateReplicationTask()) } - setTaskVersion(c.msBuilder.GetCurrentVersion(), transferTasks, timerTasks) + setTaskInfo(c.msBuilder.GetCurrentVersion(), now, transferTasks, timerTasks) if err1 := c.updateWorkflowExecutionWithRetry(&persistence.UpdateWorkflowExecutionRequest{ ExecutionInfo: executionInfo,