Skip to content

Commit

Permalink
Set context timeout for various history service api calls (cadence-wo…
Browse files Browse the repository at this point in the history
…rkflow#1239)

Include timeout on various history service API calls from matching
engine.

Emit size metric from replicator before sending message to kafka.

Standardized logging and metric from history service handler.
  • Loading branch information
samarabbas authored Nov 16, 2018
1 parent eecdbba commit ba14055
Show file tree
Hide file tree
Showing 8 changed files with 300 additions and 271 deletions.
5 changes: 5 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,8 @@ type (

// MaxTaskTimeout is maximum task timeout allowed. 366 days in seconds
const MaxTaskTimeout = 31622400

const (
// GetHistoryWarnSizeLimit is the threshold for emitting warn log
GetHistoryWarnSizeLimit = 500 * 1024 // Warn when size goes over 500KB
)
3 changes: 3 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,8 @@ const (
HistorySyncShardStatusScope
// HistorySyncActivityScope tracks HistoryActivity API calls received by service
HistorySyncActivityScope
// HistoryDescribeMutableStateScope tracks HistoryActivity API calls received by service
HistoryDescribeMutableStateScope
// HistoryShardControllerScope is the scope used by shard controller
HistoryShardControllerScope
// TransferQueueProcessorScope is the scope used by all metric emitted by transfer queue processor
Expand Down Expand Up @@ -763,6 +765,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
HistoryReplicateEventsScope: {operation: "ReplicateEvents"},
HistorySyncShardStatusScope: {operation: "SyncShardStatus"},
HistorySyncActivityScope: {operation: "SyncActivity"},
HistoryDescribeMutableStateScope: {operation: "DescribeMutableState"},
HistoryShardControllerScope: {operation: "ShardController"},
TransferQueueProcessorScope: {operation: "TransferQueueProcessor"},
TransferActiveQueueProcessorScope: {operation: "TransferActiveQueueProcessor"},
Expand Down
6 changes: 1 addition & 5 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,6 @@ var (
frontendServiceRetryPolicy = common.CreateFrontendServiceRetryPolicy()
)

const (
getHistoryWarnSizeLimit = 500 * 1024 // Warn when size goes over 500KB
)

// NewWorkflowHandler creates a thrift handler for the cadence service
func NewWorkflowHandler(sVice service.Service, config *Config, metadataMgr persistence.MetadataManager,
historyMgr persistence.HistoryManager, historyV2Mgr persistence.HistoryV2Manager, visibilityMgr persistence.VisibilityManager,
Expand Down Expand Up @@ -2309,7 +2305,7 @@ func (wh *WorkflowHandler) getHistory(scope int, domainID string, execution gen.
if len(historyEvents) > 0 {
wh.metricsClient.RecordTimer(scope, metrics.HistorySize, time.Duration(size))

if size > getHistoryWarnSizeLimit {
if size > common.GetHistoryWarnSizeLimit {
wh.GetLogger().WithFields(bark.Fields{
logging.TagWorkflowExecutionID: execution.GetWorkflowId(),
logging.TagWorkflowRunID: execution.GetRunId(),
Expand Down
Loading

0 comments on commit ba14055

Please sign in to comment.