Skip to content

Commit

Permalink
Unify blob size limit breach logging and metering (cadence-workflow#6250
Browse files Browse the repository at this point in the history
)

* Unify blob size limit breach logging and metering
  • Loading branch information
3vilhamster authored Aug 26, 2024
1 parent 5575487 commit 630013c
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 15 deletions.
1 change: 1 addition & 0 deletions common/archiver/gcloud/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ golang.org/x/crypto v0.0.0-20220314234659-1baeb1ce4c0b/go.mod h1:IxCIyHEi3zRg3s0
golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY=
golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20231226003508-02704c960a9b h1:kLiC65FbiHWFAOu+lxwNPujcsl8VYyTYYEZnsOO1WK4=
golang.org/x/exp/typeparams v0.0.0-20220218215828-6cf2b201936e h1:qyrTQ++p1afMkO4DPEeLGq/3oTsdlvdH4vqZUBWzUKM=
golang.org/x/exp/typeparams v0.0.0-20220218215828-6cf2b201936e/go.mod h1:AbB0pIl9nAr9wVwH+Z2ZpaocVmF5I4GyWCDIsVjR0bk=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand Down
3 changes: 3 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2085,6 +2085,8 @@ const (
HistoryCount
EventBlobSize

EventBlobSizeExceedLimit

DecisionResultCount

ArchivalConfigFailures
Expand Down Expand Up @@ -2746,6 +2748,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
DomainCacheCallbacksCount: {metricName: "domain_cache_callbacks_count", metricType: Counter},
HistorySize: {metricName: "history_size", metricType: Timer},
HistoryCount: {metricName: "history_count", metricType: Timer},
EventBlobSizeExceedLimit: {metricName: "blob_size_exceed_limit", metricType: Counter},
EventBlobSize: {metricName: "event_blob_size", metricType: Timer},
DecisionResultCount: {metricName: "decision_result_count", metricType: Timer},
ArchivalConfigFailures: {metricName: "archivalconfig_failures", metricType: Counter},
Expand Down
42 changes: 29 additions & 13 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,7 @@ func CheckEventBlobSizeLimit(
warnLimit int,
errorLimit int,
domainID string,
domainName string,
workflowID string,
runID string,
scope metrics.Scope,
Expand All @@ -606,21 +607,36 @@ func CheckEventBlobSizeLimit(

scope.RecordTimer(metrics.EventBlobSize, time.Duration(actualSize))

if actualSize > warnLimit {
if logger != nil {
logger.Warn("Blob size exceeds limit.",
tag.WorkflowDomainID(domainID),
tag.WorkflowID(workflowID),
tag.WorkflowRunID(runID),
tag.WorkflowSize(int64(actualSize)),
blobSizeViolationOperationTag)
}
if errorLimit < warnLimit {
logger.Warn("Error limit is less than warn limit.", tag.WorkflowDomainName(domainName), tag.WorkflowDomainID(domainID))

if actualSize > errorLimit {
return ErrBlobSizeExceedsLimit
}
warnLimit = errorLimit
}
return nil

if actualSize <= warnLimit {
return nil
}

tags := []tag.Tag{
tag.WorkflowDomainName(domainName),
tag.WorkflowDomainID(domainID),
tag.WorkflowID(workflowID),
tag.WorkflowRunID(runID),
tag.WorkflowSize(int64(actualSize)),
blobSizeViolationOperationTag,
}

if actualSize <= errorLimit {
logger.Warn("Blob size close to the limit.", tags...)

return nil
}

scope.Tagged(metrics.DomainTag(domainName)).IncCounter(metrics.EventBlobSizeExceedLimit)

logger.Error("Blob size exceeds limit.", tags...)

return ErrBlobSizeExceedsLimit
}

// ValidateLongPollContextTimeout check if the context timeout for a long poll handler is too short or below a normal value.
Expand Down
90 changes: 90 additions & 0 deletions common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/uber-go/tally"
"go.uber.org/yarpc/yarpcerrors"
"golang.org/x/exp/maps"

"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/log"
Expand Down Expand Up @@ -1638,3 +1640,91 @@ func TestNewPerTaskListScope(t *testing.T) {
assert.NotNil(t, NewPerTaskListScope("test-domain", "test-tasklist", types.TaskListKindNormal, metrics.NewNoopMetricsClient(), 0))
assert.NotNil(t, NewPerTaskListScope("test-domain", "test-tasklist", types.TaskListKindSticky, metrics.NewNoopMetricsClient(), 0))
}

func TestCheckEventBlobSizeLimit(t *testing.T) {
for name, c := range map[string]struct {
blobSize int
warnSize int
errSize int
wantErr error
prepareLogger func(*log.MockLogger)
assertMetrics func(tally.Snapshot)
}{
"blob size is less than limit": {
blobSize: 10,
warnSize: 20,
errSize: 30,
wantErr: nil,
},
"blob size is greater than warn limit": {
blobSize: 21,
warnSize: 20,
errSize: 30,
wantErr: nil,
prepareLogger: func(logger *log.MockLogger) {
logger.On("Warn", "Blob size close to the limit.", mock.Anything).Once()
},
},
"blob size is greater than error limit": {
blobSize: 31,
warnSize: 20,
errSize: 30,
wantErr: ErrBlobSizeExceedsLimit,
prepareLogger: func(logger *log.MockLogger) {
logger.On("Error", "Blob size exceeds limit.", mock.Anything).Once()
},
assertMetrics: func(snapshot tally.Snapshot) {
counters := snapshot.Counters()
assert.Len(t, counters, 1)
values := maps.Values(counters)
assert.Equal(t, "test.blob_size_exceed_limit", values[0].Name())
assert.Equal(t, int64(1), values[0].Value())
},
},
"error limit is less then warn limit": {
blobSize: 21,
warnSize: 30,
errSize: 20,
wantErr: ErrBlobSizeExceedsLimit,
prepareLogger: func(logger *log.MockLogger) {
logger.On("Warn", "Error limit is less than warn limit.", mock.Anything).Once()
logger.On("Error", "Blob size exceeds limit.", mock.Anything).Once()
},
},
} {
t.Run(name, func(t *testing.T) {
testScope := tally.NewTestScope("test", nil)
metricsClient := metrics.NewClient(testScope, metrics.History)
logger := &log.MockLogger{}
defer logger.AssertExpectations(t)

if c.prepareLogger != nil {
c.prepareLogger(logger)
}

const (
domainID = "testDomainID"
domainName = "testDomainName"
workflowID = "testWorkflowID"
runID = "testRunID"
)

got := CheckEventBlobSizeLimit(
c.blobSize,
c.warnSize,
c.errSize,
domainID,
domainName,
workflowID,
runID,
metricsClient.Scope(1),
logger,
tag.OperationName("testOperation"),
)
require.Equal(t, c.wantErr, got)
if c.assertMetrics != nil {
c.assertMetrics(testScope.Snapshot())
}
})
}
}
15 changes: 15 additions & 0 deletions service/frontend/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,7 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeat(
sizeLimitWarn,
sizeLimitError,
taskToken.DomainID,
domainName,
taskToken.WorkflowID,
taskToken.RunID,
scope,
Expand Down Expand Up @@ -876,6 +877,7 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeatByID(
sizeLimitWarn,
sizeLimitError,
taskToken.DomainID,
domainName,
taskToken.WorkflowID,
taskToken.RunID,
scope,
Expand Down Expand Up @@ -973,6 +975,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCompleted(
sizeLimitWarn,
sizeLimitError,
taskToken.DomainID,
domainName,
taskToken.WorkflowID,
taskToken.RunID,
scope,
Expand Down Expand Up @@ -1078,6 +1081,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCompletedByID(
sizeLimitWarn,
sizeLimitError,
taskToken.DomainID,
domainName,
taskToken.WorkflowID,
taskToken.RunID,
scope,
Expand Down Expand Up @@ -1174,6 +1178,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailed(
sizeLimitWarn,
sizeLimitError,
taskToken.DomainID,
domainName,
taskToken.WorkflowID,
taskToken.RunID,
scope,
Expand Down Expand Up @@ -1268,6 +1273,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailedByID(
sizeLimitWarn,
sizeLimitError,
taskToken.DomainID,
domainName,
taskToken.WorkflowID,
taskToken.RunID,
scope,
Expand Down Expand Up @@ -1355,6 +1361,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceled(
sizeLimitWarn,
sizeLimitError,
taskToken.DomainID,
domainName,
taskToken.WorkflowID,
taskToken.RunID,
scope,
Expand Down Expand Up @@ -1460,6 +1467,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceledByID(
sizeLimitWarn,
sizeLimitError,
taskToken.DomainID,
domainName,
taskToken.WorkflowID,
taskToken.RunID,
scope,
Expand Down Expand Up @@ -1647,6 +1655,7 @@ func (wh *WorkflowHandler) RespondDecisionTaskFailed(
sizeLimitWarn,
sizeLimitError,
taskToken.DomainID,
domainName,
taskToken.WorkflowID,
taskToken.RunID,
scope,
Expand Down Expand Up @@ -1712,6 +1721,7 @@ func (wh *WorkflowHandler) RespondQueryTaskCompleted(
sizeLimitWarn,
sizeLimitError,
queryTaskToken.DomainID,
domainName,
"",
"",
scope,
Expand Down Expand Up @@ -1953,6 +1963,7 @@ func (wh *WorkflowHandler) validateStartWorkflowExecutionRequest(ctx context.Con
sizeLimitWarn,
sizeLimitError,
domainID,
domainName,
startRequest.GetWorkflowID(),
"",
scope,
Expand Down Expand Up @@ -2303,6 +2314,7 @@ func (wh *WorkflowHandler) SignalWorkflowExecution(
sizeLimitWarn,
sizeLimitError,
domainID,
domainName,
signalRequest.GetWorkflowExecution().GetWorkflowID(),
signalRequest.GetWorkflowExecution().GetRunID(),
scope,
Expand Down Expand Up @@ -2536,6 +2548,7 @@ func (wh *WorkflowHandler) validateSignalWithStartWorkflowExecutionRequest(ctx c
sizeLimitWarn,
sizeLimitError,
domainID,
domainName,
signalWithStartRequest.GetWorkflowID(),
"",
scope,
Expand All @@ -2550,6 +2563,7 @@ func (wh *WorkflowHandler) validateSignalWithStartWorkflowExecutionRequest(ctx c
sizeLimitWarn,
sizeLimitError,
domainID,
domainName,
signalWithStartRequest.GetWorkflowID(),
"",
scope,
Expand Down Expand Up @@ -3359,6 +3373,7 @@ func (wh *WorkflowHandler) QueryWorkflow(
sizeLimitWarn,
sizeLimitError,
domainID,
domainName,
queryRequest.GetExecution().GetWorkflowID(),
queryRequest.GetExecution().GetRunID(),
scope,
Expand Down
5 changes: 5 additions & 0 deletions service/history/decision/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type (
}

workflowSizeChecker struct {
domainName string

blobSizeLimitWarn int
blobSizeLimitError int

Expand Down Expand Up @@ -90,6 +92,7 @@ func newAttrValidator(
}

func newWorkflowSizeChecker(
domainName string,
blobSizeLimitWarn int,
blobSizeLimitError int,
historySizeLimitWarn int,
Expand All @@ -103,6 +106,7 @@ func newWorkflowSizeChecker(
logger log.Logger,
) *workflowSizeChecker {
return &workflowSizeChecker{
domainName: domainName,
blobSizeLimitWarn: blobSizeLimitWarn,
blobSizeLimitError: blobSizeLimitError,
historySizeLimitWarn: historySizeLimitWarn,
Expand All @@ -129,6 +133,7 @@ func (c *workflowSizeChecker) failWorkflowIfBlobSizeExceedsLimit(
c.blobSizeLimitWarn,
c.blobSizeLimitError,
executionInfo.DomainID,
c.domainName,
executionInfo.WorkflowID,
executionInfo.RunID,
c.metricsScope.Tagged(decisionTypeTag),
Expand Down
2 changes: 1 addition & 1 deletion service/history/decision/checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ func TestWorkflowSizeChecker_failWorkflowIfBlobSizeExceedsLimit(t *testing.T) {
assertLogsAndMetrics: func(t *testing.T, logs *observer.ObservedLogs, scope tally.TestScope) {
logEntries := logs.All()
require.Len(t, logEntries, 1)
assert.Equal(t, "Blob size exceeds limit.", logEntries[0].Message)
assert.Equal(t, "Blob size close to the limit.", logEntries[0].Message)
},
},
"fail": {
Expand Down
4 changes: 3 additions & 1 deletion service/history/decision/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ Update_History_Loop:
failMessage = fmt.Sprintf("binary %v is already marked as bad deployment", binChecksum)
} else {
workflowSizeChecker := newWorkflowSizeChecker(
domainName,
handler.config.BlobSizeLimitWarn(domainName),
handler.config.BlobSizeLimitError(domainName),
handler.config.HistorySizeLimitWarn(domainName),
Expand Down Expand Up @@ -779,10 +780,11 @@ func (handler *handlerImpl) handleBufferedQueries(
sizeLimitWarn,
sizeLimitError,
domainID,
domain,
workflowID,
runID,
scope,
handler.throttledLogger,
handler.logger,
tag.BlobSizeViolationOperation("ConsistentQuery"),
); err != nil {
handler.logger.Info("failing query because query result size is too large",
Expand Down
1 change: 1 addition & 0 deletions service/history/decision/task_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1641,6 +1641,7 @@ func newTaskHandlerForTest(t *testing.T) *taskHandlerImpl {
mockMutableState := execution.NewMockMutableState(ctrl)
mockDomainCache := cache.NewMockDomainCache(ctrl)
workflowSizeChecker := newWorkflowSizeChecker(
"testDomain",
testConfig.BlobSizeLimitWarn(constants.TestDomainName),
testConfig.BlobSizeLimitError(constants.TestDomainName),
testConfig.HistorySizeLimitWarn(constants.TestDomainName),
Expand Down

0 comments on commit 630013c

Please sign in to comment.