diff --git a/common/persistence/cassandraPersistence.go b/common/persistence/cassandraPersistence.go index ff5c216f620..201abe44095 100644 --- a/common/persistence/cassandraPersistence.go +++ b/common/persistence/cassandraPersistence.go @@ -118,7 +118,8 @@ const ( `start_to_close_timeout: ?, ` + `heart_beat_timeout: ?, ` + `cancel_requested: ?, ` + - `cancel_request_id: ?` + + `cancel_request_id: ?, ` + + `last_hb_updated_time: ?` + `}` templateTimerInfoType = `{` + @@ -1344,6 +1345,7 @@ func (d *cassandraPersistence) updateActivityInfos(batch *gocql.Batch, activityI a.HeartbeatTimeout, a.CancelRequested, a.CancelRequestID, + a.LastHeartBeatUpdatedTime, d.shardID, rowTypeExecution, domainID, @@ -1528,6 +1530,8 @@ func createActivityInfo(result map[string]interface{}) *ActivityInfo { info.CancelRequested = v.(bool) case "cancel_request_id": info.CancelRequestID = v.(int64) + case "last_hb_updated_time": + info.LastHeartBeatUpdatedTime = v.(time.Time) } } diff --git a/common/persistence/cassandraPersistence_test.go b/common/persistence/cassandraPersistence_test.go index 1090da8435c..201c0debac8 100644 --- a/common/persistence/cassandraPersistence_test.go +++ b/common/persistence/cassandraPersistence_test.go @@ -672,6 +672,7 @@ func (s *cassandraPersistenceSuite) TestWorkflowMutableState_Activities() { updatedInfo := copyWorkflowExecutionInfo(info0) updatedInfo.NextEventID = int64(5) updatedInfo.LastProcessedEvent = int64(2) + currentTime := time.Now().UTC() activityInfos := []*ActivityInfo{ { ScheduleID: 1, @@ -682,6 +683,7 @@ func (s *cassandraPersistenceSuite) TestWorkflowMutableState_Activities() { ScheduleToStartTimeout: 2, StartToCloseTimeout: 3, HeartbeatTimeout: 4, + LastHeartBeatUpdatedTime: currentTime, }} err2 := s.UpdateWorkflowExecution(updatedInfo, []int64{int64(4)}, nil, int64(3), nil, nil, activityInfos, nil, nil, nil) s.Nil(err2, "No error expected.") @@ -701,6 +703,7 @@ func (s *cassandraPersistenceSuite) TestWorkflowMutableState_Activities() { s.Equal(int32(2), ai.ScheduleToStartTimeout) s.Equal(int32(3), ai.StartToCloseTimeout) s.Equal(int32(4), ai.HeartbeatTimeout) + s.Equal(currentTime.Unix(), ai.LastHeartBeatUpdatedTime.Unix()) err2 = s.UpdateWorkflowExecution(updatedInfo, nil, nil, int64(5), nil, nil, nil, common.Int64Ptr(1), nil, nil) s.Nil(err2, "No error expected.") diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index 27e87bf5610..9785a5a54c8 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -15,7 +15,7 @@ const ( // Workflow execution states const ( - WorkflowStateCreated = iota + WorkflowStateCreated = iota WorkflowStateRunning WorkflowStateCompleted ) @@ -28,7 +28,7 @@ const ( // Transfer task types const ( - TransferTaskTypeDecisionTask = iota + TransferTaskTypeDecisionTask = iota TransferTaskTypeActivityTask TransferTaskTypeDeleteExecution TransferTaskTypeCancelExecution @@ -202,19 +202,20 @@ type ( // ActivityInfo details. ActivityInfo struct { - ScheduleID int64 - ScheduledEvent []byte - StartedID int64 - StartedEvent []byte - ActivityID string - RequestID string - Details []byte - ScheduleToStartTimeout int32 - ScheduleToCloseTimeout int32 - StartToCloseTimeout int32 - HeartbeatTimeout int32 - CancelRequested bool - CancelRequestID int64 + ScheduleID int64 + ScheduledEvent []byte + StartedID int64 + StartedEvent []byte + ActivityID string + RequestID string + Details []byte + ScheduleToStartTimeout int32 + ScheduleToCloseTimeout int32 + StartToCloseTimeout int32 + HeartbeatTimeout int32 + CancelRequested bool + CancelRequestID int64 + LastHeartBeatUpdatedTime time.Time } // TimerInfo details - metadata about user timer info. diff --git a/schema/workflow_test.cql b/schema/workflow_test.cql index 8be363b77d6..111905964bf 100644 --- a/schema/workflow_test.cql +++ b/schema/workflow_test.cql @@ -75,6 +75,7 @@ CREATE TYPE activity_info ( heart_beat_timeout int, cancel_requested boolean, -- If a cancel request is made to cancel the activity in progress. cancel_request_id bigint, -- Event ID that identifies the cancel request. + last_hb_updated_time timestamp, -- Last time the heartbeat is received. ); -- User timer details diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index eb6c7c6c546..763d5f59c9f 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -999,21 +999,11 @@ Update_History_Loop: cancelRequested := ai.CancelRequested - var timerTasks []persistence.Task - var transferTasks []persistence.Task - e.logger.Debugf("Activity HeartBeat: scheduleEventID: %v, ActivityInfo: %+v, CancelRequested: %v", scheduleID, ai, cancelRequested) - // Re-schedule next heartbeat. - start2HeartBeatTimeoutTask, _ := context.tBuilder.AddHeartBeatActivityTimeout(ai) - if start2HeartBeatTimeoutTask != nil { - timerTasks = append(timerTasks, start2HeartBeatTimeoutTask) - defer e.timerProcessor.NotifyNewTimer(start2HeartBeatTimeoutTask.GetTaskID()) - } - - // Save progress reported. - msBuilder.updateActivityProgress(ai, request.GetDetails()) + // Save progress and last HB reported time. + msBuilder.updateActivityProgress(ai, request) // Generate a transaction ID for appending events to history transactionID, err2 := e.shard.GetNextTransferTaskID() @@ -1023,14 +1013,13 @@ Update_History_Loop: // We apply the update to execution using optimistic concurrency. If it fails due to a conflict than reload // the history and try the operation again. - if err := context.updateWorkflowExecution(transferTasks, timerTasks, transactionID); err != nil { + if err := context.updateWorkflowExecution(nil, nil, transactionID); err != nil { if err == ErrConflict { continue Update_History_Loop } return nil, err } - return &workflow.RecordActivityTaskHeartbeatResponse{CancelRequested: common.BoolPtr(cancelRequested)}, nil } diff --git a/service/history/mutableStateBuilder.go b/service/history/mutableStateBuilder.go index d77c4f29ecb..71683b1a226 100644 --- a/service/history/mutableStateBuilder.go +++ b/service/history/mutableStateBuilder.go @@ -5,8 +5,8 @@ import ( "fmt" "time" - workflow "github.com/uber/cadence/.gen/go/shared" h "github.com/uber/cadence/.gen/go/history" + workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" "github.com/uber/cadence/common/persistence" @@ -176,8 +176,10 @@ func (e *mutableStateBuilder) hasPendingTasks() bool { return len(e.pendingActivityInfoIDs) > 0 || len(e.pendingTimerInfoIDs) > 0 } -func (e *mutableStateBuilder) updateActivityProgress(ai *persistence.ActivityInfo, details []byte) { - ai.Details = details +func (e *mutableStateBuilder) updateActivityProgress(ai *persistence.ActivityInfo, + request *workflow.RecordActivityTaskHeartbeatRequest) { + ai.Details = request.GetDetails() + ai.LastHeartBeatUpdatedTime = time.Now() e.updateActivityInfos = append(e.updateActivityInfos, ai) } @@ -305,8 +307,8 @@ func (e *mutableStateBuilder) AddWorkflowExecutionStartedEventForContinueAsNew(d WorkflowType: wType, TaskStartToCloseTimeoutSeconds: common.Int32Ptr(decisionTimeout), ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(attributes.GetExecutionStartToCloseTimeoutSeconds()), - Input: attributes.GetInput(), - Identity: nil, + Input: attributes.GetInput(), + Identity: nil, } return e.AddWorkflowExecutionStartedEvent(domainID, execution, createRequest) @@ -445,16 +447,17 @@ func (e *mutableStateBuilder) AddActivityTaskScheduledEvent(decisionCompletedEve heartbeatTimeout := attributes.GetHeartbeatTimeoutSeconds() ai := &persistence.ActivityInfo{ - ScheduleID: scheduleEventID, - ScheduledEvent: scheduleEvent, - StartedID: emptyEventID, - ActivityID: attributes.GetActivityId(), - ScheduleToStartTimeout: scheduleToStartTimeout, - ScheduleToCloseTimeout: scheduleToCloseTimeout, - StartToCloseTimeout: startToCloseTimeout, - HeartbeatTimeout: heartbeatTimeout, - CancelRequested: false, - CancelRequestID: emptyEventID, + ScheduleID: scheduleEventID, + ScheduledEvent: scheduleEvent, + StartedID: emptyEventID, + ActivityID: attributes.GetActivityId(), + ScheduleToStartTimeout: scheduleToStartTimeout, + ScheduleToCloseTimeout: scheduleToCloseTimeout, + StartToCloseTimeout: startToCloseTimeout, + HeartbeatTimeout: heartbeatTimeout, + CancelRequested: false, + CancelRequestID: emptyEventID, + LastHeartBeatUpdatedTime: time.Time{}, } e.pendingActivityInfoIDs[scheduleEventID] = ai diff --git a/service/history/timerBuilder.go b/service/history/timerBuilder.go index 48e724eb5c1..05959a04dfd 100644 --- a/service/history/timerBuilder.go +++ b/service/history/timerBuilder.go @@ -135,32 +135,38 @@ func (tb *timerBuilder) AddDecisionTimoutTask(scheduleID int64, func (tb *timerBuilder) AddScheduleToStartActivityTimeout( ai *persistence.ActivityInfo) *persistence.ActivityTimeoutTask { - return tb.AddActivityTimeoutTask(ai.ScheduleID, w.TimeoutType_SCHEDULE_TO_START, ai.ScheduleToStartTimeout) + return tb.AddActivityTimeoutTask(ai.ScheduleID, w.TimeoutType_SCHEDULE_TO_START, ai.ScheduleToStartTimeout, nil) } func (tb *timerBuilder) AddScheduleToCloseActivityTimeout( ai *persistence.ActivityInfo) (*persistence.ActivityTimeoutTask, error) { - return tb.AddActivityTimeoutTask(ai.ScheduleID, w.TimeoutType_SCHEDULE_TO_CLOSE, ai.ScheduleToCloseTimeout), nil + return tb.AddActivityTimeoutTask(ai.ScheduleID, w.TimeoutType_SCHEDULE_TO_CLOSE, ai.ScheduleToCloseTimeout, nil), nil } func (tb *timerBuilder) AddStartToCloseActivityTimeout(ai *persistence.ActivityInfo) (*persistence.ActivityTimeoutTask, error) { - return tb.AddActivityTimeoutTask(ai.ScheduleID, w.TimeoutType_START_TO_CLOSE, ai.StartToCloseTimeout), nil + return tb.AddActivityTimeoutTask(ai.ScheduleID, w.TimeoutType_START_TO_CLOSE, ai.StartToCloseTimeout, nil), nil } func (tb *timerBuilder) AddHeartBeatActivityTimeout(ai *persistence.ActivityInfo) (*persistence.ActivityTimeoutTask, error) { - return tb.AddActivityTimeoutTask(ai.ScheduleID, w.TimeoutType_HEARTBEAT, ai.HeartbeatTimeout), nil + // We want to create the timer starting from the last heart beat time stamp but + // avoid creating timers before the current timer frame. + targetTime := common.AddSecondsToBaseTime(ai.LastHeartBeatUpdatedTime.UnixNano(), int64(ai.HeartbeatTimeout)) + if targetTime > time.Now().UnixNano() { + return tb.AddActivityTimeoutTask(ai.ScheduleID, w.TimeoutType_HEARTBEAT, ai.HeartbeatTimeout, &ai.LastHeartBeatUpdatedTime), nil + } + return tb.AddActivityTimeoutTask(ai.ScheduleID, w.TimeoutType_HEARTBEAT, ai.HeartbeatTimeout, nil), nil } // AddActivityTimeoutTask - Adds an activity timeout task. func (tb *timerBuilder) AddActivityTimeoutTask(scheduleID int64, - timeoutType w.TimeoutType, fireTimeout int32) *persistence.ActivityTimeoutTask { + timeoutType w.TimeoutType, fireTimeout int32, baseTime *time.Time) *persistence.ActivityTimeoutTask { if fireTimeout <= 0 { return nil } - timeOutTask := tb.createActivityTimeoutTask(fireTimeout, timeoutType, scheduleID) + timeOutTask := tb.createActivityTimeoutTask(fireTimeout, timeoutType, scheduleID, baseTime) tb.logger.Debugf("Adding Activity Timeout: SequenceID: %v, TimeoutType: %v, EventID: %v", SequenceID(timeOutTask.TaskID), timeoutType.String(), timeOutTask.EventID) return timeOutTask @@ -213,8 +219,15 @@ func (tb *timerBuilder) createDecisionTimeoutTask(fireTimeOut int32, eventID int } // createActivityTimeoutTask - Creates a activity timeout task. -func (tb *timerBuilder) createActivityTimeoutTask(fireTimeOut int32, timeoutType w.TimeoutType, eventID int64) *persistence.ActivityTimeoutTask { - expiryTime := common.AddSecondsToBaseTime(time.Now().UnixNano(), int64(fireTimeOut)) +func (tb *timerBuilder) createActivityTimeoutTask(fireTimeOut int32, timeoutType w.TimeoutType, + eventID int64, baseTime *time.Time) *persistence.ActivityTimeoutTask { + var expiryTime int64 + if baseTime != nil { + expiryTime = common.AddSecondsToBaseTime(baseTime.UnixNano(), int64(fireTimeOut)) + } else { + expiryTime = common.AddSecondsToBaseTime(time.Now().UnixNano(), int64(fireTimeOut)) + } + seqID := ConstructTimerKey(expiryTime, tb.seqNumGen.NextSeq()) return &persistence.ActivityTimeoutTask{ TaskID: int64(seqID), diff --git a/service/history/timerQueueProcessor.go b/service/history/timerQueueProcessor.go index 52ff3007984..e9da25cdb52 100644 --- a/service/history/timerQueueProcessor.go +++ b/service/history/timerQueueProcessor.go @@ -427,12 +427,14 @@ Update_History_Loop: return err1 } - referenceExpiryTime, _ := DeconstructTimerKey(SequenceID(task.TaskID)) context.tBuilder.LoadUserTimers(msBuilder) var timerTasks []persistence.Task var clearTimerTask persistence.Task + scheduleNewDecision := false + timerTaskExpiryTime, _ := DeconstructTimerKey(SequenceID(task.TaskID)) + ExpireUserTimers: for _, td := range context.tBuilder.AllTimers() { hasTimer, ti := context.tBuilder.UserTimer(td.SequenceID) @@ -441,7 +443,7 @@ Update_History_Loop: return fmt.Errorf("failed to find user timer") } - if isExpired := context.tBuilder.IsTimerExpired(td, referenceExpiryTime); isExpired { + if isExpired := context.tBuilder.IsTimerExpired(td, timerTaskExpiryTime); isExpired { // Add TimerFired event to history. if msBuilder.AddTimerFiredEvent(ti.StartedID, ti.TimerID) == nil { return errFailedToAddTimerFiredEvent @@ -502,6 +504,7 @@ Update_History_Loop: clearTimerTask := &persistence.ActivityTimeoutTask{TaskID: timerTask.TaskID} + var timerTasks []persistence.Task scheduleNewDecision := false updateHistory := false @@ -535,13 +538,29 @@ Update_History_Loop: case workflow.TimeoutType_HEARTBEAT: { - if ai.StartedID != emptyEventID { + timerTaskExpiryTime, _ := DeconstructTimerKey(SequenceID(timerTask.TaskID)) + l := common.AddSecondsToBaseTime( + ai.LastHeartBeatUpdatedTime.UnixNano(), + int64(ai.HeartbeatTimeout)) + + if timerTaskExpiryTime > l { + t.logger.Debugf("Activity Heartbeat expired: %+v", *ai) if msBuilder.AddActivityTaskTimedOutEvent(scheduleID, ai.StartedID, timeoutType, nil) == nil { return errFailedToAddTimeoutEvent } updateHistory = true scheduleNewDecision = !msBuilder.HasPendingDecisionTask() + } else { + // Re-Schedule next heartbeat. + hbTimeoutTask, err := context.tBuilder.AddHeartBeatActivityTimeout(ai) + if err != nil { + return err + } + if hbTimeoutTask != nil { + timerTasks = append(timerTasks, hbTimeoutTask) + defer t.NotifyNewTimer(hbTimeoutTask.GetTaskID()) + } } } @@ -562,7 +581,7 @@ Update_History_Loop: if updateHistory { // We apply the update to execution using optimistic concurrency. If it fails due to a conflict than reload // the history and try the operation again. - err := t.updateWorkflowExecution(context, msBuilder, scheduleNewDecision, nil, clearTimerTask) + err := t.updateWorkflowExecution(context, msBuilder, scheduleNewDecision, timerTasks, clearTimerTask) if err != nil { if err == ErrConflict { continue Update_History_Loop