Skip to content

Commit

Permalink
Try detecting timer and activity resurrection (cadence-workflow#4375)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Aug 24, 2021
1 parent 7110f05 commit 170deed
Show file tree
Hide file tree
Showing 10 changed files with 532 additions and 14 deletions.
8 changes: 8 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,13 @@ const (
// Default value: 3*time.Minute
// Allowed filters: DomainID
StandbyTaskReReplicationContextTimeout
// ResurrectionCheckMinDelay is the minimal timer processing delay before scanning history to see
// if there's a resurrected timer/activity
// KeyName: history.resurrectionCheckMinDelay
// Value type: Duration
// Default value: 24*time.Hour
// Allowed filters: N/A
ResurrectionCheckMinDelay
// QueueProcessorEnableSplit is indicates whether processing queue split policy should be enabled
// KeyName: history.queueProcessorEnableSplit
// Value type: Bool
Expand Down Expand Up @@ -2091,6 +2098,7 @@ var Keys = map[Key]string{
StandbyTaskRedispatchInterval: "history.standbyTaskRedispatchInterval",
TaskRedispatchIntervalJitterCoefficient: "history.taskRedispatchIntervalJitterCoefficient",
StandbyTaskReReplicationContextTimeout: "history.standbyTaskReReplicationContextTimeout",
ResurrectionCheckMinDelay: "history.resurrectionCheckMinDelay",
QueueProcessorEnableSplit: "history.queueProcessorEnableSplit",
QueueProcessorSplitMaxLevel: "history.queueProcessorSplitMaxLevel",
QueueProcessorEnableRandomSplitByDomainID: "history.queueProcessorEnableRandomSplitByDomainID",
Expand Down
4 changes: 4 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1901,6 +1901,8 @@ const (
DecisionAttemptTimer
StaleMutableStateCounter
DataInconsistentCounter
TimerResurrectionCounter
ActivityResurrectionCounter
AutoResetPointsLimitExceededCounter
AutoResetPointCorruptionCounter
ConcurrencyUpdateFailureCounter
Expand Down Expand Up @@ -2405,6 +2407,8 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
DecisionAttemptTimer: {metricName: "decision_attempt", metricType: Timer},
StaleMutableStateCounter: {metricName: "stale_mutable_state", metricType: Counter},
DataInconsistentCounter: {metricName: "data_inconsistent", metricType: Counter},
TimerResurrectionCounter: {metricName: "timer_resurrection", metricType: Counter},
ActivityResurrectionCounter: {metricName: "activity_resurrection", metricType: Counter},
AutoResetPointsLimitExceededCounter: {metricName: "auto_reset_points_exceed_limit", metricType: Counter},
AutoResetPointCorruptionCounter: {metricName: "auto_reset_point_corruption", metricType: Counter},
ConcurrencyUpdateFailureCounter: {metricName: "concurrency_update_failure", metricType: Counter},
Expand Down
2 changes: 2 additions & 0 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ type Config struct {
TaskRedispatchIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
StandbyTaskReReplicationContextTimeout dynamicconfig.DurationPropertyFnWithDomainIDFilter
EnableDropStuckTaskByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter
ResurrectionCheckMinDelay dynamicconfig.DurationPropertyFn

// QueueProcessor settings
QueueProcessorEnableSplit dynamicconfig.BoolPropertyFn
Expand Down Expand Up @@ -378,6 +379,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA
TaskRedispatchIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TaskRedispatchIntervalJitterCoefficient, 0.15),
StandbyTaskReReplicationContextTimeout: dc.GetDurationPropertyFilteredByDomainID(dynamicconfig.StandbyTaskReReplicationContextTimeout, 3*time.Minute),
EnableDropStuckTaskByDomainID: dc.GetBoolPropertyFilteredByDomainID(dynamicconfig.EnableDropStuckTaskByDomainID, false),
ResurrectionCheckMinDelay: dc.GetDurationProperty(dynamicconfig.ResurrectionCheckMinDelay, 24*time.Hour),

QueueProcessorEnableSplit: dc.GetBoolProperty(dynamicconfig.QueueProcessorEnableSplit, false),
QueueProcessorSplitMaxLevel: dc.GetIntProperty(dynamicconfig.QueueProcessorSplitMaxLevel, 2), // 3 levels, start from 0
Expand Down
2 changes: 2 additions & 0 deletions service/history/execution/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ type (
CreateNewHistoryEventWithTimestamp(eventType types.EventType, timestamp int64) *types.HistoryEvent
CreateTransientDecisionEvents(di *DecisionInfo, identity string) (*types.HistoryEvent, *types.HistoryEvent)
DeleteDecision()
DeleteUserTimer(timerID string) error
DeleteActivity(scheduleEventID int64) error
DeleteSignalRequested(requestID string)
FailDecision(bool)
FlushBufferedEvents() error
Expand Down
28 changes: 28 additions & 0 deletions service/history/execution/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 11 additions & 3 deletions service/history/execution/timer_sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type (

// TimerSequence manages user / activity timer
TimerSequence interface {
IsExpired(referenceTime time.Time, TimerSequenceID TimerSequenceID) bool
IsExpired(referenceTime time.Time, TimerSequenceID TimerSequenceID) (time.Duration, bool)

CreateNextUserTimer() (bool, error)
CreateNextActivityTimer() (bool, error)
Expand Down Expand Up @@ -113,11 +113,19 @@ func NewTimerSequence(
func (t *timerSequenceImpl) IsExpired(
referenceTime time.Time,
TimerSequenceID TimerSequenceID,
) bool {
) (time.Duration, bool) {

// Cassandra timestamp resolution is in millisecond
// here we do the check in terms of second resolution.
return TimerSequenceID.Timestamp.Unix() <= referenceTime.Unix()

timerFireTimeInSecond := TimerSequenceID.Timestamp.Unix()
referenceTimeInSecond := referenceTime.Unix()

if timerFireTimeInSecond <= referenceTimeInSecond {
return time.Duration(referenceTimeInSecond-timerFireTimeInSecond) * time.Second, true
}

return 0, false
}

func (t *timerSequenceImpl) CreateNextUserTimer() (bool, error) {
Expand Down
7 changes: 4 additions & 3 deletions service/history/execution/timer_sequence_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 170deed

Please sign in to comment.