Skip to content

Commit

Permalink
Modify state builder to use standardlized task generation logic (cade…
Browse files Browse the repository at this point in the history
…nce-workflow#2721)

* Rewrite state builder
* Rewrite UT
  • Loading branch information
wxing1292 authored Oct 27, 2019
1 parent ac90693 commit a1b996d
Show file tree
Hide file tree
Showing 10 changed files with 1,185 additions and 1,739 deletions.
11 changes: 9 additions & 2 deletions service/history/conflictResolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,17 @@ func (r *conflictResolverImpl) reset(
domainEntry,
)

sBuilder = newStateBuilder(r.shard, resetMutableStateBuilder, r.logger)
sBuilder = newStateBuilder(
r.shard,
r.logger,
resetMutableStateBuilder,
func(mutableState mutableState) mutableStateTaskGenerator {
return newMutableStateTaskGenerator(r.shard.GetDomainCache(), r.logger, mutableState)
},
)
}

_, _, _, err = sBuilder.applyEvents(domainID, requestID, execution, history, nil, false)
_, err = sBuilder.applyEvents(domainID, requestID, execution, history, nil, false)
if err != nil {
r.logError("Conflict resolution err applying events.", err)
return nil, err
Expand Down
12 changes: 10 additions & 2 deletions service/history/historyReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,14 @@ func newHistoryReplicator(
return newConflictResolver(shard, context, historyV2Mgr, logger)
},
getNewStateBuilder: func(msBuilder mutableState, logger log.Logger) stateBuilder {
return newStateBuilder(shard, msBuilder, logger)
return newStateBuilder(
shard,
logger,
msBuilder,
func(mutableState mutableState) mutableStateTaskGenerator {
return newMutableStateTaskGenerator(shard.GetDomainCache(), logger, mutableState)
},
)
},
getNewMutableState: func(domainEntry *cache.DomainCacheEntry, logger log.Logger) mutableState {
return newMutableStateBuilderWithReplicationState(
Expand Down Expand Up @@ -565,6 +572,7 @@ func (r *historyReplicator) ApplyReplicationTask(
if len(request.History.Events) == 0 {
return nil
}
lastEvent := request.History.Events[len(request.History.Events)-1]

execution := *request.WorkflowExecution

Expand All @@ -576,7 +584,7 @@ func (r *historyReplicator) ApplyReplicationTask(
}

// directly use stateBuilder to apply events for other events(including continueAsNew)
lastEvent, _, newMutableState, err := sBuilder.applyEvents(
newMutableState, err := sBuilder.applyEvents(
domainID, requestID, execution, request.History.Events, newRunHistory, request.GetNewRunNDC(),
)
if err != nil {
Expand Down
12 changes: 7 additions & 5 deletions service/history/mutableStateTaskGenerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,12 +393,13 @@ func (r *mutableStateTaskGeneratorImpl) generateRequestCancelExternalTasks(

attr := event.RequestCancelExternalWorkflowExecutionInitiatedEventAttributes
scheduleID := event.GetEventId()
version := event.GetVersion()
targetDomainName := attr.GetDomain()
targetWorkflowID := attr.GetWorkflowExecution().GetWorkflowId()
targetRunID := attr.GetWorkflowExecution().GetRunId()
targetChildOnly := attr.GetChildWorkflowOnly()

requestCancelExternalInfo, ok := r.mutableState.GetRequestCancelInfo(scheduleID)
_, ok := r.mutableState.GetRequestCancelInfo(scheduleID)
if !ok {
return &shared.InternalServiceError{
Message: fmt.Sprintf("it could be a bug, cannot get pending request cancel external workflow: %v", scheduleID),
Expand All @@ -418,7 +419,7 @@ func (r *mutableStateTaskGeneratorImpl) generateRequestCancelExternalTasks(
TargetRunID: targetRunID,
TargetChildWorkflowOnly: targetChildOnly,
InitiatedID: scheduleID,
Version: requestCancelExternalInfo.Version,
Version: version,
})

return nil
Expand All @@ -431,12 +432,13 @@ func (r *mutableStateTaskGeneratorImpl) generateSignalExternalTasks(

attr := event.SignalExternalWorkflowExecutionInitiatedEventAttributes
scheduleID := event.GetEventId()
version := event.GetVersion()
targetDomainName := attr.GetDomain()
targetWorkflowID := attr.GetWorkflowExecution().GetWorkflowId()
targetRunID := attr.GetWorkflowExecution().GetRunId()
targetChildOnly := attr.GetChildWorkflowOnly()

signalExternalInfo, ok := r.mutableState.GetSignalInfo(scheduleID)
_, ok := r.mutableState.GetSignalInfo(scheduleID)
if !ok {
return &shared.InternalServiceError{
Message: fmt.Sprintf("it could be a bug, cannot get pending signal external workflow: %v", scheduleID),
Expand All @@ -456,7 +458,7 @@ func (r *mutableStateTaskGeneratorImpl) generateSignalExternalTasks(
TargetRunID: targetRunID,
TargetChildWorkflowOnly: targetChildOnly,
InitiatedID: scheduleID,
Version: signalExternalInfo.Version,
Version: version,
})

return nil
Expand All @@ -471,7 +473,7 @@ func (r *mutableStateTaskGeneratorImpl) generateWorkflowSearchAttrTasks(
r.mutableState.AddTransferTasks(&persistence.UpsertWorkflowSearchAttributesTask{
// TaskID is set by shard
VisibilityTimestamp: now,
Version: currentVersion,
Version: currentVersion, // task processing does not check this version
})

return nil
Expand Down
20 changes: 14 additions & 6 deletions service/history/nDCHistoryReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var (

type (
stateBuilderProvider func(
msBuilder mutableState,
mutableState mutableState,
logger log.Logger) stateBuilder

mutableStateProvider func(
Expand Down Expand Up @@ -149,10 +149,18 @@ func newNDCHistoryReplicator(
return newNDCWorkflowResetter(shard, transactionMgr, domainID, workflowID, baseRunID, newContext, newRunID, logger)
},
newStateBuilder: func(
msBuilder mutableState,
state mutableState,
logger log.Logger,
) stateBuilder {
return newStateBuilder(shard, msBuilder, logger)

return newStateBuilder(
shard,
logger,
state,
func(mutableState mutableState) mutableStateTaskGenerator {
return newMutableStateTaskGenerator(shard.GetDomainCache(), logger, mutableState)
},
)
},
newMutableState: func(
domainEntry *cache.DomainCacheEntry,
Expand Down Expand Up @@ -274,7 +282,7 @@ func (r *nDCHistoryReplicatorImpl) applyStartEvents(
stateBuilder := r.newStateBuilder(mutableState, task.getLogger())

// use state builder for workflow mutable state mutation
_, _, _, err = stateBuilder.applyEvents(
_, err = stateBuilder.applyEvents(
task.getDomainID(),
requestID,
*task.getExecution(),
Expand Down Expand Up @@ -354,7 +362,7 @@ func (r *nDCHistoryReplicatorImpl) applyNonStartEventsToCurrentBranch(

requestID := uuid.New() // requestID used for start workflow execution request. This is not on the history event.
stateBuilder := r.newStateBuilder(mutableState, task.getLogger())
_, _, newMutableState, err := stateBuilder.applyEvents(
newMutableState, err := stateBuilder.applyEvents(
task.getDomainID(),
requestID,
*task.getExecution(),
Expand Down Expand Up @@ -523,7 +531,7 @@ func (r *nDCHistoryReplicatorImpl) applyNonStartEventsResetWorkflow(

requestID := uuid.New() // requestID used for start workflow execution request. This is not on the history event.
stateBuilder := r.newStateBuilder(mutableState, task.getLogger())
_, _, _, err := stateBuilder.applyEvents(
_, err := stateBuilder.applyEvents(
task.getDomainID(),
requestID,
*task.getExecution(),
Expand Down
11 changes: 9 additions & 2 deletions service/history/nDCStateRebuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,14 @@ func (r *nDCStateRebuilderImpl) initializeBuilders(
r.logger,
domainEntry,
)
stateBuilder := newStateBuilder(r.shard, resetMutableStateBuilder, r.logger)
stateBuilder := newStateBuilder(
r.shard,
r.logger,
resetMutableStateBuilder,
func(mutableState mutableState) mutableStateTaskGenerator {
return newMutableStateTaskGenerator(r.shard.GetDomainCache(), r.logger, mutableState)
},
)
return resetMutableStateBuilder, stateBuilder
}

Expand All @@ -196,7 +203,7 @@ func (r *nDCStateRebuilderImpl) applyEvents(
requestID string,
) error {

_, _, _, err := stateBuilder.applyEvents(
_, err := stateBuilder.applyEvents(
workflowIdentifier.DomainID,
requestID,
shared.WorkflowExecution{
Expand Down
2 changes: 1 addition & 1 deletion service/history/nDCStateRebuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (s *nDCStateRebuilderSuite) TestApplyEvents() {
events,
[]*shared.HistoryEvent(nil),
true,
).Return(nil, nil, nil, nil).Times(1)
).Return(nil, nil).Times(1)

err := s.nDCStateRebuilder.applyEvents(workflowIdentifier, mockStateBuilder, events, requestID)
s.NoError(err)
Expand Down
Loading

0 comments on commit a1b996d

Please sign in to comment.