Skip to content

Commit

Permalink
Add unit tests for ConflictResolveWorkflowExecution method in executi…
Browse files Browse the repository at this point in the history
…n/context.go (cadence-workflow#5874)
  • Loading branch information
Shaddoll authored Apr 5, 2024
1 parent f414775 commit 0d9c014
Show file tree
Hide file tree
Showing 2 changed files with 646 additions and 41 deletions.
59 changes: 18 additions & 41 deletions service/history/execution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ type (
emitWorkflowCompletionStatsFn func(string, string, string, string, string, *types.HistoryEvent)
mergeContinueAsNewReplicationTasksFn func(persistence.UpdateWorkflowMode, *persistence.WorkflowMutation, *persistence.WorkflowSnapshot) error
updateWorkflowExecutionEventReapplyFn func(persistence.UpdateWorkflowMode, []*persistence.WorkflowEvents, []*persistence.WorkflowEvents) error
conflictResolveEventReapplyFn func(persistence.ConflictResolveWorkflowMode, []*persistence.WorkflowEvents, []*persistence.WorkflowEvents) error
emitLargeWorkflowShardIDStatsFn func(int64, int64, int64, int64)
}
)
Expand Down Expand Up @@ -243,6 +244,7 @@ func NewContext(
ctx.persistStartWorkflowBatchEventsFn = ctx.PersistStartWorkflowBatchEvents
ctx.persistNonStartWorkflowBatchEventsFn = ctx.PersistNonStartWorkflowBatchEvents
ctx.updateWorkflowExecutionEventReapplyFn = ctx.updateWorkflowExecutionEventReapply
ctx.conflictResolveEventReapplyFn = ctx.conflictResolveEventReapply
ctx.emitLargeWorkflowShardIDStatsFn = ctx.emitLargeWorkflowShardIDStats
return ctx
}
Expand Down Expand Up @@ -466,25 +468,21 @@ func (c *contextImpl) ConflictResolveWorkflowExecution(
currentMutableState MutableState,
currentTransactionPolicy *TransactionPolicy,
) (retError error) {

defer func() {
if retError != nil {
c.Clear()
}
}()

resetWorkflow, resetWorkflowEventsSeq, err := resetMutableState.CloseTransactionAsSnapshot(
now,
TransactionPolicyPassive,
)
resetWorkflow, resetWorkflowEventsSeq, err := resetMutableState.CloseTransactionAsSnapshot(now, TransactionPolicyPassive)
if err != nil {
return err
}

var persistedBlobs events.PersistedBlobs
resetHistorySize := c.GetHistorySize()
for _, workflowEvents := range resetWorkflowEventsSeq {
blob, err := c.PersistNonStartWorkflowBatchEvents(ctx, workflowEvents)
blob, err := c.persistNonStartWorkflowBatchEventsFn(ctx, workflowEvents)
if err != nil {
return err
}
Expand All @@ -499,23 +497,19 @@ func (c *contextImpl) ConflictResolveWorkflowExecution(
var newWorkflow *persistence.WorkflowSnapshot
var newWorkflowEventsSeq []*persistence.WorkflowEvents
if newContext != nil && newMutableState != nil {

defer func() {
if retError != nil {
newContext.Clear()
}
}()

newWorkflow, newWorkflowEventsSeq, err = newMutableState.CloseTransactionAsSnapshot(
now,
TransactionPolicyPassive,
)
newWorkflow, newWorkflowEventsSeq, err = newMutableState.CloseTransactionAsSnapshot(now, TransactionPolicyPassive)
if err != nil {
return err
}
newWorkflowSizeSize := newContext.GetHistorySize()
startEvents := newWorkflowEventsSeq[0]
blob, err := c.PersistStartWorkflowBatchEvents(ctx, startEvents)
blob, err := c.persistStartWorkflowBatchEventsFn(ctx, startEvents)
if err != nil {
return err
}
Expand All @@ -530,23 +524,19 @@ func (c *contextImpl) ConflictResolveWorkflowExecution(
var currentWorkflow *persistence.WorkflowMutation
var currentWorkflowEventsSeq []*persistence.WorkflowEvents
if currentContext != nil && currentMutableState != nil && currentTransactionPolicy != nil {

defer func() {
if retError != nil {
currentContext.Clear()
}
}()

currentWorkflow, currentWorkflowEventsSeq, err = currentMutableState.CloseTransactionAsMutation(
now,
*currentTransactionPolicy,
)
currentWorkflow, currentWorkflowEventsSeq, err = currentMutableState.CloseTransactionAsMutation(now, *currentTransactionPolicy)
if err != nil {
return err
}
currentWorkflowSize := currentContext.GetHistorySize()
for _, workflowEvents := range currentWorkflowEventsSeq {
blob, err := c.PersistNonStartWorkflowBatchEvents(ctx, workflowEvents)
blob, err := c.persistNonStartWorkflowBatchEventsFn(ctx, workflowEvents)
if err != nil {
return err
}
Expand All @@ -559,7 +549,7 @@ func (c *contextImpl) ConflictResolveWorkflowExecution(
}
}

if err := c.conflictResolveEventReapply(
if err := c.conflictResolveEventReapplyFn(
conflictResolveMode,
resetWorkflowEventsSeq,
newWorkflowEventsSeq,
Expand All @@ -582,9 +572,9 @@ func (c *contextImpl) ConflictResolveWorkflowExecution(
})
if err != nil {
if isOperationPossiblySuccessfulError(err) {
notifyTasksFromWorkflowSnapshot(c.shard.GetEngine(), resetWorkflow, persistedBlobs, true)
notifyTasksFromWorkflowSnapshot(c.shard.GetEngine(), newWorkflow, persistedBlobs, true)
notifyTasksFromWorkflowMutation(c.shard.GetEngine(), currentWorkflow, persistedBlobs, true)
c.notifyTasksFromWorkflowSnapshotFn(resetWorkflow, persistedBlobs, true)
c.notifyTasksFromWorkflowSnapshotFn(newWorkflow, persistedBlobs, true)
c.notifyTasksFromWorkflowMutationFn(currentWorkflow, persistedBlobs, true)
}
return err
}
Expand All @@ -607,34 +597,21 @@ func (c *contextImpl) ConflictResolveWorkflowExecution(
workflowCloseState,
))

notifyTasksFromWorkflowSnapshot(c.shard.GetEngine(), resetWorkflow, persistedBlobs, false)
notifyTasksFromWorkflowSnapshot(c.shard.GetEngine(), newWorkflow, persistedBlobs, false)
notifyTasksFromWorkflowMutation(c.shard.GetEngine(), currentWorkflow, persistedBlobs, false)
c.notifyTasksFromWorkflowSnapshotFn(resetWorkflow, persistedBlobs, false)
c.notifyTasksFromWorkflowSnapshotFn(newWorkflow, persistedBlobs, false)
c.notifyTasksFromWorkflowMutationFn(currentWorkflow, persistedBlobs, false)

// finally emit session stats
domainName := c.GetDomainName()
emitWorkflowHistoryStats(
c.metricsClient,
domainName,
int(c.stats.HistorySize),
int(resetMutableState.GetNextEventID()-1),
)
emitSessionUpdateStats(
c.metricsClient,
domainName,
resp.MutableStateUpdateSessionStats,
)
c.emitWorkflowHistoryStatsFn(domain, int(c.stats.HistorySize), int(resetMutableState.GetNextEventID()-1))
c.emitSessionUpdateStatsFn(domain, resp.MutableStateUpdateSessionStats)
// emit workflow completion stats if any
if resetWorkflow.ExecutionInfo.State == persistence.WorkflowStateCompleted {
if event, err := resetMutableState.GetCompletionEvent(ctx); err == nil {
workflowType := resetWorkflow.ExecutionInfo.WorkflowTypeName
taskList := resetWorkflow.ExecutionInfo.TaskList
emitWorkflowCompletionStats(c.metricsClient, c.logger,
domainName, workflowType, c.workflowExecution.GetWorkflowID(), c.workflowExecution.GetRunID(),
taskList, event)
c.emitWorkflowCompletionStatsFn(domain, workflowType, c.workflowExecution.GetWorkflowID(), c.workflowExecution.GetRunID(), taskList, event)
}
}

return nil
}

Expand Down
Loading

0 comments on commit 0d9c014

Please sign in to comment.