Skip to content

Commit

Permalink
Drop stuck close execution transfer task (cadence-workflow#3240) (cad…
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Aug 4, 2020
1 parent d096317 commit dee9745
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 0 deletions.
3 changes: 3 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1715,6 +1715,8 @@ const (
TransferTaskThrottledCounter
TimerTaskThrottledCounter

TransferTaskMissingEventCounter

ProcessingQueuePendingTaskSplitCounter
ProcessingQueueStuckTaskSplitCounter
ProcessingQueueSelectedDomainSplitCounter
Expand Down Expand Up @@ -2159,6 +2161,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
TaskRedispatchQueuePendingTasksTimer: {metricName: "task_redispatch_queue_pending_tasks", metricType: Timer},
TransferTaskThrottledCounter: {metricName: "transfer_task_throttled_counter", metricType: Counter},
TimerTaskThrottledCounter: {metricName: "timer_task_throttled_counter", metricType: Counter},
TransferTaskMissingEventCounter: {metricName: "transfer_task_missing_event_counter", metricType: Counter},
ProcessingQueuePendingTaskSplitCounter: {metricName: "processing_queue_pending_task_split_counter", metricType: Counter},
ProcessingQueueStuckTaskSplitCounter: {metricName: "processing_queue_stuck_task_split_counter", metricType: Counter},
ProcessingQueueSelectedDomainSplitCounter: {metricName: "processing_queue_selected_domain_split_counter", metricType: Counter},
Expand Down
9 changes: 9 additions & 0 deletions common/service/dynamicconfig/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,15 @@ func (s *configSuite) TestGetBoolProperty() {
s.Equal(false, value())
}

func (s *configSuite) TestGetBoolPropertyFilteredByDomainID() {
key := testGetBoolPropertyFilteredByDomainIDKey
domainID := "testDomainID"
value := s.cln.GetBoolPropertyFilteredByDomainID(key, true)
s.Equal(true, value(domainID))
s.client.SetValue(key, false)
s.Equal(false, value(domainID))
}

func (s *configSuite) TestGetBoolPropertyFilteredByTaskListInfo() {
key := testGetBoolPropertyFilteredByTaskListInfoKey
domain := "testDomain"
Expand Down
6 changes: 6 additions & 0 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var keys = map[Key]string{
testGetDurationPropertyFilteredByDomainKey: "testGetDurationPropertyFilteredByDomainKey",
testGetIntPropertyFilteredByTaskListInfoKey: "testGetIntPropertyFilteredByTaskListInfoKey",
testGetDurationPropertyFilteredByTaskListInfoKey: "testGetDurationPropertyFilteredByTaskListInfoKey",
testGetBoolPropertyFilteredByDomainIDKey: "testGetBoolPropertyFilteredByDomainIDKey",
testGetBoolPropertyFilteredByTaskListInfoKey: "testGetBoolPropertyFilteredByTaskListInfoKey",

// system settings
Expand Down Expand Up @@ -273,6 +274,7 @@ var keys = map[Key]string{
ReplicationEventsFromCurrentCluster: "history.ReplicationEventsFromCurrentCluster",
NotifyFailoverMarkerInterval: "history.NotifyFailoverMarkerInterval",
NotifyFailoverMarkerTimerJitterCoefficient: "history.NotifyFailoverMarkerTimerJitterCoefficient",
EnableDropStuckTaskByDomainID: "history.DropStuckTaskByDomain",

WorkerPersistenceMaxQPS: "worker.persistenceMaxQPS",
WorkerPersistenceGlobalMaxQPS: "worker.persistenceGlobalMaxQPS",
Expand Down Expand Up @@ -335,6 +337,7 @@ const (
testGetDurationPropertyFilteredByDomainKey
testGetIntPropertyFilteredByTaskListInfoKey
testGetDurationPropertyFilteredByTaskListInfoKey
testGetBoolPropertyFilteredByDomainIDKey
testGetBoolPropertyFilteredByTaskListInfoKey

// EnableGlobalDomain is key for enable global domain
Expand Down Expand Up @@ -737,6 +740,9 @@ const (
// DecisionHeartbeatTimeout for decision heartbeat
DecisionHeartbeatTimeout

// EnableDropStuckTaskByDomainID is whether stuck timer/transfer task should be dropped for a domain
EnableDropStuckTaskByDomainID

// key for worker

// WorkerPersistenceMaxQPS is the max qps worker host can query DB
Expand Down
2 changes: 2 additions & 0 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type Config struct {
ActiveTaskRedispatchInterval dynamicconfig.DurationPropertyFn
StandbyTaskRedispatchInterval dynamicconfig.DurationPropertyFn
TaskRedispatchIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
EnableDropStuckTaskByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter

// QueueProcessor settings
QueueProcessorEnableDomainTaggedMetrics dynamicconfig.BoolPropertyFn
Expand Down Expand Up @@ -324,6 +325,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA
ActiveTaskRedispatchInterval: dc.GetDurationProperty(dynamicconfig.ActiveTaskRedispatchInterval, 5*time.Second),
StandbyTaskRedispatchInterval: dc.GetDurationProperty(dynamicconfig.StandbyTaskRedispatchInterval, 30*time.Second),
TaskRedispatchIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TimerProcessorSplitQueueIntervalJitterCoefficient, 0.15),
EnableDropStuckTaskByDomainID: dc.GetBoolPropertyFilteredByDomainID(dynamicconfig.EnableDropStuckTaskByDomainID, false),

QueueProcessorEnableDomainTaggedMetrics: dc.GetBoolProperty(dynamicconfig.QueueProcessorEnableDomainTaggedMetrics, false),
QueueProcessorEnableSplit: dc.GetBoolProperty(dynamicconfig.QueueProcessorEnableSplit, false),
Expand Down
10 changes: 10 additions & 0 deletions service/history/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service/dynamicconfig"
ctask "github.com/uber/cadence/common/task"
"github.com/uber/cadence/service/history/execution"
"github.com/uber/cadence/service/history/shard"
)

Expand Down Expand Up @@ -291,6 +292,15 @@ func (t *taskBase) HandleErr(
return nil
}

if transferTask, ok := t.Info.(*persistence.TransferTaskInfo); ok &&
transferTask.TaskType == persistence.TransferTaskTypeCloseExecution &&
err == execution.ErrMissingWorkflowStartEvent &&
t.shard.GetConfig().EnableDropStuckTaskByDomainID(t.Info.GetDomainID()) { // use domainID here to avoid accessing domainCache
t.scope.IncCounter(metrics.TransferTaskMissingEventCounter)
t.logger.Error("Drop close execution transfer task due to corrupted workflow history", tag.Error(err), tag.LifeCycleProcessingFailed)
return nil
}

// this is a transient error
if err == ErrTaskRedispatch {
t.scope.IncCounter(metrics.TaskStandbyRetryCounter)
Expand Down
9 changes: 9 additions & 0 deletions service/history/taskProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,15 @@ func (t *taskProcessor) handleTaskError(
return nil
}

if transferTask, ok := taskInfo.task.(*persistence.TransferTaskInfo); ok &&
transferTask.TaskType == persistence.TransferTaskTypeCloseExecution &&
err == execution.ErrMissingWorkflowStartEvent &&
t.config.EnableDropStuckTaskByDomainID(taskInfo.task.GetDomainID()) { // use domainID here to avoid accessing domainCache
scope.IncCounter(metrics.TransferTaskMissingEventCounter)
taskInfo.logger.Error("Drop close execution transfer task due to corrupted workflow history", tag.Error(err), tag.LifeCycleProcessingFailed)
return nil
}

// this is a transient error
if err == task.ErrTaskRedispatch {
scope.IncCounter(metrics.TaskStandbyRetryCounter)
Expand Down

0 comments on commit dee9745

Please sign in to comment.