diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 169c6796821..54c43663a0e 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -455,6 +455,8 @@ const ( StickyTTL = "history.stickyTTL" // WorkflowTaskHeartbeatTimeout for workflow task heartbeat WorkflowTaskHeartbeatTimeout = "history.workflowTaskHeartbeatTimeout" + // WorkflowTaskCriticalAttempts is the number of attempts for a workflow task that's regarded as critical + WorkflowTaskCriticalAttempts = "history.workflowTaskCriticalAttempt" // DefaultWorkflowTaskTimeout for a workflow task DefaultWorkflowTaskTimeout = "history.defaultWorkflowTaskTimeout" // SkipReapplicationByNamespaceID is whether skipping a event re-application for a namespace diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 6530140456e..552bdc50bf4 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -1923,6 +1923,7 @@ const ( EmptyCompletionCommandsCounter MultipleCompletionCommandsCounter FailedWorkflowTasksCounter + WorkflowTaskAttempt StaleMutableStateCounter AutoResetPointsLimitExceededCounter AutoResetPointCorruptionCounter @@ -2384,6 +2385,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ EmptyCompletionCommandsCounter: NewCounterDef("empty_completion_commands"), MultipleCompletionCommandsCounter: NewCounterDef("multiple_completion_commands"), FailedWorkflowTasksCounter: NewCounterDef("failed_workflow_tasks"), + WorkflowTaskAttempt: NewDimensionlessHistogramDef("worrkflow_task_attempt"), StaleMutableStateCounter: NewCounterDef("stale_mutable_state"), AutoResetPointsLimitExceededCounter: NewCounterDef("auto_reset_points_exceed_limit"), AutoResetPointCorruptionCounter: NewCounterDef("auto_reset_point_corruption"), diff --git a/service/history/configs/config.go b/service/history/configs/config.go index d2cc74f6156..37006f50854 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -190,6 +190,7 @@ type Config struct { // WorkflowTaskHeartbeatTimeout is to timeout behavior of: RespondWorkflowTaskComplete with ForceCreateNewWorkflowTask == true without any workflow tasks // So that workflow task will be scheduled to another worker(by clear stickyness) WorkflowTaskHeartbeatTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter + WorkflowTaskCriticalAttempts dynamicconfig.IntPropertyFn // The following is used by the new RPC replication stack ReplicationTaskFetcherParallelism dynamicconfig.IntPropertyFn @@ -377,6 +378,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis DefaultWorkflowRetryPolicy: dc.GetMapPropertyFnWithNamespaceFilter(dynamicconfig.DefaultWorkflowRetryPolicy, common.GetDefaultRetryPolicyConfigOptions()), StickyTTL: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.StickyTTL, time.Hour*24*365), WorkflowTaskHeartbeatTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.WorkflowTaskHeartbeatTimeout, time.Minute*30), + WorkflowTaskCriticalAttempts: dc.GetIntProperty(dynamicconfig.WorkflowTaskCriticalAttempts, 10), ReplicationTaskFetcherParallelism: dc.GetIntProperty(dynamicconfig.ReplicationTaskFetcherParallelism, 4), ReplicationTaskFetcherAggregationInterval: dc.GetDurationProperty(dynamicconfig.ReplicationTaskFetcherAggregationInterval, 2*time.Second), diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index 3639af1474e..69bcc9712f3 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -41,6 +41,7 @@ import ( enumsspb "go.temporal.io/server/api/enums/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/log/tag" + "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/primitives/timestamp" ) @@ -58,7 +59,15 @@ func newWorkflowTaskStateMachine( } } -func (m *workflowTaskStateMachine) ReplicateWorkflowTaskScheduledEvent(version int64, scheduleID int64, taskQueue *taskqueuepb.TaskQueue, startToCloseTimeoutSeconds int32, attempt int32, scheduleTimestamp *time.Time, originalScheduledTimestamp *time.Time) (*WorkflowTaskInfo, error) { +func (m *workflowTaskStateMachine) ReplicateWorkflowTaskScheduledEvent( + version int64, + scheduleID int64, + taskQueue *taskqueuepb.TaskQueue, + startToCloseTimeoutSeconds int32, + attempt int32, + scheduleTimestamp *time.Time, + originalScheduledTimestamp *time.Time, +) (*WorkflowTaskInfo, error) { // set workflow state to running, since workflow task is scheduled // NOTE: for zombie workflow, should not change the state @@ -394,6 +403,9 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent( } workflowTask, err := m.ReplicateWorkflowTaskStartedEvent(workflowTask, m.ms.GetCurrentVersion(), scheduleID, startedID, requestID, startTime) + + m.emitWorkflowTaskAttemptStats(workflowTask.Attempt) + // TODO merge active & passive task generation if err := m.ms.taskGenerator.GenerateStartWorkflowTaskTasks( startTime, // start time is now @@ -739,3 +751,21 @@ func (m *workflowTaskStateMachine) afterAddWorkflowTaskCompletedEvent( m.ms.executionInfo.LastWorkflowTaskStartId = event.GetWorkflowTaskCompletedEventAttributes().GetStartedEventId() return m.ms.addBinaryCheckSumIfNotExists(event, maxResetPoints) } + +func (m *workflowTaskStateMachine) emitWorkflowTaskAttemptStats( + attempt int32, +) { + namespaceName := m.ms.GetNamespaceEntry().Name().String() + m.ms.metricsClient.Scope( + metrics.WorkflowContextScope, + metrics.NamespaceTag(namespaceName), + ).RecordDistribution(metrics.WorkflowTaskAttempt, int(attempt)) + if attempt >= int32(m.ms.shard.GetConfig().WorkflowTaskCriticalAttempts()) { + m.ms.shard.GetThrottledLogger().Warn("Critical attempts processing workflow task", + tag.WorkflowNamespace(namespaceName), + tag.WorkflowID(m.ms.GetExecutionInfo().WorkflowId), + tag.WorkflowRunID(m.ms.GetExecutionState().RunId), + tag.Attempt(attempt), + ) + } +}