Skip to content

Commit

Permalink
[Wf-Diagnostics] Unmarshal metadata for timeout issues and rootcause (c…
Browse files Browse the repository at this point in the history
…adence-workflow#6294)

* [Wf-Diagnostics] Unmarshal metadata for timeout issues and rootcause

* update the result struct

* Update workflow_test.go
  • Loading branch information
sankari165 authored Sep 19, 2024
1 parent dc51527 commit 811ffe7
Show file tree
Hide file tree
Showing 7 changed files with 306 additions and 49 deletions.
2 changes: 1 addition & 1 deletion service/worker/diagnostics/activities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func Test__rootCauseTimeouts(t *testing.T) {
},
}
taskListBacklog := int64(10)
taskListBacklogInBytes, err := json.Marshal(taskListBacklog)
taskListBacklogInBytes, err := json.Marshal(invariants.PollersMetadata{TaskListBacklog: taskListBacklog})
require.NoError(t, err)
expectedRootCause := []invariants.InvariantRootCauseResult{
{
Expand Down
13 changes: 8 additions & 5 deletions service/worker/diagnostics/invariants/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (t *timeout) Check(context.Context) ([]InvariantCheckResult, error) {
result = append(result, InvariantCheckResult{
InvariantType: TimeoutTypeDecision.String(),
Reason: reason,
Metadata: metadata,
Metadata: marshalData(metadata),
})
}
if event.ChildWorkflowExecutionTimedOutEventAttributes != nil {
Expand Down Expand Up @@ -165,15 +165,16 @@ func (t *timeout) checkTasklist(ctx context.Context, issue InvariantCheckResult)
}

tasklistBacklog := resp.GetTaskListStatus().GetBacklogCountHint()
polllersMetadataInBytes := marshalData(PollersMetadata{TaskListBacklog: tasklistBacklog})
if len(resp.GetPollers()) == 0 {
return InvariantRootCauseResult{
RootCause: RootCauseTypeMissingPollers,
Metadata: taskListBacklogInBytes(tasklistBacklog),
Metadata: polllersMetadataInBytes,
}, nil
}
return InvariantRootCauseResult{
RootCause: RootCauseTypePollersStatus,
Metadata: taskListBacklogInBytes(tasklistBacklog),
Metadata: polllersMetadataInBytes,
}, nil

}
Expand All @@ -185,11 +186,13 @@ func checkHeartbeatStatus(issue InvariantCheckResult) ([]InvariantRootCauseResul
return nil, err
}

heartbeatingMetadataInBytes := marshalData(HeartbeatingMetadata{TimeElapsed: metadata.TimeElapsed})

if metadata.HeartBeatTimeout == 0 && activityStarted(metadata) {
return []InvariantRootCauseResult{
{
RootCause: RootCauseTypeHeartBeatingNotEnabled,
Metadata: []byte(metadata.TimeElapsed.String()),
Metadata: heartbeatingMetadataInBytes,
},
}, nil
}
Expand All @@ -198,7 +201,7 @@ func checkHeartbeatStatus(issue InvariantCheckResult) ([]InvariantRootCauseResul
return []InvariantRootCauseResult{
{
RootCause: RootCauseTypeHeartBeatingEnabledMissingHeartbeat,
Metadata: []byte(metadata.TimeElapsed.String()),
Metadata: heartbeatingMetadataInBytes,
},
}, nil
}
Expand Down
40 changes: 26 additions & 14 deletions service/worker/diagnostics/invariants/timeout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ const (
)

func Test__Check(t *testing.T) {
taskTimeoutSecondInBytes, err := json.Marshal(taskTimeoutSecond)
decisionTimeoutMetadata := DecisionTimeoutMetadata{ConfiguredTimeout: 50 * time.Second}
decisionTimeoutMetadataInBytes, err := json.Marshal(decisionTimeoutMetadata)
require.NoError(t, err)
testCases := []struct {
name string
Expand Down Expand Up @@ -104,12 +105,12 @@ func Test__Check(t *testing.T) {
{
InvariantType: TimeoutTypeDecision.String(),
Reason: "START_TO_CLOSE",
Metadata: taskTimeoutSecondInBytes,
Metadata: decisionTimeoutMetadataInBytes,
},
{
InvariantType: TimeoutTypeDecision.String(),
Reason: "workflow reset",
Metadata: []byte("new run ID"),
Reason: "workflow reset - New run ID: new run ID",
Metadata: decisionTimeoutMetadataInBytes,
},
},
err: nil,
Expand Down Expand Up @@ -265,11 +266,18 @@ func decisionTimeoutHistory() *types.GetWorkflowExecutionHistoryResponse {
TimeoutType: types.TimeoutTypeStartToClose.Ptr(),
},
},
{
ID: 23,
DecisionTaskScheduledEventAttributes: &types.DecisionTaskScheduledEventAttributes{
StartToCloseTimeoutSeconds: common.Int32Ptr(taskTimeoutSecond),
},
},
{
DecisionTaskTimedOutEventAttributes: &types.DecisionTaskTimedOutEventAttributes{
Cause: types.DecisionTaskTimedOutCauseReset.Ptr(),
Reason: "workflow reset",
NewRunID: "new run ID",
ScheduledEventID: 23,
Cause: types.DecisionTaskTimedOutCauseReset.Ptr(),
Reason: "workflow reset",
NewRunID: "new run ID",
},
},
},
Expand Down Expand Up @@ -369,6 +377,10 @@ func childWfTimeoutDataInBytes(t *testing.T) []byte {

func Test__RootCause(t *testing.T) {
actStartToCloseTimeoutData := activityStartToCloseTimeoutData()
pollersMetadataInBytes, err := json.Marshal(PollersMetadata{TaskListBacklog: testTaskListBacklog})
require.NoError(t, err)
heartBeatingMetadataInBytes, err := json.Marshal(HeartbeatingMetadata{TimeElapsed: actStartToCloseTimeoutData.TimeElapsed})
require.NoError(t, err)
testCases := []struct {
name string
input []InvariantCheckResult
Expand Down Expand Up @@ -396,7 +408,7 @@ func Test__RootCause(t *testing.T) {
expectedResult: []InvariantRootCauseResult{
{
RootCause: RootCauseTypeMissingPollers,
Metadata: taskListBacklogInBytes(testTaskListBacklog),
Metadata: pollersMetadataInBytes,
},
},
err: nil,
Expand Down Expand Up @@ -425,7 +437,7 @@ func Test__RootCause(t *testing.T) {
expectedResult: []InvariantRootCauseResult{
{
RootCause: RootCauseTypePollersStatus,
Metadata: taskListBacklogInBytes(testTaskListBacklog),
Metadata: pollersMetadataInBytes,
},
},
err: nil,
Expand Down Expand Up @@ -454,11 +466,11 @@ func Test__RootCause(t *testing.T) {
expectedResult: []InvariantRootCauseResult{
{
RootCause: RootCauseTypePollersStatus,
Metadata: taskListBacklogInBytes(testTaskListBacklog),
Metadata: pollersMetadataInBytes,
},
{
RootCause: RootCauseTypeHeartBeatingNotEnabled,
Metadata: []byte(actStartToCloseTimeoutData.TimeElapsed.String()),
Metadata: heartBeatingMetadataInBytes,
},
},
err: nil,
Expand Down Expand Up @@ -487,7 +499,7 @@ func Test__RootCause(t *testing.T) {
expectedResult: []InvariantRootCauseResult{
{
RootCause: RootCauseTypePollersStatus,
Metadata: taskListBacklogInBytes(testTaskListBacklog),
Metadata: pollersMetadataInBytes,
},
},
err: nil,
Expand Down Expand Up @@ -516,11 +528,11 @@ func Test__RootCause(t *testing.T) {
expectedResult: []InvariantRootCauseResult{
{
RootCause: RootCauseTypePollersStatus,
Metadata: taskListBacklogInBytes(testTaskListBacklog),
Metadata: pollersMetadataInBytes,
},
{
RootCause: RootCauseTypeHeartBeatingEnabledMissingHeartbeat,
Metadata: []byte(actStartToCloseTimeoutData.TimeElapsed.String()),
Metadata: heartBeatingMetadataInBytes,
},
},
err: nil,
Expand Down
22 changes: 7 additions & 15 deletions service/worker/diagnostics/invariants/timeout_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,20 @@ import (
"github.com/uber/cadence/common/types"
)

func reasonForDecisionTaskTimeouts(event *types.HistoryEvent, allEvents []*types.HistoryEvent) (string, []byte) {
func reasonForDecisionTaskTimeouts(event *types.HistoryEvent, allEvents []*types.HistoryEvent) (string, DecisionTimeoutMetadata) {
eventScheduledID := event.GetDecisionTaskTimedOutEventAttributes().GetScheduledEventID()
attr := event.GetDecisionTaskTimedOutEventAttributes()
cause := attr.GetCause()
var reason string
switch cause {
case types.DecisionTaskTimedOutCauseTimeout:
return attr.TimeoutType.String(), timeoutLimitInBytes(getDecisionTaskConfiguredTimeout(eventScheduledID, allEvents))
reason = attr.TimeoutType.String()
case types.DecisionTaskTimedOutCauseReset:
newRunID := attr.GetNewRunID()
return attr.Reason, []byte(newRunID)
default:
return "valid cause not available for decision task timeout", nil
reason = fmt.Sprintf("%s - New run ID: %s", attr.Reason, newRunID)
}
return reason, DecisionTimeoutMetadata{
ConfiguredTimeout: time.Duration(getDecisionTaskConfiguredTimeout(eventScheduledID, allEvents)) * time.Second,
}
}

Expand Down Expand Up @@ -123,16 +125,6 @@ func getChildWorkflowExecutionConfiguredTimeout(e *types.HistoryEvent, events []
return 0
}

func timeoutLimitInBytes(val int32) []byte {
valInBytes, _ := json.Marshal(val)
return valInBytes
}

func taskListBacklogInBytes(val int64) []byte {
valInBytes, _ := json.Marshal(val)
return valInBytes
}

func getExecutionTime(startID, timeoutID int64, events []*types.HistoryEvent) time.Duration {
sort.SliceStable(events, func(i, j int) bool {
return events[i].ID < events[j].ID
Expand Down
12 changes: 12 additions & 0 deletions service/worker/diagnostics/invariants/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,15 @@ type ActivityTimeoutMetadata struct {
HeartBeatTimeout time.Duration
Tasklist *types.TaskList
}

type DecisionTimeoutMetadata struct {
ConfiguredTimeout time.Duration
}

type PollersMetadata struct {
TaskListBacklog int64
}

type HeartbeatingMetadata struct {
TimeElapsed time.Duration
}
Loading

0 comments on commit 811ffe7

Please sign in to comment.