Skip to content

Commit

Permalink
Wire up cross-cluster operation implementation (cadence-workflow#4524)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Oct 6, 2021
1 parent 3cd5166 commit 041061c
Show file tree
Hide file tree
Showing 12 changed files with 289 additions and 237 deletions.
188 changes: 106 additions & 82 deletions common/dynamicconfig/constants.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1967,6 +1967,7 @@ const (
ShardInfoTransferStandbyPendingTasksTimer
ShardInfoTimerActivePendingTasksTimer
ShardInfoTimerStandbyPendingTasksTimer
ShardInfoCrossClusterPendingTasksTimer
ShardInfoReplicationLagTimer
ShardInfoTransferLagTimer
ShardInfoTimerLagTimer
Expand Down Expand Up @@ -2491,6 +2492,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
ShardInfoTransferStandbyPendingTasksTimer: {metricName: "shardinfo_transfer_standby_pending_task", metricType: Timer},
ShardInfoTimerActivePendingTasksTimer: {metricName: "shardinfo_timer_active_pending_task", metricType: Timer},
ShardInfoTimerStandbyPendingTasksTimer: {metricName: "shardinfo_timer_standby_pending_task", metricType: Timer},
ShardInfoCrossClusterPendingTasksTimer: {metricName: "shardinfo_cross_cluster_pending_task", metricType: Timer},
ShardInfoReplicationLagTimer: {metricName: "shardinfo_replication_lag", metricType: Timer},
ShardInfoTransferLagTimer: {metricName: "shardinfo_transfer_lag", metricType: Timer},
ShardInfoTimerLagTimer: {metricName: "shardinfo_timer_lag", metricType: Timer},
Expand Down
14 changes: 14 additions & 0 deletions host/ndc/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,25 @@ func (s *NDCIntegrationTestSuite) SetupSuite() {
controller := gomock.NewController(s.T())
mockStandbyClient := adminClient.NewMockClient(controller)
mockStandbyClient.EXPECT().GetReplicationMessages(gomock.Any(), gomock.Any()).DoAndReturn(s.GetReplicationMessagesMock).AnyTimes()
mockStandbyClient.EXPECT().GetCrossClusterTasks(gomock.Any(), gomock.Any()).Return(
&types.GetCrossClusterTasksResponse{
TasksByShard: make(map[int32][]*types.CrossClusterTaskRequest),
FailedCauseByShard: make(map[int32]types.GetTaskFailedCause),
},
nil,
).AnyTimes()
mockOtherClient := adminClient.NewMockClient(controller)
mockOtherClient.EXPECT().GetReplicationMessages(gomock.Any(), gomock.Any()).Return(
&types.GetReplicationMessagesResponse{
MessagesByShard: make(map[int32]*types.ReplicationMessages),
}, nil).AnyTimes()
mockOtherClient.EXPECT().GetCrossClusterTasks(gomock.Any(), gomock.Any()).Return(
&types.GetCrossClusterTasksResponse{
TasksByShard: make(map[int32][]*types.CrossClusterTaskRequest),
FailedCauseByShard: make(map[int32]types.GetTaskFailedCause),
},
nil,
).AnyTimes()
s.mockAdminClient["standby"] = mockStandbyClient
s.mockAdminClient["other"] = mockOtherClient
s.clusterConfigs[0].MockAdminClient = s.mockAdminClient
Expand Down
84 changes: 46 additions & 38 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ type Config struct {
// TimerQueueProcessor settings
TimerTaskBatchSize dynamicconfig.IntPropertyFn
TimerTaskDeleteBatchSize dynamicconfig.IntPropertyFn
TimerTaskWorkerCount dynamicconfig.IntPropertyFn
TimerProcessorGetFailureRetryCount dynamicconfig.IntPropertyFn
TimerProcessorCompleteTimerFailureRetryCount dynamicconfig.IntPropertyFn
TimerProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn
Expand All @@ -140,7 +139,6 @@ type Config struct {
// TransferQueueProcessor settings
TransferTaskBatchSize dynamicconfig.IntPropertyFn
TransferTaskDeleteBatchSize dynamicconfig.IntPropertyFn
TransferTaskWorkerCount dynamicconfig.IntPropertyFn
TransferProcessorCompleteTransferFailureRetryCount dynamicconfig.IntPropertyFn
TransferProcessorFailoverMaxPollRPS dynamicconfig.IntPropertyFn
TransferProcessorMaxPollRPS dynamicconfig.IntPropertyFn
Expand All @@ -157,23 +155,30 @@ type Config struct {
TransferProcessorVisibilityArchivalTimeLimit dynamicconfig.DurationPropertyFn

// CrossClusterQueueProcessor settings
CrossClusterTaskBatchSize dynamicconfig.IntPropertyFn
CrossClusterTaskDeleteBatchSize dynamicconfig.IntPropertyFn
CrossClusterTaskFetchBatchSize dynamicconfig.IntPropertyFnWithShardIDFilter
CrossClusterTaskWorkerCount dynamicconfig.IntPropertyFn
CrossClusterProcessorCompleteTaskFailureRetryCount dynamicconfig.IntPropertyFn
CrossClusterProcessorMaxPollRPS dynamicconfig.IntPropertyFn
CrossClusterProcessorMaxPollInterval dynamicconfig.DurationPropertyFn
CrossClusterProcessorMaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
CrossClusterProcessorSplitQueueInterval dynamicconfig.DurationPropertyFn
CrossClusterProcessorSplitQueueIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
CrossClusterProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn
CrossClusterProcessorUpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
CrossClusterProcessorCompleteTaskInterval dynamicconfig.DurationPropertyFn
CrossClusterProcessorMaxRedispatchQueueSize dynamicconfig.IntPropertyFn
CrossClusterProcessorEnableValidator dynamicconfig.BoolPropertyFn
CrossClusterProcessorValidationInterval dynamicconfig.DurationPropertyFn
CrossClusterProcessorValidationIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
CrossClusterTaskBatchSize dynamicconfig.IntPropertyFn
CrossClusterTaskDeleteBatchSize dynamicconfig.IntPropertyFn
CrossClusterTaskFetchBatchSize dynamicconfig.IntPropertyFnWithShardIDFilter
CrossClusterSourceProcessorMaxPollRPS dynamicconfig.IntPropertyFn
CrossClusterSourceProcessorMaxPollInterval dynamicconfig.DurationPropertyFn
CrossClusterSourceProcessorMaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
CrossClusterSourceProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn
CrossClusterSourceProcessorUpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
CrossClusterSourceProcessorMaxRedispatchQueueSize dynamicconfig.IntPropertyFn
CrossClusterSourceProcessorMaxPendingTaskSize dynamicconfig.IntPropertyFn

// CrossClusterTargetTaskProcessor settings
CrossClusterTargetProcessorMaxPendingTasks dynamicconfig.IntPropertyFn
CrossClusterTargetProcessorMaxRetryCount dynamicconfig.IntPropertyFn
CrossClusterTargetProcessorTaskWaitInterval dynamicconfig.DurationPropertyFn
CrossClusterTargetProcessorServiceBusyBackoffInterval dynamicconfig.DurationPropertyFn
CrossClusterTargetProcessorJitterCoefficient dynamicconfig.FloatPropertyFn

// CrossClusterTaskFetcher settings
CrossClusterFetcherParallelism dynamicconfig.IntPropertyFn
CrossClusterFetcherAggregationInterval dynamicconfig.DurationPropertyFn
CrossClusterFetcherServiceBusyBackoffInterval dynamicconfig.DurationPropertyFn
CrossClusterFetcherErrorBackoffInterval dynamicconfig.DurationPropertyFn
CrossClusterFetcherJitterCoefficient dynamicconfig.FloatPropertyFn

// ReplicatorQueueProcessor settings
ReplicatorTaskBatchSize dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -400,7 +405,6 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA

TimerTaskBatchSize: dc.GetIntProperty(dynamicconfig.TimerTaskBatchSize, 100),
TimerTaskDeleteBatchSize: dc.GetIntProperty(dynamicconfig.TimerTaskDeleteBatchSize, 4000),
TimerTaskWorkerCount: dc.GetIntProperty(dynamicconfig.TimerTaskWorkerCount, 10),
TimerProcessorGetFailureRetryCount: dc.GetIntProperty(dynamicconfig.TimerProcessorGetFailureRetryCount, 5),
TimerProcessorCompleteTimerFailureRetryCount: dc.GetIntProperty(dynamicconfig.TimerProcessorCompleteTimerFailureRetryCount, 10),
TimerProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.TimerProcessorUpdateAckInterval, 30*time.Second),
Expand All @@ -421,7 +425,6 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA
TransferTaskDeleteBatchSize: dc.GetIntProperty(dynamicconfig.TransferTaskDeleteBatchSize, 4000),
TransferProcessorFailoverMaxPollRPS: dc.GetIntProperty(dynamicconfig.TransferProcessorFailoverMaxPollRPS, 1),
TransferProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.TransferProcessorMaxPollRPS, 20),
TransferTaskWorkerCount: dc.GetIntProperty(dynamicconfig.TransferTaskWorkerCount, 10),
TransferProcessorCompleteTransferFailureRetryCount: dc.GetIntProperty(dynamicconfig.TransferProcessorCompleteTransferFailureRetryCount, 10),
TransferProcessorMaxPollInterval: dc.GetDurationProperty(dynamicconfig.TransferProcessorMaxPollInterval, 1*time.Minute),
TransferProcessorMaxPollIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TransferProcessorMaxPollIntervalJitterCoefficient, 0.15),
Expand All @@ -435,23 +438,28 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA
TransferProcessorValidationInterval: dc.GetDurationProperty(dynamicconfig.TransferProcessorValidationInterval, 30*time.Second),
TransferProcessorVisibilityArchivalTimeLimit: dc.GetDurationProperty(dynamicconfig.TransferProcessorVisibilityArchivalTimeLimit, 200*time.Millisecond),

CrossClusterTaskBatchSize: dc.GetIntProperty(dynamicconfig.CrossClusterTaskBatchSize, 100),
CrossClusterTaskDeleteBatchSize: dc.GetIntProperty(dynamicconfig.CrossClusterTaskDeleteBatchSize, 4000),
CrossClusterTaskFetchBatchSize: dc.GetIntPropertyFilteredByShardID(dynamicconfig.CrossClusterTaskFetchBatchSize, 100),
CrossClusterProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.CrossClusterProcessorMaxPollRPS, 20),
CrossClusterTaskWorkerCount: dc.GetIntProperty(dynamicconfig.CrossClusterTaskWorkerCount, 10),
CrossClusterProcessorCompleteTaskFailureRetryCount: dc.GetIntProperty(dynamicconfig.CrossClusterProcessorCompleteTaskFailureRetryCount, 10),
CrossClusterProcessorMaxPollInterval: dc.GetDurationProperty(dynamicconfig.CrossClusterProcessorMaxPollInterval, 1*time.Minute),
CrossClusterProcessorMaxPollIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.CrossClusterProcessorMaxPollIntervalJitterCoefficient, 0.15),
CrossClusterProcessorSplitQueueInterval: dc.GetDurationProperty(dynamicconfig.CrossClusterProcessorSplitQueueInterval, 1*time.Minute),
CrossClusterProcessorSplitQueueIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.CrossClusterProcessorSplitQueueIntervalJitterCoefficient, 0.15),
CrossClusterProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.CrossClusterProcessorUpdateAckInterval, 30*time.Second),
CrossClusterProcessorUpdateAckIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.CrossClusterProcessorUpdateAckIntervalJitterCoefficient, 0.15),
CrossClusterProcessorCompleteTaskInterval: dc.GetDurationProperty(dynamicconfig.CrossClusterProcessorCompleteTaskInterval, 60*time.Second),
CrossClusterProcessorMaxRedispatchQueueSize: dc.GetIntProperty(dynamicconfig.CrossClusterProcessorMaxRedispatchQueueSize, 10000),
CrossClusterProcessorEnableValidator: dc.GetBoolProperty(dynamicconfig.CrossClusterProcessorEnableValidator, false),
CrossClusterProcessorValidationInterval: dc.GetDurationProperty(dynamicconfig.CrossClusterProcessorValidationInterval, 30*time.Second),
CrossClusterProcessorValidationIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.CrossClusterProcessorValidationIntervalJitterCoefficient, 0.15),
CrossClusterTaskBatchSize: dc.GetIntProperty(dynamicconfig.CrossClusterTaskBatchSize, 100),
CrossClusterTaskDeleteBatchSize: dc.GetIntProperty(dynamicconfig.CrossClusterTaskDeleteBatchSize, 4000),
CrossClusterTaskFetchBatchSize: dc.GetIntPropertyFilteredByShardID(dynamicconfig.CrossClusterTaskFetchBatchSize, 100),
CrossClusterSourceProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.CrossClusterSourceProcessorMaxPollRPS, 20),
CrossClusterSourceProcessorMaxPollInterval: dc.GetDurationProperty(dynamicconfig.CrossClusterSourceProcessorMaxPollInterval, 1*time.Minute),
CrossClusterSourceProcessorMaxPollIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.CrossClusterSourceProcessorMaxPollIntervalJitterCoefficient, 0.15),
CrossClusterSourceProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.CrossClusterSourceProcessorUpdateAckInterval, 30*time.Second),
CrossClusterSourceProcessorUpdateAckIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.CrossClusterSourceProcessorUpdateAckIntervalJitterCoefficient, 0.15),
CrossClusterSourceProcessorMaxRedispatchQueueSize: dc.GetIntProperty(dynamicconfig.CrossClusterSourceProcessorMaxRedispatchQueueSize, 10000),
CrossClusterSourceProcessorMaxPendingTaskSize: dc.GetIntProperty(dynamicconfig.CrossClusterSourceProcessorMaxPendingTaskSize, 500),

CrossClusterTargetProcessorMaxPendingTasks: dc.GetIntProperty(dynamicconfig.CrossClusterTargetProcessorMaxPendingTasks, 200),
CrossClusterTargetProcessorMaxRetryCount: dc.GetIntProperty(dynamicconfig.CrossClusterTargetProcessorMaxRetryCount, 20),
CrossClusterTargetProcessorTaskWaitInterval: dc.GetDurationProperty(dynamicconfig.CrossClusterTargetProcessorTaskWaitInterval, 3*time.Second),
CrossClusterTargetProcessorServiceBusyBackoffInterval: dc.GetDurationProperty(dynamicconfig.CrossClusterTargetProcessorServiceBusyBackoffInterval, 5*time.Second),
CrossClusterTargetProcessorJitterCoefficient: dc.GetFloat64Property(dynamicconfig.CrossClusterTargetProcessorJitterCoefficient, 0.15),

CrossClusterFetcherParallelism: dc.GetIntProperty(dynamicconfig.CrossClusterFetcherParallelism, 1),
CrossClusterFetcherAggregationInterval: dc.GetDurationProperty(dynamicconfig.CrossClusterFetcherAggregationInterval, 2*time.Second),
CrossClusterFetcherServiceBusyBackoffInterval: dc.GetDurationProperty(dynamicconfig.CrossClusterFetcherServiceBusyBackoffInterval, 5*time.Second),
CrossClusterFetcherErrorBackoffInterval: dc.GetDurationProperty(dynamicconfig.CrossClusterFetcherServiceBusyBackoffInterval, time.Second),
CrossClusterFetcherJitterCoefficient: dc.GetFloat64Property(dynamicconfig.CrossClusterFetcherJitterCoefficient, 0.15),

ReplicatorTaskBatchSize: dc.GetIntProperty(dynamicconfig.ReplicatorTaskBatchSize, 100),
ReplicatorTaskDeleteBatchSize: dc.GetIntProperty(dynamicconfig.ReplicatorTaskDeleteBatchSize, 4000),
Expand Down
37 changes: 27 additions & 10 deletions service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,17 @@ type (
handlerImpl struct {
resource.Resource

shuttingDown int32
controller shard.Controller
tokenSerializer common.TaskTokenSerializer
startWG sync.WaitGroup
config *config.Config
historyEventNotifier events.Notifier
rateLimiter quotas.Limiter
replicationTaskFetchers replication.TaskFetchers
queueTaskProcessor task.Processor
failoverCoordinator failover.Coordinator
shuttingDown int32
controller shard.Controller
tokenSerializer common.TaskTokenSerializer
startWG sync.WaitGroup
config *config.Config
historyEventNotifier events.Notifier
rateLimiter quotas.Limiter
crossClusterTaskFetchers task.Fetchers
replicationTaskFetchers replication.TaskFetchers
queueTaskProcessor task.Processor
failoverCoordinator failover.Coordinator
}
)

Expand Down Expand Up @@ -161,6 +162,20 @@ func NewHandler(

// Start starts the handler
func (h *handlerImpl) Start() {
h.crossClusterTaskFetchers = task.NewCrossClusterTaskFetchers(
h.GetClusterMetadata(),
h.GetClientBean(),
&task.FetcherOptions{
Parallelism: h.config.CrossClusterFetcherParallelism,
AggregationInterval: h.config.CrossClusterFetcherAggregationInterval,
ServiceBusyBackoffInterval: h.config.CrossClusterFetcherServiceBusyBackoffInterval,
ErrorRetryInterval: h.config.CrossClusterFetcherErrorBackoffInterval,
TimerJitterCoefficient: h.config.CrossClusterFetcherJitterCoefficient,
},
h.GetMetricsClient(),
h.GetLogger(),
)
h.crossClusterTaskFetchers.Start()

h.replicationTaskFetchers = replication.NewTaskFetchers(
h.GetLogger(),
Expand Down Expand Up @@ -221,6 +236,7 @@ func (h *handlerImpl) Start() {
// Stop stops the handler
func (h *handlerImpl) Stop() {
h.prepareToShutDown()
h.crossClusterTaskFetchers.Stop()
h.replicationTaskFetchers.Stop()
h.queueTaskProcessor.Stop()
h.controller.Stop()
Expand Down Expand Up @@ -258,6 +274,7 @@ func (h *handlerImpl) CreateEngine(
h.GetSDKClient(),
h.historyEventNotifier,
h.config,
h.crossClusterTaskFetchers,
h.replicationTaskFetchers,
h.GetMatchingRawClient(),
h.queueTaskProcessor,
Expand Down
Loading

0 comments on commit 041061c

Please sign in to comment.