Skip to content

Commit

Permalink
Add logs for heartbeat timeout debugging (cadence-workflow#1306)
Browse files Browse the repository at this point in the history
* Add logs for heartbeat timeout debugging

* more logs
  • Loading branch information
samarabbas authored and longquanzheng committed Dec 5, 2018
1 parent 3d37cb9 commit 7238eec
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 7 deletions.
4 changes: 4 additions & 0 deletions common/logging/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ const (
TagVersion = "version"
TagCurrentVersion = "current-version"
TagIncomingVersion = "incoming-version"
TagScheduleID = "schedule-id"
TagFirstEventID = "first-event-id"
TagNextEventID = "next-event-id"
TagResetNextEventID = "reset-next-event-id"
Expand All @@ -73,6 +74,9 @@ const (
TagAttemptEnd = "attempt-end"
TagSize = "size"
TagSignalCount = "signal-count"
TagTimerTaskStatus = "timer-task-status"
TagScheduleAttempt = "schedule-attempt"
TagCursorTimestamp = "cursor-timestamp"

// workflow logging tag values
// TagWorkflowComponent Values
Expand Down
8 changes: 5 additions & 3 deletions service/history/shardContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,15 +845,17 @@ func (s *shardContextImpl) allocateTimerIDsLocked(timerTasks []persistence.Task,
if task.GetVersion() != common.EmptyVersion {
cluster = clusterMetadata.ClusterNameForFailoverVersion(task.GetVersion())
}
if ts.Before(s.timerMaxReadLevelMap[cluster]) {
readCursorTS := s.timerMaxReadLevelMap[cluster]
if ts.Before(readCursorTS) {
// This can happen if shard move and new host have a time SKU, or there is db write delay.
// We generate a new timer ID using timerMaxReadLevel.
s.logger.WithFields(bark.Fields{
logging.TagWorkflowEventID: logging.ShardAllocateTimerBeforeRead,
logging.TagDomainID: domainID,
logging.TagWorkflowExecutionID: workflowID,
}).Warnf("%v: New timer generated is less than read level. timestamp: %v, timerMaxReadLevel: %v",
time.Now(), ts, s.timerMaxReadLevelMap[cluster])
logging.TagTimestamp: ts,
logging.TagCursorTimestamp: readCursorTS,
}).Warn("New timer generated is less than read level")
task.SetVisibilityTimestamp(s.timerMaxReadLevelMap[cluster].Add(time.Millisecond))
}

Expand Down
47 changes: 43 additions & 4 deletions service/history/timerQueueActiveProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,16 @@ Update_History_Loop:

if td.Attempt < ai.Attempt {
// retry could update ai.Attempt, and we should ignore further timeouts for previous attempt
t.logger.WithFields(bark.Fields{
logging.TagDomainID: msBuilder.GetExecutionInfo().DomainID,
logging.TagWorkflowExecutionID: msBuilder.GetExecutionInfo().WorkflowID,
logging.TagWorkflowRunID: msBuilder.GetExecutionInfo().RunID,
logging.TagScheduleID: ai.ScheduleID,
logging.TagAttempt: ai.Attempt,
logging.TagVersion: ai.Version,
logging.TagTimerTaskStatus: ai.TimerTaskStatus,
logging.TagTimeoutType: timeoutType,
}).Info("Retry attempt mismatch, skip activity timeout processing")
continue
}

Expand All @@ -368,8 +378,16 @@ Update_History_Loop:
timerTasks = append(timerTasks, retryTask)
updateState = true

t.logger.Debugf("Ignore ActivityTimeout (%v) as retry is needed. New attempt: %v, retry backoff duration: %v.",
timeoutType, ai.Attempt, retryTask.(*persistence.ActivityRetryTimerTask).VisibilityTimestamp.Sub(time.Now()))
t.logger.WithFields(bark.Fields{
logging.TagDomainID: msBuilder.GetExecutionInfo().DomainID,
logging.TagWorkflowExecutionID: msBuilder.GetExecutionInfo().WorkflowID,
logging.TagWorkflowRunID: msBuilder.GetExecutionInfo().RunID,
logging.TagScheduleID: ai.ScheduleID,
logging.TagAttempt: ai.Attempt,
logging.TagVersion: ai.Version,
logging.TagTimerTaskStatus: ai.TimerTaskStatus,
logging.TagTimeoutType: timeoutType,
}).Info("Ignore activity timeout due to retry")

continue
}
Expand Down Expand Up @@ -430,8 +448,19 @@ Update_History_Loop:
msBuilder.UpdateActivity(ai)
updateState = true

t.logger.Debugf("%s: Adding Activity Timeout: with timeout: %v sec, ExpiryTime: %s, TimeoutType: %v, EventID: %v",
time.Now(), td.TimeoutSec, at.VisibilityTimestamp, td.TimeoutType.String(), at.EventID)
// Emit log if timer is created within a second
if t.now().Add(time.Second).After(td.TimerSequenceID.VisibilityTimestamp) {
t.logger.WithFields(bark.Fields{
logging.TagDomainID: msBuilder.GetExecutionInfo().DomainID,
logging.TagWorkflowExecutionID: msBuilder.GetExecutionInfo().WorkflowID,
logging.TagWorkflowRunID: msBuilder.GetExecutionInfo().RunID,
logging.TagScheduleID: ai.ScheduleID,
logging.TagAttempt: ai.Attempt,
logging.TagVersion: ai.Version,
logging.TagTimerTaskStatus: ai.TimerTaskStatus,
logging.TagTimeoutType: td.TimeoutType,
}).Info("Next timer is created to fire within one second")
}
}

// Done!
Expand Down Expand Up @@ -587,6 +616,16 @@ func (t *timerQueueActiveProcessorImpl) processActivityRetryTimer(task *persiste
scheduledID := task.EventID
ai, running := msBuilder.GetActivityInfo(scheduledID)
if !running || task.ScheduleAttempt < int64(ai.Attempt) {
t.logger.WithFields(bark.Fields{
logging.TagDomainID: msBuilder.GetExecutionInfo().DomainID,
logging.TagWorkflowExecutionID: msBuilder.GetExecutionInfo().WorkflowID,
logging.TagWorkflowRunID: msBuilder.GetExecutionInfo().RunID,
logging.TagScheduleID: ai.ScheduleID,
logging.TagAttempt: ai.Attempt,
logging.TagScheduleAttempt: task.ScheduleAttempt,
logging.TagVersion: ai.Version,
logging.TagTimerTaskStatus: ai.TimerTaskStatus,
}).Info("Duplicate activity retry timer task")
return nil
}
ok, err := verifyTaskVersion(t.shard, t.logger, task.DomainID, ai.Version, task.Version, task)
Expand Down

0 comments on commit 7238eec

Please sign in to comment.