Skip to content

Commit

Permalink
Add class
Browse files Browse the repository at this point in the history
Signed-off-by: Joni Collinge <[email protected]>
  • Loading branch information
jjcollinge committed Nov 16, 2022
1 parent 5ca0683 commit fbf8c88
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 59 deletions.
8 changes: 8 additions & 0 deletions docs/development/dapr-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ Dapr uses prometheus process and go collectors by default.
* dapr_runtime_component_init_total: The number of initialized components
* dapr_runtime_component_init_fail_total: The number of component initialization failures

#### Service Invocation

* dapr_runtime_service_invocation_req_sent_total: The number of remote service invocation requests sent
* dapr_runtime_service_invocation_req_recv_total: The number of remote service invocation requests received
* dapr_runtime_service_invocation_res_sent_total: The number of remote service invocation responses sent
* dapr_runtime_service_invocation_res_recv_total: The number of remote service invocation responses received
* dapr_runtime_service_invocation_res_recv_latency_ms: The remote service invocation round trip latency

#### Security

* dapr_runtime_mtls_init_total: The number of successful mTLS authenticator initialization.
Expand Down
49 changes: 28 additions & 21 deletions pkg/diagnostics/service_monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package diagnostics

import (
"context"
"strconv"
"time"

"go.opencensus.io/stats"
Expand All @@ -11,6 +12,12 @@ import (
diagUtils "github.com/dapr/dapr/pkg/diagnostics/utils"
)

const (
SuccessClass = "success"
FailureClass = "failure"
UnknownClass = "unknown"
)

// Tag keys.
var (
componentKey = tag.MustNewKey("component")
Expand All @@ -27,7 +34,7 @@ var (
sourceAppIDKey = tag.MustNewKey("src_app_id")
methodKey = tag.MustNewKey("method")
statusKey = tag.MustNewKey("status")
sourceProtocolKey = tag.MustNewKey("src_protocol")
classKey = tag.MustNewKey("class")
)

// serviceMetrics holds dapr runtime metric monitoring methods.
Expand Down Expand Up @@ -208,11 +215,11 @@ func (s *serviceMetrics) Init(appID string) error {
diagUtils.NewMeasureView(s.appPolicyActionBlocked, []tag.Key{appIDKey, trustDomainKey, namespaceKey, operationKey, httpMethodKey, policyActionKey}, view.Count()),
diagUtils.NewMeasureView(s.globalPolicyActionBlocked, []tag.Key{appIDKey, trustDomainKey, namespaceKey, operationKey, httpMethodKey, policyActionKey}, view.Count()),

diagUtils.NewMeasureView(s.serviceInvocationRequestSentTotal, []tag.Key{appIDKey, destinationAppIDKey, methodKey, sourceProtocolKey}, view.Count()),
diagUtils.NewMeasureView(s.serviceInvocationRequestReceivedTotal, []tag.Key{appIDKey, sourceAppIDKey, methodKey, sourceProtocolKey}, view.Count()),
diagUtils.NewMeasureView(s.serviceInvocationResponseSentTotal, []tag.Key{appIDKey, destinationAppIDKey, methodKey, sourceProtocolKey, statusKey}, view.Count()),
diagUtils.NewMeasureView(s.serviceInvocationResponseReceivedTotal, []tag.Key{appIDKey, sourceAppIDKey, methodKey, sourceProtocolKey, statusKey}, view.Count()),
diagUtils.NewMeasureView(s.serviceInvocationResponseReceivedLatency, []tag.Key{appIDKey, sourceAppIDKey, methodKey, sourceProtocolKey, statusKey}, defaultLatencyDistribution),
diagUtils.NewMeasureView(s.serviceInvocationRequestSentTotal, []tag.Key{appIDKey, destinationAppIDKey, methodKey}, view.Count()),
diagUtils.NewMeasureView(s.serviceInvocationRequestReceivedTotal, []tag.Key{appIDKey, sourceAppIDKey, methodKey}, view.Count()),
diagUtils.NewMeasureView(s.serviceInvocationResponseSentTotal, []tag.Key{appIDKey, destinationAppIDKey, methodKey, statusKey, classKey}, view.Count()),
diagUtils.NewMeasureView(s.serviceInvocationResponseReceivedTotal, []tag.Key{appIDKey, sourceAppIDKey, methodKey, statusKey, classKey}, view.Count()),
diagUtils.NewMeasureView(s.serviceInvocationResponseReceivedLatency, []tag.Key{appIDKey, sourceAppIDKey, methodKey, statusKey, classKey}, defaultLatencyDistribution),
)
}

Expand Down Expand Up @@ -407,69 +414,69 @@ func (s *serviceMetrics) RequestBlockedByGlobalAction(appID, trustDomain, namesp
}

// ServiceInvocationRequestSent records the number of service invocation requests sent.
func (s *serviceMetrics) ServiceInvocationRequestSent(appID, destinationAppID, method, protocol string) {
func (s *serviceMetrics) ServiceInvocationRequestSent(appID, destinationAppID, method string) {
if s.enabled {
stats.RecordWithTags(
s.ctx,
diagUtils.WithTags(
appIDKey, appID,
destinationAppIDKey, destinationAppID,
methodKey, method,
sourceProtocolKey, protocol),
methodKey, method),
s.serviceInvocationRequestSentTotal.M(1))
}
}

// ServiceInvocationRequestReceived records the number of service invocation requests received.
func (s *serviceMetrics) ServiceInvocationRequestReceived(appID, sourceAppID, method, protocol string) {
func (s *serviceMetrics) ServiceInvocationRequestReceived(appID, sourceAppID, method string) {
if s.enabled {
stats.RecordWithTags(
s.ctx,
diagUtils.WithTags(
appIDKey, appID,
sourceAppIDKey, sourceAppID,
methodKey, method,
sourceProtocolKey, protocol),
methodKey, method),
s.serviceInvocationRequestReceivedTotal.M(1))
}
}

// ServiceInvocationResponseSent records the number of service invocation responses sent.
func (s *serviceMetrics) ServiceInvocationResponseSent(appID, destinationAppID, method, protocol, status string) {
func (s *serviceMetrics) ServiceInvocationResponseSent(appID, destinationAppID, method, class string, status int32) {
if s.enabled {
statusCode := strconv.Itoa(int(status))
stats.RecordWithTags(
s.ctx,
diagUtils.WithTags(
appIDKey, appID,
destinationAppIDKey, destinationAppID,
methodKey, method,
sourceProtocolKey, protocol,
statusKey, status),
statusKey, statusCode,
classKey, class),
s.serviceInvocationResponseSentTotal.M(1))
}
}

// ServiceInvocationResponseReceived records the number of service invocation responses received.
func (s *serviceMetrics) ServiceInvocationResponseReceived(appID, sourceAppID, method, protocol, status string, start time.Time) {
func (s *serviceMetrics) ServiceInvocationResponseReceived(appID, sourceAppID, method, class string, status int32, start time.Time) {
if s.enabled {
elapsed := float64(time.Since(start) / time.Millisecond)
statusCode := strconv.Itoa(int(status))
stats.RecordWithTags(
s.ctx,
diagUtils.WithTags(
appIDKey, appID,
sourceAppIDKey, sourceAppID,
methodKey, method,
sourceProtocolKey, protocol,
statusKey, status),
statusKey, statusCode,
classKey, class),
s.serviceInvocationResponseReceivedTotal.M(1))
elapsed := float64(time.Since(start) / time.Millisecond)
stats.RecordWithTags(
s.ctx,
diagUtils.WithTags(
appIDKey, appID,
sourceAppIDKey, sourceAppID,
methodKey, method,
sourceProtocolKey, protocol,
statusKey, status),
statusKey, statusCode,
classKey, class),
s.serviceInvocationResponseReceivedLatency.M(elapsed))
}
}
44 changes: 24 additions & 20 deletions pkg/grpc/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ import (
"github.com/dapr/dapr/pkg/messages"
"github.com/dapr/dapr/pkg/messaging"
invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
v1 "github.com/dapr/dapr/pkg/messaging/v1"
commonv1pb "github.com/dapr/dapr/pkg/proto/common/v1"
internalv1pb "github.com/dapr/dapr/pkg/proto/internals/v1"
runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
Expand Down Expand Up @@ -354,32 +353,37 @@ func (a *api) CallLocal(ctx context.Context, in *internalv1pb.InternalInvokeRequ
}
}

// get the source app id from the metadata
var sourceAppID string
sourceIDHeader, ok := req.Metadata()[invokev1.SourceIDHeader]
if ok {
if len(sourceIDHeader.Values) > 0 {
sourceAppID = sourceIDHeader.Values[0]
}
}

var protocol string
if v1.IsGRPCProtocol(req.Metadata()) {
protocol = config.GRPCProtocol
if ok && len(sourceIDHeader.Values) > 0 {
sourceAppID = sourceIDHeader.Values[0]
} else {
protocol = config.HTTPProtocol
sourceAppID = "unknown"
}

diag.DefaultMonitoring.ServiceInvocationRequestReceived(a.id, sourceAppID, req.Message().Method, protocol)
diag.DefaultMonitoring.ServiceInvocationRequestReceived(a.id, sourceAppID, req.Message().Method)

resp, err := a.appChannel.InvokeMethod(ctx, req)

var statusStr string
if resp.Status() != nil {
statusStr = strconv.FormatInt(int64(resp.Status().Code), 10)
}
diag.DefaultMonitoring.ServiceInvocationResponseSent(a.id, sourceAppID, req.Message().Method, protocol, statusStr)
var resp *invokev1.InvokeMethodResponse
defer func() {
var code int32
if err != nil {
code = int32(codes.Internal)
} else {
code = int32(resp.Status().Code)
}
var class string
if invokev1.IsSuccessCode(a.appProtocol, code) {
class = diag.SuccessClass
} else {
class = diag.FailureClass
}
resp.WithHeaders(metadata.New(map[string]string{
invokev1.StatusCodeClass: class,
}))
diag.DefaultMonitoring.ServiceInvocationResponseSent(a.id, sourceAppID, req.Message().Method, class, code)
}()

resp, err = a.appChannel.InvokeMethod(ctx, req)
if err != nil {
err = status.Errorf(codes.Internal, messages.ErrChannelInvoke, err)
return nil, err
Expand Down
34 changes: 16 additions & 18 deletions pkg/messaging/direct_messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package messaging
import (
"context"
"os"
"strconv"
"strings"
"time"

Expand All @@ -31,7 +30,6 @@ import (
"github.com/dapr/kit/logger"

"github.com/dapr/dapr/pkg/channel"
"github.com/dapr/dapr/pkg/config"
diag "github.com/dapr/dapr/pkg/diagnostics"
diagUtils "github.com/dapr/dapr/pkg/diagnostics/utils"
"github.com/dapr/dapr/pkg/modes"
Expand All @@ -40,7 +38,6 @@ import (
"github.com/dapr/dapr/utils"

invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
v1 "github.com/dapr/dapr/pkg/messaging/v1"
internalv1pb "github.com/dapr/dapr/pkg/proto/internals/v1"
)

Expand Down Expand Up @@ -255,27 +252,28 @@ func (d *directMessaging) invokeRemote(ctx context.Context, appID, namespace, ap
var opts []grpc.CallOption
opts = append(opts, grpc.MaxCallRecvMsgSize(d.maxRequestBodySize*1024*1024), grpc.MaxCallSendMsgSize(d.maxRequestBodySize*1024*1024))

var protocol string
if v1.IsGRPCProtocol(req.Metadata()) {
protocol = config.GRPCProtocol
} else {
protocol = config.HTTPProtocol
}

start := time.Now()
diag.DefaultMonitoring.ServiceInvocationRequestSent(d.appID, appID, req.Message().Method, protocol)
diag.DefaultMonitoring.ServiceInvocationRequestSent(d.appID, appID, req.Message().Method)

var resp *internalv1pb.InternalInvokeResponse
defer func() {
if resp != nil {
var class string
classHeader, ok := resp.Headers[invokev1.StatusCodeClass]
if ok && len(classHeader.Values) > 0 {
class = classHeader.Values[0]
} else {
class = diag.UnknownClass
}
diag.DefaultMonitoring.ServiceInvocationResponseReceived(d.appID, appID, req.Message().Method, class, resp.Status.Code, start)
}
}()

resp, err := clientV1.CallLocal(ctx, req.Proto(), opts...)
resp, err = clientV1.CallLocal(ctx, req.Proto(), opts...)
if err != nil {
return nil, err
}

var code string
if resp.Status != nil {
code = strconv.FormatInt(int64(resp.Status.Code), 10)
}
diag.DefaultMonitoring.ServiceInvocationResponseReceived(appID, d.appID, req.Message().Method, protocol, code, start)

return invokev1.InternalInvokeResponse(resp)
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/messaging/direct_messaging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,14 @@ func TestDestinationHeaders(t *testing.T) {
req.WithMetadata(map[string][]string{})

dm := newDirectMessaging()

dm.addAppIDHeadersToMetadata(appID, req)

md := req.Metadata()[invokev1.DestinationIDHeader]
assert.Equal(t, appID, md.Values[0])

md = req.Metadata()[invokev1.SourceIDHeader]
assert.Equal(t, dm.appID, md.Values[0])
})
}

Expand Down
20 changes: 20 additions & 0 deletions pkg/messaging/v1/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/reflect/protoreflect"

"github.com/dapr/dapr/pkg/config"
diag "github.com/dapr/dapr/pkg/diagnostics"
diagUtils "github.com/dapr/dapr/pkg/diagnostics/utils"
internalv1pb "github.com/dapr/dapr/pkg/proto/internals/v1"
Expand Down Expand Up @@ -59,6 +60,9 @@ const (
// SourceIDHeader is the header carrying the value of the invoking app id.
SourceIDHeader = "source-app-id"

// StatusCodeClass is the header determining the status codes classification (e.g. success, failure).
StatusCodeClass = "status-code-class"

// ErrorInfo metadata value is limited to 64 chars
// https://github.com/googleapis/googleapis/blob/master/google/rpc/error_details.proto#L126
maxMetadataValueLen = 63
Expand Down Expand Up @@ -457,3 +461,19 @@ func WithCustomGRPCMetadata(ctx context.Context, md map[string]string) context.C

return ctx
}

func IsSuccessCode(protocol string, status int32) bool {
if protocol == config.GRPCProtocol {
if status == 0 {
return true
} else {
return false
}
} else {
if status >= 200 && status < 300 {
return true
} else {
return false
}
}
}

0 comments on commit fbf8c88

Please sign in to comment.