Skip to content

Commit

Permalink
Add Activity E2E latency metrics (cadence-workflow#3108)
Browse files Browse the repository at this point in the history
  • Loading branch information
vancexu authored Mar 13, 2020
1 parent 9bd2d87 commit 10fc818
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 20 deletions.
2 changes: 2 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1622,6 +1622,7 @@ const (
TransferTaskThrottledCounter
TimerTaskThrottledCounter

ActivityE2ELatency
AckLevelUpdateCounter
AckLevelUpdateFailedCounter
DecisionTypeScheduleActivityCounter
Expand Down Expand Up @@ -1970,6 +1971,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
TaskBatchCompleteCounter: {metricName: "task_batch_complete_counter", metricType: Counter},
TransferTaskThrottledCounter: {metricName: "transfer_task_throttled_counter", metricType: Counter},
TimerTaskThrottledCounter: {metricName: "timer_task_throttled_counter", metricType: Counter},
ActivityE2ELatency: {metricName: "activity_end_to_end_latency", metricType: Timer},
AckLevelUpdateCounter: {metricName: "ack_level_update", metricType: Counter},
AckLevelUpdateFailedCounter: {metricName: "ack_level_update_failed", metricType: Counter},
DecisionTypeScheduleActivityCounter: {metricName: "schedule_activity_decision", metricType: Counter},
Expand Down
46 changes: 46 additions & 0 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const (
domain = "domain"
targetCluster = "target_cluster"
taskList = "tasklist"
workflowType = "workflowType"
activityType = "activityType"

domainAllValue = "all"
unknownValue = "_unknown_"
Expand Down Expand Up @@ -60,6 +62,14 @@ type (
taskListTag struct {
value string
}

workflowTypeTag struct {
value string
}

activityTypeTag struct {
value string
}
)

// DomainTag returns a new domain tag. For timers, this also ensures that we
Expand Down Expand Up @@ -147,3 +157,39 @@ func (d taskListTag) Key() string {
func (d taskListTag) Value() string {
return d.value
}

// WorkflowTypeTag returns a new workflow type tag.
func WorkflowTypeTag(value string) Tag {
if len(value) == 0 {
value = unknownValue
}
return workflowTypeTag{value}
}

// Key returns the key of the workflow type tag
func (d workflowTypeTag) Key() string {
return workflowType
}

// Value returns the value of the workflow type tag
func (d workflowTypeTag) Value() string {
return d.value
}

// ActivityTypeTag returns a new activity type tag.
func ActivityTypeTag(value string) Tag {
if len(value) == 0 {
value = unknownValue
}
return activityTypeTag{value}
}

// Key returns the key of the activity type tag
func (d activityTypeTag) Key() string {
return activityType
}

// Value returns the value of the activity type tag
func (d activityTypeTag) Value() string {
return d.value
}
2 changes: 2 additions & 0 deletions common/taskTokenSerializerInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ type (
TaskToken struct {
DomainID string `json:"domainId"`
WorkflowID string `json:"workflowId"`
WorkflowType string `json:"workflowType"`
RunID string `json:"runId"`
ScheduleID int64 `json:"scheduleId"`
ScheduleAttempt int64 `json:"scheduleAttempt"`
ActivityID string `json:"activityId"`
ActivityType string `json:"activityType"`
}

// QueryTaskToken identifies a query task
Expand Down
55 changes: 51 additions & 4 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1449,6 +1449,7 @@ func (e *historyEngineImpl) RespondActivityTaskCompleted(
return err
}
domainID := domainEntry.GetInfo().ID
domainName := domainEntry.GetInfo().Name

request := req.CompleteRequest
token, err0 := e.tokenSerializer.Deserialize(request.TaskToken)
Expand All @@ -1461,7 +1462,9 @@ func (e *historyEngineImpl) RespondActivityTaskCompleted(
RunId: common.StringPtr(token.RunID),
}

return e.updateWorkflowExecution(ctx, domainID, workflowExecution, true,
var activityStartedTime time.Time
var taskList string
err = e.updateWorkflowExecution(ctx, domainID, workflowExecution, true,
func(context workflowExecutionContext, mutableState mutableState) error {
if !mutableState.IsWorkflowExecutionRunning() {
return ErrWorkflowCompleted
Expand Down Expand Up @@ -1492,8 +1495,21 @@ func (e *historyEngineImpl) RespondActivityTaskCompleted(
// Unable to add ActivityTaskCompleted event to history
return &workflow.InternalServiceError{Message: "Unable to add ActivityTaskCompleted event to history."}
}
activityStartedTime = ai.StartedTime
taskList = ai.TaskList
return nil
})
if err == nil && !activityStartedTime.IsZero() {
scope := e.metricsClient.Scope(metrics.HistoryRespondActivityTaskCompletedScope).
Tagged(
metrics.DomainTag(domainName),
metrics.WorkflowTypeTag(token.WorkflowType),
metrics.ActivityTypeTag(token.ActivityType),
metrics.TaskListTag(taskList),
)
scope.RecordTimer(metrics.ActivityE2ELatency, time.Since(activityStartedTime))
}
return err
}

// RespondActivityTaskFailed completes an activity task failure.
Expand All @@ -1507,6 +1523,7 @@ func (e *historyEngineImpl) RespondActivityTaskFailed(
return err
}
domainID := domainEntry.GetInfo().ID
domainName := domainEntry.GetInfo().Name

request := req.FailedRequest
token, err0 := e.tokenSerializer.Deserialize(request.TaskToken)
Expand All @@ -1519,7 +1536,9 @@ func (e *historyEngineImpl) RespondActivityTaskFailed(
RunId: common.StringPtr(token.RunID),
}

return e.updateWorkflowExecutionWithAction(ctx, domainID, workflowExecution,
var activityStartedTime time.Time
var taskList string
err = e.updateWorkflowExecutionWithAction(ctx, domainID, workflowExecution,
func(context workflowExecutionContext, mutableState mutableState) (*updateWorkflowAction, error) {
if !mutableState.IsWorkflowExecutionRunning() {
return nil, ErrWorkflowCompleted
Expand Down Expand Up @@ -1560,8 +1579,21 @@ func (e *historyEngineImpl) RespondActivityTaskFailed(
postActions.createDecision = true
}

activityStartedTime = ai.StartedTime
taskList = ai.TaskList
return postActions, nil
})
if err == nil && !activityStartedTime.IsZero() {
scope := e.metricsClient.Scope(metrics.HistoryRespondActivityTaskFailedScope).
Tagged(
metrics.DomainTag(domainName),
metrics.WorkflowTypeTag(token.WorkflowType),
metrics.ActivityTypeTag(token.ActivityType),
metrics.TaskListTag(taskList),
)
scope.RecordTimer(metrics.ActivityE2ELatency, time.Since(activityStartedTime))
}
return err
}

// RespondActivityTaskCanceled completes an activity task failure.
Expand All @@ -1575,6 +1607,7 @@ func (e *historyEngineImpl) RespondActivityTaskCanceled(
return err
}
domainID := domainEntry.GetInfo().ID
domainName := domainEntry.GetInfo().Name

request := req.CancelRequest
token, err0 := e.tokenSerializer.Deserialize(request.TaskToken)
Expand All @@ -1587,7 +1620,9 @@ func (e *historyEngineImpl) RespondActivityTaskCanceled(
RunId: common.StringPtr(token.RunID),
}

return e.updateWorkflowExecution(ctx, domainID, workflowExecution, true,
var activityStartedTime time.Time
var taskList string
err = e.updateWorkflowExecution(ctx, domainID, workflowExecution, true,
func(context workflowExecutionContext, mutableState mutableState) error {
if !mutableState.IsWorkflowExecutionRunning() {
return ErrWorkflowCompleted
Expand Down Expand Up @@ -1624,9 +1659,21 @@ func (e *historyEngineImpl) RespondActivityTaskCanceled(
return &workflow.InternalServiceError{Message: "Unable to add ActivityTaskCanceled event to history."}
}

activityStartedTime = ai.StartedTime
taskList = ai.TaskList
return nil
})

if err == nil && !activityStartedTime.IsZero() {
scope := e.metricsClient.Scope(metrics.HistoryClientRespondActivityTaskCanceledScope).
Tagged(
metrics.DomainTag(domainName),
metrics.WorkflowTypeTag(token.WorkflowType),
metrics.ActivityTypeTag(token.ActivityType),
metrics.TaskListTag(taskList),
)
scope.RecordTimer(metrics.ActivityE2ELatency, time.Since(activityStartedTime))
}
return err
}

// RecordActivityTaskHeartbeat records an hearbeat for a task.
Expand Down
1 change: 1 addition & 0 deletions service/history/queueTaskProcess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/uber-go/tally"

"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/loggerimpl"
Expand Down
3 changes: 3 additions & 0 deletions service/matching/matchingEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,9 +737,12 @@ func (e *matchingEngineImpl) createPollForActivityTaskResponse(
token := &common.TaskToken{
DomainID: task.event.DomainID,
WorkflowID: task.event.WorkflowID,
WorkflowType: historyResponse.WorkflowType.GetName(),
RunID: task.event.RunID,
ScheduleID: task.event.ScheduleID,
ScheduleAttempt: historyResponse.GetAttempt(),
ActivityID: attributes.GetActivityId(),
ActivityType: attributes.GetActivityType().GetName(),
}

response.TaskToken, _ = e.tokenSerializer.Serialize(token)
Expand Down
40 changes: 24 additions & 16 deletions service/matching/matchingEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,10 +579,12 @@ func (s *matchingEngineSuite) TestAddThenConsumeActivities() {
s.Equal(int32(50), *result.StartToCloseTimeoutSeconds)
s.Equal(int32(10), *result.HeartbeatTimeoutSeconds)
token := &common.TaskToken{
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
ScheduleID: scheduleID,
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
ScheduleID: scheduleID,
ActivityID: activityID,
ActivityType: activityTypeName,
}

taskToken, _ := s.matchingEngine.tokenSerializer.Serialize(token)
Expand Down Expand Up @@ -726,10 +728,12 @@ func (s *matchingEngineSuite) TestSyncMatchActivities() {
s.EqualValues(activityInput, result.Input)
s.EqualValues(workflowExecution, *result.WorkflowExecution)
token := &common.TaskToken{
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
ScheduleID: scheduleID,
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
ScheduleID: scheduleID,
ActivityID: activityID,
ActivityType: activityTypeName,
}

taskToken, _ := s.matchingEngine.tokenSerializer.Serialize(token)
Expand Down Expand Up @@ -908,10 +912,12 @@ func (s *matchingEngineSuite) concurrentPublishConsumeActivities(
s.EqualValues(activityHeader, result.Header)
s.EqualValues(workflowExecution, *result.WorkflowExecution)
token := &common.TaskToken{
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
ScheduleID: scheduleID,
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
ScheduleID: scheduleID,
ActivityID: activityID,
ActivityType: activityTypeName,
}
resultToken, err := s.matchingEngine.tokenSerializer.Deserialize(result.TaskToken)
s.NoError(err)
Expand Down Expand Up @@ -1205,10 +1211,12 @@ func (s *matchingEngineSuite) TestMultipleEnginesActivitiesRangeStealing() {
s.EqualValues(activityInput, result.Input)
s.EqualValues(workflowExecution, *result.WorkflowExecution)
token := &common.TaskToken{
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
ScheduleID: scheduleID,
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
ScheduleID: scheduleID,
ActivityID: activityID,
ActivityType: activityTypeName,
}
resultToken, err := engine.tokenSerializer.Deserialize(result.TaskToken)
if err != nil {
Expand Down

0 comments on commit 10fc818

Please sign in to comment.