Skip to content

Commit

Permalink
Timer task creation marker can identify activity time type (cadence-w…
Browse files Browse the repository at this point in the history
…orkflow#289)

* Timer task creation marker can identify activity time type

In activity mutable state we have timer task status that indicates if the timer task has already been
created for a specific timeout, to give us de-duplication logic for the timer if the next minimum activity in the activity timer list needs a timer task.
  • Loading branch information
sivakku authored Jul 28, 2017
1 parent 4053538 commit c06bbbb
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 10 deletions.
41 changes: 34 additions & 7 deletions service/history/timerBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ const (
TimerTaskStatusCreated
)

// Activity Timer task status
const (
TimerTaskStatusCreatedStartToClose = 1 << iota
TimerTaskStatusCreatedScheduleToStart
TimerTaskStatusCreatedScheduleToClose
TimerTaskStatusCreatedHeartbeat
)

type (
timerDetails struct {
SequenceID SequenceID
Expand Down Expand Up @@ -242,9 +250,10 @@ func (tb *timerBuilder) GetActivityTimerTaskIfNeeded(msBuilder *mutableStateBuil
// Update the task ID tracking if it has created timer task or not.
td := tb.activityTimers[0]
ai := tb.pendingActivityTimers[td.ActivityID]
ai.TimerTaskStatus = TimerTaskStatusCreated
msBuilder.UpdateActivity(ai)
at := timerTask.(*persistence.ActivityTimeoutTask)
ai.TimerTaskStatus = ai.TimerTaskStatus & getActivityTimerStatus(w.TimeoutType(at.TimeoutType))
msBuilder.UpdateActivity(ai)

tb.logger.Debugf("%s: Adding Activity Timeout: with timeout: %v sec, ExpiryTime: %s, TimeoutType: %v, EventID: %v",
time.Now(), td.TimeoutSec, at.VisibilityTimestamp, td.TimeoutType.String(), at.EventID)
}
Expand Down Expand Up @@ -280,17 +289,21 @@ func (tb *timerBuilder) loadActivityTimers(msBuilder *mutableStateBuilder) {
EventID: v.StartedID,
TimeoutType: w.TimeoutType_START_TO_CLOSE,
TimeoutSec: v.StartToCloseTimeout,
TaskCreated: v.TimerTaskStatus == TimerTaskStatusCreated}
TaskCreated: (v.TimerTaskStatus & TimerTaskStatusCreatedStartToClose) != 0}
tb.activityTimers = append(tb.activityTimers, td)
if v.HeartbeatTimeout > 0 {
heartBeatExpiry := v.LastHeartBeatUpdatedTime.Add(time.Duration(v.HeartbeatTimeout) * time.Second)
lastHeartBeatTS := v.LastHeartBeatUpdatedTime
if lastHeartBeatTS.IsZero() {
lastHeartBeatTS = v.StartedTime
}
heartBeatExpiry := lastHeartBeatTS.Add(time.Duration(v.HeartbeatTimeout) * time.Second)
td := &timerDetails{
SequenceID: SequenceID{VisibilityTimestamp: heartBeatExpiry},
ActivityID: v.ScheduleID,
EventID: v.StartedID,
TimeoutType: w.TimeoutType_HEARTBEAT,
TimeoutSec: v.HeartbeatTimeout,
TaskCreated: v.TimerTaskStatus == TimerTaskStatusCreated}
TaskCreated: (v.TimerTaskStatus & TimerTaskStatusCreatedHeartbeat) != 0}
tb.activityTimers = append(tb.activityTimers, td)
}
} else {
Expand All @@ -301,7 +314,7 @@ func (tb *timerBuilder) loadActivityTimers(msBuilder *mutableStateBuilder) {
EventID: v.ScheduleID,
TimeoutSec: v.ScheduleToStartTimeout,
TimeoutType: w.TimeoutType_SCHEDULE_TO_START,
TaskCreated: v.TimerTaskStatus == TimerTaskStatusCreated}
TaskCreated: (v.TimerTaskStatus & TimerTaskStatusCreatedScheduleToStart) != 0}
tb.activityTimers = append(tb.activityTimers, td)
scheduleToCloseExpiry := v.ScheduledTime.Add(time.Duration(v.ScheduleToCloseTimeout) * time.Second)
td = &timerDetails{
Expand All @@ -310,7 +323,7 @@ func (tb *timerBuilder) loadActivityTimers(msBuilder *mutableStateBuilder) {
EventID: v.ScheduleID,
TimeoutSec: v.ScheduleToCloseTimeout,
TimeoutType: w.TimeoutType_SCHEDULE_TO_CLOSE,
TaskCreated: v.TimerTaskStatus == TimerTaskStatusCreated}
TaskCreated: (v.TimerTaskStatus & TimerTaskStatusCreatedScheduleToClose) != 0}
tb.activityTimers = append(tb.activityTimers, td)
}
}
Expand Down Expand Up @@ -415,3 +428,17 @@ func compareTimerIDLess(first *SequenceID, second *SequenceID) bool {
}
return false
}

func getActivityTimerStatus(timeoutType w.TimeoutType) int32 {
switch timeoutType {
case w.TimeoutType_HEARTBEAT:
return TimerTaskStatusCreatedHeartbeat
case w.TimeoutType_SCHEDULE_TO_START:
return TimerTaskStatusCreatedScheduleToStart
case w.TimeoutType_SCHEDULE_TO_CLOSE:
return TimerTaskStatusCreatedScheduleToClose
case w.TimeoutType_START_TO_CLOSE:
return TimerTaskStatusCreatedStartToClose
}
panic("invalid timeout type")
}
25 changes: 25 additions & 0 deletions service/history/timerBuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"encoding/json"

"github.com/pborman/uuid"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/suite"
"github.com/uber-common/bark"
Expand Down Expand Up @@ -173,6 +174,30 @@ func (s *timerBuilderProcessorSuite) TestTimerBuilderDuplicateTimerID() {
s.Nil(ti)
}

func (s *timerBuilderProcessorSuite) TestTimerBuilder_GetActivityTimer() {
// ScheduleToStart being more than HB.
builder := newMutableStateBuilder(s.logger)
ase, ai := builder.AddActivityTaskScheduledEvent(emptyEventID,
&workflow.ScheduleActivityTaskDecisionAttributes{
ScheduleToStartTimeoutSeconds: common.Int32Ptr(2),
StartToCloseTimeoutSeconds: common.Int32Ptr(2),
HeartbeatTimeoutSeconds: common.Int32Ptr(1),
})
// create a schedule to start timeout
tb := newTimerBuilder(s.logger, &mockTimeSource{currTime: time.Now()})
tt := tb.GetActivityTimerTaskIfNeeded(builder)
s.NotNil(tt)
s.Equal(workflow.TimeoutType_SCHEDULE_TO_START, workflow.TimeoutType(tt.(*persistence.ActivityTimeoutTask).TimeoutType))

builder.AddActivityTaskStartedEvent(ai, ase.GetEventId(), uuid.New(), &workflow.PollForActivityTaskRequest{})

// create a heart beat timeout
tb = newTimerBuilder(s.logger, &mockTimeSource{currTime: time.Now()})
tt = tb.GetActivityTimerTaskIfNeeded(builder)
s.NotNil(tt)
s.Equal(workflow.TimeoutType_HEARTBEAT, workflow.TimeoutType(tt.(*persistence.ActivityTimeoutTask).TimeoutType))
}

func (s *timerBuilderProcessorSuite) TestDecodeHistory() {
historyString := "5b7b226576656e744964223a312c2274696d657374616d70223a313438383332353631383735333431373433312c226576656e7454797065223a22576f726b666c6f77457865637574696f6e53746172746564222c22776f726b666c6f77457865637574696f6e537461727465644576656e7441747472696275746573223a7b22776f726b666c6f7754797065223a7b226e616d65223a22696e7465726174696f6e2d73657175656e7469616c2d757365722d74696d6572732d746573742d74797065227d2c227461736b4c697374223a7b226e616d65223a22696e7465726174696f6e2d73657175656e7469616c2d757365722d74696d6572732d746573742d7461736b6c697374227d2c22657865637574696f6e5374617274546f436c6f736554696d656f75745365636f6e6473223a3130302c227461736b5374617274546f436c6f736554696d656f75745365636f6e6473223a312c226964656e74697479223a22776f726b657231227d7d2c7b226576656e744964223a322c2274696d657374616d70223a313438383332353631383735333435333137312c226576656e7454797065223a224465636973696f6e5461736b5363686564756c6564222c226465636973696f6e5461736b5363686564756c65644576656e7441747472696275746573223a7b227461736b4c697374223a7b226e616d65223a22696e7465726174696f6e2d73657175656e7469616c2d757365722d74696d6572732d746573742d7461736b6c697374227d2c227374617274546f436c6f736554696d656f75745365636f6e6473223a317d7d2c7b226576656e744964223a332c2274696d657374616d70223a313438383332353632333938383637373536302c226576656e7454797065223a224465636973696f6e5461736b53746172746564222c226465636973696f6e5461736b537461727465644576656e7441747472696275746573223a7b227363686564756c65644576656e744964223a322c226964656e74697479223a22776f726b657231222c22726571756573744964223a2235383364326164652d663363332d343862322d383366352d323936636238393931646433227d7d2c7b226576656e744964223a342c2274696d657374616d70223a313438383332353632333939373138303336362c226576656e7454797065223a224465636973696f6e5461736b436f6d706c65746564222c226465636973696f6e5461736b436f6d706c657465644576656e7441747472696275746573223a7b22657865637574696f6e436f6e74657874223a224d513d3d222c227363686564756c65644576656e744964223a322c22737461727465644576656e744964223a332c226964656e74697479223a22776f726b657231227d7d2c7b226576656e744964223a352c2274696d657374616d70223a313438383332353632333939373138343436332c226576656e7454797065223a2254696d657253746172746564222c2274696d6572537461727465644576656e7441747472696275746573223a7b2274696d65724964223a2274696d65722d69642d31222c227374617274546f4669726554696d656f75745365636f6e6473223a312c226465636973696f6e5461736b436f6d706c657465644576656e744964223a347d7d2c7b226576656e744964223a362c2274696d657374616d70223a313438383332353632343939363835383639382c226576656e7454797065223a2254696d65724669726564222c2274696d657246697265644576656e7441747472696275746573223a7b2274696d65724964223a2274696d65722d69642d31222c22737461727465644576656e744964223a357d7d2c7b226576656e744964223a372c2274696d657374616d70223a313438383332353632343939363837333438302c226576656e7454797065223a224465636973696f6e5461736b5363686564756c6564222c226465636973696f6e5461736b5363686564756c65644576656e7441747472696275746573223a7b227461736b4c697374223a7b226e616d65223a22696e7465726174696f6e2d73657175656e7469616c2d757365722d74696d6572732d746573742d7461736b6c697374227d2c227374617274546f436c6f736554696d656f75745365636f6e6473223a317d7d2c7b226576656e744964223a382c2274696d657374616d70223a313438383332353632353238313139373232312c226576656e7454797065223a224465636973696f6e5461736b53746172746564222c226465636973696f6e5461736b537461727465644576656e7441747472696275746573223a7b227363686564756c65644576656e744964223a372c226964656e74697479223a22776f726b657231222c22726571756573744964223a2233646361663661642d663639382d343436342d386363612d333366663431353838393363227d7d2c7b226576656e744964223a392c2274696d657374616d70223a313438383332353632353238343137353337372c226576656e7454797065223a224465636973696f6e5461736b436f6d706c65746564222c226465636973696f6e5461736b436f6d706c657465644576656e7441747472696275746573223a7b22657865637574696f6e436f6e74657874223a224d673d3d222c227363686564756c65644576656e744964223a372c22737461727465644576656e744964223a382c226964656e74697479223a22776f726b657231227d7d2c7b226576656e744964223a31302c2274696d657374616d70223a313438383332353632353238343137373732342c226576656e7454797065223a2254696d657253746172746564222c2274696d6572537461727465644576656e7441747472696275746573223a7b2274696d65724964223a2274696d65722d69642d32222c227374617274546f4669726554696d656f75745365636f6e6473223a312c226465636973696f6e5461736b436f6d706c657465644576656e744964223a397d7d5d"
data, err := hex.DecodeString(historyString)
Expand Down
13 changes: 10 additions & 3 deletions service/history/timerQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,13 +654,20 @@ Update_History_Loop:
}
} else {
// See if we have next timer in list to be created.
if !td.TaskCreated || td.ActivityID == scheduleID {
isHeartBeatTask := timerTask.TimeoutType == int(workflow.TimeoutType_HEARTBEAT)

// Create next timer task if we don't have one (or)
// if current one is HB task and we need to create next HB task for the same.
// NOTE: When record activity HB comes in we only update last heartbeat timestamp, this is the place
// where we create next timer task based on that new updated timestamp.
if !td.TaskCreated || (isHeartBeatTask && td.ActivityID == scheduleID) {
nextTask := tBuilder.createNewTask(td)
timerTasks = []persistence.Task{nextTask}
at := nextTask.(*persistence.ActivityTimeoutTask)

ai.TimerTaskStatus = TimerTaskStatusCreated
ai.TimerTaskStatus = ai.TimerTaskStatus & getActivityTimerStatus(workflow.TimeoutType(at.TimeoutType))
msBuilder.UpdateActivity(ai)
at := nextTask.(*persistence.ActivityTimeoutTask)

t.logger.Debugf("%s: Adding Activity Timeout: with timeout: %v sec, ExpiryTime: %s, TimeoutType: %v, EventID: %v",
time.Now(), td.TimeoutSec, at.VisibilityTimestamp, td.TimeoutType.String(), at.EventID)
}
Expand Down

0 comments on commit c06bbbb

Please sign in to comment.