diff --git a/docs/development/dapr-metrics.md b/docs/development/dapr-metrics.md index 92fd63b4fe6..839de317a71 100644 --- a/docs/development/dapr-metrics.md +++ b/docs/development/dapr-metrics.md @@ -65,6 +65,14 @@ Dapr uses prometheus process and go collectors by default. * dapr_runtime_component_init_total: The number of initialized components * dapr_runtime_component_init_fail_total: The number of component initialization failures +#### Service Invocation + +* dapr_runtime_service_invocation_req_sent_total: The number of remote service invocation requests sent +* dapr_runtime_service_invocation_req_recv_total: The number of remote service invocation requests received +* dapr_runtime_service_invocation_res_sent_total: The number of remote service invocation responses sent +* dapr_runtime_service_invocation_res_recv_total: The number of remote service invocation responses received +* dapr_runtime_service_invocation_res_recv_latency_ms: The remote service invocation round trip latency + #### Security * dapr_runtime_mtls_init_total: The number of successful mTLS authenticator initialization. diff --git a/pkg/diagnostics/service_monitoring.go b/pkg/diagnostics/service_monitoring.go index ea5be3d5bad..d505e523e72 100644 --- a/pkg/diagnostics/service_monitoring.go +++ b/pkg/diagnostics/service_monitoring.go @@ -2,6 +2,7 @@ package diagnostics import ( "context" + "strconv" "time" "go.opencensus.io/stats" @@ -11,6 +12,12 @@ import ( diagUtils "github.com/dapr/dapr/pkg/diagnostics/utils" ) +const ( + SuccessClass = "success" + FailureClass = "failure" + UnknownClass = "unknown" +) + // Tag keys. var ( componentKey = tag.MustNewKey("component") @@ -27,7 +34,7 @@ var ( sourceAppIDKey = tag.MustNewKey("src_app_id") methodKey = tag.MustNewKey("method") statusKey = tag.MustNewKey("status") - sourceProtocolKey = tag.MustNewKey("src_protocol") + classKey = tag.MustNewKey("class") ) // serviceMetrics holds dapr runtime metric monitoring methods. @@ -208,11 +215,11 @@ func (s *serviceMetrics) Init(appID string) error { diagUtils.NewMeasureView(s.appPolicyActionBlocked, []tag.Key{appIDKey, trustDomainKey, namespaceKey, operationKey, httpMethodKey, policyActionKey}, view.Count()), diagUtils.NewMeasureView(s.globalPolicyActionBlocked, []tag.Key{appIDKey, trustDomainKey, namespaceKey, operationKey, httpMethodKey, policyActionKey}, view.Count()), - diagUtils.NewMeasureView(s.serviceInvocationRequestSentTotal, []tag.Key{appIDKey, destinationAppIDKey, methodKey, sourceProtocolKey}, view.Count()), - diagUtils.NewMeasureView(s.serviceInvocationRequestReceivedTotal, []tag.Key{appIDKey, sourceAppIDKey, methodKey, sourceProtocolKey}, view.Count()), - diagUtils.NewMeasureView(s.serviceInvocationResponseSentTotal, []tag.Key{appIDKey, destinationAppIDKey, methodKey, sourceProtocolKey, statusKey}, view.Count()), - diagUtils.NewMeasureView(s.serviceInvocationResponseReceivedTotal, []tag.Key{appIDKey, sourceAppIDKey, methodKey, sourceProtocolKey, statusKey}, view.Count()), - diagUtils.NewMeasureView(s.serviceInvocationResponseReceivedLatency, []tag.Key{appIDKey, sourceAppIDKey, methodKey, sourceProtocolKey, statusKey}, defaultLatencyDistribution), + diagUtils.NewMeasureView(s.serviceInvocationRequestSentTotal, []tag.Key{appIDKey, destinationAppIDKey, methodKey}, view.Count()), + diagUtils.NewMeasureView(s.serviceInvocationRequestReceivedTotal, []tag.Key{appIDKey, sourceAppIDKey, methodKey}, view.Count()), + diagUtils.NewMeasureView(s.serviceInvocationResponseSentTotal, []tag.Key{appIDKey, destinationAppIDKey, methodKey, statusKey, classKey}, view.Count()), + diagUtils.NewMeasureView(s.serviceInvocationResponseReceivedTotal, []tag.Key{appIDKey, sourceAppIDKey, methodKey, statusKey, classKey}, view.Count()), + diagUtils.NewMeasureView(s.serviceInvocationResponseReceivedLatency, []tag.Key{appIDKey, sourceAppIDKey, methodKey, statusKey, classKey}, defaultLatencyDistribution), ) } @@ -407,69 +414,69 @@ func (s *serviceMetrics) RequestBlockedByGlobalAction(appID, trustDomain, namesp } // ServiceInvocationRequestSent records the number of service invocation requests sent. -func (s *serviceMetrics) ServiceInvocationRequestSent(appID, destinationAppID, method, protocol string) { +func (s *serviceMetrics) ServiceInvocationRequestSent(appID, destinationAppID, method string) { if s.enabled { stats.RecordWithTags( s.ctx, diagUtils.WithTags( appIDKey, appID, destinationAppIDKey, destinationAppID, - methodKey, method, - sourceProtocolKey, protocol), + methodKey, method), s.serviceInvocationRequestSentTotal.M(1)) } } // ServiceInvocationRequestReceived records the number of service invocation requests received. -func (s *serviceMetrics) ServiceInvocationRequestReceived(appID, sourceAppID, method, protocol string) { +func (s *serviceMetrics) ServiceInvocationRequestReceived(appID, sourceAppID, method string) { if s.enabled { stats.RecordWithTags( s.ctx, diagUtils.WithTags( appIDKey, appID, sourceAppIDKey, sourceAppID, - methodKey, method, - sourceProtocolKey, protocol), + methodKey, method), s.serviceInvocationRequestReceivedTotal.M(1)) } } // ServiceInvocationResponseSent records the number of service invocation responses sent. -func (s *serviceMetrics) ServiceInvocationResponseSent(appID, destinationAppID, method, protocol, status string) { +func (s *serviceMetrics) ServiceInvocationResponseSent(appID, destinationAppID, method, class string, status int32) { if s.enabled { + statusCode := strconv.Itoa(int(status)) stats.RecordWithTags( s.ctx, diagUtils.WithTags( appIDKey, appID, destinationAppIDKey, destinationAppID, methodKey, method, - sourceProtocolKey, protocol, - statusKey, status), + statusKey, statusCode, + classKey, class), s.serviceInvocationResponseSentTotal.M(1)) } } // ServiceInvocationResponseReceived records the number of service invocation responses received. -func (s *serviceMetrics) ServiceInvocationResponseReceived(appID, sourceAppID, method, protocol, status string, start time.Time) { +func (s *serviceMetrics) ServiceInvocationResponseReceived(appID, sourceAppID, method, class string, status int32, start time.Time) { if s.enabled { - elapsed := float64(time.Since(start) / time.Millisecond) + statusCode := strconv.Itoa(int(status)) stats.RecordWithTags( s.ctx, diagUtils.WithTags( appIDKey, appID, sourceAppIDKey, sourceAppID, methodKey, method, - sourceProtocolKey, protocol, - statusKey, status), + statusKey, statusCode, + classKey, class), s.serviceInvocationResponseReceivedTotal.M(1)) + elapsed := float64(time.Since(start) / time.Millisecond) stats.RecordWithTags( s.ctx, diagUtils.WithTags( appIDKey, appID, sourceAppIDKey, sourceAppID, methodKey, method, - sourceProtocolKey, protocol, - statusKey, status), + statusKey, statusCode, + classKey, class), s.serviceInvocationResponseReceivedLatency.M(elapsed)) } } diff --git a/pkg/grpc/api.go b/pkg/grpc/api.go index 587249b1943..a1f81760ae2 100644 --- a/pkg/grpc/api.go +++ b/pkg/grpc/api.go @@ -53,7 +53,6 @@ import ( "github.com/dapr/dapr/pkg/messages" "github.com/dapr/dapr/pkg/messaging" invokev1 "github.com/dapr/dapr/pkg/messaging/v1" - v1 "github.com/dapr/dapr/pkg/messaging/v1" commonv1pb "github.com/dapr/dapr/pkg/proto/common/v1" internalv1pb "github.com/dapr/dapr/pkg/proto/internals/v1" runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1" @@ -354,32 +353,37 @@ func (a *api) CallLocal(ctx context.Context, in *internalv1pb.InternalInvokeRequ } } - // get the source app id from the metadata var sourceAppID string sourceIDHeader, ok := req.Metadata()[invokev1.SourceIDHeader] - if ok { - if len(sourceIDHeader.Values) > 0 { - sourceAppID = sourceIDHeader.Values[0] - } - } - - var protocol string - if v1.IsGRPCProtocol(req.Metadata()) { - protocol = config.GRPCProtocol + if ok && len(sourceIDHeader.Values) > 0 { + sourceAppID = sourceIDHeader.Values[0] } else { - protocol = config.HTTPProtocol + sourceAppID = "unknown" } - diag.DefaultMonitoring.ServiceInvocationRequestReceived(a.id, sourceAppID, req.Message().Method, protocol) + diag.DefaultMonitoring.ServiceInvocationRequestReceived(a.id, sourceAppID, req.Message().Method) - resp, err := a.appChannel.InvokeMethod(ctx, req) - - var statusStr string - if resp.Status() != nil { - statusStr = strconv.FormatInt(int64(resp.Status().Code), 10) - } - diag.DefaultMonitoring.ServiceInvocationResponseSent(a.id, sourceAppID, req.Message().Method, protocol, statusStr) + var resp *invokev1.InvokeMethodResponse + defer func() { + var code int32 + if err != nil { + code = int32(codes.Internal) + } else { + code = int32(resp.Status().Code) + } + var class string + if invokev1.IsSuccessCode(a.appProtocol, code) { + class = diag.SuccessClass + } else { + class = diag.FailureClass + } + resp.WithHeaders(metadata.New(map[string]string{ + invokev1.StatusCodeClass: class, + })) + diag.DefaultMonitoring.ServiceInvocationResponseSent(a.id, sourceAppID, req.Message().Method, class, code) + }() + resp, err = a.appChannel.InvokeMethod(ctx, req) if err != nil { err = status.Errorf(codes.Internal, messages.ErrChannelInvoke, err) return nil, err diff --git a/pkg/messaging/direct_messaging.go b/pkg/messaging/direct_messaging.go index 90ce67a904e..e90e9cb8125 100644 --- a/pkg/messaging/direct_messaging.go +++ b/pkg/messaging/direct_messaging.go @@ -16,7 +16,6 @@ package messaging import ( "context" "os" - "strconv" "strings" "time" @@ -31,7 +30,6 @@ import ( "github.com/dapr/kit/logger" "github.com/dapr/dapr/pkg/channel" - "github.com/dapr/dapr/pkg/config" diag "github.com/dapr/dapr/pkg/diagnostics" diagUtils "github.com/dapr/dapr/pkg/diagnostics/utils" "github.com/dapr/dapr/pkg/modes" @@ -40,7 +38,6 @@ import ( "github.com/dapr/dapr/utils" invokev1 "github.com/dapr/dapr/pkg/messaging/v1" - v1 "github.com/dapr/dapr/pkg/messaging/v1" internalv1pb "github.com/dapr/dapr/pkg/proto/internals/v1" ) @@ -255,27 +252,28 @@ func (d *directMessaging) invokeRemote(ctx context.Context, appID, namespace, ap var opts []grpc.CallOption opts = append(opts, grpc.MaxCallRecvMsgSize(d.maxRequestBodySize*1024*1024), grpc.MaxCallSendMsgSize(d.maxRequestBodySize*1024*1024)) - var protocol string - if v1.IsGRPCProtocol(req.Metadata()) { - protocol = config.GRPCProtocol - } else { - protocol = config.HTTPProtocol - } - start := time.Now() - diag.DefaultMonitoring.ServiceInvocationRequestSent(d.appID, appID, req.Message().Method, protocol) + diag.DefaultMonitoring.ServiceInvocationRequestSent(d.appID, appID, req.Message().Method) + + var resp *internalv1pb.InternalInvokeResponse + defer func() { + if resp != nil { + var class string + classHeader, ok := resp.Headers[invokev1.StatusCodeClass] + if ok && len(classHeader.Values) > 0 { + class = classHeader.Values[0] + } else { + class = diag.UnknownClass + } + diag.DefaultMonitoring.ServiceInvocationResponseReceived(d.appID, appID, req.Message().Method, class, resp.Status.Code, start) + } + }() - resp, err := clientV1.CallLocal(ctx, req.Proto(), opts...) + resp, err = clientV1.CallLocal(ctx, req.Proto(), opts...) if err != nil { return nil, err } - var code string - if resp.Status != nil { - code = strconv.FormatInt(int64(resp.Status.Code), 10) - } - diag.DefaultMonitoring.ServiceInvocationResponseReceived(appID, d.appID, req.Message().Method, protocol, code, start) - return invokev1.InternalInvokeResponse(resp) } diff --git a/pkg/messaging/direct_messaging_test.go b/pkg/messaging/direct_messaging_test.go index 5cd6751162f..255f818c0e9 100644 --- a/pkg/messaging/direct_messaging_test.go +++ b/pkg/messaging/direct_messaging_test.go @@ -33,9 +33,14 @@ func TestDestinationHeaders(t *testing.T) { req.WithMetadata(map[string][]string{}) dm := newDirectMessaging() + dm.addAppIDHeadersToMetadata(appID, req) + md := req.Metadata()[invokev1.DestinationIDHeader] assert.Equal(t, appID, md.Values[0]) + + md = req.Metadata()[invokev1.SourceIDHeader] + assert.Equal(t, dm.appID, md.Values[0]) }) } diff --git a/pkg/messaging/v1/util.go b/pkg/messaging/v1/util.go index 9302f4d8cad..503bbfeac99 100644 --- a/pkg/messaging/v1/util.go +++ b/pkg/messaging/v1/util.go @@ -29,6 +29,7 @@ import ( "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/reflect/protoreflect" + "github.com/dapr/dapr/pkg/config" diag "github.com/dapr/dapr/pkg/diagnostics" diagUtils "github.com/dapr/dapr/pkg/diagnostics/utils" internalv1pb "github.com/dapr/dapr/pkg/proto/internals/v1" @@ -59,6 +60,9 @@ const ( // SourceIDHeader is the header carrying the value of the invoking app id. SourceIDHeader = "source-app-id" + // StatusCodeClass is the header determining the status codes classification (e.g. success, failure). + StatusCodeClass = "status-code-class" + // ErrorInfo metadata value is limited to 64 chars // https://github.com/googleapis/googleapis/blob/master/google/rpc/error_details.proto#L126 maxMetadataValueLen = 63 @@ -457,3 +461,19 @@ func WithCustomGRPCMetadata(ctx context.Context, md map[string]string) context.C return ctx } + +func IsSuccessCode(protocol string, status int32) bool { + if protocol == config.GRPCProtocol { + if status == 0 { + return true + } else { + return false + } + } else { + if status >= 200 && status < 300 { + return true + } else { + return false + } + } +}