Skip to content

Commit

Permalink
Emit metric on history sizes by frontend (cadence-workflow#1211)
Browse files Browse the repository at this point in the history
Emit history size metric from frontend API which loads history
from persistence. Also log a warning with domainID, WorkflowID,
and RunID when history returned is larger than 500KB.
  • Loading branch information
samarabbas authored Oct 30, 2018
1 parent f6d4148 commit ec36698
Showing 1 changed file with 24 additions and 6 deletions.
30 changes: 24 additions & 6 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ var (
frontendServiceRetryPolicy = common.CreateFrontendServiceRetryPolicy()
)

const (
getHistoryWarnSizeLimit = 500 * 1024 // Warn when size goes over 500KB
)

// NewWorkflowHandler creates a thrift handler for the cadence service
func NewWorkflowHandler(sVice service.Service, config *Config, metadataMgr persistence.MetadataManager,
historyMgr persistence.HistoryManager, visibilityMgr persistence.VisibilityManager,
Expand Down Expand Up @@ -788,7 +792,7 @@ func (wh *WorkflowHandler) PollForDecisionTask(
return nil, nil
}

resp, err := wh.createPollForDecisionTaskResponse(ctx, domainID, matchingResp)
resp, err := wh.createPollForDecisionTaskResponse(ctx, scope, domainID, matchingResp)
if err != nil {
return nil, wh.error(err, scope)
}
Expand Down Expand Up @@ -1270,7 +1274,7 @@ func (wh *WorkflowHandler) RespondDecisionTaskCompleted(
}
matchingResp := common.CreateMatchingPollForDecisionTaskResponse(histResp.StartedResponse, workflowExecution, token)

newDecisionTask, err := wh.createPollForDecisionTaskResponse(ctx, taskToken.DomainID, matchingResp)
newDecisionTask, err := wh.createPollForDecisionTaskResponse(ctx, scope, taskToken.DomainID, matchingResp)
if err != nil {
return nil, wh.error(err, scope)
}
Expand Down Expand Up @@ -1564,7 +1568,7 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
history.Events = []*gen.HistoryEvent{}
if isCloseEventOnly {
if !isWorkflowRunning {
history, _, err = wh.getHistory(domainID, *execution, lastFirstEventID, nextEventID,
history, _, err = wh.getHistory(scope, domainID, *execution, lastFirstEventID, nextEventID,
getRequest.GetMaximumPageSize(), nil, token.TransientDecision)
if err != nil {
return nil, wh.error(err, scope)
Expand All @@ -1588,7 +1592,7 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
}
} else {
history, token.PersistenceToken, err =
wh.getHistory(domainID, *execution, token.FirstEventID, token.NextEventID,
wh.getHistory(scope, domainID, *execution, token.FirstEventID, token.NextEventID,
getRequest.GetMaximumPageSize(), token.PersistenceToken, token.TransientDecision)
if err != nil {
return nil, wh.error(err, scope)
Expand Down Expand Up @@ -2237,7 +2241,7 @@ func (wh *WorkflowHandler) DescribeTaskList(ctx context.Context, request *gen.De
return response, nil
}

func (wh *WorkflowHandler) getHistory(domainID string, execution gen.WorkflowExecution,
func (wh *WorkflowHandler) getHistory(scope int, domainID string, execution gen.WorkflowExecution,
firstEventID, nextEventID int64, pageSize int32, nextPageToken []byte,
transientDecision *gen.TransientDecisionInfo) (*gen.History, []byte, error) {

Expand All @@ -2256,6 +2260,19 @@ func (wh *WorkflowHandler) getHistory(domainID string, execution gen.WorkflowExe
return nil, nil, err
}

if response != nil {
wh.metricsClient.RecordTimer(scope, metrics.HistorySize, time.Duration(response.Size))

if response.Size > getHistoryWarnSizeLimit {
wh.GetLogger().WithFields(bark.Fields{
logging.TagWorkflowExecutionID: execution.GetWorkflowId(),
logging.TagWorkflowRunID: execution.GetRunId(),
logging.TagDomainID: domainID,
logging.TagSize: response.Size,
}).Warn("GetHistory size threshold breached")
}
}

historyEvents = append(historyEvents, response.History.Events...)
nextPageToken = response.NextPageToken
if len(nextPageToken) == 0 && transientDecision != nil {
Expand Down Expand Up @@ -2419,7 +2436,7 @@ func createDomainResponse(info *persistence.DomainInfo, config *persistence.Doma
return infoResult, configResult, replicationConfigResult
}

func (wh *WorkflowHandler) createPollForDecisionTaskResponse(ctx context.Context, domainID string,
func (wh *WorkflowHandler) createPollForDecisionTaskResponse(ctx context.Context, scope int, domainID string,
matchingResp *m.PollForDecisionTaskResponse) (*gen.PollForDecisionTaskResponse, error) {

if matchingResp.WorkflowExecution == nil {
Expand Down Expand Up @@ -2457,6 +2474,7 @@ func (wh *WorkflowHandler) createPollForDecisionTaskResponse(ctx context.Context
return nil, dErr
}
history, persistenceToken, err = wh.getHistory(
scope,
domainID,
*matchingResp.WorkflowExecution,
firstEventID,
Expand Down

0 comments on commit ec36698

Please sign in to comment.