Skip to content

Commit

Permalink
Refactor workflow history size / count related code logic (cadence-wo…
Browse files Browse the repository at this point in the history
…rkflow#2171)

* Move history size / count limit check into decision handler
* Change exceeding history size / count limit behavior from force termination to fail workflow
* Move workflow stats metrics related code to dedicated file
  • Loading branch information
wxing1292 authored Jul 8, 2019
1 parent 392feba commit 4bc4cd8
Show file tree
Hide file tree
Showing 11 changed files with 346 additions and 265 deletions.
6 changes: 3 additions & 3 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ const (
FailureReasonFailureDetailsExceedsLimit = "FAILURE_DETAILS_EXCEEDS_LIMIT"
// FailureReasonCancelDetailsExceedsLimit is failureReason for cancel details exceeds limit
FailureReasonCancelDetailsExceedsLimit = "CANCEL_DETAILS_EXCEEDS_LIMIT"
//FailureReasonHeartbeatExceedsLimit is failureReason for heartbeat exceeds limit
// FailureReasonHeartbeatExceedsLimit is failureReason for heartbeat exceeds limit
FailureReasonHeartbeatExceedsLimit = "HEARTBEAT_EXCEEDS_LIMIT"
// FailureReasonDecisionBlobSizeExceedsLimit is the failureReason for decision blob exceeds size limit
FailureReasonDecisionBlobSizeExceedsLimit = "DECISION_BLOB_SIZE_EXCEEDS_LIMIT"
// TerminateReasonSizeExceedsLimit is reason to terminate workflow when history size or count exceed limit
TerminateReasonSizeExceedsLimit = "HISTORY_EXCEEDS_LIMIT"
// FailureReasonSizeExceedsLimit is reason to fail workflow when history size or count exceed limit
FailureReasonSizeExceedsLimit = "HISTORY_EXCEEDS_LIMIT"
// FailureReasonTransactionSizeExceedsLimit is the failureReason for when transaction cannot be committed because it exceeds size limit
FailureReasonTransactionSizeExceedsLimit = "TRANSACTION_SIZE_EXCEEDS_LIMIT"
)
Expand Down
10 changes: 4 additions & 6 deletions host/sizelimit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,7 @@ func (s *sizeLimitIntegrationSuite) TestTerminateWorkflowCausedBySizeLimit() {
// process this decision will trigger history exceed limit error
_, err := poller.PollAndProcessDecisionTask(false, false)
s.Logger.Info("PollAndProcessDecisionTask", tag.Error(err))
s.NotNil(err)
s.Equal(&workflow.EntityNotExistsError{Message: "Workflow execution already completed."}, err)
s.Nil(err)

// verify last event is terminated event
historyResponse, err := s.engine.GetWorkflowExecutionHistory(createContext(), &workflow.GetWorkflowExecutionHistoryRequest{
Expand All @@ -168,10 +167,9 @@ func (s *sizeLimitIntegrationSuite) TestTerminateWorkflowCausedBySizeLimit() {
s.Nil(err)
history := historyResponse.History
lastEvent := history.Events[len(history.Events)-1]
s.Equal(workflow.EventTypeWorkflowExecutionTerminated, lastEvent.GetEventType())
terminateEventAttributes := lastEvent.WorkflowExecutionTerminatedEventAttributes
s.Equal(common.TerminateReasonSizeExceedsLimit, terminateEventAttributes.GetReason())
s.Equal("cadence-history-server", terminateEventAttributes.GetIdentity())
s.Equal(workflow.EventTypeWorkflowExecutionFailed, lastEvent.GetEventType())
failedEventAttributes := lastEvent.WorkflowExecutionFailedEventAttributes
s.Equal(common.FailureReasonSizeExceedsLimit, failedEventAttributes.GetReason())

// verify visibility is correctly processed from open to close
isCloseCorrect := false
Expand Down
104 changes: 80 additions & 24 deletions service/history/decisionChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/elasticsearch/validator"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
)
Expand All @@ -41,13 +42,20 @@ type (
searchAttributesValidator *validator.SearchAttributesValidator
}

decisionBlobSizeChecker struct {
sizeLimitWarn int
sizeLimitError int
completedID int64
mutableState mutableState
metricsClient metrics.Client
logger log.Logger
workflowSizeChecker struct {
blobSizeLimitWarn int
blobSizeLimitError int

historySizeLimitWarn int
historySizeLimitError int

historyCountLimitWarn int
historyCountLimitError int

completedID int64
mutableState mutableState
metricsClient metrics.Client
logger log.Logger
}
)

Expand All @@ -59,42 +67,52 @@ func newDecisionAttrValidator(
return &decisionAttrValidator{
domainCache: domainCache,
maxIDLengthLimit: config.MaxIDLengthLimit(),
searchAttributesValidator: validator.NewSearchAttributesValidator(logger,
searchAttributesValidator: validator.NewSearchAttributesValidator(
logger,
config.ValidSearchAttributes,
config.SearchAttributesNumberOfKeysLimit,
config.SearchAttributesSizeOfValueLimit,
config.SearchAttributesTotalSizeLimit),
config.SearchAttributesTotalSizeLimit,
),
}
}

func newDecisionBlobSizeChecker(
sizeLimitWarn int,
sizeLimitError int,
func newWorkflowSizeChecker(
blobSizeLimitWarn int,
blobSizeLimitError int,
historySizeLimitWarn int,
historySizeLimitError int,
historyCountLimitWarn int,
historyCountLimitError int,
completedID int64,
mutableState mutableState,
metricsClient metrics.Client,
logger log.Logger,
) *decisionBlobSizeChecker {
return &decisionBlobSizeChecker{
sizeLimitWarn: sizeLimitWarn,
sizeLimitError: sizeLimitError,
completedID: completedID,
mutableState: mutableState,
metricsClient: metricsClient,
logger: logger,
) *workflowSizeChecker {
return &workflowSizeChecker{
blobSizeLimitWarn: blobSizeLimitWarn,
blobSizeLimitError: blobSizeLimitError,
historySizeLimitWarn: historySizeLimitWarn,
historySizeLimitError: historySizeLimitError,
historyCountLimitWarn: historyCountLimitWarn,
historyCountLimitError: historyCountLimitError,
completedID: completedID,
mutableState: mutableState,
metricsClient: metricsClient,
logger: logger,
}
}

func (c *decisionBlobSizeChecker) failWorkflowIfBlobSizeExceedsLimit(
func (c *workflowSizeChecker) failWorkflowIfBlobSizeExceedsLimit(
blob []byte,
message string,
) (bool, error) {

executionInfo := c.mutableState.GetExecutionInfo()
err := common.CheckEventBlobSizeLimit(
len(blob),
c.sizeLimitWarn,
c.sizeLimitError,
c.blobSizeLimitWarn,
c.blobSizeLimitError,
executionInfo.DomainID,
executionInfo.WorkflowID,
executionInfo.RunID,
Expand All @@ -111,12 +129,50 @@ func (c *decisionBlobSizeChecker) failWorkflowIfBlobSizeExceedsLimit(
}

if _, err := c.mutableState.AddFailWorkflowEvent(c.completedID, attributes); err != nil {
return false, &workflow.InternalServiceError{Message: "Unable to add fail workflow event."}
return false, err
}

return true, nil
}

func (c *workflowSizeChecker) failWorkflowSizeExceedsLimit() (bool, error) {
historyCount := int(c.mutableState.GetNextEventID()) - 1
historySize := int(c.mutableState.GetHistorySize())

if historySize > c.historySizeLimitError || historyCount > c.historyCountLimitError {
executionInfo := c.mutableState.GetExecutionInfo()
c.logger.Warn("history size exceeds limit.",
tag.WorkflowDomainID(executionInfo.DomainID),
tag.WorkflowID(executionInfo.WorkflowID),
tag.WorkflowRunID(executionInfo.RunID),
tag.WorkflowHistorySize(historySize),
tag.WorkflowEventCount(historyCount))

attributes := &workflow.FailWorkflowExecutionDecisionAttributes{
Reason: common.StringPtr(common.FailureReasonSizeExceedsLimit),
Details: []byte("Workflow history size / count exceeds limit."),
}

if _, err := c.mutableState.AddFailWorkflowEvent(c.completedID, attributes); err != nil {
return false, err
}
return true, nil
}

if historySize > c.historySizeLimitWarn || historyCount > c.historyCountLimitWarn {
executionInfo := c.mutableState.GetExecutionInfo()
c.logger.Warn("history size exceeds limit.",
tag.WorkflowDomainID(executionInfo.DomainID),
tag.WorkflowID(executionInfo.WorkflowID),
tag.WorkflowRunID(executionInfo.RunID),
tag.WorkflowHistorySize(historySize),
tag.WorkflowEventCount(historyCount))
return false, nil
}

return false, nil
}

func (v *decisionAttrValidator) validateActivityScheduleAttributes(
domainID string,
targetDomainID string,
Expand Down
20 changes: 14 additions & 6 deletions service/history/decisionHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,11 @@ func newDecisionHandler(historyEngine *historyEngineImpl) *decisionHandlerImpl {
metricsClient: historyEngine.metricsClient,
logger: historyEngine.logger,
throttledLogger: historyEngine.throttledLogger,
decisionAttrValidator: newDecisionAttrValidator(historyEngine.shard.GetDomainCache(),
historyEngine.config, historyEngine.logger),
decisionAttrValidator: newDecisionAttrValidator(
historyEngine.shard.GetDomainCache(),
historyEngine.config,
historyEngine.logger,
),
}
}

Expand Down Expand Up @@ -374,9 +377,14 @@ Update_History_Loop:
failMessage = fmt.Sprintf("binary %v is already marked as bad deployment", binChecksum)
} else {

decisionBlobSizeChecker := newDecisionBlobSizeChecker(
handler.config.BlobSizeLimitWarn(domainEntry.GetInfo().Name),
handler.config.BlobSizeLimitError(domainEntry.GetInfo().Name),
domainName := domainEntry.GetInfo().Name
workflowSizeChecker := newWorkflowSizeChecker(
handler.config.BlobSizeLimitWarn(domainName),
handler.config.BlobSizeLimitError(domainName),
handler.config.HistorySizeLimitWarn(domainName),
handler.config.HistorySizeLimitError(domainName),
handler.config.HistoryCountLimitWarn(domainName),
handler.config.HistoryCountLimitError(domainName),
completedEvent.GetEventId(),
msBuilder,
handler.metricsClient,
Expand All @@ -390,7 +398,7 @@ Update_History_Loop:
domainEntry,
msBuilder,
handler.decisionAttrValidator,
decisionBlobSizeChecker,
workflowSizeChecker,
handler.logger,
timerBuilderProvider,
handler.domainCache,
Expand Down
11 changes: 8 additions & 3 deletions service/history/decisionTaskHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type (

// validation
attrValidator *decisionAttrValidator
sizeLimitChecker *decisionBlobSizeChecker
sizeLimitChecker *workflowSizeChecker

logger log.Logger
timerBuilderProvider timerBuilderProvider
Expand All @@ -76,7 +76,7 @@ func newDecisionTaskHandler(
domainEntry *cache.DomainCacheEntry,
mutableState mutableState,
attrValidator *decisionAttrValidator,
sizeLimitChecker *decisionBlobSizeChecker,
sizeLimitChecker *workflowSizeChecker,
logger log.Logger,
timerBuilderProvider timerBuilderProvider,
domainCache cache.DomainCache,
Expand Down Expand Up @@ -118,7 +118,12 @@ func (handler *decisionTaskHandlerImpl) handleDecisions(
decisions []*workflow.Decision,
) error {

var err error
// overall workflow size / count check
failWorkflow, err := handler.sizeLimitChecker.failWorkflowSizeExceedsLimit()
if err != nil || failWorkflow {
return err
}

for _, decision := range decisions {

err = handler.handleDecision(decision)
Expand Down
28 changes: 21 additions & 7 deletions service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,9 @@ func (s *engine2Suite) TestRecordDecisionTaskStartedSuccessStickyEnabled() {

s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse, nil).Once()
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything).Return(&p.AppendHistoryNodesResponse{Size: 0}, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(&p.UpdateWorkflowExecutionResponse{
MutableStateUpdateSessionStats: &p.MutableStateUpdateSessionStats{},
}, nil).Once()
s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(
&p.GetDomainResponse{
Info: &p.DomainInfo{ID: domainID},
Expand Down Expand Up @@ -467,7 +469,9 @@ func (s *engine2Suite) TestRecordDecisionTaskStartedConflictOnUpdate() {

s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse2, nil).Once()
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything).Return(&p.AppendHistoryNodesResponse{Size: 0}, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(&p.UpdateWorkflowExecutionResponse{
MutableStateUpdateSessionStats: &p.MutableStateUpdateSessionStats{},
}, nil).Once()
s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(
&p.GetDomainResponse{
Info: &p.DomainInfo{ID: domainID},
Expand Down Expand Up @@ -691,7 +695,9 @@ func (s *engine2Suite) TestRecordDecisionTaskSuccess() {
gwmsResponse := &p.GetWorkflowExecutionResponse{State: ms}
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse, nil).Once()
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything).Return(&p.AppendHistoryNodesResponse{Size: 0}, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(&p.UpdateWorkflowExecutionResponse{
MutableStateUpdateSessionStats: &p.MutableStateUpdateSessionStats{},
}, nil).Once()
s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(
&p.GetDomainResponse{
Info: &p.DomainInfo{ID: domainID},
Expand Down Expand Up @@ -799,7 +805,9 @@ func (s *engine2Suite) TestRecordActivityTaskStartedSuccess() {

s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse1, nil).Once()
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything).Return(&p.AppendHistoryNodesResponse{Size: 0}, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(&p.UpdateWorkflowExecutionResponse{
MutableStateUpdateSessionStats: &p.MutableStateUpdateSessionStats{},
}, nil).Once()
s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(
&p.GetDomainResponse{
Info: &p.DomainInfo{ID: domainID},
Expand Down Expand Up @@ -851,7 +859,9 @@ func (s *engine2Suite) TestRequestCancelWorkflowExecutionSuccess() {

s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse1, nil).Once()
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything).Return(&p.AppendHistoryNodesResponse{Size: 0}, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(&p.UpdateWorkflowExecutionResponse{
MutableStateUpdateSessionStats: &p.MutableStateUpdateSessionStats{},
}, nil).Once()
s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(
&p.GetDomainResponse{
Info: &p.DomainInfo{ID: domainID},
Expand Down Expand Up @@ -981,7 +991,9 @@ func (s *engine2Suite) TestRespondDecisionTaskCompletedRecordMarkerDecision() {

s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse, nil).Once()
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything).Return(&p.AppendHistoryNodesResponse{Size: 0}, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(&p.UpdateWorkflowExecutionResponse{
MutableStateUpdateSessionStats: &p.MutableStateUpdateSessionStats{},
}, nil).Once()
s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(
&p.GetDomainResponse{
Info: &p.DomainInfo{ID: domainID},
Expand Down Expand Up @@ -1421,7 +1433,9 @@ func (s *engine2Suite) TestSignalWithStartWorkflowExecution_JustSignal() {
s.mockExecutionMgr.On("GetCurrentExecution", mock.Anything).Return(gceResponse, nil).Once()
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse, nil).Once()
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything).Return(&p.AppendHistoryNodesResponse{Size: 0}, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(&p.UpdateWorkflowExecutionResponse{
MutableStateUpdateSessionStats: &p.MutableStateUpdateSessionStats{},
}, nil).Once()
s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(
&p.GetDomainResponse{
Info: &p.DomainInfo{ID: domainID},
Expand Down
8 changes: 6 additions & 2 deletions service/history/historyEngine3_eventsv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,9 @@ func (s *engine3Suite) TestRecordDecisionTaskStartedSuccessStickyEnabled() {

s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse, nil).Once()
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything).Return(&p.AppendHistoryNodesResponse{Size: 0}, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(&p.UpdateWorkflowExecutionResponse{
MutableStateUpdateSessionStats: &p.MutableStateUpdateSessionStats{},
}, nil).Once()
s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(
&p.GetDomainResponse{
Info: &p.DomainInfo{ID: domainID},
Expand Down Expand Up @@ -340,7 +342,9 @@ func (s *engine3Suite) TestSignalWithStartWorkflowExecution_JustSignal() {
s.mockExecutionMgr.On("GetCurrentExecution", mock.Anything).Return(gceResponse, nil).Once()
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse, nil).Once()
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything).Return(&p.AppendHistoryNodesResponse{Size: 0}, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(&p.UpdateWorkflowExecutionResponse{
MutableStateUpdateSessionStats: &p.MutableStateUpdateSessionStats{},
}, nil).Once()
s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(
&p.GetDomainResponse{
Info: &p.DomainInfo{ID: domainID},
Expand Down
14 changes: 14 additions & 0 deletions service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,20 @@ func (s *engineSuite) TestGetMutableStateSync() {
gweResponse := &persistence.GetWorkflowExecutionResponse{State: ms}
// right now the next event ID is 4
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gweResponse, nil).Once()
s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(
&persistence.GetDomainResponse{
Info: &persistence.DomainInfo{ID: domainID, Name: "testDomain"},
Config: &persistence.DomainConfig{Retention: 1},
ReplicationConfig: &persistence.DomainReplicationConfig{
ActiveClusterName: cluster.TestCurrentClusterName,
Clusters: []*persistence.ClusterReplicationConfig{
{ClusterName: cluster.TestCurrentClusterName},
},
},
TableVersion: persistence.DomainTableVersionV1,
},
nil,
).Once()

// test get the next event ID instantly
response, err := s.mockHistoryEngine.GetMutableState(ctx, &history.GetMutableStateRequest{
Expand Down
Loading

0 comments on commit 4bc4cd8

Please sign in to comment.