diff --git a/service/history/timerBuilder.go b/service/history/timerBuilder.go index ded78762877..9432310aedc 100644 --- a/service/history/timerBuilder.go +++ b/service/history/timerBuilder.go @@ -46,6 +46,14 @@ const ( TimerTaskStatusCreated ) +// Activity Timer task status +const ( + TimerTaskStatusCreatedStartToClose = 1 << iota + TimerTaskStatusCreatedScheduleToStart + TimerTaskStatusCreatedScheduleToClose + TimerTaskStatusCreatedHeartbeat +) + type ( timerDetails struct { SequenceID SequenceID @@ -242,9 +250,10 @@ func (tb *timerBuilder) GetActivityTimerTaskIfNeeded(msBuilder *mutableStateBuil // Update the task ID tracking if it has created timer task or not. td := tb.activityTimers[0] ai := tb.pendingActivityTimers[td.ActivityID] - ai.TimerTaskStatus = TimerTaskStatusCreated - msBuilder.UpdateActivity(ai) at := timerTask.(*persistence.ActivityTimeoutTask) + ai.TimerTaskStatus = ai.TimerTaskStatus & getActivityTimerStatus(w.TimeoutType(at.TimeoutType)) + msBuilder.UpdateActivity(ai) + tb.logger.Debugf("%s: Adding Activity Timeout: with timeout: %v sec, ExpiryTime: %s, TimeoutType: %v, EventID: %v", time.Now(), td.TimeoutSec, at.VisibilityTimestamp, td.TimeoutType.String(), at.EventID) } @@ -280,17 +289,21 @@ func (tb *timerBuilder) loadActivityTimers(msBuilder *mutableStateBuilder) { EventID: v.StartedID, TimeoutType: w.TimeoutType_START_TO_CLOSE, TimeoutSec: v.StartToCloseTimeout, - TaskCreated: v.TimerTaskStatus == TimerTaskStatusCreated} + TaskCreated: (v.TimerTaskStatus & TimerTaskStatusCreatedStartToClose) != 0} tb.activityTimers = append(tb.activityTimers, td) if v.HeartbeatTimeout > 0 { - heartBeatExpiry := v.LastHeartBeatUpdatedTime.Add(time.Duration(v.HeartbeatTimeout) * time.Second) + lastHeartBeatTS := v.LastHeartBeatUpdatedTime + if lastHeartBeatTS.IsZero() { + lastHeartBeatTS = v.StartedTime + } + heartBeatExpiry := lastHeartBeatTS.Add(time.Duration(v.HeartbeatTimeout) * time.Second) td := &timerDetails{ SequenceID: SequenceID{VisibilityTimestamp: heartBeatExpiry}, ActivityID: v.ScheduleID, EventID: v.StartedID, TimeoutType: w.TimeoutType_HEARTBEAT, TimeoutSec: v.HeartbeatTimeout, - TaskCreated: v.TimerTaskStatus == TimerTaskStatusCreated} + TaskCreated: (v.TimerTaskStatus & TimerTaskStatusCreatedHeartbeat) != 0} tb.activityTimers = append(tb.activityTimers, td) } } else { @@ -301,7 +314,7 @@ func (tb *timerBuilder) loadActivityTimers(msBuilder *mutableStateBuilder) { EventID: v.ScheduleID, TimeoutSec: v.ScheduleToStartTimeout, TimeoutType: w.TimeoutType_SCHEDULE_TO_START, - TaskCreated: v.TimerTaskStatus == TimerTaskStatusCreated} + TaskCreated: (v.TimerTaskStatus & TimerTaskStatusCreatedScheduleToStart) != 0} tb.activityTimers = append(tb.activityTimers, td) scheduleToCloseExpiry := v.ScheduledTime.Add(time.Duration(v.ScheduleToCloseTimeout) * time.Second) td = &timerDetails{ @@ -310,7 +323,7 @@ func (tb *timerBuilder) loadActivityTimers(msBuilder *mutableStateBuilder) { EventID: v.ScheduleID, TimeoutSec: v.ScheduleToCloseTimeout, TimeoutType: w.TimeoutType_SCHEDULE_TO_CLOSE, - TaskCreated: v.TimerTaskStatus == TimerTaskStatusCreated} + TaskCreated: (v.TimerTaskStatus & TimerTaskStatusCreatedScheduleToClose) != 0} tb.activityTimers = append(tb.activityTimers, td) } } @@ -415,3 +428,17 @@ func compareTimerIDLess(first *SequenceID, second *SequenceID) bool { } return false } + +func getActivityTimerStatus(timeoutType w.TimeoutType) int32 { + switch timeoutType { + case w.TimeoutType_HEARTBEAT: + return TimerTaskStatusCreatedHeartbeat + case w.TimeoutType_SCHEDULE_TO_START: + return TimerTaskStatusCreatedScheduleToStart + case w.TimeoutType_SCHEDULE_TO_CLOSE: + return TimerTaskStatusCreatedScheduleToClose + case w.TimeoutType_START_TO_CLOSE: + return TimerTaskStatusCreatedStartToClose + } + panic("invalid timeout type") +} diff --git a/service/history/timerBuilder_test.go b/service/history/timerBuilder_test.go index 586da7a5612..5b814d750d0 100644 --- a/service/history/timerBuilder_test.go +++ b/service/history/timerBuilder_test.go @@ -32,6 +32,7 @@ import ( "encoding/json" + "github.com/pborman/uuid" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/suite" "github.com/uber-common/bark" @@ -173,6 +174,30 @@ func (s *timerBuilderProcessorSuite) TestTimerBuilderDuplicateTimerID() { s.Nil(ti) } +func (s *timerBuilderProcessorSuite) TestTimerBuilder_GetActivityTimer() { + // ScheduleToStart being more than HB. + builder := newMutableStateBuilder(s.logger) + ase, ai := builder.AddActivityTaskScheduledEvent(emptyEventID, + &workflow.ScheduleActivityTaskDecisionAttributes{ + ScheduleToStartTimeoutSeconds: common.Int32Ptr(2), + StartToCloseTimeoutSeconds: common.Int32Ptr(2), + HeartbeatTimeoutSeconds: common.Int32Ptr(1), + }) + // create a schedule to start timeout + tb := newTimerBuilder(s.logger, &mockTimeSource{currTime: time.Now()}) + tt := tb.GetActivityTimerTaskIfNeeded(builder) + s.NotNil(tt) + s.Equal(workflow.TimeoutType_SCHEDULE_TO_START, workflow.TimeoutType(tt.(*persistence.ActivityTimeoutTask).TimeoutType)) + + builder.AddActivityTaskStartedEvent(ai, ase.GetEventId(), uuid.New(), &workflow.PollForActivityTaskRequest{}) + + // create a heart beat timeout + tb = newTimerBuilder(s.logger, &mockTimeSource{currTime: time.Now()}) + tt = tb.GetActivityTimerTaskIfNeeded(builder) + s.NotNil(tt) + s.Equal(workflow.TimeoutType_HEARTBEAT, workflow.TimeoutType(tt.(*persistence.ActivityTimeoutTask).TimeoutType)) +} + func (s *timerBuilderProcessorSuite) TestDecodeHistory() { historyString := "5b7b226576656e744964223a312c2274696d657374616d70223a313438383332353631383735333431373433312c226576656e7454797065223a22576f726b666c6f77457865637574696f6e53746172746564222c22776f726b666c6f77457865637574696f6e537461727465644576656e7441747472696275746573223a7b22776f726b666c6f7754797065223a7b226e616d65223a22696e7465726174696f6e2d73657175656e7469616c2d757365722d74696d6572732d746573742d74797065227d2c227461736b4c697374223a7b226e616d65223a22696e7465726174696f6e2d73657175656e7469616c2d757365722d74696d6572732d746573742d7461736b6c697374227d2c22657865637574696f6e5374617274546f436c6f736554696d656f75745365636f6e6473223a3130302c227461736b5374617274546f436c6f736554696d656f75745365636f6e6473223a312c226964656e74697479223a22776f726b657231227d7d2c7b226576656e744964223a322c2274696d657374616d70223a313438383332353631383735333435333137312c226576656e7454797065223a224465636973696f6e5461736b5363686564756c6564222c226465636973696f6e5461736b5363686564756c65644576656e7441747472696275746573223a7b227461736b4c697374223a7b226e616d65223a22696e7465726174696f6e2d73657175656e7469616c2d757365722d74696d6572732d746573742d7461736b6c697374227d2c227374617274546f436c6f736554696d656f75745365636f6e6473223a317d7d2c7b226576656e744964223a332c2274696d657374616d70223a313438383332353632333938383637373536302c226576656e7454797065223a224465636973696f6e5461736b53746172746564222c226465636973696f6e5461736b537461727465644576656e7441747472696275746573223a7b227363686564756c65644576656e744964223a322c226964656e74697479223a22776f726b657231222c22726571756573744964223a2235383364326164652d663363332d343862322d383366352d323936636238393931646433227d7d2c7b226576656e744964223a342c2274696d657374616d70223a313438383332353632333939373138303336362c226576656e7454797065223a224465636973696f6e5461736b436f6d706c65746564222c226465636973696f6e5461736b436f6d706c657465644576656e7441747472696275746573223a7b22657865637574696f6e436f6e74657874223a224d513d3d222c227363686564756c65644576656e744964223a322c22737461727465644576656e744964223a332c226964656e74697479223a22776f726b657231227d7d2c7b226576656e744964223a352c2274696d657374616d70223a313438383332353632333939373138343436332c226576656e7454797065223a2254696d657253746172746564222c2274696d6572537461727465644576656e7441747472696275746573223a7b2274696d65724964223a2274696d65722d69642d31222c227374617274546f4669726554696d656f75745365636f6e6473223a312c226465636973696f6e5461736b436f6d706c657465644576656e744964223a347d7d2c7b226576656e744964223a362c2274696d657374616d70223a313438383332353632343939363835383639382c226576656e7454797065223a2254696d65724669726564222c2274696d657246697265644576656e7441747472696275746573223a7b2274696d65724964223a2274696d65722d69642d31222c22737461727465644576656e744964223a357d7d2c7b226576656e744964223a372c2274696d657374616d70223a313438383332353632343939363837333438302c226576656e7454797065223a224465636973696f6e5461736b5363686564756c6564222c226465636973696f6e5461736b5363686564756c65644576656e7441747472696275746573223a7b227461736b4c697374223a7b226e616d65223a22696e7465726174696f6e2d73657175656e7469616c2d757365722d74696d6572732d746573742d7461736b6c697374227d2c227374617274546f436c6f736554696d656f75745365636f6e6473223a317d7d2c7b226576656e744964223a382c2274696d657374616d70223a313438383332353632353238313139373232312c226576656e7454797065223a224465636973696f6e5461736b53746172746564222c226465636973696f6e5461736b537461727465644576656e7441747472696275746573223a7b227363686564756c65644576656e744964223a372c226964656e74697479223a22776f726b657231222c22726571756573744964223a2233646361663661642d663639382d343436342d386363612d333366663431353838393363227d7d2c7b226576656e744964223a392c2274696d657374616d70223a313438383332353632353238343137353337372c226576656e7454797065223a224465636973696f6e5461736b436f6d706c65746564222c226465636973696f6e5461736b436f6d706c657465644576656e7441747472696275746573223a7b22657865637574696f6e436f6e74657874223a224d673d3d222c227363686564756c65644576656e744964223a372c22737461727465644576656e744964223a382c226964656e74697479223a22776f726b657231227d7d2c7b226576656e744964223a31302c2274696d657374616d70223a313438383332353632353238343137373732342c226576656e7454797065223a2254696d657253746172746564222c2274696d6572537461727465644576656e7441747472696275746573223a7b2274696d65724964223a2274696d65722d69642d32222c227374617274546f4669726554696d656f75745365636f6e6473223a312c226465636973696f6e5461736b436f6d706c657465644576656e744964223a397d7d5d" data, err := hex.DecodeString(historyString) diff --git a/service/history/timerQueueProcessor.go b/service/history/timerQueueProcessor.go index bf942249692..38623d63fb3 100644 --- a/service/history/timerQueueProcessor.go +++ b/service/history/timerQueueProcessor.go @@ -654,13 +654,20 @@ Update_History_Loop: } } else { // See if we have next timer in list to be created. - if !td.TaskCreated || td.ActivityID == scheduleID { + isHeartBeatTask := timerTask.TimeoutType == int(workflow.TimeoutType_HEARTBEAT) + + // Create next timer task if we don't have one (or) + // if current one is HB task and we need to create next HB task for the same. + // NOTE: When record activity HB comes in we only update last heartbeat timestamp, this is the place + // where we create next timer task based on that new updated timestamp. + if !td.TaskCreated || (isHeartBeatTask && td.ActivityID == scheduleID) { nextTask := tBuilder.createNewTask(td) timerTasks = []persistence.Task{nextTask} + at := nextTask.(*persistence.ActivityTimeoutTask) - ai.TimerTaskStatus = TimerTaskStatusCreated + ai.TimerTaskStatus = ai.TimerTaskStatus & getActivityTimerStatus(workflow.TimeoutType(at.TimeoutType)) msBuilder.UpdateActivity(ai) - at := nextTask.(*persistence.ActivityTimeoutTask) + t.logger.Debugf("%s: Adding Activity Timeout: with timeout: %v sec, ExpiryTime: %s, TimeoutType: %v, EventID: %v", time.Now(), td.TimeoutSec, at.VisibilityTimestamp, td.TimeoutType.String(), at.EventID) }