Skip to content

Commit

Permalink
Mutiple Bugfixes (temporalio#794)
Browse files Browse the repository at this point in the history
* fix workflow timeout version check bug
* fix task event ID check bug
* fix integtest race condition
* add tag to failover processor
  • Loading branch information
wxing1292 authored May 30, 2018
1 parent 4ae0147 commit edfa972
Show file tree
Hide file tree
Showing 9 changed files with 161 additions and 146 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*.cov
*.html
.tmp/
.vscode
.vscode/
/vendor
/cadence
.DS_Store
Expand Down
1 change: 1 addition & 0 deletions common/logging/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (
TagPartition = "partition"
TagOffset = "offset"
TagScope = "scope"
TagFailover = "failover"

// workflow logging tag values
// TagWorkflowComponent Values
Expand Down
2 changes: 1 addition & 1 deletion host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1804,7 +1804,7 @@ func (s *integrationSuite) TestActivityHeartbeatTimeouts() {

for i := 0; i < activityCount; i++ {
go func() {
err = poller.pollAndProcessActivityTask(false)
err := poller.pollAndProcessActivityTask(false)
s.logger.Infof("Activity Processing Completed. Error: %v", err)
}()
}
Expand Down
77 changes: 77 additions & 0 deletions service/history/failoverCheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
package history

import (
"github.com/uber-common/bark"
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
)

Expand Down Expand Up @@ -61,3 +64,77 @@ func verifyTimerTaskVersion(shard ShardContext, domainID string, version int64,
}
return true, nil
}

// load mutable state, if mutable state's next event ID <= task ID, will attempt to refresh
// if still mutable state's next event ID <= task ID, will return nil, nil
func loadMutableStateForTransferTask(context *workflowExecutionContext, transferTask *persistence.TransferTaskInfo, metricsClient metrics.Client, logger bark.Logger) (*mutableStateBuilder, error) {
msBuilder, err := context.loadWorkflowExecution()
if err != nil {
if _, ok := err.(*workflow.EntityNotExistsError); ok {
// this could happen if this is a duplicate processing of the task, and the execution has already completed.
return nil, nil
}
return nil, err
}

// check to see if cache needs to be refreshed as we could potentially have stale workflow execution
// the exception is decision consistently fail
// there will be no event generated, thus making the decision schedule ID == next event ID
isDecisionRetry := transferTask.TaskType == persistence.TransferTaskTypeDecisionTask &&
msBuilder.executionInfo.DecisionScheduleID == transferTask.ScheduleID &&
msBuilder.executionInfo.DecisionAttempt > 0

if transferTask.ScheduleID >= msBuilder.GetNextEventID() && !isDecisionRetry {
metricsClient.IncCounter(metrics.TransferQueueProcessorScope, metrics.StaleMutableStateCounter)
logger.Debugf("Transfer Task Processor: task event ID: %v >= MS NextEventID: %v.", transferTask.ScheduleID, msBuilder.GetNextEventID())
context.clear()

msBuilder, err = context.loadWorkflowExecution()
if err != nil {
return nil, err
}
// after refresh, still mutable state's next event ID <= task ID
if transferTask.ScheduleID >= msBuilder.GetNextEventID() {
logger.Infof("Transfer Task Processor: task event ID: %v >= MS NextEventID: %v, skip.", transferTask.ScheduleID, msBuilder.GetNextEventID())
return nil, nil
}
}
return msBuilder, nil
}

// load mutable state, if mutable state's next event ID <= task ID, will attempt to refresh
// if still mutable state's next event ID <= task ID, will return nil, nil
func loadMutableStateForTimerTask(context *workflowExecutionContext, timerTask *persistence.TimerTaskInfo, metricsClient metrics.Client, logger bark.Logger) (*mutableStateBuilder, error) {
msBuilder, err := context.loadWorkflowExecution()
if err != nil {
if _, ok := err.(*workflow.EntityNotExistsError); ok {
// this could happen if this is a duplicate processing of the task, and the execution has already completed.
return nil, nil
}
return nil, err
}

// check to see if cache needs to be refreshed as we could potentially have stale workflow execution
// the exception is decision consistently fail
// there will be no event generated, thus making the decision schedule ID == next event ID
isDecisionRetry := timerTask.TaskType == persistence.TaskTypeDecisionTimeout &&
msBuilder.executionInfo.DecisionScheduleID == timerTask.EventID &&
msBuilder.executionInfo.DecisionAttempt > 0

if timerTask.EventID >= msBuilder.GetNextEventID() && !isDecisionRetry {
metricsClient.IncCounter(metrics.TimerQueueProcessorScope, metrics.StaleMutableStateCounter)
logger.Debugf("Timer Task Processor: task event ID: %v >= MS NextEventID: %v.", timerTask.EventID, msBuilder.GetNextEventID())
context.clear()

msBuilder, err = context.loadWorkflowExecution()
if err != nil {
return nil, err
}
// after refresh, still mutable state's next event ID <= task ID
if timerTask.EventID >= msBuilder.GetNextEventID() {
logger.Infof("Timer Task Processor: task event ID: %v >= MS NextEventID: %v, skip.", timerTask.EventID, msBuilder.GetNextEventID())
return nil, nil
}
}
return msBuilder, nil
}
98 changes: 33 additions & 65 deletions service/history/timerQueueActiveProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func newTimerQueueFailoverProcessor(shard ShardContext, historyService *historyE
}
logger = logger.WithFields(bark.Fields{
logging.TagWorkflowCluster: clusterName,
logging.TagFailover: "from: " + standbyClusterName,
})
timerTaskFilter := func(timer *persistence.TimerTaskInfo) (bool, error) {
if timer.DomainID == domainID {
Expand Down Expand Up @@ -223,16 +224,13 @@ func (t *timerQueueActiveProcessorImpl) processExpiredUserTimer(task *persistenc

Update_History_Loop:
for attempt := 0; attempt < conditionalRetryCount; attempt++ {
msBuilder, err1 := context.loadWorkflowExecution()
if err1 != nil {
return err1
}
tBuilder := t.historyService.getTimerBuilder(&context.workflowExecution)

if !msBuilder.isWorkflowExecutionRunning() {
// Workflow is completed.
msBuilder, err := loadMutableStateForTimerTask(context, task, t.metricsClient, t.logger)
if err != nil {
return err
} else if msBuilder == nil || !msBuilder.isWorkflowExecutionRunning() {
return nil
}
tBuilder := t.historyService.getTimerBuilder(&context.workflowExecution)

var timerTasks []persistence.Task
scheduleNewDecision := false
Expand Down Expand Up @@ -271,7 +269,7 @@ Update_History_Loop:

// We apply the update to execution using optimistic concurrency. If it fails due to a conflict than reload
// the history and try the operation again.
err := t.updateWorkflowExecution(context, msBuilder, scheduleNewDecision, false, timerTasks, nil)
err = t.updateWorkflowExecution(context, msBuilder, scheduleNewDecision, false, timerTasks, nil)
if err != nil {
if err == ErrConflict {
continue Update_History_Loop
Expand All @@ -295,30 +293,15 @@ func (t *timerQueueActiveProcessorImpl) processActivityTimeout(timerTask *persis

Update_History_Loop:
for attempt := 0; attempt < conditionalRetryCount; attempt++ {
msBuilder, err1 := context.loadWorkflowExecution()
if err1 != nil {
return err1
}
tBuilder := t.historyService.getTimerBuilder(&context.workflowExecution)

scheduleID := timerTask.EventID
// First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in
// some extreme cassandra failure cases.
if scheduleID >= msBuilder.GetNextEventID() {
t.metricsClient.IncCounter(metrics.TimerQueueProcessorScope, metrics.StaleMutableStateCounter)
t.logger.Debugf("processActivityTimeout: scheduleID mismatch. MS NextEventID: %v, scheduleID: %v",
msBuilder.GetNextEventID(), scheduleID)
// Reload workflow execution history
context.clear()
continue Update_History_Loop
}

if !msBuilder.isWorkflowExecutionRunning() {
// Workflow is completed.
msBuilder, err := loadMutableStateForTimerTask(context, timerTask, t.metricsClient, t.logger)
if err != nil {
return err
} else if msBuilder == nil || !msBuilder.isWorkflowExecutionRunning() {
return nil
}
tBuilder := t.historyService.getTimerBuilder(&context.workflowExecution)

ai, running := msBuilder.GetActivityInfo(scheduleID)
ai, running := msBuilder.GetActivityInfo(timerTask.EventID)
if running {
// If current one is HB task then we may need to create the next heartbeat timer. Clear the create flag for this
// heartbeat timer so we can create it again if needed.
Expand Down Expand Up @@ -467,25 +450,15 @@ func (t *timerQueueActiveProcessorImpl) processDecisionTimeout(task *persistence

Update_History_Loop:
for attempt := 0; attempt < conditionalRetryCount; attempt++ {
msBuilder, err1 := context.loadWorkflowExecution()
if err1 != nil {
return err1
}
if !msBuilder.isWorkflowExecutionRunning() {
msBuilder, err := loadMutableStateForTimerTask(context, task, t.metricsClient, t.logger)
if err != nil {
return err
} else if msBuilder == nil || !msBuilder.isWorkflowExecutionRunning() {
return nil
}

scheduleID := task.EventID
di, found := msBuilder.GetPendingDecision(scheduleID)

// First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in
// some extreme cassandra failure cases.
if !found && scheduleID >= msBuilder.GetNextEventID() {
t.metricsClient.IncCounter(metrics.TimerQueueProcessorScope, metrics.StaleMutableStateCounter)
// Reload workflow execution history
context.clear()
continue Update_History_Loop
}
if !found {
logging.LogDuplicateTransferTaskEvent(t.logger, persistence.TaskTypeDecisionTimeout, task.TaskID, scheduleID)
return nil
Expand Down Expand Up @@ -551,16 +524,10 @@ func (t *timerQueueActiveProcessorImpl) processRetryTimer(task *persistence.Time
if err0 != nil {
return err0
}
msBuilder, err1 := context.loadWorkflowExecution()
if err1 != nil {
if _, ok := err1.(*workflow.EntityNotExistsError); ok {
// this could happen if this is a duplicate processing of the task, and the execution has already completed.
return nil
}
return err1
}

if !msBuilder.isWorkflowExecutionRunning() {
msBuilder, err := loadMutableStateForTimerTask(context, task, t.metricsClient, t.logger)
if err != nil {
return err
} else if msBuilder == nil || !msBuilder.isWorkflowExecutionRunning() {
return nil
}

Expand Down Expand Up @@ -634,22 +601,23 @@ func (t *timerQueueActiveProcessorImpl) processWorkflowTimeout(task *persistence

Update_History_Loop:
for attempt := 0; attempt < conditionalRetryCount; attempt++ {
msBuilder, err1 := context.loadWorkflowExecution()
if err1 != nil {
return err1
}

if !msBuilder.isWorkflowExecutionRunning() {
return nil
}

ok, err := verifyTimerTaskVersion(t.shard, task.DomainID, msBuilder.GetStartVersion(), task)
msBuilder, err := loadMutableStateForTimerTask(context, task, t.metricsClient, t.logger)
if err != nil {
return err
} else if !ok {
} else if msBuilder == nil || !msBuilder.isWorkflowExecutionRunning() {
return nil
}

// do version check for global domain task
if msBuilder.replicationState != nil {
ok, err := verifyTimerTaskVersion(t.shard, task.DomainID, msBuilder.replicationState.StartVersion, task)
if err != nil {
return err
} else if !ok {
return nil
}
}

if e := msBuilder.AddTimeoutWorkflowEvent(); e == nil {
// If we failed to add the event that means the workflow is already completed.
// we drop this timeout event.
Expand Down
4 changes: 3 additions & 1 deletion service/history/timerQueueProcessorBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,11 @@ func (t *timerQueueProcessorBase) processDeleteHistoryEvent(task *persistence.Ti
}
defer func() { release(retError) }()

msBuilder, err := context.loadWorkflowExecution()
msBuilder, err := loadMutableStateForTimerTask(context, task, t.metricsClient, t.logger)
if err != nil {
return err
} else if msBuilder == nil {
return nil
}
ok, err := verifyTimerTaskVersion(t.shard, task.DomainID, msBuilder.GetCurrentVersion(), task)
if err != nil {
Expand Down
33 changes: 11 additions & 22 deletions service/history/timerQueueStandbyProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,29 +320,18 @@ func (t *timerQueueStandbyProcessorImpl) processTimer(timerTask *persistence.Tim
}
}()

Process_Loop:
for attempt := 0; attempt < conditionalRetryCount; attempt++ {
msBuilder, err := context.loadWorkflowExecution()
if err != nil {
return err
}
msBuilder, err := loadMutableStateForTimerTask(context, timerTask, t.metricsClient, t.logger)
if err != nil {
return err
} else if msBuilder == nil {
return nil
}

// First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in
// some extreme cassandra failure cases.
if timerTask.EventID >= msBuilder.GetNextEventID() {
t.metricsClient.IncCounter(metrics.TimerQueueProcessorScope, metrics.StaleMutableStateCounter)
t.logger.Debugf("processExpiredUserTimer: timer event ID: %v >= MS NextEventID: %v.", timerTask.EventID, msBuilder.GetNextEventID())
// Reload workflow execution history
context.clear()
continue Process_Loop
}
if !msBuilder.isWorkflowExecutionRunning() {
// workflow already finished, no need to process the timer
return nil
}

if !msBuilder.isWorkflowExecutionRunning() {
// workflow already finished, no need to process the timer
return nil
}
return fn(msBuilder)

return fn(msBuilder)
}
return ErrMaxAttemptsExceeded
}
Loading

0 comments on commit edfa972

Please sign in to comment.