Skip to content

Commit

Permalink
Heartbeat to reduce timers by recording last reported time stamp (cad…
Browse files Browse the repository at this point in the history
…ence-workflow#74)

Add last updated HB timestamp to activity mutable state.
Record Activity Task HeartBeat to use LastUpdateTimestamp.
  • Loading branch information
sivakku authored Apr 28, 2017
1 parent d6fff65 commit aecbb59
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 57 deletions.
6 changes: 5 additions & 1 deletion common/persistence/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ const (
`start_to_close_timeout: ?, ` +
`heart_beat_timeout: ?, ` +
`cancel_requested: ?, ` +
`cancel_request_id: ?` +
`cancel_request_id: ?, ` +
`last_hb_updated_time: ?` +
`}`

templateTimerInfoType = `{` +
Expand Down Expand Up @@ -1344,6 +1345,7 @@ func (d *cassandraPersistence) updateActivityInfos(batch *gocql.Batch, activityI
a.HeartbeatTimeout,
a.CancelRequested,
a.CancelRequestID,
a.LastHeartBeatUpdatedTime,
d.shardID,
rowTypeExecution,
domainID,
Expand Down Expand Up @@ -1528,6 +1530,8 @@ func createActivityInfo(result map[string]interface{}) *ActivityInfo {
info.CancelRequested = v.(bool)
case "cancel_request_id":
info.CancelRequestID = v.(int64)
case "last_hb_updated_time":
info.LastHeartBeatUpdatedTime = v.(time.Time)
}
}

Expand Down
3 changes: 3 additions & 0 deletions common/persistence/cassandraPersistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,7 @@ func (s *cassandraPersistenceSuite) TestWorkflowMutableState_Activities() {
updatedInfo := copyWorkflowExecutionInfo(info0)
updatedInfo.NextEventID = int64(5)
updatedInfo.LastProcessedEvent = int64(2)
currentTime := time.Now().UTC()
activityInfos := []*ActivityInfo{
{
ScheduleID: 1,
Expand All @@ -682,6 +683,7 @@ func (s *cassandraPersistenceSuite) TestWorkflowMutableState_Activities() {
ScheduleToStartTimeout: 2,
StartToCloseTimeout: 3,
HeartbeatTimeout: 4,
LastHeartBeatUpdatedTime: currentTime,
}}
err2 := s.UpdateWorkflowExecution(updatedInfo, []int64{int64(4)}, nil, int64(3), nil, nil, activityInfos, nil, nil, nil)
s.Nil(err2, "No error expected.")
Expand All @@ -701,6 +703,7 @@ func (s *cassandraPersistenceSuite) TestWorkflowMutableState_Activities() {
s.Equal(int32(2), ai.ScheduleToStartTimeout)
s.Equal(int32(3), ai.StartToCloseTimeout)
s.Equal(int32(4), ai.HeartbeatTimeout)
s.Equal(currentTime.Unix(), ai.LastHeartBeatUpdatedTime.Unix())

err2 = s.UpdateWorkflowExecution(updatedInfo, nil, nil, int64(5), nil, nil, nil, common.Int64Ptr(1), nil, nil)
s.Nil(err2, "No error expected.")
Expand Down
31 changes: 16 additions & 15 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const (

// Workflow execution states
const (
WorkflowStateCreated = iota
WorkflowStateCreated = iota
WorkflowStateRunning
WorkflowStateCompleted
)
Expand All @@ -28,7 +28,7 @@ const (

// Transfer task types
const (
TransferTaskTypeDecisionTask = iota
TransferTaskTypeDecisionTask = iota
TransferTaskTypeActivityTask
TransferTaskTypeDeleteExecution
TransferTaskTypeCancelExecution
Expand Down Expand Up @@ -202,19 +202,20 @@ type (

// ActivityInfo details.
ActivityInfo struct {
ScheduleID int64
ScheduledEvent []byte
StartedID int64
StartedEvent []byte
ActivityID string
RequestID string
Details []byte
ScheduleToStartTimeout int32
ScheduleToCloseTimeout int32
StartToCloseTimeout int32
HeartbeatTimeout int32
CancelRequested bool
CancelRequestID int64
ScheduleID int64
ScheduledEvent []byte
StartedID int64
StartedEvent []byte
ActivityID string
RequestID string
Details []byte
ScheduleToStartTimeout int32
ScheduleToCloseTimeout int32
StartToCloseTimeout int32
HeartbeatTimeout int32
CancelRequested bool
CancelRequestID int64
LastHeartBeatUpdatedTime time.Time
}

// TimerInfo details - metadata about user timer info.
Expand Down
1 change: 1 addition & 0 deletions schema/workflow_test.cql
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ CREATE TYPE activity_info (
heart_beat_timeout int,
cancel_requested boolean, -- If a cancel request is made to cancel the activity in progress.
cancel_request_id bigint, -- Event ID that identifies the cancel request.
last_hb_updated_time timestamp, -- Last time the heartbeat is received.
);

-- User timer details
Expand Down
17 changes: 3 additions & 14 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,21 +999,11 @@ Update_History_Loop:

cancelRequested := ai.CancelRequested

var timerTasks []persistence.Task
var transferTasks []persistence.Task

e.logger.Debugf("Activity HeartBeat: scheduleEventID: %v, ActivityInfo: %+v, CancelRequested: %v",
scheduleID, ai, cancelRequested)

// Re-schedule next heartbeat.
start2HeartBeatTimeoutTask, _ := context.tBuilder.AddHeartBeatActivityTimeout(ai)
if start2HeartBeatTimeoutTask != nil {
timerTasks = append(timerTasks, start2HeartBeatTimeoutTask)
defer e.timerProcessor.NotifyNewTimer(start2HeartBeatTimeoutTask.GetTaskID())
}

// Save progress reported.
msBuilder.updateActivityProgress(ai, request.GetDetails())
// Save progress and last HB reported time.
msBuilder.updateActivityProgress(ai, request)

// Generate a transaction ID for appending events to history
transactionID, err2 := e.shard.GetNextTransferTaskID()
Expand All @@ -1023,14 +1013,13 @@ Update_History_Loop:

// We apply the update to execution using optimistic concurrency. If it fails due to a conflict than reload
// the history and try the operation again.
if err := context.updateWorkflowExecution(transferTasks, timerTasks, transactionID); err != nil {
if err := context.updateWorkflowExecution(nil, nil, transactionID); err != nil {
if err == ErrConflict {
continue Update_History_Loop
}

return nil, err
}

return &workflow.RecordActivityTaskHeartbeatResponse{CancelRequested: common.BoolPtr(cancelRequested)}, nil
}

Expand Down
33 changes: 18 additions & 15 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"fmt"
"time"

workflow "github.com/uber/cadence/.gen/go/shared"
h "github.com/uber/cadence/.gen/go/history"
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/persistence"

Expand Down Expand Up @@ -176,8 +176,10 @@ func (e *mutableStateBuilder) hasPendingTasks() bool {
return len(e.pendingActivityInfoIDs) > 0 || len(e.pendingTimerInfoIDs) > 0
}

func (e *mutableStateBuilder) updateActivityProgress(ai *persistence.ActivityInfo, details []byte) {
ai.Details = details
func (e *mutableStateBuilder) updateActivityProgress(ai *persistence.ActivityInfo,
request *workflow.RecordActivityTaskHeartbeatRequest) {
ai.Details = request.GetDetails()
ai.LastHeartBeatUpdatedTime = time.Now()
e.updateActivityInfos = append(e.updateActivityInfos, ai)
}

Expand Down Expand Up @@ -305,8 +307,8 @@ func (e *mutableStateBuilder) AddWorkflowExecutionStartedEventForContinueAsNew(d
WorkflowType: wType,
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(decisionTimeout),
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(attributes.GetExecutionStartToCloseTimeoutSeconds()),
Input: attributes.GetInput(),
Identity: nil,
Input: attributes.GetInput(),
Identity: nil,
}

return e.AddWorkflowExecutionStartedEvent(domainID, execution, createRequest)
Expand Down Expand Up @@ -445,16 +447,17 @@ func (e *mutableStateBuilder) AddActivityTaskScheduledEvent(decisionCompletedEve
heartbeatTimeout := attributes.GetHeartbeatTimeoutSeconds()

ai := &persistence.ActivityInfo{
ScheduleID: scheduleEventID,
ScheduledEvent: scheduleEvent,
StartedID: emptyEventID,
ActivityID: attributes.GetActivityId(),
ScheduleToStartTimeout: scheduleToStartTimeout,
ScheduleToCloseTimeout: scheduleToCloseTimeout,
StartToCloseTimeout: startToCloseTimeout,
HeartbeatTimeout: heartbeatTimeout,
CancelRequested: false,
CancelRequestID: emptyEventID,
ScheduleID: scheduleEventID,
ScheduledEvent: scheduleEvent,
StartedID: emptyEventID,
ActivityID: attributes.GetActivityId(),
ScheduleToStartTimeout: scheduleToStartTimeout,
ScheduleToCloseTimeout: scheduleToCloseTimeout,
StartToCloseTimeout: startToCloseTimeout,
HeartbeatTimeout: heartbeatTimeout,
CancelRequested: false,
CancelRequestID: emptyEventID,
LastHeartBeatUpdatedTime: time.Time{},
}

e.pendingActivityInfoIDs[scheduleEventID] = ai
Expand Down
29 changes: 21 additions & 8 deletions service/history/timerBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,32 +135,38 @@ func (tb *timerBuilder) AddDecisionTimoutTask(scheduleID int64,

func (tb *timerBuilder) AddScheduleToStartActivityTimeout(
ai *persistence.ActivityInfo) *persistence.ActivityTimeoutTask {
return tb.AddActivityTimeoutTask(ai.ScheduleID, w.TimeoutType_SCHEDULE_TO_START, ai.ScheduleToStartTimeout)
return tb.AddActivityTimeoutTask(ai.ScheduleID, w.TimeoutType_SCHEDULE_TO_START, ai.ScheduleToStartTimeout, nil)
}

func (tb *timerBuilder) AddScheduleToCloseActivityTimeout(
ai *persistence.ActivityInfo) (*persistence.ActivityTimeoutTask, error) {
return tb.AddActivityTimeoutTask(ai.ScheduleID, w.TimeoutType_SCHEDULE_TO_CLOSE, ai.ScheduleToCloseTimeout), nil
return tb.AddActivityTimeoutTask(ai.ScheduleID, w.TimeoutType_SCHEDULE_TO_CLOSE, ai.ScheduleToCloseTimeout, nil), nil
}

func (tb *timerBuilder) AddStartToCloseActivityTimeout(ai *persistence.ActivityInfo) (*persistence.ActivityTimeoutTask,
error) {
return tb.AddActivityTimeoutTask(ai.ScheduleID, w.TimeoutType_START_TO_CLOSE, ai.StartToCloseTimeout), nil
return tb.AddActivityTimeoutTask(ai.ScheduleID, w.TimeoutType_START_TO_CLOSE, ai.StartToCloseTimeout, nil), nil
}

func (tb *timerBuilder) AddHeartBeatActivityTimeout(ai *persistence.ActivityInfo) (*persistence.ActivityTimeoutTask,
error) {
return tb.AddActivityTimeoutTask(ai.ScheduleID, w.TimeoutType_HEARTBEAT, ai.HeartbeatTimeout), nil
// We want to create the timer starting from the last heart beat time stamp but
// avoid creating timers before the current timer frame.
targetTime := common.AddSecondsToBaseTime(ai.LastHeartBeatUpdatedTime.UnixNano(), int64(ai.HeartbeatTimeout))
if targetTime > time.Now().UnixNano() {
return tb.AddActivityTimeoutTask(ai.ScheduleID, w.TimeoutType_HEARTBEAT, ai.HeartbeatTimeout, &ai.LastHeartBeatUpdatedTime), nil
}
return tb.AddActivityTimeoutTask(ai.ScheduleID, w.TimeoutType_HEARTBEAT, ai.HeartbeatTimeout, nil), nil
}

// AddActivityTimeoutTask - Adds an activity timeout task.
func (tb *timerBuilder) AddActivityTimeoutTask(scheduleID int64,
timeoutType w.TimeoutType, fireTimeout int32) *persistence.ActivityTimeoutTask {
timeoutType w.TimeoutType, fireTimeout int32, baseTime *time.Time) *persistence.ActivityTimeoutTask {
if fireTimeout <= 0 {
return nil
}

timeOutTask := tb.createActivityTimeoutTask(fireTimeout, timeoutType, scheduleID)
timeOutTask := tb.createActivityTimeoutTask(fireTimeout, timeoutType, scheduleID, baseTime)
tb.logger.Debugf("Adding Activity Timeout: SequenceID: %v, TimeoutType: %v, EventID: %v",
SequenceID(timeOutTask.TaskID), timeoutType.String(), timeOutTask.EventID)
return timeOutTask
Expand Down Expand Up @@ -213,8 +219,15 @@ func (tb *timerBuilder) createDecisionTimeoutTask(fireTimeOut int32, eventID int
}

// createActivityTimeoutTask - Creates a activity timeout task.
func (tb *timerBuilder) createActivityTimeoutTask(fireTimeOut int32, timeoutType w.TimeoutType, eventID int64) *persistence.ActivityTimeoutTask {
expiryTime := common.AddSecondsToBaseTime(time.Now().UnixNano(), int64(fireTimeOut))
func (tb *timerBuilder) createActivityTimeoutTask(fireTimeOut int32, timeoutType w.TimeoutType,
eventID int64, baseTime *time.Time) *persistence.ActivityTimeoutTask {
var expiryTime int64
if baseTime != nil {
expiryTime = common.AddSecondsToBaseTime(baseTime.UnixNano(), int64(fireTimeOut))
} else {
expiryTime = common.AddSecondsToBaseTime(time.Now().UnixNano(), int64(fireTimeOut))
}

seqID := ConstructTimerKey(expiryTime, tb.seqNumGen.NextSeq())
return &persistence.ActivityTimeoutTask{
TaskID: int64(seqID),
Expand Down
27 changes: 23 additions & 4 deletions service/history/timerQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,12 +427,14 @@ Update_History_Loop:
return err1
}

referenceExpiryTime, _ := DeconstructTimerKey(SequenceID(task.TaskID))
context.tBuilder.LoadUserTimers(msBuilder)

var timerTasks []persistence.Task
var clearTimerTask persistence.Task

scheduleNewDecision := false
timerTaskExpiryTime, _ := DeconstructTimerKey(SequenceID(task.TaskID))

ExpireUserTimers:
for _, td := range context.tBuilder.AllTimers() {
hasTimer, ti := context.tBuilder.UserTimer(td.SequenceID)
Expand All @@ -441,7 +443,7 @@ Update_History_Loop:
return fmt.Errorf("failed to find user timer")
}

if isExpired := context.tBuilder.IsTimerExpired(td, referenceExpiryTime); isExpired {
if isExpired := context.tBuilder.IsTimerExpired(td, timerTaskExpiryTime); isExpired {
// Add TimerFired event to history.
if msBuilder.AddTimerFiredEvent(ti.StartedID, ti.TimerID) == nil {
return errFailedToAddTimerFiredEvent
Expand Down Expand Up @@ -502,6 +504,7 @@ Update_History_Loop:

clearTimerTask := &persistence.ActivityTimeoutTask{TaskID: timerTask.TaskID}

var timerTasks []persistence.Task
scheduleNewDecision := false
updateHistory := false

Expand Down Expand Up @@ -535,13 +538,29 @@ Update_History_Loop:

case workflow.TimeoutType_HEARTBEAT:
{
if ai.StartedID != emptyEventID {
timerTaskExpiryTime, _ := DeconstructTimerKey(SequenceID(timerTask.TaskID))
l := common.AddSecondsToBaseTime(
ai.LastHeartBeatUpdatedTime.UnixNano(),
int64(ai.HeartbeatTimeout))

if timerTaskExpiryTime > l {
t.logger.Debugf("Activity Heartbeat expired: %+v", *ai)
if msBuilder.AddActivityTaskTimedOutEvent(scheduleID, ai.StartedID, timeoutType, nil) == nil {
return errFailedToAddTimeoutEvent
}

updateHistory = true
scheduleNewDecision = !msBuilder.HasPendingDecisionTask()
} else {
// Re-Schedule next heartbeat.
hbTimeoutTask, err := context.tBuilder.AddHeartBeatActivityTimeout(ai)
if err != nil {
return err
}
if hbTimeoutTask != nil {
timerTasks = append(timerTasks, hbTimeoutTask)
defer t.NotifyNewTimer(hbTimeoutTask.GetTaskID())
}
}
}

Expand All @@ -562,7 +581,7 @@ Update_History_Loop:
if updateHistory {
// We apply the update to execution using optimistic concurrency. If it fails due to a conflict than reload
// the history and try the operation again.
err := t.updateWorkflowExecution(context, msBuilder, scheduleNewDecision, nil, clearTimerTask)
err := t.updateWorkflowExecution(context, msBuilder, scheduleNewDecision, timerTasks, clearTimerTask)
if err != nil {
if err == ErrConflict {
continue Update_History_Loop
Expand Down

0 comments on commit aecbb59

Please sign in to comment.