Skip to content

Commit

Permalink
SizeLimit: enforce hitory size/count limit (cadence-workflow#1325)
Browse files Browse the repository at this point in the history
  • Loading branch information
yiminc authored Dec 11, 2018
1 parent 7d85142 commit b01495b
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 12 deletions.
2 changes: 2 additions & 0 deletions common/logging/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ const (
TagTimerTaskStatus = "timer-task-status"
TagScheduleAttempt = "schedule-attempt"
TagCursorTimestamp = "cursor-timestamp"
TagHistorySize = "history-size"
TagEventCount = "event-count"

// workflow logging tag values
// TagWorkflowComponent Values
Expand Down
2 changes: 2 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,7 @@ const (
DomainCacheCallbacksLatency

HistorySize
HistoryCount
EventBlobSize

NumCommonMetrics // Needs to be last on this list for iota numbering
Expand Down Expand Up @@ -1052,6 +1053,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
DomainCachePrepareCallbacksLatency: {metricName: "domain-cache.prepare-callbacks.latency", metricType: Timer},
DomainCacheCallbacksLatency: {metricName: "domain-cache.callbacks.latency", metricType: Timer},
HistorySize: {metricName: "history-size", metricType: Timer},
HistoryCount: {metricName: "history-count", metricType: Timer},
EventBlobSize: {metricName: "event-blob-size", metricType: Timer},
},
Frontend: {},
Expand Down
16 changes: 14 additions & 2 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,12 @@ var keys = map[Key]string{
EnableVisibilityToKafka: "system.enableVisibilityToKafka",

// size limit
BlobSizeLimitError: "limit.blobSize.error",
BlobSizeLimitWarn: "limit.blobSize.warn",
BlobSizeLimitError: "limit.blobSize.error",
BlobSizeLimitWarn: "limit.blobSize.warn",
HistorySizeLimitError: "limit.historySize.error",
HistorySizeLimitWarn: "limit.historySize.warn",
HistoryCountLimitError: "limit.historyCount.error",
HistoryCountLimitWarn: "limit.historyCount.warn",

// frontend settings
FrontendPersistenceMaxQPS: "frontend.persistenceMaxQPS",
Expand Down Expand Up @@ -180,6 +184,14 @@ const (
BlobSizeLimitError
// BlobSizeLimitWarn is the per event blob size limit for warning
BlobSizeLimitWarn
// HistorySizeLimitError is the per workflow execution history size limit
HistorySizeLimitError
// HistorySizeLimitWarn is the per workflow execution history size limit for warning
HistorySizeLimitWarn
// HistoryCountLimitError is the per workflow execution history event count limit
HistoryCountLimitError
// HistoryCountLimitWarn is the per workflow execution history event count limit for warning
HistoryCountLimitWarn

// key for frontend

Expand Down
2 changes: 2 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ const (
FailureReasonHeartbeatExceedsLimit = "HEARTBEAT_EXCEEDS_LIMIT"
// FailureReasonDecisionBlobSizeExceedsLimit is the failureReason for decision blob exceeds size limit
FailureReasonDecisionBlobSizeExceedsLimit = "DECISION_BLOB_SIZE_EXCEEDS_LIMIT"
// TerminateReasonSizeExceedsLimit is reason to terminate workflow when history size exceed limit
TerminateReasonSizeExceedsLimit = "HISTORY_SIZE_EXCEEDS_LIMIT"
)

var (
Expand Down
16 changes: 12 additions & 4 deletions service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,12 @@ type Config struct {
EnableArchival dynamicconfig.BoolPropertyFnWithDomainFilter
NumSysWorkflows dynamicconfig.IntPropertyFn

BlobSizeLimitError dynamicconfig.IntPropertyFnWithDomainFilter
BlobSizeLimitWarn dynamicconfig.IntPropertyFnWithDomainFilter
BlobSizeLimitError dynamicconfig.IntPropertyFnWithDomainFilter
BlobSizeLimitWarn dynamicconfig.IntPropertyFnWithDomainFilter
HistorySizeLimitError dynamicconfig.IntPropertyFnWithDomainFilter
HistorySizeLimitWarn dynamicconfig.IntPropertyFnWithDomainFilter
HistoryCountLimitError dynamicconfig.IntPropertyFnWithDomainFilter
HistoryCountLimitWarn dynamicconfig.IntPropertyFnWithDomainFilter
}

// NewConfig returns new service config with default values
Expand Down Expand Up @@ -196,8 +200,12 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int) *Config {
EnableArchival: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.EnableArchival, false),
NumSysWorkflows: dc.GetIntProperty(dynamicconfig.NumSystemWorkflows, 1000),

BlobSizeLimitError: dc.GetIntPropertyFilteredByDomain(dynamicconfig.BlobSizeLimitError, 2*1024*1024),
BlobSizeLimitWarn: dc.GetIntPropertyFilteredByDomain(dynamicconfig.BlobSizeLimitWarn, 256*1024),
BlobSizeLimitError: dc.GetIntPropertyFilteredByDomain(dynamicconfig.BlobSizeLimitError, 2*1024*1024),
BlobSizeLimitWarn: dc.GetIntPropertyFilteredByDomain(dynamicconfig.BlobSizeLimitError, 256*1024),
HistorySizeLimitError: dc.GetIntPropertyFilteredByDomain(dynamicconfig.HistorySizeLimitError, 200*1024*1024),
HistorySizeLimitWarn: dc.GetIntPropertyFilteredByDomain(dynamicconfig.HistorySizeLimitWarn, 50*1024*1024),
HistoryCountLimitError: dc.GetIntPropertyFilteredByDomain(dynamicconfig.HistoryCountLimitError, 200*1024),
HistoryCountLimitWarn: dc.GetIntPropertyFilteredByDomain(dynamicconfig.HistoryCountLimitWarn, 50*1024),
}
}

Expand Down
76 changes: 70 additions & 6 deletions service/history/workflowExecutionContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,12 +340,12 @@ func (c *workflowExecutionContextImpl) update(transferTasks []persistence.Task,
}
}

historySize := 0
newHistorySize := 0
// always standby history first
if hasNewStandbyHistoryEvents {
firstEvent := standbyHistoryBuilder.GetFirstEvent()
// Note: standby events has no transient decision events
historySize, err = c.appendHistoryEvents(standbyHistoryBuilder, standbyHistoryBuilder.history, transactionID)
newHistorySize, err = c.appendHistoryEvents(standbyHistoryBuilder, standbyHistoryBuilder.history, transactionID)
if err != nil {
return err
}
Expand All @@ -358,7 +358,7 @@ func (c *workflowExecutionContextImpl) update(transferTasks []persistence.Task,
firstEvent := activeHistoryBuilder.GetFirstEvent()
// Transient decision events need to be written as a separate batch
if activeHistoryBuilder.HasTransientEvents() {
historySize, err = c.appendHistoryEvents(activeHistoryBuilder, activeHistoryBuilder.transientHistory, transactionID)
newHistorySize, err = c.appendHistoryEvents(activeHistoryBuilder, activeHistoryBuilder.transientHistory, transactionID)
if err != nil {
return err
}
Expand All @@ -371,8 +371,72 @@ func (c *workflowExecutionContextImpl) update(transferTasks []persistence.Task,
}

executionInfo.SetLastFirstEventID(firstEvent.GetEventId())
historySize += size
}
newHistorySize += size

// enforce history size/count limit (only on active side)
config := c.shard.GetConfig()
sizeLimitWarn := config.HistorySizeLimitWarn(executionInfo.DomainID)
countLimitWarn := config.HistoryCountLimitWarn(executionInfo.DomainID)
historyCount := int(c.msBuilder.GetNextEventID()) - 1
historySize := int(c.msBuilder.GetHistorySize()) + newHistorySize
c.metricsClient.RecordTimer(metrics.PersistenceUpdateWorkflowExecutionScope, metrics.HistorySize, time.Duration(historySize))
c.metricsClient.RecordTimer(metrics.PersistenceUpdateWorkflowExecutionScope, metrics.HistoryCount, time.Duration(historyCount))
if historySize > sizeLimitWarn || historyCount > countLimitWarn {
// emit warning
c.logger.WithFields(bark.Fields{
logging.TagDomainID: executionInfo.DomainID,
logging.TagWorkflowExecutionID: executionInfo.WorkflowID,
logging.TagWorkflowRunID: executionInfo.RunID,
logging.TagHistorySize: historySize,
logging.TagEventCount: historyCount,
}).Warn("history size exceeds limit.")

sizeLimitError := config.HistorySizeLimitError(executionInfo.DomainID)
countLimitError := config.HistoryCountLimitError(executionInfo.DomainID)
if (historySize > sizeLimitError || historyCount > countLimitError) && c.msBuilder.IsWorkflowExecutionRunning() {
// hard terminate workflow if it is still running
c.clear() // discard pending changes
_, err1 := c.loadWorkflowExecution() // reload mutable state
if err1 != nil {
return err1
}

c.msBuilder.AddWorkflowExecutionTerminatedEvent(&workflow.TerminateWorkflowExecutionRequest{
Reason: common.StringPtr(common.TerminateReasonSizeExceedsLimit),
Identity: common.StringPtr("cadence-history-server"),
})

updates, err = c.msBuilder.CloseUpdateSession()
if err != nil {
return err
}

executionInfo = c.msBuilder.GetExecutionInfo()
activeHistoryBuilder = updates.newEventsBuilder
newStateBuilder = nil // since we terminate current workflow execution, so ignore new executions

if crossDCEnabled {
c.msBuilder.UpdateReplicationStateLastEventID(
c.clusterMetadata.GetCurrentClusterName(),
c.msBuilder.GetCurrentVersion(),
executionInfo.NextEventID-1,
)
}

firstEvent = activeHistoryBuilder.GetFirstEvent()
terminateTransactionID, err1 := c.shard.GetNextTransferTaskID()
if err1 != nil {
return err1
}
newHistorySize, err = c.appendHistoryEvents(activeHistoryBuilder, activeHistoryBuilder.history, terminateTransactionID)
if err != nil {
return err
}
executionInfo.SetLastFirstEventID(firstEvent.GetEventId())
} // end of hard terminate workflow
} // end of enforce history size/count limit

} // end of update history events for active builder

continueAsNew := updates.continueAsNew
finishExecution := false
Expand Down Expand Up @@ -412,7 +476,7 @@ func (c *workflowExecutionContextImpl) update(transferTasks []persistence.Task,
setTaskInfo(c.msBuilder.GetCurrentVersion(), now, transferTasks, timerTasks)

// Update history size on mutableState before calling UpdateWorkflowExecution
c.msBuilder.IncrementHistorySize(historySize)
c.msBuilder.IncrementHistorySize(newHistorySize)

var resp *persistence.UpdateWorkflowExecutionResponse
var err1 error
Expand Down

0 comments on commit b01495b

Please sign in to comment.