diff --git a/common/metrics/scope.go b/common/metrics/scope.go index 9cddaa77086..9cb36270808 100644 --- a/common/metrics/scope.go +++ b/common/metrics/scope.go @@ -35,6 +35,11 @@ func newMetricsScope(scope tally.Scope, defs map[int]metricDefinition) Scope { return &metricsScope{scope, defs} } +// NoopScope returns a noop scope of metrics +func NoopScope(serviceIdx ServiceIdx) Scope { + return &metricsScope{tally.NoopScope, getMetricDefs(serviceIdx)} +} + func (m *metricsScope) IncCounter(id int) { name := string(m.defs[id].metricName) m.scope.Counter(name).Inc(1) diff --git a/common/util.go b/common/util.go index d43fcc13def..7b3871ab0d5 100644 --- a/common/util.go +++ b/common/util.go @@ -381,14 +381,8 @@ func CreateHistoryStartWorkflowRequest(domainID string, startRequest *workflow.S // CheckEventBlobSizeLimit checks if a blob data exceeds limits. It logs a warning if it exceeds warnLimit, // and return ErrBlobSizeExceedsLimit if it exceeds errorLimit. -func CheckEventBlobSizeLimit(actualSize, warnLimit, errorLimit int, domainID, workflowID, runID string, metricsClient metrics.Client, scope int, logger bark.Logger) error { - if metricsClient != nil { - metricsClient.RecordTimer( - scope, - metrics.EventBlobSize, - time.Duration(actualSize), - ) - } +func CheckEventBlobSizeLimit(actualSize, warnLimit, errorLimit int, domainID, workflowID, runID string, scope metrics.Scope, logger bark.Logger) error { + scope.RecordTimer(metrics.EventBlobSize, time.Duration(actualSize)) if actualSize > warnLimit { if logger != nil { diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index 4971794144f..cc40bc3216b 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -30,6 +30,7 @@ import ( "github.com/pborman/uuid" "github.com/uber-common/bark" + "github.com/uber-go/tally" "github.com/uber/cadence/.gen/go/cadence/workflowserviceserver" "github.com/uber/cadence/.gen/go/health" "github.com/uber/cadence/.gen/go/health/metaserver" @@ -203,7 +204,7 @@ func (wh *WorkflowHandler) Health(ctx context.Context) (*health.HealthStatus, er return hs, nil } -func (wh *WorkflowHandler) checkPermission(securityToken *string, scope int) error { +func (wh *WorkflowHandler) checkPermission(securityToken *string, scope metrics.Scope) error { if wh.config.EnableAdminProtection() { if securityToken == nil { return wh.error(errNoPermission, scope) @@ -222,7 +223,7 @@ func (wh *WorkflowHandler) checkPermission(securityToken *string, scope int) err // domain. func (wh *WorkflowHandler) RegisterDomain(ctx context.Context, registerRequest *gen.RegisterDomainRequest) (retError error) { defer logging.CapturePanic(wh.GetLogger(), &retError) - scope := metrics.FrontendRegisterDomainScope + scope := wh.metricsClient.Scope(metrics.FrontendRegisterDomainScope) sw := wh.startRequestProfile(scope) defer sw.Stop() @@ -351,7 +352,7 @@ func (wh *WorkflowHandler) RegisterDomain(ctx context.Context, registerRequest * func (wh *WorkflowHandler) ListDomains(ctx context.Context, listRequest *gen.ListDomainsRequest) (response *gen.ListDomainsResponse, retError error) { defer logging.CapturePanic(wh.GetLogger(), &retError) - scope := metrics.FrontendListDomainsScope + scope := wh.metricsClient.Scope(metrics.FrontendListDomainsScope) sw := wh.startRequestProfile(scope) defer sw.Stop() @@ -395,7 +396,7 @@ func (wh *WorkflowHandler) ListDomains(ctx context.Context, func (wh *WorkflowHandler) DescribeDomain(ctx context.Context, describeRequest *gen.DescribeDomainRequest) (response *gen.DescribeDomainResponse, retError error) { defer logging.CapturePanic(wh.GetLogger(), &retError) - scope := metrics.FrontendDescribeDomainScope + scope := wh.metricsClient.Scope(metrics.FrontendDescribeDomainScope) sw := wh.startRequestProfile(scope) defer sw.Stop() @@ -432,7 +433,7 @@ func (wh *WorkflowHandler) UpdateDomain(ctx context.Context, updateRequest *gen.UpdateDomainRequest) (resp *gen.UpdateDomainResponse, retError error) { defer logging.CapturePanic(wh.GetLogger(), &retError) - scope := metrics.FrontendUpdateDomainScope + scope := wh.metricsClient.Scope(metrics.FrontendUpdateDomainScope) sw := wh.startRequestProfile(scope) defer sw.Stop() @@ -686,7 +687,7 @@ func (wh *WorkflowHandler) mergeDomainData(old map[string]string, new map[string func (wh *WorkflowHandler) DeprecateDomain(ctx context.Context, deprecateRequest *gen.DeprecateDomainRequest) (retError error) { defer logging.CapturePanic(wh.GetLogger(), &retError) - scope := metrics.FrontendDeprecateDomainScope + scope := wh.metricsClient.Scope(metrics.FrontendDeprecateDomainScope) sw := wh.startRequestProfile(scope) defer sw.Stop() @@ -762,7 +763,7 @@ func (wh *WorkflowHandler) PollForActivityTask( callTime := time.Now() - scope := metrics.FrontendPollForActivityTaskScope + scope := wh.metricsClient.Scope(metrics.FrontendPollForActivityTaskScope) sw := wh.startRequestProfile(scope) defer sw.Stop() @@ -799,6 +800,9 @@ func (wh *WorkflowHandler) PollForActivityTask( return nil, wh.error(err, scope) } + // add domain tag to scope, so further metrics will have the domain tag + scope = scope.Tagged(metrics.DomainTag(pollRequest.GetDomain())) + pollerID := uuid.New() op := func() error { var err error @@ -839,7 +843,7 @@ func (wh *WorkflowHandler) PollForDecisionTask( callTime := time.Now() - scope := metrics.FrontendPollForDecisionTaskScope + scope := wh.metricsClient.Scope(metrics.FrontendPollForDecisionTaskScope) sw := wh.startRequestProfile(scope) defer sw.Stop() @@ -877,6 +881,9 @@ func (wh *WorkflowHandler) PollForDecisionTask( return nil, wh.error(err, scope) } + // add domain tag to scope, so further metrics will have the domain tag + scope = scope.Tagged(metrics.DomainTag(domainName)) + wh.Service.GetLogger().Debugf("Poll for decision. DomainName: %v, DomainID: %v", domainName, domainID) pollerID := uuid.New() @@ -952,7 +959,7 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeat( heartbeatRequest *gen.RecordActivityTaskHeartbeatRequest) (resp *gen.RecordActivityTaskHeartbeatResponse, err error) { defer logging.CapturePanic(wh.GetLogger(), &err) - scope := metrics.FrontendRecordActivityTaskHeartbeatScope + scope := wh.metricsClient.Scope(metrics.FrontendRecordActivityTaskHeartbeatScope) sw := wh.startRequestProfile(scope) defer sw.Stop() @@ -980,12 +987,22 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeat( return nil, wh.error(err, scope) } + // add domain tag to scope, so further metrics will have the domain tag + scope = scope.Tagged(metrics.DomainTag(domainEntry.GetInfo().Name)) + sizeLimitError := wh.config.BlobSizeLimitError(domainEntry.GetInfo().Name) sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainEntry.GetInfo().Name) - if err := common.CheckEventBlobSizeLimit(len(heartbeatRequest.Details), sizeLimitWarn, sizeLimitError, - taskToken.DomainID, taskToken.WorkflowID, taskToken.RunID, - wh.metricsClient, scope, wh.GetThrottledLogger()); err != nil { + if err := common.CheckEventBlobSizeLimit( + len(heartbeatRequest.Details), + sizeLimitWarn, + sizeLimitError, + taskToken.DomainID, + taskToken.WorkflowID, + taskToken.RunID, + scope, + wh.GetThrottledLogger(), + ); err != nil { // heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason failRequest := &gen.RespondActivityTaskFailedRequest{ TaskToken: heartbeatRequest.TaskToken, @@ -1020,7 +1037,7 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeatByID( heartbeatRequest *gen.RecordActivityTaskHeartbeatByIDRequest) (resp *gen.RecordActivityTaskHeartbeatResponse, err error) { defer logging.CapturePanic(wh.GetLogger(), &err) - scope := metrics.FrontendRecordActivityTaskHeartbeatByIDScope + scope := wh.metricsClient.Scope(metrics.FrontendRecordActivityTaskHeartbeatByIDScope) sw := wh.startRequestProfile(scope) defer sw.Stop() @@ -1067,12 +1084,22 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeatByID( return nil, wh.error(err, scope) } + // add domain tag to scope, so further metrics will have the domain tag + scope = scope.Tagged(metrics.DomainTag(domainEntry.GetInfo().Name)) + sizeLimitError := wh.config.BlobSizeLimitError(domainEntry.GetInfo().Name) sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainEntry.GetInfo().Name) - if err := common.CheckEventBlobSizeLimit(len(heartbeatRequest.Details), sizeLimitWarn, sizeLimitError, - taskToken.DomainID, taskToken.WorkflowID, taskToken.RunID, - wh.metricsClient, scope, wh.GetThrottledLogger()); err != nil { + if err := common.CheckEventBlobSizeLimit( + len(heartbeatRequest.Details), + sizeLimitWarn, + sizeLimitError, + taskToken.DomainID, + taskToken.WorkflowID, + taskToken.RunID, + scope, + wh.GetThrottledLogger(), + ); err != nil { // heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason failRequest := &gen.RespondActivityTaskFailedRequest{ TaskToken: token, @@ -1113,7 +1140,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCompleted( completeRequest *gen.RespondActivityTaskCompletedRequest) (retError error) { defer logging.CapturePanic(wh.GetLogger(), &retError) - scope := metrics.FrontendRespondActivityTaskCompletedScope + scope := wh.metricsClient.Scope(metrics.FrontendRespondActivityTaskCompletedScope) sw := wh.startRequestProfile(scope) defer sw.Stop() @@ -1143,12 +1170,22 @@ func (wh *WorkflowHandler) RespondActivityTaskCompleted( return wh.error(errIdentityTooLong, scope) } + // add domain tag to scope, so further metrics will have the domain tag + scope = scope.Tagged(metrics.DomainTag(domainEntry.GetInfo().Name)) + sizeLimitError := wh.config.BlobSizeLimitError(domainEntry.GetInfo().Name) sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainEntry.GetInfo().Name) - if err := common.CheckEventBlobSizeLimit(len(completeRequest.Result), sizeLimitWarn, sizeLimitError, - taskToken.DomainID, taskToken.WorkflowID, taskToken.RunID, - wh.metricsClient, scope, wh.GetThrottledLogger()); err != nil { + if err := common.CheckEventBlobSizeLimit( + len(completeRequest.Result), + sizeLimitWarn, + sizeLimitError, + taskToken.DomainID, + taskToken.WorkflowID, + taskToken.RunID, + scope, + wh.GetThrottledLogger(), + ); err != nil { // result exceeds blob size limit, we would record it as failure failRequest := &gen.RespondActivityTaskFailedRequest{ TaskToken: completeRequest.TaskToken, @@ -1182,7 +1219,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCompletedByID( completeRequest *gen.RespondActivityTaskCompletedByIDRequest) (retError error) { defer logging.CapturePanic(wh.GetLogger(), &retError) - scope := metrics.FrontendRespondActivityTaskCompletedByIDScope + scope := wh.metricsClient.Scope(metrics.FrontendRespondActivityTaskCompletedByIDScope) sw := wh.startRequestProfile(scope) defer sw.Stop() @@ -1232,12 +1269,22 @@ func (wh *WorkflowHandler) RespondActivityTaskCompletedByID( return wh.error(err, scope) } + // add domain tag to scope, so further metrics will have the domain tag + scope = scope.Tagged(metrics.DomainTag(domainEntry.GetInfo().Name)) + sizeLimitError := wh.config.BlobSizeLimitError(domainEntry.GetInfo().Name) sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainEntry.GetInfo().Name) - if err := common.CheckEventBlobSizeLimit(len(completeRequest.Result), sizeLimitWarn, sizeLimitError, - taskToken.DomainID, taskToken.WorkflowID, taskToken.RunID, - wh.metricsClient, scope, wh.GetThrottledLogger()); err != nil { + if err := common.CheckEventBlobSizeLimit( + len(completeRequest.Result), + sizeLimitWarn, + sizeLimitError, + taskToken.DomainID, + taskToken.WorkflowID, + taskToken.RunID, + scope, + wh.GetThrottledLogger(), + ); err != nil { // result exceeds blob size limit, we would record it as failure failRequest := &gen.RespondActivityTaskFailedRequest{ TaskToken: token, @@ -1277,7 +1324,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailed( failedRequest *gen.RespondActivityTaskFailedRequest) (retError error) { defer logging.CapturePanic(wh.GetLogger(), &retError) - scope := metrics.FrontendRespondActivityTaskFailedScope + scope := wh.metricsClient.Scope(metrics.FrontendRespondActivityTaskFailedScope) sw := wh.startRequestProfile(scope) defer sw.Stop() @@ -1303,6 +1350,10 @@ func (wh *WorkflowHandler) RespondActivityTaskFailed( if err != nil { return wh.error(err, scope) } + + // add domain tag to scope, so further metrics will have the domain tag + scope = scope.Tagged(metrics.DomainTag(domainEntry.GetInfo().Name)) + if len(failedRequest.GetIdentity()) > wh.config.MaxIDLengthLimit() { return wh.error(errIdentityTooLong, scope) } @@ -1310,9 +1361,16 @@ func (wh *WorkflowHandler) RespondActivityTaskFailed( sizeLimitError := wh.config.BlobSizeLimitError(domainEntry.GetInfo().Name) sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainEntry.GetInfo().Name) - if err := common.CheckEventBlobSizeLimit(len(failedRequest.Details), sizeLimitWarn, sizeLimitError, - taskToken.DomainID, taskToken.WorkflowID, taskToken.RunID, - wh.metricsClient, scope, wh.GetThrottledLogger()); err != nil { + if err := common.CheckEventBlobSizeLimit( + len(failedRequest.Details), + sizeLimitWarn, + sizeLimitError, + taskToken.DomainID, + taskToken.WorkflowID, + taskToken.RunID, + scope, + wh.GetThrottledLogger(), + ); err != nil { // details exceeds blob size limit, we would truncate the details and put a specific error reason failedRequest.Reason = common.StringPtr(common.FailureReasonFailureDetailsExceedsLimit) failedRequest.Details = failedRequest.Details[0:sizeLimitError] @@ -1334,7 +1392,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailedByID( failedRequest *gen.RespondActivityTaskFailedByIDRequest) (retError error) { defer logging.CapturePanic(wh.GetLogger(), &retError) - scope := metrics.FrontendRespondActivityTaskFailedByIDScope + scope := wh.metricsClient.Scope(metrics.FrontendRespondActivityTaskFailedByIDScope) sw := wh.startRequestProfile(scope) defer sw.Stop() @@ -1383,12 +1441,22 @@ func (wh *WorkflowHandler) RespondActivityTaskFailedByID( return wh.error(err, scope) } + // add domain tag to scope, so further metrics will have the domain tag + scope = scope.Tagged(metrics.DomainTag(domainEntry.GetInfo().Name)) + sizeLimitError := wh.config.BlobSizeLimitError(domainEntry.GetInfo().Name) sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainEntry.GetInfo().Name) - if err := common.CheckEventBlobSizeLimit(len(failedRequest.Details), sizeLimitWarn, sizeLimitError, - taskToken.DomainID, taskToken.WorkflowID, taskToken.RunID, - wh.metricsClient, scope, wh.GetThrottledLogger()); err != nil { + if err := common.CheckEventBlobSizeLimit( + len(failedRequest.Details), + sizeLimitWarn, + sizeLimitError, + taskToken.DomainID, + taskToken.WorkflowID, + taskToken.RunID, + scope, + wh.GetThrottledLogger(), + ); err != nil { // details exceeds blob size limit, we would truncate the details and put a specific error reason failedRequest.Reason = common.StringPtr(common.FailureReasonFailureDetailsExceedsLimit) failedRequest.Details = failedRequest.Details[0:sizeLimitError] @@ -1417,7 +1485,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceled( cancelRequest *gen.RespondActivityTaskCanceledRequest) (retError error) { defer logging.CapturePanic(wh.GetLogger(), &retError) - scope := metrics.FrontendRespondActivityTaskCanceledScope + scope := wh.metricsClient.Scope(metrics.FrontendRespondActivityTaskCanceledScope) sw := wh.startRequestProfile(scope) defer sw.Stop() @@ -1444,6 +1512,9 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceled( return wh.error(err, scope) } + // add domain tag to scope, so further metrics will have the domain tag + scope = scope.Tagged(metrics.DomainTag(domainEntry.GetInfo().Name)) + if len(cancelRequest.GetIdentity()) > wh.config.MaxIDLengthLimit() { return wh.error(errIdentityTooLong, scope) } @@ -1451,9 +1522,16 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceled( sizeLimitError := wh.config.BlobSizeLimitError(domainEntry.GetInfo().Name) sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainEntry.GetInfo().Name) - if err := common.CheckEventBlobSizeLimit(len(cancelRequest.Details), sizeLimitWarn, sizeLimitError, - taskToken.DomainID, taskToken.WorkflowID, taskToken.RunID, - wh.metricsClient, scope, wh.GetThrottledLogger()); err != nil { + if err := common.CheckEventBlobSizeLimit( + len(cancelRequest.Details), + sizeLimitWarn, + sizeLimitError, + taskToken.DomainID, + taskToken.WorkflowID, + taskToken.RunID, + scope, + wh.GetThrottledLogger(), + ); err != nil { // details exceeds blob size limit, we would record it as failure failRequest := &gen.RespondActivityTaskFailedRequest{ TaskToken: cancelRequest.TaskToken, @@ -1487,7 +1565,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceledByID( cancelRequest *gen.RespondActivityTaskCanceledByIDRequest) (retError error) { defer logging.CapturePanic(wh.GetLogger(), &retError) - scope := metrics.FrontendRespondActivityTaskCanceledScope + scope := wh.metricsClient.Scope(metrics.FrontendRespondActivityTaskCanceledScope) sw := wh.startRequestProfile(scope) defer sw.Stop() @@ -1536,12 +1614,22 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceledByID( return wh.error(err, scope) } + // add domain tag to scope, so further metrics will have the domain tag + scope = scope.Tagged(metrics.DomainTag(domainEntry.GetInfo().Name)) + sizeLimitError := wh.config.BlobSizeLimitError(domainEntry.GetInfo().Name) sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainEntry.GetInfo().Name) - if err := common.CheckEventBlobSizeLimit(len(cancelRequest.Details), sizeLimitWarn, sizeLimitError, - taskToken.DomainID, taskToken.WorkflowID, taskToken.RunID, - wh.metricsClient, scope, wh.GetThrottledLogger()); err != nil { + if err := common.CheckEventBlobSizeLimit( + len(cancelRequest.Details), + sizeLimitWarn, + sizeLimitError, + taskToken.DomainID, + taskToken.WorkflowID, + taskToken.RunID, + scope, + wh.GetThrottledLogger(), + ); err != nil { // details exceeds blob size limit, we would record it as failure failRequest := &gen.RespondActivityTaskFailedRequest{ TaskToken: token, @@ -1581,7 +1669,7 @@ func (wh *WorkflowHandler) RespondDecisionTaskCompleted( completeRequest *gen.RespondDecisionTaskCompletedRequest) (resp *gen.RespondDecisionTaskCompletedResponse, retError error) { defer logging.CapturePanic(wh.GetLogger(), &retError) - scope := metrics.FrontendRespondDecisionTaskCompletedScope + scope := wh.metricsClient.Scope(metrics.FrontendRespondDecisionTaskCompletedScope) sw := wh.startRequestProfile(scope) defer sw.Stop() @@ -1603,6 +1691,14 @@ func (wh *WorkflowHandler) RespondDecisionTaskCompleted( return nil, wh.error(errDomainNotSet, scope) } + domainEntry, err := wh.domainCache.GetDomainByID(taskToken.DomainID) + if err != nil { + return nil, wh.error(err, scope) + } + + // add domain tag to scope, so further metrics will have the domain tag + scope = scope.Tagged(metrics.DomainTag(domainEntry.GetInfo().Name)) + histResp, err := wh.history.RespondDecisionTaskCompleted(ctx, &h.RespondDecisionTaskCompletedRequest{ DomainUUID: common.StringPtr(taskToken.DomainID), CompleteRequest: completeRequest}, @@ -1648,7 +1744,7 @@ func (wh *WorkflowHandler) RespondDecisionTaskFailed( failedRequest *gen.RespondDecisionTaskFailedRequest) (retError error) { defer logging.CapturePanic(wh.GetLogger(), &retError) - scope := metrics.FrontendRespondDecisionTaskFailedScope + scope := wh.metricsClient.Scope(metrics.FrontendRespondDecisionTaskFailedScope) sw := wh.startRequestProfile(scope) defer sw.Stop() @@ -1675,6 +1771,9 @@ func (wh *WorkflowHandler) RespondDecisionTaskFailed( return wh.error(err, scope) } + // add domain tag to scope, so further metrics will have the domain tag + scope = scope.Tagged(metrics.DomainTag(domainEntry.GetInfo().Name)) + if len(failedRequest.GetIdentity()) > wh.config.MaxIDLengthLimit() { return wh.error(errIdentityTooLong, scope) } @@ -1682,9 +1781,16 @@ func (wh *WorkflowHandler) RespondDecisionTaskFailed( sizeLimitError := wh.config.BlobSizeLimitError(domainEntry.GetInfo().Name) sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainEntry.GetInfo().Name) - if err := common.CheckEventBlobSizeLimit(len(failedRequest.Details), sizeLimitWarn, sizeLimitError, - taskToken.DomainID, taskToken.WorkflowID, taskToken.RunID, - wh.metricsClient, scope, wh.GetThrottledLogger()); err != nil { + if err := common.CheckEventBlobSizeLimit( + len(failedRequest.Details), + sizeLimitWarn, + sizeLimitError, + taskToken.DomainID, + taskToken.WorkflowID, + taskToken.RunID, + scope, + wh.GetThrottledLogger(), + ); err != nil { // details exceed, we would just truncate the size for decision task failed as the details is not used anywhere by client code failedRequest.Details = failedRequest.Details[0:sizeLimitError] } @@ -1705,7 +1811,7 @@ func (wh *WorkflowHandler) RespondQueryTaskCompleted( completeRequest *gen.RespondQueryTaskCompletedRequest) (retError error) { defer logging.CapturePanic(wh.GetLogger(), &retError) - scope := metrics.FrontendRespondQueryTaskCompletedScope + scope := wh.metricsClient.Scope(metrics.FrontendRespondQueryTaskCompletedScope) sw := wh.startRequestProfile(scope) defer sw.Stop() @@ -1727,6 +1833,14 @@ func (wh *WorkflowHandler) RespondQueryTaskCompleted( return wh.error(errInvalidTaskToken, scope) } + domainEntry, err := wh.domainCache.GetDomainByID(queryTaskToken.DomainID) + if err != nil { + return wh.error(err, scope) + } + + // add domain tag to scope, so further metrics will have the domain tag + scope = scope.Tagged(metrics.DomainTag(domainEntry.GetInfo().Name)) + matchingRequest := &m.RespondQueryTaskCompletedRequest{ DomainUUID: common.StringPtr(queryTaskToken.DomainID), TaskList: &gen.TaskList{Name: common.StringPtr(queryTaskToken.TaskList)}, @@ -1747,7 +1861,7 @@ func (wh *WorkflowHandler) StartWorkflowExecution( startRequest *gen.StartWorkflowExecutionRequest) (resp *gen.StartWorkflowExecutionResponse, retError error) { defer logging.CapturePanic(wh.GetLogger(), &retError) - scope := metrics.FrontendStartWorkflowExecutionScope + scope := wh.metricsClient.Scope(metrics.FrontendStartWorkflowExecutionScope) sw := wh.startRequestProfile(scope) defer sw.Stop() @@ -1848,10 +1962,21 @@ func (wh *WorkflowHandler) StartWorkflowExecution( return nil, wh.error(err, scope) } + // add domain tag to scope, so further metrics will have the domain tag + scope = scope.Tagged(metrics.DomainTag(domainName)) + sizeLimitError := wh.config.BlobSizeLimitError(startRequest.GetDomain()) sizeLimitWarn := wh.config.BlobSizeLimitWarn(startRequest.GetDomain()) - if err := common.CheckEventBlobSizeLimit(len(startRequest.Input), sizeLimitWarn, sizeLimitError, domainID, - startRequest.GetWorkflowId(), "", wh.metricsClient, scope, wh.GetThrottledLogger()); err != nil { + if err := common.CheckEventBlobSizeLimit( + len(startRequest.Input), + sizeLimitWarn, + sizeLimitError, + domainID, + startRequest.GetWorkflowId(), + "", + scope, + wh.GetThrottledLogger(), + ); err != nil { return nil, wh.error(err, scope) } @@ -1871,7 +1996,7 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory( getRequest *gen.GetWorkflowExecutionHistoryRequest) (resp *gen.GetWorkflowExecutionHistoryResponse, retError error) { defer logging.CapturePanic(wh.GetLogger(), &retError) - scope := metrics.FrontendGetWorkflowExecutionHistoryScope + scope := wh.metricsClient.Scope(metrics.FrontendGetWorkflowExecutionHistoryScope) sw := wh.startRequestProfile(scope) defer sw.Stop() @@ -1887,7 +2012,6 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory( return nil, wh.error(errDomainNotSet, scope) } - domainScope := wh.metricsClient.Scope(scope, metrics.DomainTag(getRequest.GetDomain())) if err := wh.validateExecutionAndEmitMetrics(getRequest.Execution, scope); err != nil { return nil, err } @@ -1901,6 +2025,8 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory( return nil, wh.error(err, scope) } + scope = scope.Tagged(metrics.DomainTag(getRequest.GetDomain())) + // force limit page size if exceed if getRequest.GetMaximumPageSize() > common.GetHistoryMaxPageSize { wh.GetLogger().WithFields(bark.Fields{ @@ -2000,7 +2126,6 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory( if !isWorkflowRunning { history, _, err = wh.getHistory( scope, - domainScope, domainID, *execution, lastFirstEventID, @@ -2034,7 +2159,6 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory( } else { history, token.PersistenceToken, err = wh.getHistory( scope, - domainScope, domainID, *execution, token.FirstEventID, @@ -2075,7 +2199,7 @@ func (wh *WorkflowHandler) SignalWorkflowExecution(ctx context.Context, signalRequest *gen.SignalWorkflowExecutionRequest) (retError error) { defer logging.CapturePanic(wh.GetLogger(), &retError) - scope := metrics.FrontendSignalWorkflowExecutionScope + scope := wh.metricsClient.Scope(metrics.FrontendSignalWorkflowExecutionScope) sw := wh.startRequestProfile(scope) defer sw.Stop() @@ -2116,11 +2240,21 @@ func (wh *WorkflowHandler) SignalWorkflowExecution(ctx context.Context, return wh.error(err, scope) } + // add domain tag to scope, so further metrics will have the domain tag + scope = scope.Tagged(metrics.DomainTag(signalRequest.GetDomain())) + sizeLimitError := wh.config.BlobSizeLimitError(signalRequest.GetDomain()) sizeLimitWarn := wh.config.BlobSizeLimitWarn(signalRequest.GetDomain()) - if err := common.CheckEventBlobSizeLimit(len(signalRequest.Input), sizeLimitWarn, sizeLimitError, domainID, - signalRequest.GetWorkflowExecution().GetWorkflowId(), signalRequest.GetWorkflowExecution().GetWorkflowId(), - wh.metricsClient, scope, wh.GetThrottledLogger()); err != nil { + if err := common.CheckEventBlobSizeLimit( + len(signalRequest.Input), + sizeLimitWarn, + sizeLimitError, + domainID, + signalRequest.GetWorkflowExecution().GetWorkflowId(), + signalRequest.GetWorkflowExecution().GetWorkflowId(), + scope, + wh.GetThrottledLogger(), + ); err != nil { return wh.error(err, scope) } @@ -2144,7 +2278,7 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(ctx context.Context, signalWithStartRequest *gen.SignalWithStartWorkflowExecutionRequest) (resp *gen.StartWorkflowExecutionResponse, retError error) { defer logging.CapturePanic(wh.GetLogger(), &retError) - scope := metrics.FrontendSignalWithStartWorkflowExecutionScope + scope := wh.metricsClient.Scope(metrics.FrontendSignalWithStartWorkflowExecutionScope) sw := wh.startRequestProfile(scope) defer sw.Stop() @@ -2245,14 +2379,33 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(ctx context.Context, return nil, wh.error(err, scope) } + // add domain tag to scope, so further metrics will have the domain tag + scope = scope.Tagged(metrics.DomainTag(signalWithStartRequest.GetDomain())) + sizeLimitError := wh.config.BlobSizeLimitError(signalWithStartRequest.GetDomain()) sizeLimitWarn := wh.config.BlobSizeLimitWarn(signalWithStartRequest.GetDomain()) - if err := common.CheckEventBlobSizeLimit(len(signalWithStartRequest.SignalInput), sizeLimitWarn, sizeLimitError, domainID, - signalWithStartRequest.GetWorkflowId(), "", wh.metricsClient, scope, wh.GetThrottledLogger()); err != nil { + if err := common.CheckEventBlobSizeLimit( + len(signalWithStartRequest.SignalInput), + sizeLimitWarn, + sizeLimitError, + domainID, + signalWithStartRequest.GetWorkflowId(), + "", + scope, + wh.GetThrottledLogger(), + ); err != nil { return nil, wh.error(err, scope) } - if err := common.CheckEventBlobSizeLimit(len(signalWithStartRequest.Input), sizeLimitWarn, sizeLimitError, domainID, - signalWithStartRequest.GetWorkflowId(), "", wh.metricsClient, scope, wh.GetThrottledLogger()); err != nil { + if err := common.CheckEventBlobSizeLimit( + len(signalWithStartRequest.Input), + sizeLimitWarn, + sizeLimitError, + domainID, + signalWithStartRequest.GetWorkflowId(), + "", + scope, + wh.GetThrottledLogger(), + ); err != nil { return nil, wh.error(err, scope) } @@ -2279,7 +2432,7 @@ func (wh *WorkflowHandler) TerminateWorkflowExecution(ctx context.Context, terminateRequest *gen.TerminateWorkflowExecutionRequest) (retError error) { defer logging.CapturePanic(wh.GetLogger(), &retError) - scope := metrics.FrontendTerminateWorkflowExecutionScope + scope := wh.metricsClient.Scope(metrics.FrontendTerminateWorkflowExecutionScope) sw := wh.startRequestProfile(scope) defer sw.Stop() @@ -2304,6 +2457,9 @@ func (wh *WorkflowHandler) TerminateWorkflowExecution(ctx context.Context, return wh.error(err, scope) } + // add domain tag to scope, so further metrics will have the domain tag + scope = scope.Tagged(metrics.DomainTag(terminateRequest.GetDomain())) + err = wh.history.TerminateWorkflowExecution(ctx, &h.TerminateWorkflowExecutionRequest{ DomainUUID: common.StringPtr(domainID), TerminateRequest: terminateRequest, @@ -2321,7 +2477,7 @@ func (wh *WorkflowHandler) ResetWorkflowExecution(ctx context.Context, resetRequest *gen.ResetWorkflowExecutionRequest) (resp *gen.ResetWorkflowExecutionResponse, retError error) { defer logging.CapturePanic(wh.GetLogger(), &retError) - scope := metrics.FrontendResetWorkflowExecutionScope + scope := wh.metricsClient.Scope(metrics.FrontendResetWorkflowExecutionScope) sw := wh.startRequestProfile(scope) defer sw.Stop() @@ -2346,6 +2502,9 @@ func (wh *WorkflowHandler) ResetWorkflowExecution(ctx context.Context, return nil, wh.error(err, scope) } + // add domain tag to scope, so further metrics will have the domain tag + scope = scope.Tagged(metrics.DomainTag(resetRequest.GetDomain())) + resp, err = wh.history.ResetWorkflowExecution(ctx, &h.ResetWorkflowExecutionRequest{ DomainUUID: common.StringPtr(domainID), ResetRequest: resetRequest, @@ -2363,7 +2522,7 @@ func (wh *WorkflowHandler) RequestCancelWorkflowExecution( cancelRequest *gen.RequestCancelWorkflowExecutionRequest) (retError error) { defer logging.CapturePanic(wh.GetLogger(), &retError) - scope := metrics.FrontendRequestCancelWorkflowExecutionScope + scope := wh.metricsClient.Scope(metrics.FrontendRequestCancelWorkflowExecutionScope) sw := wh.startRequestProfile(scope) defer sw.Stop() @@ -2388,6 +2547,9 @@ func (wh *WorkflowHandler) RequestCancelWorkflowExecution( return wh.error(err, scope) } + // add domain tag to scope, so further metrics will have the domain tag + scope = scope.Tagged(metrics.DomainTag(cancelRequest.GetDomain())) + err = wh.history.RequestCancelWorkflowExecution(ctx, &h.RequestCancelWorkflowExecutionRequest{ DomainUUID: common.StringPtr(domainID), CancelRequest: cancelRequest, @@ -2404,7 +2566,7 @@ func (wh *WorkflowHandler) ListOpenWorkflowExecutions(ctx context.Context, listRequest *gen.ListOpenWorkflowExecutionsRequest) (resp *gen.ListOpenWorkflowExecutionsResponse, retError error) { defer logging.CapturePanic(wh.GetLogger(), &retError) - scope := metrics.FrontendListOpenWorkflowExecutionsScope + scope := wh.metricsClient.Scope(metrics.FrontendListOpenWorkflowExecutionsScope) sw := wh.startRequestProfile(scope) defer sw.Stop() @@ -2447,6 +2609,9 @@ func (wh *WorkflowHandler) ListOpenWorkflowExecutions(ctx context.Context, return nil, wh.error(err, scope) } + // add domain tag to scope, so further metrics will have the domain tag + scope = scope.Tagged(metrics.DomainTag(domain)) + baseReq := persistence.ListWorkflowExecutionsRequest{ DomainUUID: domainID, Domain: domain, @@ -2497,7 +2662,7 @@ func (wh *WorkflowHandler) ListClosedWorkflowExecutions(ctx context.Context, listRequest *gen.ListClosedWorkflowExecutionsRequest) (resp *gen.ListClosedWorkflowExecutionsResponse, retError error) { defer logging.CapturePanic(wh.GetLogger(), &retError) - scope := metrics.FrontendListClosedWorkflowExecutionsScope + scope := wh.metricsClient.Scope(metrics.FrontendListClosedWorkflowExecutionsScope) sw := wh.startRequestProfile(scope) defer sw.Stop() @@ -2548,6 +2713,9 @@ func (wh *WorkflowHandler) ListClosedWorkflowExecutions(ctx context.Context, return nil, wh.error(err, scope) } + // add domain tag to scope, so further metrics will have the domain tag + scope = scope.Tagged(metrics.DomainTag(domain)) + baseReq := persistence.ListWorkflowExecutionsRequest{ DomainUUID: domainID, Domain: domain, @@ -2607,7 +2775,7 @@ func (wh *WorkflowHandler) ListClosedWorkflowExecutions(ctx context.Context, func (wh *WorkflowHandler) ResetStickyTaskList(ctx context.Context, resetRequest *gen.ResetStickyTaskListRequest) (resp *gen.ResetStickyTaskListResponse, retError error) { defer logging.CapturePanic(wh.GetLogger(), &retError) - scope := metrics.FrontendResetStickyTaskListScope + scope := wh.metricsClient.Scope(metrics.FrontendResetStickyTaskListScope) sw := wh.startRequestProfile(scope) defer sw.Stop() @@ -2628,6 +2796,9 @@ func (wh *WorkflowHandler) ResetStickyTaskList(ctx context.Context, resetRequest return nil, wh.error(err, scope) } + // add domain tag to scope, so further metrics will have the domain tag + scope = scope.Tagged(metrics.DomainTag(resetRequest.GetDomain())) + _, err = wh.history.ResetStickyTaskList(ctx, &h.ResetStickyTaskListRequest{ DomainUUID: common.StringPtr(domainID), Execution: resetRequest.Execution, @@ -2643,7 +2814,7 @@ func (wh *WorkflowHandler) QueryWorkflow(ctx context.Context, queryRequest *gen.QueryWorkflowRequest) (resp *gen.QueryWorkflowResponse, retError error) { defer logging.CapturePanic(wh.GetLogger(), &retError) - scope := metrics.FrontendQueryWorkflowScope + scope := wh.metricsClient.Scope(metrics.FrontendQueryWorkflowScope) sw := wh.startRequestProfile(scope) defer sw.Stop() @@ -2672,6 +2843,9 @@ func (wh *WorkflowHandler) QueryWorkflow(ctx context.Context, return nil, wh.error(err, scope) } + // add domain tag to scope, so further metrics will have the domain tag + scope = scope.Tagged(metrics.DomainTag(queryRequest.GetDomain())) + matchingRequest := &m.QueryWorkflowRequest{ DomainUUID: common.StringPtr(domainID), QueryRequest: queryRequest, @@ -2743,7 +2917,7 @@ func (wh *WorkflowHandler) QueryWorkflow(ctx context.Context, func (wh *WorkflowHandler) DescribeWorkflowExecution(ctx context.Context, request *gen.DescribeWorkflowExecutionRequest) (resp *gen.DescribeWorkflowExecutionResponse, retError error) { defer logging.CapturePanic(wh.GetLogger(), &retError) - scope := metrics.FrontendDescribeWorkflowExecutionScope + scope := wh.metricsClient.Scope(metrics.FrontendDescribeWorkflowExecutionScope) sw := wh.startRequestProfile(scope) defer sw.Stop() @@ -2763,6 +2937,9 @@ func (wh *WorkflowHandler) DescribeWorkflowExecution(ctx context.Context, reques return nil, wh.error(err, scope) } + // add domain tag to scope, so further metrics will have the domain tag + scope = scope.Tagged(metrics.DomainTag(request.GetDomain())) + if err := wh.validateExecutionAndEmitMetrics(request.Execution, scope); err != nil { return nil, err } @@ -2785,7 +2962,7 @@ func (wh *WorkflowHandler) DescribeWorkflowExecution(ctx context.Context, reques func (wh *WorkflowHandler) DescribeTaskList(ctx context.Context, request *gen.DescribeTaskListRequest) (resp *gen.DescribeTaskListResponse, retError error) { defer logging.CapturePanic(wh.GetLogger(), &retError) - scope := metrics.FrontendDescribeTaskListScope + scope := wh.metricsClient.Scope(metrics.FrontendDescribeTaskListScope) sw := wh.startRequestProfile(scope) defer sw.Stop() @@ -2805,6 +2982,9 @@ func (wh *WorkflowHandler) DescribeTaskList(ctx context.Context, request *gen.De return nil, wh.error(err, scope) } + // add domain tag to scope, so further metrics will have the domain tag + scope = scope.Tagged(metrics.DomainTag(request.GetDomain())) + if err := wh.validateTaskList(request.TaskList, scope); err != nil { return nil, err } @@ -2832,8 +3012,7 @@ func (wh *WorkflowHandler) DescribeTaskList(ctx context.Context, request *gen.De } func (wh *WorkflowHandler) getHistory( - scope int, - domainScope metrics.Scope, + scope metrics.Scope, domainID string, execution gen.WorkflowExecution, firstEventID, nextEventID int64, @@ -2879,8 +3058,8 @@ func (wh *WorkflowHandler) getHistory( if len(historyEvents) > 0 { // N.B. - Dual emit is required here so that we can see aggregate timer stats across all // domains along with the individual domains stats - wh.metricsClient.RecordTimer(scope, metrics.HistorySize, time.Duration(size)) - domainScope.RecordTimer(metrics.HistorySize, time.Duration(size)) + scope.Tagged(metrics.DomainAllTag()).RecordTimer(metrics.HistorySize, time.Duration(size)) + scope.RecordTimer(metrics.HistorySize, time.Duration(size)) if size > common.GetHistoryWarnSizeLimit { wh.GetThrottledLogger().WithFields(bark.Fields{ @@ -2916,68 +3095,68 @@ func (wh *WorkflowHandler) getLoggerForTask(taskToken []byte) bark.Logger { } // startRequestProfile initiates recording of request metrics -func (wh *WorkflowHandler) startRequestProfile(scope int) metrics.Stopwatch { +func (wh *WorkflowHandler) startRequestProfile(scope metrics.Scope) tally.Stopwatch { wh.startWG.Wait() - sw := wh.metricsClient.StartTimer(scope, metrics.CadenceLatency) - wh.metricsClient.IncCounter(scope, metrics.CadenceRequests) + sw := scope.StartTimer(metrics.CadenceLatency) + scope.IncCounter(metrics.CadenceRequests) return sw } -func (wh *WorkflowHandler) error(err error, scope int) error { +func (wh *WorkflowHandler) error(err error, scope metrics.Scope) error { switch err := err.(type) { case *gen.InternalServiceError: logging.LogInternalServiceError(wh.Service.GetLogger(), err) - wh.metricsClient.IncCounter(scope, metrics.CadenceFailures) + scope.IncCounter(metrics.CadenceFailures) // NOTE: For internal error, we won't return thrift error from cadence-frontend. // Because in uber internal metrics, thrift errors are counted as user errors return fmt.Errorf("Cadence internal error, msg: %v", err.Message) case *gen.BadRequestError: - wh.metricsClient.IncCounter(scope, metrics.CadenceErrBadRequestCounter) + scope.IncCounter(metrics.CadenceErrBadRequestCounter) return err case *gen.DomainNotActiveError: - wh.metricsClient.IncCounter(scope, metrics.CadenceErrBadRequestCounter) + scope.IncCounter(metrics.CadenceErrBadRequestCounter) return err case *gen.ServiceBusyError: - wh.metricsClient.IncCounter(scope, metrics.CadenceErrServiceBusyCounter) + scope.IncCounter(metrics.CadenceErrServiceBusyCounter) return err case *gen.EntityNotExistsError: - wh.metricsClient.IncCounter(scope, metrics.CadenceErrEntityNotExistsCounter) + scope.IncCounter(metrics.CadenceErrEntityNotExistsCounter) return err case *gen.WorkflowExecutionAlreadyStartedError: - wh.metricsClient.IncCounter(scope, metrics.CadenceErrExecutionAlreadyStartedCounter) + scope.IncCounter(metrics.CadenceErrExecutionAlreadyStartedCounter) return err case *gen.DomainAlreadyExistsError: - wh.metricsClient.IncCounter(scope, metrics.CadenceErrDomainAlreadyExistsCounter) + scope.IncCounter(metrics.CadenceErrDomainAlreadyExistsCounter) return err case *gen.CancellationAlreadyRequestedError: - wh.metricsClient.IncCounter(scope, metrics.CadenceErrCancellationAlreadyRequestedCounter) + scope.IncCounter(metrics.CadenceErrCancellationAlreadyRequestedCounter) return err case *gen.QueryFailedError: - wh.metricsClient.IncCounter(scope, metrics.CadenceErrQueryFailedCounter) + scope.IncCounter(metrics.CadenceErrQueryFailedCounter) return err case *gen.LimitExceededError: - wh.metricsClient.IncCounter(scope, metrics.CadenceErrLimitExceededCounter) + scope.IncCounter(metrics.CadenceErrLimitExceededCounter) return err case *yarpcerrors.Status: if err.Code() == yarpcerrors.CodeDeadlineExceeded { - wh.metricsClient.IncCounter(scope, metrics.CadenceErrContextTimeoutCounter) + scope.IncCounter(metrics.CadenceErrContextTimeoutCounter) return err } } logging.LogUncategorizedError(wh.Service.GetLogger(), err) - wh.metricsClient.IncCounter(scope, metrics.CadenceFailures) + scope.IncCounter(metrics.CadenceFailures) return fmt.Errorf("Cadence internal uncategorized error, msg: %v", err.Error()) } -func (wh *WorkflowHandler) validateTaskListType(t *gen.TaskListType, scope int) error { +func (wh *WorkflowHandler) validateTaskListType(t *gen.TaskListType, scope metrics.Scope) error { if t == nil { return wh.error(errTaskListTypeNotSet, scope) } return nil } -func (wh *WorkflowHandler) validateTaskList(t *gen.TaskList, scope int) error { +func (wh *WorkflowHandler) validateTaskList(t *gen.TaskList, scope metrics.Scope) error { if t == nil || t.Name == nil || t.GetName() == "" { return wh.error(errTaskListNotSet, scope) } @@ -2987,7 +3166,7 @@ func (wh *WorkflowHandler) validateTaskList(t *gen.TaskList, scope int) error { return nil } -func (wh *WorkflowHandler) validateExecutionAndEmitMetrics(w *gen.WorkflowExecution, scope int) error { +func (wh *WorkflowHandler) validateExecutionAndEmitMetrics(w *gen.WorkflowExecution, scope metrics.Scope) error { err := validateExecution(w) if err != nil { return wh.error(err, scope) @@ -3066,8 +3245,14 @@ func (wh *WorkflowHandler) createDomainResponse(info *persistence.DomainInfo, co return infoResult, configResult, replicationConfigResult } -func (wh *WorkflowHandler) createPollForDecisionTaskResponse(ctx context.Context, scope int, domainID string, - matchingResp *m.PollForDecisionTaskResponse, eventStoreVersion int32, branchToken []byte) (*gen.PollForDecisionTaskResponse, error) { +func (wh *WorkflowHandler) createPollForDecisionTaskResponse( + ctx context.Context, + scope metrics.Scope, + domainID string, + matchingResp *m.PollForDecisionTaskResponse, + eventStoreVersion int32, + branchToken []byte, +) (*gen.PollForDecisionTaskResponse, error) { if matchingResp.WorkflowExecution == nil { // this will happen if there is no decision task to be send to worker / caller @@ -3103,10 +3288,9 @@ func (wh *WorkflowHandler) createPollForDecisionTaskResponse(ctx context.Context if dErr != nil { return nil, dErr } - domainScope := wh.metricsClient.Scope(scope, metrics.DomainTag(domain.GetInfo().Name)) + scope = scope.Tagged(metrics.DomainTag(domain.GetInfo().Name)) history, persistenceToken, err = wh.getHistory( scope, - domainScope, domainID, *matchingResp.WorkflowExecution, firstEventID, @@ -3225,7 +3409,7 @@ func (wh *WorkflowHandler) getArchivedHistory( ctx context.Context, request *gen.GetWorkflowExecutionHistoryRequest, domainID string, - scope int, + scope metrics.Scope, ) (*gen.GetWorkflowExecutionHistoryResponse, error) { entry, err := wh.domainCache.GetDomainByID(domainID) diff --git a/service/frontend/workflowHandler_test.go b/service/frontend/workflowHandler_test.go index 32d3235ba5a..4bfc45d61e2 100644 --- a/service/frontend/workflowHandler_test.go +++ b/service/frontend/workflowHandler_test.go @@ -939,7 +939,7 @@ func (s *workflowHandlerSuite) TestGetArchivedHistory_Failure_DomainCacheEntryEr wh := s.getWorkflowHandlerWithParams(s.mockService, config, mMetadataManager, s.mockBlobstoreClient) wh.metricsClient = wh.Service.GetMetricsClient() wh.startWG.Done() - resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest(nil), "test-domain-id", 0) + resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest(nil), "test-domain-id", metrics.NoopScope(metrics.Frontend)) s.Nil(resp) s.Error(err) } @@ -954,7 +954,7 @@ func (s *workflowHandlerSuite) TestGetArchivedHistory_Failure_ArchivalBucketEmpt wh := s.getWorkflowHandlerWithParams(mService, config, mMetadataManager, s.mockBlobstoreClient) wh.metricsClient = wh.Service.GetMetricsClient() wh.startWG.Done() - resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest(nil), "test-domain-id", 0) + resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest(nil), "test-domain-id", metrics.NoopScope(metrics.Frontend)) s.Nil(resp) s.Error(err) } @@ -969,7 +969,7 @@ func (s *workflowHandlerSuite) TestGetArchivedHistory_Failure_InvalidPageToken() wh := s.getWorkflowHandlerWithParams(mService, config, mMetadataManager, s.mockBlobstoreClient) wh.metricsClient = wh.Service.GetMetricsClient() wh.startWG.Done() - resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest([]byte{3, 4, 5, 1}), "test-domain-id", 0) + resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest([]byte{3, 4, 5, 1}), "test-domain-id", metrics.NoopScope(metrics.Frontend)) s.Nil(resp) s.Error(err) s.Equal(errInvalidNextArchivalPageToken, err) @@ -987,7 +987,7 @@ func (s *workflowHandlerSuite) TestGetArchivedHistory_Failure_InvalidBlobKey() { wh.startWG.Done() getHistoryRequest := getHistoryRequest(nil) getHistoryRequest.Execution.WorkflowId = common.StringPtr("") - resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest, "test-domain-id", 0) + resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest, "test-domain-id", metrics.NoopScope(metrics.Frontend)) s.Nil(resp) s.Error(err) } @@ -1004,7 +1004,7 @@ func (s *workflowHandlerSuite) TestGetArchivedHistory_Failure_FailedToDownload() wh := s.getWorkflowHandlerWithParams(mService, config, mMetadataManager, mBlobstore) wh.metricsClient = wh.Service.GetMetricsClient() wh.startWG.Done() - resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest(nil), "test-domain-id", 0) + resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest(nil), "test-domain-id", metrics.NoopScope(metrics.Frontend)) s.Nil(resp) s.Error(err) } @@ -1033,7 +1033,7 @@ func (s *workflowHandlerSuite) TestGetArchivedHistory_Success_GetFirstPage() { wh := s.getWorkflowHandlerWithParams(mService, config, mMetadataManager, mBlobstore) wh.metricsClient = wh.Service.GetMetricsClient() wh.startWG.Done() - resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest(nil), "test-domain-id", 0) + resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest(nil), "test-domain-id", metrics.NoopScope(metrics.Frontend)) s.NoError(err) s.NotNil(resp) s.NotNil(resp.History) @@ -1070,7 +1070,7 @@ func (s *workflowHandlerSuite) TestGetArchivedHistory_Success_GetLastPage() { wh := s.getWorkflowHandlerWithParams(mService, config, mMetadataManager, mBlobstore) wh.metricsClient = wh.Service.GetMetricsClient() wh.startWG.Done() - resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest(nil), "test-domain-id", 0) + resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest(nil), "test-domain-id", metrics.NoopScope(metrics.Frontend)) s.NoError(err) s.NotNil(resp) s.NotNil(resp.History) diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 5468a22a247..cab536432d2 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -1129,9 +1129,16 @@ type decisionBlobSizeChecker struct { } func (c *decisionBlobSizeChecker) failWorkflowIfBlobSizeExceedsLimit(blob []byte, message string) (bool, error) { - err := common.CheckEventBlobSizeLimit(len(blob), c.sizeLimitWarn, c.sizeLimitError, - c.domainID, c.workflowID, c.runID, c.metricsClient, metrics.HistoryRespondDecisionTaskCompletedScope, c.logger) - + err := common.CheckEventBlobSizeLimit( + len(blob), + c.sizeLimitWarn, + c.sizeLimitError, + c.domainID, + c.workflowID, + c.runID, + c.metricsClient.Scope(metrics.HistoryRespondDecisionTaskCompletedScope), + c.logger, + ) if err == nil { return false, nil }