Skip to content

Commit

Permalink
Task processing should only report metrics when task is processed (ca…
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 authored Mar 15, 2019
1 parent 58b8530 commit 1f6000c
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 28 deletions.
32 changes: 18 additions & 14 deletions service/history/queueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ type (
var (
errUnexpectedQueueTask = errors.New("unexpected queue task")

loadQueueTaskThrottleRetryDelay = 5 * time.Second
loadDomainEntryForQueueTaskRetryDelay = 100 * time.Millisecond
loadQueueTaskThrottleRetryDelay = 5 * time.Second
)

func newQueueProcessorBase(clusterName string, shard ShardContext, options *QueueProcessorOptions, processor processor, queueAckMgr queueAckMgr, logger bark.Logger) *queueProcessorBase {
Expand Down Expand Up @@ -301,7 +302,7 @@ FilterLoop:
break FilterLoop
}
incAttempt()
time.Sleep(100 * time.Millisecond)
time.Sleep(loadDomainEntryForQueueTaskRetryDelay)
}
}

Expand All @@ -317,7 +318,6 @@ FilterLoop:
return true
}
}
defer func() { p.metricsClient.RecordTimer(scope, metrics.TaskLatency, time.Since(startTime)) }()

for {
select {
Expand All @@ -327,8 +327,7 @@ FilterLoop:
default:
err = backoff.Retry(op, p.retryPolicy, retryCondition)
if err == nil {
p.metricsClient.RecordTimer(scope, metrics.TaskAttemptTimer, time.Duration(attempt))
p.ackTaskOnce(task, scope)
p.ackTaskOnce(task, scope, shouldProcessTask, startTime, attempt)
return
}
incAttempt()
Expand All @@ -344,9 +343,10 @@ func (p *queueProcessorBase) processTaskOnce(notificationChan <-chan struct{}, t

startTime := time.Now()
scope, err := p.processor.process(task, shouldProcessTask)
p.metricsClient.IncCounter(scope, metrics.TaskRequests)
p.metricsClient.RecordTimer(scope, metrics.TaskProcessingLatency, time.Since(startTime))

if shouldProcessTask {
p.metricsClient.IncCounter(scope, metrics.TaskRequests)
p.metricsClient.RecordTimer(scope, metrics.TaskProcessingLatency, time.Since(startTime))
}
return scope, err
}

Expand Down Expand Up @@ -400,13 +400,17 @@ func (p *queueProcessorBase) handleTaskError(scope int, startTime time.Time,
return err
}

func (p *queueProcessorBase) ackTaskOnce(task queueTaskInfo, scope int) {
func (p *queueProcessorBase) ackTaskOnce(task queueTaskInfo, scope int, reportMetrics bool, startTime time.Time, attempt int) {
p.ackMgr.completeQueueTask(task.GetTaskID())
p.metricsClient.RecordTimer(
scope,
metrics.TaskQueueLatency,
time.Since(task.GetVisibilityTimestamp()),
)
if reportMetrics {
p.metricsClient.RecordTimer(scope, metrics.TaskAttemptTimer, time.Duration(attempt))
p.metricsClient.RecordTimer(scope, metrics.TaskLatency, time.Since(startTime))
p.metricsClient.RecordTimer(
scope,
metrics.TaskQueueLatency,
time.Since(task.GetVisibilityTimestamp()),
)
}
}

func (p *queueProcessorBase) initializeLoggerForTask(task queueTaskInfo) bark.Logger {
Expand Down
32 changes: 18 additions & 14 deletions service/history/timerQueueProcessorBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ import (
)

var (
errTimerTaskNotFound = errors.New("Timer task not found")
errFailedToAddTimeoutEvent = errors.New("Failed to add timeout event")
errFailedToAddTimerFiredEvent = errors.New("Failed to add timer fired event")
emptyTime = time.Time{}
maxTimestamp = time.Unix(0, math.MaxInt64)

loadTimerTaskThrottleRetryDelay = 5 * time.Second
loadDomainEntryForTimerTaskRetryDelay = 100 * time.Millisecond
loadTimerTaskThrottleRetryDelay = 5 * time.Second
)

type (
Expand Down Expand Up @@ -428,7 +428,7 @@ FilterLoop:
break FilterLoop
}
incAttempt()
time.Sleep(100 * time.Millisecond)
time.Sleep(loadDomainEntryForTimerTaskRetryDelay)
}
}

Expand All @@ -444,7 +444,6 @@ FilterLoop:
return true
}
}
defer func() { t.metricsClient.RecordTimer(scope, metrics.TaskLatency, time.Since(startTime)) }()

for {
select {
Expand All @@ -454,8 +453,7 @@ FilterLoop:
default:
err = backoff.Retry(op, t.retryPolicy, retryCondition)
if err == nil {
t.metricsClient.RecordTimer(scope, metrics.TaskAttemptTimer, time.Duration(attempt))
t.ackTaskOnce(task, scope)
t.ackTaskOnce(task, scope, shouldProcessTask, startTime, attempt)
return
}
incAttempt()
Expand All @@ -471,8 +469,10 @@ func (t *timerQueueProcessorBase) processTaskOnce(notificationChan <-chan struct

startTime := time.Now()
scope, err := t.timerProcessor.process(task, shouldProcessTask)
t.metricsClient.IncCounter(scope, metrics.TaskRequests)
t.metricsClient.RecordTimer(scope, metrics.TaskProcessingLatency, time.Since(startTime))
if shouldProcessTask {
t.metricsClient.IncCounter(scope, metrics.TaskRequests)
t.metricsClient.RecordTimer(scope, metrics.TaskProcessingLatency, time.Since(startTime))
}

return scope, err
}
Expand Down Expand Up @@ -527,13 +527,17 @@ func (t *timerQueueProcessorBase) handleTaskError(scope int, startTime time.Time
return err
}

func (t *timerQueueProcessorBase) ackTaskOnce(task *persistence.TimerTaskInfo, scope int) {
func (t *timerQueueProcessorBase) ackTaskOnce(task *persistence.TimerTaskInfo, scope int, reportMetrics bool, startTime time.Time, attempt int) {
t.timerQueueAckMgr.completeTimerTask(task)
t.metricsClient.RecordTimer(
scope,
metrics.TaskQueueLatency,
time.Since(task.GetVisibilityTimestamp()),
)
if reportMetrics {
t.metricsClient.RecordTimer(scope, metrics.TaskAttemptTimer, time.Duration(attempt))
t.metricsClient.RecordTimer(scope, metrics.TaskLatency, time.Since(startTime))
t.metricsClient.RecordTimer(
scope,
metrics.TaskQueueLatency,
time.Since(task.GetVisibilityTimestamp()),
)
}
atomic.AddUint64(&t.timerFiredCount, 1)
}

Expand Down

0 comments on commit 1f6000c

Please sign in to comment.