Skip to content

Commit

Permalink
Fix missing cleanup in terminate workflow becuase of size limit (cad…
Browse files Browse the repository at this point in the history
…ence-workflow#1634)

* Fix typos

* Fix typos

* Fix missing cleanup in terminate workflow becuase of size limit

* Address comment and refactor
  • Loading branch information
vancexu authored Apr 3, 2019
1 parent 9878a8f commit c75ff5f
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 14 deletions.
38 changes: 25 additions & 13 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ import (
)

const (
conditionalRetryCount = 5
activityCancelationMsgActivityIDUnknown = "ACTIVITY_ID_UNKNOWN"
activityCancelationMsgActivityNotStarted = "ACTIVITY_ID_NOT_STARTED"
timerCancelationMsgTimerIDUnknown = "TIMER_ID_UNKNOWN"
conditionalRetryCount = 5
activityCancellationMsgActivityIDUnknown = "ACTIVITY_ID_UNKNOWN"
activityCancellationMsgActivityNotStarted = "ACTIVITY_ID_NOT_STARTED"
timerCancellationMsgTimerIDUnknown = "TIMER_ID_UNKNOWN"
)

type (
Expand Down Expand Up @@ -86,7 +86,7 @@ type (
currentClusterName string
ShardContext
txProcessor transferQueueProcessor
replcatorProcessor queueProcessor
replicatorProcessor queueProcessor
historyEventNotifier historyEventNotifier
}
)
Expand Down Expand Up @@ -188,7 +188,7 @@ func NewEngineWithShardContext(
if publisher != nil {
replicatorProcessor := newReplicatorQueueProcessor(shard, historyEngImpl.historyCache, publisher, executionManager, historyManager, historyV2Manager, logger)
historyEngImpl.replicatorProcessor = replicatorProcessor
shardWrapper.replcatorProcessor = replicatorProcessor
shardWrapper.replicatorProcessor = replicatorProcessor
historyEngImpl.replicator = newHistoryReplicator(shard, historyEngImpl, historyCache, shard.GetDomainCache(), historyManager, historyV2Manager,
logger)
}
Expand Down Expand Up @@ -1523,15 +1523,15 @@ Update_History_Loop:
common.StringDefault(request.Identity))
if !isRunning {
msBuilder.AddRequestCancelActivityTaskFailedEvent(completedID, activityID,
activityCancelationMsgActivityIDUnknown)
activityCancellationMsgActivityIDUnknown)
continue Process_Decision_Loop
}

if ai.StartedID == common.EmptyEventID {
// We haven't started the activity yet, we can cancel the activity right away and
// schedule a decision task to ensure the workflow makes progress.
msBuilder.AddActivityTaskCanceledEvent(ai.ScheduleID, ai.StartedID, *actCancelReqEvent.EventId,
[]byte(activityCancelationMsgActivityNotStarted), common.StringDefault(request.Identity))
[]byte(activityCancellationMsgActivityNotStarted), common.StringDefault(request.Identity))
activityNotStartedCancelled = true
}

Expand Down Expand Up @@ -2792,10 +2792,22 @@ func getWorkflowHistoryCleanupTasksFromShard(
} else {
retentionInDays = domainEntry.GetRetentionDays(workflowID)
}
deleteTask := tBuilder.createDeleteHistoryEventTimerTask(time.Duration(retentionInDays) * time.Hour * 24)
deleteTask := createDeleteHistoryEventTimerTask(tBuilder, retentionInDays)
return &persistence.CloseExecutionTask{}, deleteTask, nil
}

func createDeleteHistoryEventTimerTask(tBuilder *timerBuilder, retentionInDays int32) *persistence.DeleteHistoryEventTask {
retention := time.Duration(retentionInDays) * time.Hour * 24
if tBuilder != nil {
return tBuilder.createDeleteHistoryEventTimerTask(retention)
} else {
expiryTime := clock.NewRealTimeSource().Now().Add(retention)
return &persistence.DeleteHistoryEventTask{
VisibilityTimestamp: expiryTime,
}
}
}

func (e *historyEngineImpl) createRecordDecisionTaskStartedResponse(domainID string, msBuilder mutableState,
di *decisionInfo, identity string) *h.RecordDecisionTaskStartedResponse {
response := &h.RecordDecisionTaskStartedResponse{}
Expand Down Expand Up @@ -2866,19 +2878,19 @@ func (e *historyEngineImpl) failDecision(context workflowExecutionContext, sched
}

func (e *historyEngineImpl) getTimerBuilder(we *workflow.WorkflowExecution) *timerBuilder {
lg := e.logger.WithFields(bark.Fields{
log := e.logger.WithFields(bark.Fields{
logging.TagWorkflowExecutionID: we.WorkflowId,
logging.TagWorkflowRunID: we.RunId,
})
return newTimerBuilder(e.shard.GetConfig(), lg, clock.NewRealTimeSource())
return newTimerBuilder(e.shard.GetConfig(), log, clock.NewRealTimeSource())
}

func (s *shardContextWrapper) UpdateWorkflowExecution(request *persistence.UpdateWorkflowExecutionRequest) (*persistence.UpdateWorkflowExecutionResponse, error) {
resp, err := s.ShardContext.UpdateWorkflowExecution(request)
if err == nil {
s.txProcessor.NotifyNewTask(s.currentClusterName, request.TransferTasks)
if len(request.ReplicationTasks) > 0 {
s.replcatorProcessor.notifyNewTask()
s.replicatorProcessor.notifyNewTask()
}
}
return resp, err
Expand All @@ -2890,7 +2902,7 @@ func (s *shardContextWrapper) CreateWorkflowExecution(request *persistence.Creat
if err == nil {
s.txProcessor.NotifyNewTask(s.currentClusterName, request.TransferTasks)
if len(request.ReplicationTasks) > 0 {
s.replcatorProcessor.notifyNewTask()
s.replicatorProcessor.notifyNewTask()
}
}
return resp, err
Expand Down
2 changes: 1 addition & 1 deletion service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2286,7 +2286,7 @@ func (e *mutableStateBuilder) AddCancelTimerFailedEvent(decisionCompletedEventID
// No Operation: We couldn't cancel it probably TIMER_ID_UNKNOWN
timerID := attributes.GetTimerId()
return e.hBuilder.AddCancelTimerFailedEvent(timerID, decisionCompletedEventID,
timerCancelationMsgTimerIDUnknown, identity)
timerCancellationMsgTimerIDUnknown, identity)
}

func (e *mutableStateBuilder) AddRecordMarkerEvent(decisionCompletedEventID int64,
Expand Down
8 changes: 8 additions & 0 deletions service/history/workflowExecutionContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,14 @@ func (c *workflowExecutionContextImpl) update(transferTasks []persistence.Task,
return err
}
executionInfo.SetLastFirstEventID(firstEvent.GetEventId())

// add clean up tasks
tranT, timerT, err := getWorkflowHistoryCleanupTasksFromShard(c.shard, executionInfo.DomainID, executionInfo.WorkflowID, nil)
if err != nil {
return err
}
transferTasks = append(transferTasks, tranT)
timerTasks = append(timerTasks, timerT)
} // end of hard terminate workflow
} // end of enforce history size/count limit

Expand Down

0 comments on commit c75ff5f

Please sign in to comment.