Skip to content

Commit

Permalink
Cancel timer should clear any TimerFired events in buffered events (c…
Browse files Browse the repository at this point in the history
…adence-workflow#1654)

* Go over bufferedEvents, updateBufferedEvents and historyBuilder history and clear out any timer fired events from it on a cancel timer decision
* Reset the value of hasBufferedEvents as we may have removed things from the mutable state
  • Loading branch information
shreyassrivatsan authored Apr 10, 2019
1 parent a4e8002 commit b86d5cb
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 12 deletions.
6 changes: 4 additions & 2 deletions service/history/historyBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,10 @@ func (b *historyBuilder) AddTimerStartedEvent(decisionCompletedEventID int64,
return b.addEventToHistory(event)
}

func (b *historyBuilder) AddTimerFiredEvent(startedEventID int64,
timerID string) *workflow.HistoryEvent {
func (b *historyBuilder) AddTimerFiredEvent(
startedEventID int64,
timerID string,
) *workflow.HistoryEvent {

attributes := &workflow.TimerFiredEventAttributes{}
attributes.TimerId = common.StringPtr(timerID)
Expand Down
4 changes: 4 additions & 0 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1558,6 +1558,10 @@ Update_History_Loop:
// since timer builder has a local cached version of timers
tBuilder = e.getTimerBuilder(context.getExecution())
tBuilder.loadUserTimers(msBuilder)

// timer deletion is a success, we may have deleted a fired timer in
// which case we should reset hasBufferedEvents
hasUnhandledEvents = msBuilder.HasBufferedEvents()
}

case workflow.DecisionTypeRecordMarker:
Expand Down
77 changes: 76 additions & 1 deletion service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4422,6 +4422,81 @@ func (s *engineSuite) TestCancelTimer_RespondDecisionTaskCompleted_NoStartTimer(
s.False(executionBuilder.HasPendingDecisionTask())
}

func (s *engineSuite) TestCancelTimer_RespondDecisionTaskCompleted_TimerFired() {
domainID := validDomainID
we := workflow.WorkflowExecution{
WorkflowId: common.StringPtr("wId"),
RunId: common.StringPtr(validRunID),
}
tl := "testTaskList"
taskToken, _ := json.Marshal(&common.TaskToken{
WorkflowID: *we.WorkflowId,
RunID: *we.RunId,
ScheduleID: 6,
})
identity := "testIdentity"
timerID := "t1"

msBuilder := newMutableStateBuilderWithEventV2(s.mockClusterMetadata.GetCurrentClusterName(), s.mockHistoryEngine.shard, s.eventsCache,
bark.NewLoggerFromLogrus(log.New()), we.GetRunId())
// Verify cancel timer with a start event.
addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 100, identity)
di := addDecisionTaskScheduledEvent(msBuilder)
decisionStartedEvent := addDecisionTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity)
decisionCompletedEvent := addDecisionTaskCompletedEvent(msBuilder, di.ScheduleID,
*decisionStartedEvent.EventId, nil, identity)
addTimerStartedEvent(msBuilder, *decisionCompletedEvent.EventId, timerID, 10)
di2 := addDecisionTaskScheduledEvent(msBuilder)
addDecisionTaskStartedEvent(msBuilder, di2.ScheduleID, tl, identity)
addTimerFiredEvent(msBuilder, di2.ScheduleID, timerID)

ms := createMutableState(msBuilder)
gwmsResponse := &persistence.GetWorkflowExecutionResponse{State: ms}

decisions := []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCancelTimer),
CancelTimerDecisionAttributes: &workflow.CancelTimerDecisionAttributes{
TimerId: common.StringPtr(timerID),
},
}}

s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse, nil).Once()
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything).Return(&p.AppendHistoryNodesResponse{Size: 0}, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(&p.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &p.MutableStateUpdateSessionStats{}}, nil).Once()

s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(
&persistence.GetDomainResponse{
Info: &persistence.DomainInfo{ID: domainID},
Config: &persistence.DomainConfig{Retention: 1},
ReplicationConfig: &persistence.DomainReplicationConfig{
ActiveClusterName: cluster.TestCurrentClusterName,
Clusters: []*persistence.ClusterReplicationConfig{
&persistence.ClusterReplicationConfig{ClusterName: cluster.TestCurrentClusterName},
},
},
TableVersion: persistence.DomainTableVersionV1,
},
nil,
)
_, err := s.mockHistoryEngine.RespondDecisionTaskCompleted(context.Background(), &history.RespondDecisionTaskCompletedRequest{
DomainUUID: common.StringPtr(domainID),
CompleteRequest: &workflow.RespondDecisionTaskCompletedRequest{
TaskToken: taskToken,
Decisions: decisions,
ExecutionContext: []byte("context"),
Identity: &identity,
},
})
s.Nil(err)

executionBuilder := s.getBuilder(domainID, we)
s.Equal(int64(10), executionBuilder.GetExecutionInfo().NextEventID)
s.Equal(int64(7), executionBuilder.GetExecutionInfo().LastProcessedEvent)
s.Equal(persistence.WorkflowStateRunning, executionBuilder.GetExecutionInfo().State)
s.False(executionBuilder.HasPendingDecisionTask())
s.False(executionBuilder.HasBufferedEvents())
}

func (s *engineSuite) TestSignalWorkflowExecution() {
signalRequest := &history.SignalWorkflowExecutionRequest{}
err := s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
Expand Down Expand Up @@ -4924,7 +4999,7 @@ func createMutableState(ms mutableState) *persistence.WorkflowMutableState {
if len(builder.bufferedEvents) > 0 {
bufferedEvents = append(bufferedEvents, builder.bufferedEvents...)
}
if builder.updateBufferedEvents != nil {
if len(builder.updateBufferedEvents) > 0 {
bufferedEvents = append(bufferedEvents, builder.updateBufferedEvents...)
}
var replicationState *persistence.ReplicationState
Expand Down
61 changes: 52 additions & 9 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,6 @@ func (e *mutableStateBuilder) FlushBufferedEvents() error {

// no decision in-flight, flush all buffered events to committed bucket
if !e.HasInFlightDecisionTask() {

// flush persisted buffered events
reorderFunc(e.bufferedEvents)

Expand Down Expand Up @@ -513,6 +512,37 @@ func (e *mutableStateBuilder) CreateReplicationTask(newRunEventStoreVersion int3
return t
}

func (e *mutableStateBuilder) checkAndClearTimerFiredEvent(timerID string) *workflow.HistoryEvent {
var timerEvent *workflow.HistoryEvent
e.bufferedEvents, timerEvent = checkAndClearTimerFiredEvent(e.bufferedEvents, timerID)
if timerEvent != nil {
return timerEvent
}
e.updateBufferedEvents, timerEvent = checkAndClearTimerFiredEvent(e.updateBufferedEvents, timerID)
if timerEvent != nil {
return timerEvent
}
e.hBuilder.history, timerEvent = checkAndClearTimerFiredEvent(e.hBuilder.history, timerID)
return timerEvent
}

func checkAndClearTimerFiredEvent(events []*workflow.HistoryEvent, timerID string) ([]*workflow.HistoryEvent, *workflow.HistoryEvent) {
// go over all history events. if we find a timer fired event for the given
// timerID, clear it
timerFiredIdx := -1
for idx, event := range events {
if event.GetEventType() == workflow.EventTypeTimerFired &&
event.GetTimerFiredEventAttributes().GetTimerId() == timerID {
timerFiredIdx = idx
break
}
}
if timerFiredIdx == -1 {
return events, nil
}
return append(events[:timerFiredIdx], events[timerFiredIdx+1:]...), events[timerFiredIdx]
}

func convertUpdateActivityInfos(inputs map[*persistence.ActivityInfo]struct{}) []*persistence.ActivityInfo {
outputs := []*persistence.ActivityInfo{}
for item := range inputs {
Expand Down Expand Up @@ -1140,7 +1170,7 @@ func (e *mutableStateBuilder) GetInFlightDecisionTask() (*decisionInfo, bool) {
}

func (e *mutableStateBuilder) HasBufferedEvents() bool {
if len(e.bufferedEvents) > 0 || e.updateBufferedEvents != nil {
if len(e.bufferedEvents) > 0 || len(e.updateBufferedEvents) > 0 {
return true
}

Expand Down Expand Up @@ -2270,18 +2300,31 @@ func (e *mutableStateBuilder) ReplicateTimerFiredEvent(event *workflow.HistoryEv
e.DeleteUserTimer(timerID)
}

func (e *mutableStateBuilder) AddTimerCanceledEvent(decisionCompletedEventID int64,
attributes *workflow.CancelTimerDecisionAttributes, identity string) *workflow.HistoryEvent {
timerID := *attributes.TimerId
func (e *mutableStateBuilder) AddTimerCanceledEvent(
decisionCompletedEventID int64,
attributes *workflow.CancelTimerDecisionAttributes,
identity string,
) *workflow.HistoryEvent {
var timerStartedID int64
timerID := attributes.GetTimerId()
isTimerRunning, ti := e.GetUserTimer(timerID)
if !isTimerRunning {
logging.LogInvalidHistoryActionEvent(e.logger, logging.TagValueActionTimerCanceled, e.GetNextEventID(), fmt.Sprintf(
"{IsTimerRunning: %v, timerID: %v}", isTimerRunning, timerID))
return nil
// if timer is not running then check if it has fired in the mutable state.
// If so clear the timer from the mutable state. We need to check both the
// bufferedEvents and the history builder
timerFiredEvent := e.checkAndClearTimerFiredEvent(timerID)
if timerFiredEvent == nil {
logging.LogInvalidHistoryActionEvent(e.logger, logging.TagValueActionTimerCanceled, e.GetNextEventID(), fmt.Sprintf(
"{IsTimerRunning: %v, timerID: %v}", isTimerRunning, timerID))
return nil
}
timerStartedID = timerFiredEvent.TimerFiredEventAttributes.GetStartedEventId()
} else {
timerStartedID = ti.StartedID
}

// Timer is running.
event := e.hBuilder.AddTimerCanceledEvent(ti.StartedID, decisionCompletedEventID, timerID, identity)
event := e.hBuilder.AddTimerCanceledEvent(timerStartedID, decisionCompletedEventID, timerID, identity)
e.ReplicateTimerCanceledEvent(event)

return event
Expand Down

0 comments on commit b86d5cb

Please sign in to comment.