Skip to content

Commit

Permalink
Add more metrics for matching (cadence-workflow#6208)
Browse files Browse the repository at this point in the history
* Split matcher task_attempt metrics

* Add forward latency metrics
  • Loading branch information
Shaddoll authored Aug 5, 2024
1 parent a56d386 commit a711217
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 10 deletions.
8 changes: 6 additions & 2 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2547,12 +2547,14 @@ const (
SyncMatchLocalPollLatencyPerTaskList
SyncMatchForwardPollLatencyPerTaskList
AsyncMatchLocalPollCounterPerTaskList
AsyncMatchLocalPollAttemptPerTaskList
AsyncMatchLocalPollLatencyPerTaskList
AsyncMatchForwardPollCounterPerTaskList
AsyncMatchForwardPollAttemptPerTaskList
AsyncMatchForwardPollLatencyPerTaskList
AsyncMatchLocalPollAfterForwardFailedCounterPerTaskList
AsyncMatchLocalPollAfterForwardFailedAttemptPerTaskList
AsyncMatchLocalPollAfterForwardFailedLatencyPerTaskList
AsyncMatchAttemptPerTaskList
PollLocalMatchLatencyPerTaskList
PollForwardMatchLatencyPerTaskList
PollLocalMatchAfterForwardFailedLatencyPerTaskList
Expand Down Expand Up @@ -3214,12 +3216,14 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
SyncMatchLocalPollLatencyPerTaskList: {metricName: "syncmatch_local_poll_latency_per_tl", metricRollupName: "syncmatch_local_poll_latency"},
SyncMatchForwardPollLatencyPerTaskList: {metricName: "syncmatch_forward_poll_latency_per_tl", metricRollupName: "syncmatch_forward_poll_latency"},
AsyncMatchLocalPollCounterPerTaskList: {metricName: "asyncmatch_local_poll_per_tl", metricRollupName: "asyncmatch_local_poll"},
AsyncMatchLocalPollAttemptPerTaskList: {metricName: "asyncmatch_local_poll_attempt_per_tl", metricRollupName: "asyncmatch_local_poll_attempt", metricType: Timer},
AsyncMatchLocalPollLatencyPerTaskList: {metricName: "asyncmatch_local_poll_latency_per_tl", metricRollupName: "asyncmatch_local_poll_latency"},
AsyncMatchForwardPollCounterPerTaskList: {metricName: "asyncmatch_forward_poll_per_tl", metricRollupName: "asyncmatch_forward_poll"},
AsyncMatchForwardPollAttemptPerTaskList: {metricName: "asyncmatch_forward_poll_attempt_per_tl", metricRollupName: "asyncmatch_forward_poll_attempt", metricType: Timer},
AsyncMatchForwardPollLatencyPerTaskList: {metricName: "asyncmatch_forward_poll_latency_per_tl", metricRollupName: "asyncmatch_forward_poll_latency"},
AsyncMatchLocalPollAfterForwardFailedCounterPerTaskList: {metricName: "asyncmatch_local_poll_after_forward_failed_per_tl", metricRollupName: "asyncmatch_local_poll_after_forward_failed"},
AsyncMatchLocalPollAfterForwardFailedAttemptPerTaskList: {metricName: "asyncmatch_local_poll_after_forward_failed_attempt_per_tl", metricRollupName: "asyncmatch_local_poll_after_forward_failed_attempt", metricType: Timer},
AsyncMatchLocalPollAfterForwardFailedLatencyPerTaskList: {metricName: "asyncmatch_local_poll_after_forward_failed_latency_per_tl", metricRollupName: "asyncmatch_local_poll_after_forward_failed_latency"},
AsyncMatchAttemptPerTaskList: {metricName: "asyncmatch_attempt_per_tl", metricRollupName: "asyncmatch_attempt"},
PollLocalMatchLatencyPerTaskList: {metricName: "poll_local_match_latency_per_tl", metricRollupName: "poll_local_match_latency", metricType: Timer},
PollForwardMatchLatencyPerTaskList: {metricName: "poll_forward_match_latency_per_tl", metricRollupName: "poll_forward_match_latency", metricType: Timer},
PollLocalMatchAfterForwardFailedLatencyPerTaskList: {metricName: "poll_local_match_after_forward_failed_latency_per_tl", metricRollupName: "poll_local_match_after_forward_failed_latency", metricType: Timer},
Expand Down
10 changes: 10 additions & 0 deletions service/matching/tasklist/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"sync/atomic"

"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/quotas"
"github.com/uber/cadence/common/types"
Expand All @@ -36,6 +37,7 @@ type (
// Forwarder is the type that contains state pertaining to
// the api call forwarder component
Forwarder struct {
scope metrics.Scope
cfg *config.ForwarderConfig
taskListID *Identifier
taskListKind types.TaskListKind
Expand Down Expand Up @@ -96,6 +98,7 @@ func newForwarder(
kind types.TaskListKind,
client matching.Client,
isolationGroups []string,
scope metrics.Scope,
) *Forwarder {
rpsFunc := func() float64 { return float64(cfg.ForwarderMaxRatePerSecond()) }
fwdr := &Forwarder{
Expand All @@ -107,6 +110,7 @@ func newForwarder(
outstandingPollsLimit: int32(cfg.ForwarderMaxOutstandingPolls()),
limiter: quotas.NewDynamicRateLimiter(rpsFunc),
isolationGroups: isolationGroups,
scope: scope,
}
fwdr.addReqToken.Store(newForwarderReqToken(int(fwdr.outstandingTasksLimit), nil))
fwdr.pollReqToken.Store(newForwarderReqToken(int(fwdr.outstandingPollsLimit), isolationGroups))
Expand All @@ -130,6 +134,8 @@ func (fwdr *Forwarder) ForwardTask(ctx context.Context, task *InternalTask) erro

var err error

sw := fwdr.scope.StartTimer(metrics.ForwardTaskLatencyPerTaskList)
defer sw.Stop()
switch fwdr.taskListID.GetType() {
case persistence.TaskListTypeDecision:
err = fwdr.client.AddDecisionTask(ctx, &types.AddDecisionTaskRequest{
Expand Down Expand Up @@ -182,6 +188,8 @@ func (fwdr *Forwarder) ForwardQueryTask(
return nil, ErrNoParent
}

sw := fwdr.scope.StartTimer(metrics.ForwardQueryLatencyPerTaskList)
defer sw.Stop()
resp, err := fwdr.client.QueryWorkflow(ctx, &types.MatchingQueryWorkflowRequest{
DomainUUID: task.Query.Request.DomainUUID,
TaskList: &types.TaskList{
Expand All @@ -206,6 +214,8 @@ func (fwdr *Forwarder) ForwardPoll(ctx context.Context) (*InternalTask, error) {
return nil, ErrNoParent
}

sw := fwdr.scope.StartTimer(metrics.ForwardPollLatencyPerTaskList)
defer sw.Stop()
pollerID := PollerIDFromContext(ctx)
identity := IdentityFromContext(ctx)
isolationGroup := IsolationGroupFromContext(ctx)
Expand Down
3 changes: 2 additions & 1 deletion service/matching/tasklist/forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/matching/config"
Expand Down Expand Up @@ -67,7 +68,7 @@ func (t *ForwarderTestSuite) SetupTest() {
t.NoError(err)
t.taskList = id
t.isolationGroups = []string{"abc", "xyz"}
t.fwdr = newForwarder(t.cfg, t.taskList, types.TaskListKindNormal, t.client, t.isolationGroups)
t.fwdr = newForwarder(t.cfg, t.taskList, types.TaskListKindNormal, t.client, t.isolationGroups, metrics.NoopScope(metrics.Matching))
}

func (t *ForwarderTestSuite) TearDownTest() {
Expand Down
6 changes: 3 additions & 3 deletions service/matching/tasklist/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ forLoop:
e.EventName = "Dispatched to Local Poller"
event.Log(e)
tm.scope.IncCounter(metrics.AsyncMatchLocalPollCounterPerTaskList)
tm.scope.RecordTimer(metrics.AsyncMatchAttemptPerTaskList, time.Duration(attempt))
tm.scope.RecordTimer(metrics.AsyncMatchLocalPollAttemptPerTaskList, time.Duration(attempt))
tm.scope.RecordTimer(metrics.AsyncMatchLocalPollLatencyPerTaskList, time.Since(startT))
return nil
case token := <-tm.fwdrAddReqTokenC():
Expand Down Expand Up @@ -312,7 +312,7 @@ forLoop:
event.Log(e)
cancel()
tm.scope.IncCounter(metrics.AsyncMatchLocalPollAfterForwardFailedCounterPerTaskList)
tm.scope.RecordTimer(metrics.AsyncMatchAttemptPerTaskList, time.Duration(attempt))
tm.scope.RecordTimer(metrics.AsyncMatchLocalPollAfterForwardFailedAttemptPerTaskList, time.Duration(attempt))
tm.scope.RecordTimer(metrics.AsyncMatchLocalPollAfterForwardFailedLatencyPerTaskList, time.Since(startT))
return nil
case <-childCtx.Done():
Expand All @@ -329,7 +329,7 @@ forLoop:
e.EventName = "Task Forwarded"
event.Log(e)
tm.scope.IncCounter(metrics.AsyncMatchForwardPollCounterPerTaskList)
tm.scope.RecordTimer(metrics.AsyncMatchAttemptPerTaskList, time.Duration(attempt))
tm.scope.RecordTimer(metrics.AsyncMatchForwardPollAttemptPerTaskList, time.Duration(attempt))
tm.scope.RecordTimer(metrics.AsyncMatchForwardPollLatencyPerTaskList, time.Since(startT))

// at this point, we forwarded the task to a parent partition which
Expand Down
5 changes: 2 additions & 3 deletions service/matching/tasklist/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (t *MatcherTestSuite) SetupTest() {
}
t.cfg = tlCfg
t.isolationGroups = []string{"dca1", "dca2"}
t.fwdr = newForwarder(&t.cfg.ForwarderConfig, t.taskList, types.TaskListKindNormal, t.client, []string{"dca1", "dca2"})
t.fwdr = newForwarder(&t.cfg.ForwarderConfig, t.taskList, types.TaskListKindNormal, t.client, []string{"dca1", "dca2"}, metrics.NoopScope(metrics.Matching))
t.matcher = newTaskMatcher(tlCfg, t.fwdr, metrics.NoopScope(metrics.Matching), []string{"dca1", "dca2"}, loggerimpl.NewNopLogger(), t.taskList, types.TaskListKindNormal)

rootTaskList := NewTestTaskListID(t.T(), t.taskList.GetDomainID(), t.taskList.Parent(20), persistence.TaskListTypeDecision)
Expand Down Expand Up @@ -493,12 +493,11 @@ func (t *MatcherTestSuite) TestMustOfferRemoteMatch() {
func (t *MatcherTestSuite) TestMustOfferRemoteRateLimit() {
scope := mocks.Scope{}
scope.On("IncCounter", metrics.AsyncMatchForwardTaskThrottleErrorPerTasklist)
scope.On("RecordTimer", mock.Anything, mock.Anything)
t.matcher.scope = &scope
completionFunc := func(*persistence.TaskInfo, error) {}
for i := 0; i < 5; i++ {
scope.On("IncCounter", metrics.AsyncMatchForwardPollCounterPerTaskList)
scope.On("RecordTimer", metrics.AsyncMatchAttemptPerTaskList, mock.Anything)
scope.On("RecordTimer", metrics.AsyncMatchForwardPollLatencyPerTaskList, mock.Anything)
t.client.EXPECT().AddDecisionTask(gomock.Any(), gomock.Any()).Return(nil)
task := newInternalTask(t.newTaskInfo(), completionFunc, types.TaskSourceDbBacklog, "", false, nil, "")
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
Expand Down
2 changes: 1 addition & 1 deletion service/matching/tasklist/task_list_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func NewManager(
}
var fwdr *Forwarder
if tlMgr.isFowardingAllowed(taskList, *taskListKind) {
fwdr = newForwarder(&taskListConfig.ForwarderConfig, taskList, *taskListKind, matchingClient, isolationGroups)
fwdr = newForwarder(&taskListConfig.ForwarderConfig, taskList, *taskListKind, matchingClient, isolationGroups, scope)
}
tlMgr.matcher = newTaskMatcher(taskListConfig, fwdr, tlMgr.scope, isolationGroups, tlMgr.logger, taskList, *taskListKind)
tlMgr.taskWriter = newTaskWriter(tlMgr)
Expand Down

0 comments on commit a711217

Please sign in to comment.