Skip to content

Commit

Permalink
Automatically adjust task priority and redispatch interval based on a…
Browse files Browse the repository at this point in the history
…ttempts (cadence-workflow#4378)

- Automatically adjust task priority to low when task is keep retrying
- Assign different redispatch backoff interval for different tasks based on number of attempts
  • Loading branch information
yycptt authored Aug 25, 2021
1 parent 59c8f0e commit fb10abe
Show file tree
Hide file tree
Showing 13 changed files with 213 additions and 123 deletions.
38 changes: 10 additions & 28 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,15 @@ const (
// Default value: common.ConvertIntMapToDynamicConfigMapProperty(DefaultTaskPriorityWeight)
// Allowed filters: N/A
TaskSchedulerRoundRobinWeights
// TaskCriticalRetryCount is the critical retry count for background tasks
// when task attempt exceeds this threshold:
// - task attempt metrics and additional error logs will be emitted
// - task priority will be lowered
// KeyName: history.taskCriticalRetryCount
// Value type: Int
// Default value: 20
// Allowed filters: N/A
TaskCriticalRetryCount
// ActiveTaskRedispatchInterval is the active task redispatch interval
// KeyName: history.activeTaskRedispatchInterval
// Value type: Duration
Expand Down Expand Up @@ -908,12 +917,6 @@ const (
// Default value: 10
// Allowed filters: N/A
TimerTaskWorkerCount
// TimerTaskMaxRetryCount is max retry count for timer processor
// KeyName: history.timerTaskMaxRetryCount
// Value type: Int
// Default value: 100
// Allowed filters: N/A
TimerTaskMaxRetryCount
// TimerProcessorGetFailureRetryCount is retry count for timer processor get failure operation
// KeyName: history.timerProcessorGetFailureRetryCount
// Value type: Int
Expand Down Expand Up @@ -1029,12 +1032,6 @@ const (
// Default value: 10
// Allowed filters: N/A
TransferTaskWorkerCount
// TransferTaskMaxRetryCount is max times of retry for transferQueueProcessor
// KeyName: history.transferTaskMaxRetryCount
// Value type: Int
// Default value: 100
// Allowed filters: N/A
TransferTaskMaxRetryCount
// TransferProcessorCompleteTransferFailureRetryCount is times of retry for failure
// KeyName: history.transferProcessorCompleteTransferFailureRetryCount
// Value type: Int
Expand Down Expand Up @@ -1126,12 +1123,6 @@ const (
// Default value: 10
// Allowed filters: N/A
CrossClusterTaskWorkerCount
// CrossClusterTaskMaxRetryCount is max times of retry for crossClusterQueueProcessor
// KeyName: history.crossClusterTaskMaxRetryCount
// Value type: Int
// Default value: 100
// Allowed filters: N/A
CrossClusterTaskMaxRetryCount
// CrossClusterProcessorCompleteTaskFailureRetryCount is times of retry for failure
// KeyName: history.crossClusterProcessorCompleteTaskFailureRetryCount
// Value type: Int
Expand Down Expand Up @@ -1223,12 +1214,6 @@ const (
// Default value: 3
// Allowed filters: N/A
ReplicatorReadTaskMaxRetryCount
// ReplicatorTaskMaxRetryCount is max times of retry for ReplicatorProcessor
// KeyName: history.replicatorTaskMaxRetryCount
// Value type: Int
// Default value: 100
// Allowed filters: N/A
ReplicatorTaskMaxRetryCount
// ReplicatorProcessorMaxPollRPS is max poll rate per second for ReplicatorProcessor
// KeyName: history.replicatorProcessorMaxPollRPS
// Value type: Int
Expand Down Expand Up @@ -2094,6 +2079,7 @@ var Keys = map[Key]string{
TaskSchedulerShardQueueSize: "history.taskSchedulerShardQueueSize",
TaskSchedulerDispatcherCount: "history.taskSchedulerDispatcherCount",
TaskSchedulerRoundRobinWeights: "history.taskSchedulerRoundRobinWeight",
TaskCriticalRetryCount: "history.taskCriticalRetryCount",
ActiveTaskRedispatchInterval: "history.activeTaskRedispatchInterval",
StandbyTaskRedispatchInterval: "history.standbyTaskRedispatchInterval",
TaskRedispatchIntervalJitterCoefficient: "history.taskRedispatchIntervalJitterCoefficient",
Expand All @@ -2115,7 +2101,6 @@ var Keys = map[Key]string{

TimerTaskBatchSize: "history.timerTaskBatchSize",
TimerTaskWorkerCount: "history.timerTaskWorkerCount",
TimerTaskMaxRetryCount: "history.timerTaskMaxRetryCount",
TimerProcessorGetFailureRetryCount: "history.timerProcessorGetFailureRetryCount",
TimerProcessorCompleteTimerFailureRetryCount: "history.timerProcessorCompleteTimerFailureRetryCount",
TimerProcessorUpdateAckInterval: "history.timerProcessorUpdateAckInterval",
Expand All @@ -2136,7 +2121,6 @@ var Keys = map[Key]string{
TransferProcessorFailoverMaxPollRPS: "history.transferProcessorFailoverMaxPollRPS",
TransferProcessorMaxPollRPS: "history.transferProcessorMaxPollRPS",
TransferTaskWorkerCount: "history.transferTaskWorkerCount",
TransferTaskMaxRetryCount: "history.transferTaskMaxRetryCount",
TransferProcessorCompleteTransferFailureRetryCount: "history.transferProcessorCompleteTransferFailureRetryCount",
TransferProcessorMaxPollInterval: "history.transferProcessorMaxPollInterval",
TransferProcessorMaxPollIntervalJitterCoefficient: "history.transferProcessorMaxPollIntervalJitterCoefficient",
Expand All @@ -2153,7 +2137,6 @@ var Keys = map[Key]string{
CrossClusterTaskBatchSize: "history.crossClusterTaskBatchSize",
CrossClusterProcessorMaxPollRPS: "history.crossClusterProcessorMaxPollRPS",
CrossClusterTaskWorkerCount: "history.crossClusterTaskWorkerCount",
CrossClusterTaskMaxRetryCount: "history.crossClusterTaskMaxRetryCount",
CrossClusterProcessorCompleteTaskFailureRetryCount: "history.crossClusterProcessorCompleteTaskFailureRetryCount",
CrossClusterProcessorMaxPollInterval: "history.crossClusterProcessorMaxPollInterval",
CrossClusterProcessorMaxPollIntervalJitterCoefficient: "history.crossClusterProcessorMaxPollIntervalJitterCoefficient",
Expand All @@ -2170,7 +2153,6 @@ var Keys = map[Key]string{
ReplicatorTaskBatchSize: "history.replicatorTaskBatchSize",
ReplicatorTaskWorkerCount: "history.replicatorTaskWorkerCount",
ReplicatorReadTaskMaxRetryCount: "history.replicatorReadTaskMaxRetryCount",
ReplicatorTaskMaxRetryCount: "history.replicatorTaskMaxRetryCount",
ReplicatorProcessorMaxPollRPS: "history.replicatorProcessorMaxPollRPS",
ReplicatorProcessorMaxPollInterval: "history.replicatorProcessorMaxPollInterval",
ReplicatorProcessorMaxPollIntervalJitterCoefficient: "history.replicatorProcessorMaxPollIntervalJitterCoefficient",
Expand Down
10 changes: 2 additions & 8 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type Config struct {
TaskSchedulerShardQueueSize dynamicconfig.IntPropertyFn
TaskSchedulerDispatcherCount dynamicconfig.IntPropertyFn
TaskSchedulerRoundRobinWeights dynamicconfig.MapPropertyFn
TaskCriticalRetryCount dynamicconfig.IntPropertyFn
ActiveTaskRedispatchInterval dynamicconfig.DurationPropertyFn
StandbyTaskRedispatchInterval dynamicconfig.DurationPropertyFn
TaskRedispatchIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
Expand All @@ -119,7 +120,6 @@ type Config struct {
// TimerQueueProcessor settings
TimerTaskBatchSize dynamicconfig.IntPropertyFn
TimerTaskWorkerCount dynamicconfig.IntPropertyFn
TimerTaskMaxRetryCount dynamicconfig.IntPropertyFn
TimerProcessorGetFailureRetryCount dynamicconfig.IntPropertyFn
TimerProcessorCompleteTimerFailureRetryCount dynamicconfig.IntPropertyFn
TimerProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn
Expand All @@ -139,7 +139,6 @@ type Config struct {
// TransferQueueProcessor settings
TransferTaskBatchSize dynamicconfig.IntPropertyFn
TransferTaskWorkerCount dynamicconfig.IntPropertyFn
TransferTaskMaxRetryCount dynamicconfig.IntPropertyFn
TransferProcessorCompleteTransferFailureRetryCount dynamicconfig.IntPropertyFn
TransferProcessorFailoverMaxPollRPS dynamicconfig.IntPropertyFn
TransferProcessorMaxPollRPS dynamicconfig.IntPropertyFn
Expand All @@ -158,7 +157,6 @@ type Config struct {
// CrossClusterQueueProcessor settings
CrossClusterTaskBatchSize dynamicconfig.IntPropertyFn
CrossClusterTaskWorkerCount dynamicconfig.IntPropertyFn
CrossClusterTaskMaxRetryCount dynamicconfig.IntPropertyFn
CrossClusterProcessorCompleteTaskFailureRetryCount dynamicconfig.IntPropertyFn
CrossClusterProcessorMaxPollRPS dynamicconfig.IntPropertyFn
CrossClusterProcessorMaxPollInterval dynamicconfig.DurationPropertyFn
Expand All @@ -176,7 +174,6 @@ type Config struct {
// ReplicatorQueueProcessor settings
ReplicatorTaskBatchSize dynamicconfig.IntPropertyFn
ReplicatorTaskWorkerCount dynamicconfig.IntPropertyFn
ReplicatorTaskMaxRetryCount dynamicconfig.IntPropertyFn
ReplicatorReadTaskMaxRetryCount dynamicconfig.IntPropertyFn
ReplicatorProcessorMaxPollRPS dynamicconfig.IntPropertyFn
ReplicatorProcessorMaxPollInterval dynamicconfig.DurationPropertyFn
Expand Down Expand Up @@ -374,6 +371,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA
TaskSchedulerShardQueueSize: dc.GetIntProperty(dynamicconfig.TaskSchedulerShardQueueSize, 200),
TaskSchedulerDispatcherCount: dc.GetIntProperty(dynamicconfig.TaskSchedulerDispatcherCount, 1),
TaskSchedulerRoundRobinWeights: dc.GetMapProperty(dynamicconfig.TaskSchedulerRoundRobinWeights, common.ConvertIntMapToDynamicConfigMapProperty(DefaultTaskPriorityWeight)),
TaskCriticalRetryCount: dc.GetIntProperty(dynamicconfig.TaskCriticalRetryCount, 50),
ActiveTaskRedispatchInterval: dc.GetDurationProperty(dynamicconfig.ActiveTaskRedispatchInterval, 5*time.Second),
StandbyTaskRedispatchInterval: dc.GetDurationProperty(dynamicconfig.StandbyTaskRedispatchInterval, 30*time.Second),
TaskRedispatchIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TaskRedispatchIntervalJitterCoefficient, 0.15),
Expand All @@ -397,7 +395,6 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA

TimerTaskBatchSize: dc.GetIntProperty(dynamicconfig.TimerTaskBatchSize, 100),
TimerTaskWorkerCount: dc.GetIntProperty(dynamicconfig.TimerTaskWorkerCount, 10),
TimerTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.TimerTaskMaxRetryCount, 100),
TimerProcessorGetFailureRetryCount: dc.GetIntProperty(dynamicconfig.TimerProcessorGetFailureRetryCount, 5),
TimerProcessorCompleteTimerFailureRetryCount: dc.GetIntProperty(dynamicconfig.TimerProcessorCompleteTimerFailureRetryCount, 10),
TimerProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.TimerProcessorUpdateAckInterval, 30*time.Second),
Expand All @@ -418,7 +415,6 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA
TransferProcessorFailoverMaxPollRPS: dc.GetIntProperty(dynamicconfig.TransferProcessorFailoverMaxPollRPS, 1),
TransferProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.TransferProcessorMaxPollRPS, 20),
TransferTaskWorkerCount: dc.GetIntProperty(dynamicconfig.TransferTaskWorkerCount, 10),
TransferTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.TransferTaskMaxRetryCount, 100),
TransferProcessorCompleteTransferFailureRetryCount: dc.GetIntProperty(dynamicconfig.TransferProcessorCompleteTransferFailureRetryCount, 10),
TransferProcessorMaxPollInterval: dc.GetDurationProperty(dynamicconfig.TransferProcessorMaxPollInterval, 1*time.Minute),
TransferProcessorMaxPollIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TransferProcessorMaxPollIntervalJitterCoefficient, 0.15),
Expand All @@ -435,7 +431,6 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA
CrossClusterTaskBatchSize: dc.GetIntProperty(dynamicconfig.CrossClusterTaskBatchSize, 100),
CrossClusterProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.CrossClusterProcessorMaxPollRPS, 20),
CrossClusterTaskWorkerCount: dc.GetIntProperty(dynamicconfig.CrossClusterTaskWorkerCount, 10),
CrossClusterTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.CrossClusterTaskMaxRetryCount, 100),
CrossClusterProcessorCompleteTaskFailureRetryCount: dc.GetIntProperty(dynamicconfig.CrossClusterProcessorCompleteTaskFailureRetryCount, 10),
CrossClusterProcessorMaxPollInterval: dc.GetDurationProperty(dynamicconfig.CrossClusterProcessorMaxPollInterval, 1*time.Minute),
CrossClusterProcessorMaxPollIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.CrossClusterProcessorMaxPollIntervalJitterCoefficient, 0.15),
Expand All @@ -451,7 +446,6 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA

ReplicatorTaskBatchSize: dc.GetIntProperty(dynamicconfig.ReplicatorTaskBatchSize, 100),
ReplicatorTaskWorkerCount: dc.GetIntProperty(dynamicconfig.ReplicatorTaskWorkerCount, 10),
ReplicatorTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.ReplicatorTaskMaxRetryCount, 100),
ReplicatorReadTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.ReplicatorReadTaskMaxRetryCount, 3),
ReplicatorProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.ReplicatorProcessorMaxPollRPS, 20),
ReplicatorProcessorMaxPollInterval: dc.GetDurationProperty(dynamicconfig.ReplicatorProcessorMaxPollInterval, 1*time.Minute),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func newCrossClusterQueueProcessorBaseHelper(
func(t task.Task) {
_, _ = base.submitTask(t)
},
shard.GetConfig().CrossClusterTaskMaxRetryCount,
shard.GetConfig().TaskCriticalRetryCount,
)
}
return base
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ func (s *crossClusterQueueProcessorBaseSuite) TestUpdateTask_SubmitTask_Redispat
crossClusterTask.EXPECT().GetDomainID().Return(uuid.New()).AnyTimes()
crossClusterTask.EXPECT().Update(gomock.Any()).Return(nil).Times(1)
crossClusterTask.EXPECT().Priority().Return(0).AnyTimes()
crossClusterTask.EXPECT().GetAttempt().Return(0).Times(1)
crossClusterTask.EXPECT().GetTaskType().Return(1).AnyTimes()
s.mockTaskProcessor.EXPECT().TrySubmit(gomock.Any()).Return(false, errors.New("test")).Times(1)
newTaskMap := map[task.Key]task.Task{newCrossClusterTaskKey(2): crossClusterTask}
Expand Down
1 change: 1 addition & 0 deletions service/history/queue/processor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func newProcessorBase(
taskProcessor: taskProcessor,
redispatcher: task.NewRedispatcher(
taskProcessor,
shard.GetTimeSource(),
&task.RedispatcherOptions{
TaskRedispatchInterval: options.RedispatchInterval,
TaskRedispatchIntervalJitterCoefficient: options.RedispatchIntervalJitterCoefficient,
Expand Down
2 changes: 1 addition & 1 deletion service/history/queue/timer_queue_processor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func newTimerQueueProcessorBase(
taskExecutor,
taskProcessor,
processorBase.redispatcher.AddTask,
shard.GetConfig().TimerTaskMaxRetryCount,
shard.GetConfig().TaskCriticalRetryCount,
)
},

Expand Down
2 changes: 1 addition & 1 deletion service/history/queue/transfer_queue_processor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func newTransferQueueProcessorBase(
taskExecutor,
taskProcessor,
processorBase.redispatcher.AddTask,
shard.GetConfig().TransferTaskMaxRetryCount,
shard.GetConfig().TaskCriticalRetryCount,
)
},

Expand Down
1 change: 1 addition & 0 deletions service/history/task/cross_cluster_task_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func newCrossClusterTaskProcessor(
taskFetcher: taskFetcher,
redispatcher: NewRedispatcher(
taskProcessor,
shard.GetTimeSource(),
&RedispatcherOptions{
TaskRedispatchInterval: options.TaskRedispatchInterval,
TaskRedispatchIntervalJitterCoefficient: options.TimerJitterCoefficient,
Expand Down
22 changes: 16 additions & 6 deletions service/history/task/priority_assigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ import (
"github.com/uber/cadence/service/history/config"
)

var (
highTaskPriority = task.GetTaskPriority(task.HighPriorityClass, task.DefaultPrioritySubclass)
defaultTaskPriority = task.GetTaskPriority(task.DefaultPriorityClass, task.DefaultPrioritySubclass)
lowTaskPriority = task.GetTaskPriority(task.LowPriorityClass, task.DefaultPrioritySubclass)
)

type (
priorityAssignerImpl struct {
sync.RWMutex
Expand Down Expand Up @@ -69,19 +75,23 @@ func NewPriorityAssigner(
func (a *priorityAssignerImpl) Assign(
queueTask Task,
) error {
if queueTask.Priority() != task.NoPriority {
if priority := queueTask.Priority(); priority != task.NoPriority {
if priority != lowTaskPriority && queueTask.GetAttempt() > a.config.TaskCriticalRetryCount() {
// automatically lower the priority if task attempt exceeds certain threshold
queueTask.SetPriority(lowTaskPriority)
}
return nil
}

queueType := queueTask.GetQueueType()

if queueType == QueueTypeReplication {
queueTask.SetPriority(task.GetTaskPriority(task.LowPriorityClass, task.DefaultPrioritySubclass))
queueTask.SetPriority(lowTaskPriority)
return nil
}

// timer or transfer task, first check if task is active or not and if domain is active or not
isActiveTask := queueType == QueueTypeActiveTimer || queueType == QueueTypeActiveTransfer
isActiveTask := queueType == QueueTypeActiveTimer || queueType == QueueTypeActiveTransfer || queueType == QueueTypeCrossCluster
domainName, isActiveDomain, err := a.getDomainInfo(queueTask.GetDomainID())
if err != nil {
return err
Expand All @@ -95,7 +105,7 @@ func (a *priorityAssignerImpl) Assign(

if !isActiveTask && !isActiveDomain {
// only assign low priority to tasks in the fourth case
queueTask.SetPriority(task.GetTaskPriority(task.LowPriorityClass, task.DefaultPrioritySubclass))
queueTask.SetPriority(lowTaskPriority)
return nil
}

Expand All @@ -104,7 +114,7 @@ func (a *priorityAssignerImpl) Assign(
// it can be quickly verified/acked and won't prevent the ack level in the processor from advancing
// (especially for active processor)
if !a.getRateLimiter(domainName).Allow() {
queueTask.SetPriority(task.GetTaskPriority(task.DefaultPriorityClass, task.DefaultPrioritySubclass))
queueTask.SetPriority(defaultTaskPriority)
taggedScope := a.scope.Tagged(metrics.DomainTag(domainName))
if queueType == QueueTypeActiveTransfer || queueType == QueueTypeStandbyTransfer {
taggedScope.IncCounter(metrics.TransferTaskThrottledCounter)
Expand All @@ -114,7 +124,7 @@ func (a *priorityAssignerImpl) Assign(
return nil
}

queueTask.SetPriority(task.GetTaskPriority(task.HighPriorityClass, task.DefaultPrioritySubclass))
queueTask.SetPriority(highTaskPriority)
return nil
}

Expand Down
Loading

0 comments on commit fb10abe

Please sign in to comment.