Skip to content

Commit

Permalink
Fix leak task for timer and transfer queue processor (cadence-workflo…
Browse files Browse the repository at this point in the history
  • Loading branch information
vancexu authored and wxing1292 committed Aug 24, 2018
1 parent 3270af1 commit 44f5d46
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 3 deletions.
3 changes: 3 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,8 @@ const (
ActiveTimerTaskQueueLatency
StandbyTransferTaskQueueLatency
StandbyTimerTaskQueueLatency
CompleteTaskFailedCounter

NumHistoryMetrics
)

Expand Down Expand Up @@ -902,6 +904,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
ActiveTimerTaskQueueLatency: {metricName: "active.timertask.queue.latency", metricType: Timer},
StandbyTransferTaskQueueLatency: {metricName: "standby.transfertask.queue.latency", metricType: Timer},
StandbyTimerTaskQueueLatency: {metricName: "standby.timertask.queue.latency", metricType: Timer},
CompleteTaskFailedCounter: {metricName: "complete-task-fail-count", metricType: Counter},
},
Matching: {
PollSuccessCounter: {metricName: "poll.success"},
Expand Down
5 changes: 4 additions & 1 deletion service/history/timerQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ func (t *timerQueueProcessorImpl) completeTimersLoop() {

func (t *timerQueueProcessorImpl) completeTimers() error {
lowerAckLevel := t.ackLevel

upperAckLevel := t.activeTimerProcessor.timerQueueAckMgr.getAckLevel()

if t.isGlobalDomainEnabled {
for _, standbyTimerProcessor := range t.standbyTimerProcessors {
ackLevel := standbyTimerProcessor.timerQueueAckMgr.getAckLevel()
Expand Down Expand Up @@ -253,8 +253,11 @@ LoadCompleteLoop:
VisibilityTimestamp: timer.VisibilityTimestamp,
TaskID: timer.TaskID})
if err != nil {
t.metricsClient.IncCounter(metrics.TimerQueueProcessorScope, metrics.CompleteTaskFailedCounter)
t.logger.Warnf("Timer queue ack manager unable to complete timer task: %v; %v", timer, err)
return err
}
t.ackLevel = timerSequenceID
}

if len(response.NextPageToken) == 0 {
Expand Down
6 changes: 4 additions & 2 deletions service/history/transferQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,12 @@ LoadCompleteLoop:
if upperAckLevel < task.GetTaskID() {
break LoadCompleteLoop
}
lowerAckLevel = task.GetTaskID()
if err := executionMgr.CompleteTransferTask(&persistence.CompleteTransferTaskRequest{TaskID: task.GetTaskID()}); err != nil {
t.logger.Warnf("Timer queue ack manager unable to complete timer task: %v; %v", task, err)
t.metricsClient.IncCounter(metrics.TransferQueueProcessorScope, metrics.CompleteTaskFailedCounter)
t.logger.Warnf("Transfer queue ack manager unable to complete transfer task: %v; %v", task, err)
return err
}
t.ackLevel = task.GetTaskID()
}

if len(response.NextPageToken) == 0 {
Expand Down

0 comments on commit 44f5d46

Please sign in to comment.