Skip to content

Commit

Permalink
Refactor metrics into common and service metrics
Browse files Browse the repository at this point in the history
Summary:
This revision defines common metrics that are used in more than one
service, as well as service specific metrics.
It also adds wrappers on the service clients to emit metrics.

Test Plan: Build, UT

Reviewers: sivakk, maxim, samar

Reviewed By: samar

Subscribers: jenkins

Differential Revision: https://code.uberinternal.com/D722803
  • Loading branch information
Tamer Eldeeb committed Feb 3, 2017
1 parent 4b6c2a7 commit 3e477ad
Show file tree
Hide file tree
Showing 9 changed files with 412 additions and 43 deletions.
18 changes: 16 additions & 2 deletions client/clientfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,23 @@ func NewTChannelClientFactory(ch *tchannel.Channel,
}

func (cf *tchannelClientFactory) NewHistoryClient() (history.Client, error) {
return history.NewClient(cf.ch, cf.monitor, cf.numberOfHistoryShards)
client, err := history.NewClient(cf.ch, cf.monitor, cf.numberOfHistoryShards)
if err != nil {
return nil, err
}
if cf.metricsClient != nil {
client = history.NewMetricClient(client, cf.metricsClient)
}
return client, nil
}

func (cf *tchannelClientFactory) NewMatchingClient() (matching.Client, error) {
return matching.NewClient(cf.ch, cf.monitor)
client, err := matching.NewClient(cf.ch, cf.monitor)
if err != nil {
return nil, err
}
if cf.metricsClient != nil {
client = matching.NewMetricClient(client, cf.metricsClient)
}
return client, nil
}
5 changes: 2 additions & 3 deletions client/frontend/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@ import (

m "code.uber.internal/devexp/minions/.gen/go/minions"
workflow "code.uber.internal/devexp/minions/.gen/go/shared"
"code.uber.internal/devexp/minions/common"
tchannel "github.com/uber/tchannel-go"
"github.com/uber/tchannel-go/thrift"
)

const frontendServiceName = "cadence-frontend"

var _ Client = (*clientImpl)(nil)

type clientImpl struct {
Expand All @@ -28,7 +27,7 @@ func NewClient(ch *tchannel.Channel, hostPort string) (Client, error) {
HostPort: hostPort,
}
}
tClient := thrift.NewClient(ch, frontendServiceName, opts)
tClient := thrift.NewClient(ch, common.FrontendServiceName, opts)

client := &clientImpl{
connection: ch,
Expand Down
6 changes: 2 additions & 4 deletions client/history/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
"github.com/uber/tchannel-go/thrift"
)

const historyServiceName = "cadence-history"

var _ Client = (*clientImpl)(nil)

type clientImpl struct {
Expand All @@ -30,7 +28,7 @@ type clientImpl struct {

// NewClient creates a new history service TChannel client
func NewClient(ch *tchannel.Channel, monitor membership.Monitor, numberOfShards int) (Client, error) {
sResolver, err := monitor.GetResolver(historyServiceName)
sResolver, err := monitor.GetResolver(common.HistoryServiceName)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -172,7 +170,7 @@ func (c *clientImpl) getThriftClient(hostPort string) h.TChanHistoryService {
// before we acquired the lock
client, ok = c.thriftCache[hostPort]
if !ok {
tClient := thrift.NewClient(c.connection, historyServiceName, &thrift.ClientOptions{
tClient := thrift.NewClient(c.connection, common.HistoryServiceName, &thrift.ClientOptions{
HostPort: hostPort,
})

Expand Down
136 changes: 136 additions & 0 deletions client/history/metricClient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package history

import (
h "code.uber.internal/devexp/minions/.gen/go/history"
workflow "code.uber.internal/devexp/minions/.gen/go/shared"
"code.uber.internal/devexp/minions/common/metrics"
)

var _ Client = (*metricClient)(nil)

type metricClient struct {
client Client
metricsClient metrics.Client
}

// NewMetricClient creates a new instance of Client that emits metrics
func NewMetricClient(client Client, metricsClient metrics.Client) Client {
return &metricClient{
client: client,
metricsClient: metricsClient,
}
}

func (c *metricClient) StartWorkflowExecution(request *workflow.StartWorkflowExecutionRequest) (*workflow.StartWorkflowExecutionResponse, error) {
c.metricsClient.IncCounter(metrics.HistoryClientStartWorkflowExecutionScope, metrics.WorkflowRequests)

sw := c.metricsClient.StartTimer(metrics.HistoryClientStartWorkflowExecutionScope, metrics.WorkflowLatency)
resp, err := c.client.StartWorkflowExecution(request)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.HistoryClientStartWorkflowExecutionScope, metrics.WorkflowFailures)
}

return resp, err
}

func (c *metricClient) GetWorkflowExecutionHistory(
request *workflow.GetWorkflowExecutionHistoryRequest) (*workflow.GetWorkflowExecutionHistoryResponse, error) {
c.metricsClient.IncCounter(metrics.HistoryClientGetWorkflowExecutionHistoryScope, metrics.WorkflowRequests)

sw := c.metricsClient.StartTimer(metrics.HistoryClientGetWorkflowExecutionHistoryScope, metrics.WorkflowLatency)
resp, err := c.client.GetWorkflowExecutionHistory(request)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.HistoryClientGetWorkflowExecutionHistoryScope, metrics.WorkflowFailures)
}

return resp, err
}

func (c *metricClient) RecordDecisionTaskStarted(request *h.RecordDecisionTaskStartedRequest) (*h.RecordDecisionTaskStartedResponse, error) {
c.metricsClient.IncCounter(metrics.HistoryClientRecordDecisionTaskStartedScope, metrics.WorkflowRequests)

sw := c.metricsClient.StartTimer(metrics.HistoryClientRecordDecisionTaskStartedScope, metrics.WorkflowLatency)
resp, err := c.client.RecordDecisionTaskStarted(request)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.HistoryClientRecordDecisionTaskStartedScope, metrics.WorkflowFailures)
}

return resp, err
}

func (c *metricClient) RecordActivityTaskStarted(request *h.RecordActivityTaskStartedRequest) (*h.RecordActivityTaskStartedResponse, error) {
c.metricsClient.IncCounter(metrics.HistoryClientRecordActivityTaskStartedScope, metrics.WorkflowRequests)

sw := c.metricsClient.StartTimer(metrics.HistoryClientRecordActivityTaskStartedScope, metrics.WorkflowLatency)
resp, err := c.client.RecordActivityTaskStarted(request)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.HistoryClientRecordActivityTaskStartedScope, metrics.WorkflowFailures)
}

return resp, err
}

func (c *metricClient) RespondDecisionTaskCompleted(request *workflow.RespondDecisionTaskCompletedRequest) error {
c.metricsClient.IncCounter(metrics.HistoryClientRespondDecisionTaskCompletedScope, metrics.WorkflowRequests)

sw := c.metricsClient.StartTimer(metrics.HistoryClientRespondDecisionTaskCompletedScope, metrics.WorkflowLatency)
err := c.client.RespondDecisionTaskCompleted(request)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.HistoryClientRespondDecisionTaskCompletedScope, metrics.WorkflowFailures)
}

return err
}

func (c *metricClient) RespondActivityTaskCompleted(request *workflow.RespondActivityTaskCompletedRequest) error {
c.metricsClient.IncCounter(metrics.HistoryClientRespondActivityTaskCompletedScope, metrics.WorkflowRequests)

sw := c.metricsClient.StartTimer(metrics.HistoryClientRespondActivityTaskCompletedScope, metrics.WorkflowLatency)
err := c.client.RespondActivityTaskCompleted(request)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.HistoryClientRespondActivityTaskCompletedScope, metrics.WorkflowFailures)
}

return err
}

func (c *metricClient) RespondActivityTaskFailed(request *workflow.RespondActivityTaskFailedRequest) error {
c.metricsClient.IncCounter(metrics.HistoryClientRespondActivityTaskFailedScope, metrics.WorkflowRequests)

sw := c.metricsClient.StartTimer(metrics.HistoryClientRespondActivityTaskFailedScope, metrics.WorkflowLatency)
err := c.client.RespondActivityTaskFailed(request)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.HistoryClientRespondActivityTaskFailedScope, metrics.WorkflowFailures)
}

return err
}

func (c *metricClient) RecordActivityTaskHeartbeat(
request *workflow.RecordActivityTaskHeartbeatRequest) (*workflow.RecordActivityTaskHeartbeatResponse, error) {
c.metricsClient.IncCounter(metrics.HistoryClientRecordActivityTaskHeartbeatScope, metrics.WorkflowRequests)

sw := c.metricsClient.StartTimer(metrics.HistoryClientRecordActivityTaskHeartbeatScope, metrics.WorkflowLatency)
resp, err := c.client.RecordActivityTaskHeartbeat(request)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.HistoryClientRecordActivityTaskHeartbeatScope, metrics.WorkflowFailures)
}

return resp, err
}
7 changes: 3 additions & 4 deletions client/matching/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ import (

m "code.uber.internal/devexp/minions/.gen/go/matching"
workflow "code.uber.internal/devexp/minions/.gen/go/shared"
"code.uber.internal/devexp/minions/common"
"code.uber.internal/devexp/minions/common/membership"
tchannel "github.com/uber/tchannel-go"
"github.com/uber/tchannel-go/thrift"
)

const matchingServiceName = "cadence-matching"

var _ Client = (*clientImpl)(nil)

type clientImpl struct {
Expand All @@ -23,7 +22,7 @@ type clientImpl struct {

// NewClient creates a new history service TChannel client
func NewClient(ch *tchannel.Channel, monitor membership.Monitor) (Client, error) {
sResolver, err := monitor.GetResolver(matchingServiceName)
sResolver, err := monitor.GetResolver(common.MatchingServiceName)
if err != nil {
return nil, err
}
Expand All @@ -41,7 +40,7 @@ func (c *clientImpl) getHostForRequest(key string) (m.TChanMatchingService, erro
return nil, err
}
// TODO: build client cache
tClient := thrift.NewClient(c.connection, matchingServiceName, &thrift.ClientOptions{
tClient := thrift.NewClient(c.connection, common.MatchingServiceName, &thrift.ClientOptions{
HostPort: host.GetAddress(),
})
return m.NewTChanMatchingServiceClient(tClient), nil
Expand Down
78 changes: 78 additions & 0 deletions client/matching/metricClient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package matching

import (
m "code.uber.internal/devexp/minions/.gen/go/matching"
workflow "code.uber.internal/devexp/minions/.gen/go/shared"
"code.uber.internal/devexp/minions/common/metrics"
)

var _ Client = (*metricClient)(nil)

type metricClient struct {
client Client
metricsClient metrics.Client
}

// NewMetricClient creates a new instance of Client that emits metrics
func NewMetricClient(client Client, metricsClient metrics.Client) Client {
return &metricClient{
client: client,
metricsClient: metricsClient,
}
}

func (c *metricClient) AddActivityTask(addRequest *m.AddActivityTaskRequest) error {
c.metricsClient.IncCounter(metrics.MatchingClientAddActivityTaskScope, metrics.WorkflowRequests)

sw := c.metricsClient.StartTimer(metrics.MatchingClientAddActivityTaskScope, metrics.WorkflowLatency)
err := c.client.AddActivityTask(addRequest)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.MatchingClientAddActivityTaskScope, metrics.WorkflowFailures)
}

return err
}

func (c *metricClient) AddDecisionTask(addRequest *m.AddDecisionTaskRequest) error {
c.metricsClient.IncCounter(metrics.MatchingClientAddDecisionTaskScope, metrics.WorkflowRequests)

sw := c.metricsClient.StartTimer(metrics.MatchingClientAddDecisionTaskScope, metrics.WorkflowLatency)
err := c.client.AddDecisionTask(addRequest)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.MatchingClientAddDecisionTaskScope, metrics.WorkflowFailures)
}

return err
}

func (c *metricClient) PollForActivityTask(pollRequest *workflow.PollForActivityTaskRequest) (*workflow.PollForActivityTaskResponse, error) {
c.metricsClient.IncCounter(metrics.MatchingClientPollForActivityTaskScope, metrics.WorkflowRequests)

sw := c.metricsClient.StartTimer(metrics.MatchingClientPollForActivityTaskScope, metrics.WorkflowLatency)
resp, err := c.client.PollForActivityTask(pollRequest)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.MatchingClientPollForActivityTaskScope, metrics.WorkflowFailures)
}

return resp, err
}

func (c *metricClient) PollForDecisionTask(pollRequest *workflow.PollForDecisionTaskRequest) (*workflow.PollForDecisionTaskResponse, error) {
c.metricsClient.IncCounter(metrics.MatchingClientPollForDecisionTaskScope, metrics.WorkflowRequests)

sw := c.metricsClient.StartTimer(metrics.MatchingClientPollForDecisionTaskScope, metrics.WorkflowLatency)
resp, err := c.client.PollForDecisionTask(pollRequest)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.MatchingClientPollForDecisionTaskScope, metrics.WorkflowFailures)
}

return resp, err
}
9 changes: 9 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,12 @@ const (
// EmptyEventID is the id of the empty event
EmptyEventID int64 = -23
)

const (
// FrontendServiceName is the name of the frontend service
FrontendServiceName = "cadence-frontend"
// HistoryServiceName is the name of the history service
HistoryServiceName = "cadence-history"
// MatchingServiceName is the name of the matching service
MatchingServiceName = "cadence-matching"
)
Loading

0 comments on commit 3e477ad

Please sign in to comment.