Skip to content

Commit

Permalink
Refactor mutablestate builder part 1 (cadence-workflow#3238)
Browse files Browse the repository at this point in the history
  • Loading branch information
vancexu authored May 6, 2020
1 parent 3deebda commit cfb3693
Show file tree
Hide file tree
Showing 18 changed files with 991 additions and 269 deletions.
1 change: 0 additions & 1 deletion service/history/conflictResolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ func (r *conflictResolverImpl) reset(
if firstEvent.GetEventId() == common.FirstEventID {
resetMutableStateBuilder = execution.NewMutableStateBuilderWithReplicationState(
r.shard,
r.shard.GetEventsCache(),
r.logger,
domainEntry,
)
Expand Down
2 changes: 0 additions & 2 deletions service/history/execution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ func (c *contextImpl) LoadWorkflowExecutionForReplication(

c.mutableState = NewMutableStateBuilder(
c.shard,
c.shard.GetEventsCache(),
c.logger,
domainEntry,
)
Expand Down Expand Up @@ -344,7 +343,6 @@ func (c *contextImpl) LoadWorkflowExecution() (MutableState, error) {

c.mutableState = NewMutableStateBuilder(
c.shard,
c.shard.GetEventsCache(),
c.logger,
domainEntry,
)
Expand Down
3 changes: 1 addition & 2 deletions service/history/execution/history_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ func (s *historyBuilderSuite) SetupTest() {
s.mockDomainCache.EXPECT().GetDomainByID(gomock.Any()).Return(s.domainEntry, nil).AnyTimes()
s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()

s.msBuilder = NewMutableStateBuilder(s.mockShard, s.mockEventsCache,
s.logger, testLocalDomainEntry)
s.msBuilder = NewMutableStateBuilder(s.mockShard, s.logger, testLocalDomainEntry)
s.builder = NewHistoryBuilder(s.msBuilder, s.logger)
}

Expand Down
22 changes: 7 additions & 15 deletions service/history/execution/mutable_state_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,14 @@ var _ MutableState = (*mutableStateBuilder)(nil)
// NewMutableStateBuilder creates a new workflow mutable state builder
func NewMutableStateBuilder(
shard shard.Context,
eventsCache events.Cache,
logger log.Logger,
domainEntry *cache.DomainCacheEntry,
) MutableState {
return newMutableStateBuilder(shard, eventsCache, logger, domainEntry)
return newMutableStateBuilder(shard, logger, domainEntry)
}

func newMutableStateBuilder(
shard shard.Context,
eventsCache events.Cache,
logger log.Logger,
domainEntry *cache.DomainCacheEntry,
) *mutableStateBuilder {
Expand Down Expand Up @@ -214,7 +212,7 @@ func newMutableStateBuilder(

shard: shard,
clusterMetadata: shard.GetClusterMetadata(),
eventsCache: eventsCache,
eventsCache: shard.GetEventsCache(),
config: shard.GetConfig(),
timeSource: shard.GetTimeSource(),
logger: logger,
Expand Down Expand Up @@ -242,11 +240,10 @@ func newMutableStateBuilder(
// NewMutableStateBuilderWithReplicationState creates mutable state builder with replication state initialized
func NewMutableStateBuilderWithReplicationState(
shard shard.Context,
eventsCache events.Cache,
logger log.Logger,
domainEntry *cache.DomainCacheEntry,
) MutableState {
s := newMutableStateBuilder(shard, eventsCache, logger, domainEntry)
s := newMutableStateBuilder(shard, logger, domainEntry)
s.replicationState = &persistence.ReplicationState{
StartVersion: s.currentVersion,
CurrentVersion: s.currentVersion,
Expand All @@ -260,26 +257,24 @@ func NewMutableStateBuilderWithReplicationState(
// NewMutableStateBuilderWithVersionHistories creates mutable state builder with version history initialized
func NewMutableStateBuilderWithVersionHistories(
shard shard.Context,
eventsCache events.Cache,
logger log.Logger,
domainEntry *cache.DomainCacheEntry,
) MutableState {

s := newMutableStateBuilder(shard, eventsCache, logger, domainEntry)
s := newMutableStateBuilder(shard, logger, domainEntry)
s.versionHistories = persistence.NewVersionHistories(&persistence.VersionHistory{})
return s
}

// NewMutableStateBuilderWithEventV2 is used only in test
func NewMutableStateBuilderWithEventV2(
shard shard.Context,
eventsCache events.Cache,
logger log.Logger,
runID string,
domainEntry *cache.DomainCacheEntry,
) MutableState {

msBuilder := NewMutableStateBuilder(shard, eventsCache, logger, domainEntry)
msBuilder := NewMutableStateBuilder(shard, logger, domainEntry)
_ = msBuilder.SetHistoryTree(runID)

return msBuilder
Expand All @@ -288,14 +283,13 @@ func NewMutableStateBuilderWithEventV2(
// NewMutableStateBuilderWithReplicationStateWithEventV2 is used only in test
func NewMutableStateBuilderWithReplicationStateWithEventV2(
shard shard.Context,
eventsCache events.Cache,
logger log.Logger,
version int64,
runID string,
domainEntry *cache.DomainCacheEntry,
) MutableState {

msBuilder := NewMutableStateBuilderWithReplicationState(shard, eventsCache, logger, domainEntry)
msBuilder := NewMutableStateBuilderWithReplicationState(shard, logger, domainEntry)
msBuilder.GetReplicationState().StartVersion = version
err := msBuilder.UpdateCurrentVersion(version, true)
if err != nil {
Expand Down Expand Up @@ -3360,7 +3354,6 @@ func (e *mutableStateBuilder) AddContinueAsNewEvent(
if e.config.EnableNDC(domainName) || e.GetVersionHistories() != nil {
newStateBuilder = NewMutableStateBuilderWithVersionHistories(
e.shard,
e.shard.GetEventsCache(),
e.logger,
e.domainEntry,
).(*mutableStateBuilder)
Expand All @@ -3371,12 +3364,11 @@ func (e *mutableStateBuilder) AddContinueAsNewEvent(
// target clusters or not, for 2DC case
newStateBuilder = NewMutableStateBuilderWithReplicationState(
e.shard,
e.eventsCache,
e.logger,
e.domainEntry,
).(*mutableStateBuilder)
} else {
newStateBuilder = newMutableStateBuilder(e.shard, e.eventsCache, e.logger, e.domainEntry)
newStateBuilder = newMutableStateBuilder(e.shard, e.logger, e.domainEntry)
}
}

Expand Down
7 changes: 2 additions & 5 deletions service/history/execution/mutable_state_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ func (s *mutableStateSuite) SetupTest() {
s.mockShard.GetConfig().MutableStateChecksumGenProbability = func(domain string) int { return 100 }
s.mockShard.GetConfig().MutableStateChecksumVerifyProbability = func(domain string) int { return 100 }

s.mockEventsCache = s.mockShard.MockEventsCache
s.mockEventsCache = s.mockShard.GetEventsCache().(*events.MockCache)

s.testScope = s.mockShard.Resource.MetricsScope.(tally.TestScope)
s.logger = s.mockShard.GetLogger()

s.msBuilder = newMutableStateBuilder(s.mockShard, s.mockEventsCache, s.logger, testLocalDomainEntry)
s.msBuilder = newMutableStateBuilder(s.mockShard, s.logger, testLocalDomainEntry)
}

func (s *mutableStateSuite) TearDownTest() {
Expand All @@ -109,7 +109,6 @@ func (s *mutableStateSuite) TestTransientDecisionCompletionFirstBatchReplicated_
runID := uuid.New()
s.msBuilder = NewMutableStateBuilderWithReplicationStateWithEventV2(
s.mockShard,
s.mockEventsCache,
s.logger,
version,
runID,
Expand Down Expand Up @@ -140,7 +139,6 @@ func (s *mutableStateSuite) TestTransientDecisionCompletionFirstBatchReplicated_
runID := uuid.New()
s.msBuilder = NewMutableStateBuilderWithReplicationStateWithEventV2(
s.mockShard,
s.mockEventsCache,
s.logger,
version,
runID,
Expand All @@ -160,7 +158,6 @@ func (s *mutableStateSuite) TestTransientDecisionCompletionFirstBatchReplicated_
runID := uuid.New()
s.msBuilder = NewMutableStateBuilderWithReplicationStateWithEventV2(
s.mockShard,
s.mockEventsCache,
s.logger,
version,
runID,
Expand Down
2 changes: 0 additions & 2 deletions service/history/execution/state_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,14 +611,12 @@ func (b *stateBuilderImpl) ApplyEvents(
if newRunNDC {
newRunMutableStateBuilder = NewMutableStateBuilderWithVersionHistories(
b.shard,
b.shard.GetEventsCache(),
b.logger,
b.mutableState.GetDomainEntry(),
)
} else {
newRunMutableStateBuilder = NewMutableStateBuilderWithReplicationState(
b.shard,
b.shard.GetEventsCache(),
b.logger,
b.mutableState.GetDomainEntry(),
)
Expand Down
1 change: 0 additions & 1 deletion service/history/execution/state_rebuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ func (r *stateRebuilderImpl) initializeBuilders(
) (MutableState, StateBuilder) {
resetMutableStateBuilder := NewMutableStateBuilderWithVersionHistories(
r.shard,
r.shard.GetEventsCache(),
r.logger,
domainEntry,
)
Expand Down
3 changes: 0 additions & 3 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,6 @@ func (e *historyEngineImpl) createMutableState(
// version history applies to both local and global domain
newMutableState = execution.NewMutableStateBuilderWithVersionHistories(
e.shard,
e.shard.GetEventsCache(),
e.logger,
domainEntry,
)
Expand All @@ -451,14 +450,12 @@ func (e *historyEngineImpl) createMutableState(
// no matter whether it will be replicated to multiple target clusters or not
newMutableState = execution.NewMutableStateBuilderWithReplicationState(
e.shard,
e.shard.GetEventsCache(),
e.logger,
domainEntry,
)
} else {
newMutableState = execution.NewMutableStateBuilder(
e.shard,
e.shard.GetEventsCache(),
e.logger,
domainEntry,
)
Expand Down
64 changes: 48 additions & 16 deletions service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,12 @@ func (s *engine2Suite) TestRecordDecisionTaskStartedSuccessStickyExpired() {
stickyTl := "stickyTaskList"
identity := "testIdentity"

msBuilder := execution.NewMutableStateBuilderWithEventV2(s.historyEngine.shard, s.mockEventsCache,
loggerimpl.NewDevelopmentForTest(s.Suite), we.GetRunId(), constants.TestLocalDomainEntry)
msBuilder := execution.NewMutableStateBuilderWithEventV2(
s.historyEngine.shard,
loggerimpl.NewDevelopmentForTest(s.Suite),
we.GetRunId(),
constants.TestLocalDomainEntry,
)
executionInfo := msBuilder.GetExecutionInfo()
executionInfo.StickyTaskList = stickyTl

Expand Down Expand Up @@ -236,8 +240,12 @@ func (s *engine2Suite) TestRecordDecisionTaskStartedSuccessStickyEnabled() {
stickyTl := "stickyTaskList"
identity := "testIdentity"

msBuilder := execution.NewMutableStateBuilderWithEventV2(s.historyEngine.shard, s.mockEventsCache,
loggerimpl.NewDevelopmentForTest(s.Suite), we.GetRunId(), constants.TestLocalDomainEntry)
msBuilder := execution.NewMutableStateBuilderWithEventV2(
s.historyEngine.shard,
loggerimpl.NewDevelopmentForTest(s.Suite),
we.GetRunId(),
constants.TestLocalDomainEntry,
)
executionInfo := msBuilder.GetExecutionInfo()
executionInfo.LastUpdatedTimestamp = time.Now()
executionInfo.StickyTaskList = stickyTl
Expand Down Expand Up @@ -818,8 +826,12 @@ func (s *engine2Suite) TestRequestCancelWorkflowExecutionFail() {

func (s *engine2Suite) createExecutionStartedState(we workflow.WorkflowExecution, tl, identity string,
startDecision bool) execution.MutableState {
msBuilder := execution.NewMutableStateBuilderWithEventV2(s.historyEngine.shard, s.mockEventsCache,
s.logger, we.GetRunId(), constants.TestLocalDomainEntry)
msBuilder := execution.NewMutableStateBuilderWithEventV2(
s.historyEngine.shard,
s.logger,
we.GetRunId(),
constants.TestLocalDomainEntry,
)
test.AddWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity)
di := test.AddDecisionTaskScheduledEvent(msBuilder)
if startDecision {
Expand Down Expand Up @@ -851,8 +863,12 @@ func (s *engine2Suite) TestRespondDecisionTaskCompletedRecordMarkerDecision() {
markerDetails := []byte("marker details")
markerName := "marker name"

msBuilder := execution.NewMutableStateBuilderWithEventV2(s.historyEngine.shard, s.mockEventsCache,
loggerimpl.NewDevelopmentForTest(s.Suite), we.GetRunId(), constants.TestLocalDomainEntry)
msBuilder := execution.NewMutableStateBuilderWithEventV2(
s.historyEngine.shard,
loggerimpl.NewDevelopmentForTest(s.Suite),
we.GetRunId(),
constants.TestLocalDomainEntry,
)
test.AddWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity)
di := test.AddDecisionTaskScheduledEvent(msBuilder)
test.AddDecisionTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity)
Expand Down Expand Up @@ -1169,8 +1185,12 @@ func (s *engine2Suite) TestSignalWithStartWorkflowExecution_JustSignal() {
},
}

msBuilder := execution.NewMutableStateBuilderWithEventV2(s.historyEngine.shard, s.mockEventsCache,
loggerimpl.NewDevelopmentForTest(s.Suite), runID, constants.TestLocalDomainEntry)
msBuilder := execution.NewMutableStateBuilderWithEventV2(
s.historyEngine.shard,
loggerimpl.NewDevelopmentForTest(s.Suite),
runID,
constants.TestLocalDomainEntry,
)
ms := execution.CreatePersistenceMutableState(msBuilder)
gwmsResponse := &p.GetWorkflowExecutionResponse{State: ms}
gceResponse := &p.GetCurrentExecutionResponse{RunID: runID}
Expand Down Expand Up @@ -1301,8 +1321,12 @@ func (s *engine2Suite) TestSignalWithStartWorkflowExecution_WorkflowNotRunning()
},
}

msBuilder := execution.NewMutableStateBuilderWithEventV2(s.historyEngine.shard, s.mockEventsCache,
loggerimpl.NewDevelopmentForTest(s.Suite), runID, constants.TestLocalDomainEntry)
msBuilder := execution.NewMutableStateBuilderWithEventV2(
s.historyEngine.shard,
loggerimpl.NewDevelopmentForTest(s.Suite),
runID,
constants.TestLocalDomainEntry,
)
ms := execution.CreatePersistenceMutableState(msBuilder)
ms.ExecutionInfo.State = p.WorkflowStateCompleted
gwmsResponse := &p.GetWorkflowExecutionResponse{State: ms}
Expand Down Expand Up @@ -1347,8 +1371,12 @@ func (s *engine2Suite) TestSignalWithStartWorkflowExecution_Start_DuplicateReque
},
}

msBuilder := execution.NewMutableStateBuilderWithEventV2(s.historyEngine.shard, s.mockEventsCache,
loggerimpl.NewDevelopmentForTest(s.Suite), runID, constants.TestLocalDomainEntry)
msBuilder := execution.NewMutableStateBuilderWithEventV2(
s.historyEngine.shard,
loggerimpl.NewDevelopmentForTest(s.Suite),
runID,
constants.TestLocalDomainEntry,
)
ms := execution.CreatePersistenceMutableState(msBuilder)
ms.ExecutionInfo.State = p.WorkflowStateCompleted
gwmsResponse := &p.GetWorkflowExecutionResponse{State: ms}
Expand Down Expand Up @@ -1401,8 +1429,12 @@ func (s *engine2Suite) TestSignalWithStartWorkflowExecution_Start_WorkflowAlread
},
}

msBuilder := execution.NewMutableStateBuilderWithEventV2(s.historyEngine.shard, s.mockEventsCache,
loggerimpl.NewDevelopmentForTest(s.Suite), runID, constants.TestLocalDomainEntry)
msBuilder := execution.NewMutableStateBuilderWithEventV2(
s.historyEngine.shard,
loggerimpl.NewDevelopmentForTest(s.Suite),
runID,
constants.TestLocalDomainEntry,
)
ms := execution.CreatePersistenceMutableState(msBuilder)
ms.ExecutionInfo.State = p.WorkflowStateCompleted
gwmsResponse := &p.GetWorkflowExecutionResponse{State: ms}
Expand Down
16 changes: 12 additions & 4 deletions service/history/historyEngine3_eventsv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,12 @@ func (s *engine3Suite) TestRecordDecisionTaskStartedSuccessStickyEnabled() {
stickyTl := "stickyTaskList"
identity := "testIdentity"

msBuilder := execution.NewMutableStateBuilderWithEventV2(s.historyEngine.shard, s.mockEventsCache,
loggerimpl.NewDevelopmentForTest(s.Suite), we.GetRunId(), constants.TestLocalDomainEntry)
msBuilder := execution.NewMutableStateBuilderWithEventV2(
s.historyEngine.shard,
loggerimpl.NewDevelopmentForTest(s.Suite),
we.GetRunId(),
constants.TestLocalDomainEntry,
)
executionInfo := msBuilder.GetExecutionInfo()
executionInfo.LastUpdatedTimestamp = time.Now()
executionInfo.StickyTaskList = stickyTl
Expand Down Expand Up @@ -286,8 +290,12 @@ func (s *engine3Suite) TestSignalWithStartWorkflowExecution_JustSignal() {
},
}

msBuilder := execution.NewMutableStateBuilderWithEventV2(s.historyEngine.shard, s.mockEventsCache,
loggerimpl.NewDevelopmentForTest(s.Suite), runID, constants.TestLocalDomainEntry)
msBuilder := execution.NewMutableStateBuilderWithEventV2(
s.historyEngine.shard,
loggerimpl.NewDevelopmentForTest(s.Suite),
runID,
constants.TestLocalDomainEntry,
)
ms := execution.CreatePersistenceMutableState(msBuilder)
gwmsResponse := &p.GetWorkflowExecutionResponse{State: ms}
gceResponse := &p.GetCurrentExecutionResponse{RunID: runID}
Expand Down
Loading

0 comments on commit cfb3693

Please sign in to comment.