Skip to content

Commit

Permalink
Limit batch size for fetching cross cluster tasks (cadence-workflow#4487
Browse files Browse the repository at this point in the history
)
  • Loading branch information
yycptt authored Sep 17, 2021
1 parent 5846821 commit 52c8acc
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 12 deletions.
9 changes: 7 additions & 2 deletions common/dynamicconfig/config_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ func GetIntPropertyFilteredByTaskListInfo(value int) func(domain string, taskLis
return func(domain string, taskList string, taskType int) int { return value }
}

// GetIntPropertyFilteredByShardID returns values as IntPropertyFnWithShardIDFilter
func GetIntPropertyFilteredByShardID(value int) func(shardID int) int {
return func(shardID int) int { return value }
}

// GetFloatPropertyFn returns value as FloatPropertyFn
func GetFloatPropertyFn(value float64) func(opts ...FilterOption) float64 {
return func(...FilterOption) float64 { return value }
Expand Down Expand Up @@ -74,8 +79,8 @@ func GetDurationPropertyFnFilteredByTaskListInfo(value time.Duration) func(domai
return func(domain string, taskList string, taskType int) time.Duration { return value }
}

// GetDurationPropertyFnFilteredByTShardID returns value as DurationPropertyFnWithTaskListInfoFilters
func GetDurationPropertyFnFilteredByTShardID(value time.Duration) func(shardID int) time.Duration {
// GetDurationPropertyFnFilteredByShardID returns value as DurationPropertyFnWithShardIDFilter
func GetDurationPropertyFnFilteredByShardID(value time.Duration) func(shardID int) time.Duration {
return func(shardID int) time.Duration { return value }
}

Expand Down
11 changes: 9 additions & 2 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1129,18 +1129,24 @@ const (
// Allowed filters: N/A
TransferProcessorVisibilityArchivalTimeLimit

// CrossClusterTaskBatchSize is batch size for crossClusterQueueProcessor
// CrossClusterTaskBatchSize is the batch size for loading cross cluster tasks from persistence in crossClusterQueueProcessor
// KeyName: history.crossClusterTaskBatchSize
// Value type: Int
// Default value: 100
// Allowed filters: N/A
CrossClusterTaskBatchSize
// CrossClusterTaskDeleteBatchSize is batch size for crossClusterQueueProcessor to delete cross cluster tasks
// CrossClusterTaskDeleteBatchSize is the batch size for deleting cross cluster tasks from persistence in crossClusterQueueProcessor
// KeyName: history.crossClusterTaskDeleteBatchSize
// Value type: Int
// Default value: 4000
// Allowed filters: N/A
CrossClusterTaskDeleteBatchSize
// CrossClusterTaskFetchBatchSize is batch size for dispatching cross cluster tasks to target cluster in crossClusterQueueProcessor
// KeyName: history.crossClusterTaskFetchBatchSize
// Value type: Int
// Default value: 100
// Allowed filters: ShardID
CrossClusterTaskFetchBatchSize
// CrossClusterProcessorMaxPollRPS is max poll rate per second for crossClusterQueueProcessor
// KeyName: history.crossClusterProcessorMaxPollRPS
// Value type: Int
Expand Down Expand Up @@ -2186,6 +2192,7 @@ var Keys = map[Key]string{

CrossClusterTaskBatchSize: "history.crossClusterTaskBatchSize",
CrossClusterTaskDeleteBatchSize: "history.crossClusterTaskDeleteBatchSize",
CrossClusterTaskFetchBatchSize: "history.crossClusterTaskFetchBatchSize",
CrossClusterProcessorMaxPollRPS: "history.crossClusterProcessorMaxPollRPS",
CrossClusterTaskWorkerCount: "history.crossClusterTaskWorkerCount",
CrossClusterProcessorCompleteTaskFailureRetryCount: "history.crossClusterProcessorCompleteTaskFailureRetryCount",
Expand Down
2 changes: 2 additions & 0 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ type Config struct {
// CrossClusterQueueProcessor settings
CrossClusterTaskBatchSize dynamicconfig.IntPropertyFn
CrossClusterTaskDeleteBatchSize dynamicconfig.IntPropertyFn
CrossClusterTaskFetchBatchSize dynamicconfig.IntPropertyFnWithShardIDFilter
CrossClusterTaskWorkerCount dynamicconfig.IntPropertyFn
CrossClusterProcessorCompleteTaskFailureRetryCount dynamicconfig.IntPropertyFn
CrossClusterProcessorMaxPollRPS dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -436,6 +437,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA

CrossClusterTaskBatchSize: dc.GetIntProperty(dynamicconfig.CrossClusterTaskBatchSize, 100),
CrossClusterTaskDeleteBatchSize: dc.GetIntProperty(dynamicconfig.CrossClusterTaskDeleteBatchSize, 4000),
CrossClusterTaskFetchBatchSize: dc.GetIntPropertyFilteredByShardID(dynamicconfig.CrossClusterTaskFetchBatchSize, 100),
CrossClusterProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.CrossClusterProcessorMaxPollRPS, 20),
CrossClusterTaskWorkerCount: dc.GetIntProperty(dynamicconfig.CrossClusterTaskWorkerCount, 10),
CrossClusterProcessorCompleteTaskFailureRetryCount: dc.GetIntProperty(dynamicconfig.CrossClusterProcessorCompleteTaskFailureRetryCount, 10),
Expand Down
6 changes: 5 additions & 1 deletion service/history/queue/cross_cluster_queue_processor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func (c *crossClusterQueueProcessorBase) readTasks(

func (c *crossClusterQueueProcessorBase) pollTasks() []*types.CrossClusterTaskRequest {

// TODO: control the number of tasks returned
batchSize := c.shard.GetConfig().CrossClusterTaskFetchBatchSize(c.shard.GetShardID())
var result []*types.CrossClusterTaskRequest
for _, queueTask := range c.getTasks() {
crossClusterTask, ok := queueTask.(task.CrossClusterTask)
Expand All @@ -374,6 +374,9 @@ func (c *crossClusterQueueProcessorBase) pollTasks() []*types.CrossClusterTaskRe
// if request is nil, nothing need to be done for the task,
// task already acked in GetCrossClusterRequest()
result = append(result, request)
if len(result) >= batchSize {
break
}
}
}
}
Expand Down Expand Up @@ -511,6 +514,7 @@ func (c *crossClusterQueueProcessorBase) setupBackoffTimer(level int) {
})
}

// TODO: we should pass in a context for handling actions
func (c *crossClusterQueueProcessorBase) handleActionNotification(notification actionNotification) {
switch notification.action.ActionType {
case ActionTypeGetTasks:
Expand Down
42 changes: 36 additions & 6 deletions service/history/queue/cross_cluster_queue_processor_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,6 @@ func (s *crossClusterQueueProcessorBaseSuite) TestPollTasks_OneTaskReady_ReturnO
readyTask := task.NewMockCrossClusterTask(s.controller)
readyTask.EXPECT().GetDomainID().Return(uuid.New()).AnyTimes()
readyTask.EXPECT().IsReadyForPoll().Return(true)
readyTask.EXPECT().GetVisibilityTimestamp().Return(time.Time{}).AnyTimes()
readyTask.EXPECT().GetWorkflowID().Return(uuid.New()).AnyTimes()
readyTask.EXPECT().GetRunID().Return(uuid.New()).AnyTimes()
readyTask.EXPECT().GetTaskType().Return(1).AnyTimes()
readyTask.EXPECT().State().Return(ctask.State(0)).AnyTimes()
readyTask.EXPECT().GetTaskID().Return(int64(0)).AnyTimes()
readyTask.EXPECT().GetCrossClusterRequest().Return(&types.CrossClusterTaskRequest{}, nil).AnyTimes()
notReadyTask := task.NewMockCrossClusterTask(s.controller)
notReadyTask.EXPECT().GetDomainID().Return(uuid.New()).AnyTimes()
Expand All @@ -310,6 +304,42 @@ func (s *crossClusterQueueProcessorBaseSuite) TestPollTasks_OneTaskReady_ReturnO
s.Equal(1, len(tasks))
}

func (s *crossClusterQueueProcessorBaseSuite) TestPollTasks_FetchBatchSizeLimit() {
fetchBatchSize := 10
s.mockShard.GetConfig().CrossClusterTaskFetchBatchSize = dynamicconfig.GetIntPropertyFilteredByShardID(fetchBatchSize)
numReadyTasks := fetchBatchSize * 2

clusterName := "test"
processingQueueState := newProcessingQueueState(
0,
testKey{ID: 0},
testKey{ID: 0},
testKey{ID: numReadyTasks},
NewDomainFilter(map[string]struct{}{}, true),
)

processorBase := s.newTestCrossClusterQueueProcessorBase(
[]ProcessingQueueState{processingQueueState},
clusterName,
nil,
nil,
nil,
)

readyTasksMap := make(map[task.Key]task.Task)
for i := 0; i != numReadyTasks; i++ {
readyTask := task.NewMockCrossClusterTask(s.controller)
readyTask.EXPECT().GetDomainID().Return(uuid.New()).AnyTimes()
readyTask.EXPECT().IsReadyForPoll().Return(true).AnyTimes()
readyTask.EXPECT().GetCrossClusterRequest().Return(&types.CrossClusterTaskRequest{}, nil).AnyTimes()
readyTasksMap[testKey{ID: i + 1}] = readyTask
}
processorBase.processingQueueCollections[0].AddTasks(readyTasksMap, testKey{ID: numReadyTasks})

tasks := processorBase.pollTasks()
s.Len(tasks, fetchBatchSize)
}

func (s *crossClusterQueueProcessorBaseSuite) TestUpdateTask_Success() {
clusterName := "test"
processingQueueState := newProcessingQueueState(
Expand Down
2 changes: 1 addition & 1 deletion service/history/replication/task_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (s *taskProcessorSuite) SetupTest() {

s.mockEngine = engine.NewMockEngine(s.controller)
s.config = config.NewForTest()
s.config.ReplicationTaskProcessorNoTaskRetryWait = dynamicconfig.GetDurationPropertyFnFilteredByTShardID(1 * time.Millisecond)
s.config.ReplicationTaskProcessorNoTaskRetryWait = dynamicconfig.GetDurationPropertyFnFilteredByShardID(1 * time.Millisecond)
metricsClient := metrics.NewClient(tally.NoopScope, metrics.History)
s.requestChan = make(chan *request, 10)

Expand Down

0 comments on commit 52c8acc

Please sign in to comment.