Skip to content

Commit

Permalink
Use events v2 and thriftrw by default (cadence-workflow#1508)
Browse files Browse the repository at this point in the history
* Use events v2 and thriftrw by default
* Update tests to use events v2 and thrift encoding
  • Loading branch information
wxing1292 authored Mar 13, 2019
1 parent bda7708 commit 4c32d3e
Show file tree
Hide file tree
Showing 12 changed files with 425 additions and 356 deletions.
2 changes: 1 addition & 1 deletion service/history/conflictResolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (s *conflictResolverSuite) TestReset() {
InsertRequestCancelInfos: []*persistence.RequestCancelInfo{},
InsertSignalInfos: []*persistence.SignalInfo{},
InsertSignalRequestedIDs: []string{},
Encoding: common.EncodingType("json"),
Encoding: common.EncodingType(s.mockShard.GetConfig().EventEncodingType(domainID)),
}).Return(nil).Once()
s.mockExecutionMgr.On("GetWorkflowExecution", &persistence.GetWorkflowExecutionRequest{
DomainID: domainID,
Expand Down
92 changes: 50 additions & 42 deletions service/history/historyEngine2_test.go

Large diffs are not rendered by default.

13 changes: 6 additions & 7 deletions service/history/historyEngine3_eventsv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ package history

import (
"context"
"github.com/uber/cadence/service/worker/sysworkflow"
"os"
"testing"

"github.com/uber/cadence/service/worker/sysworkflow"

log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -183,9 +184,8 @@ func (s *engine3Suite) TestRecordDecisionTaskStartedSuccessStickyEnabled() {
stickyTl := "stickyTaskList"
identity := "testIdentity"

msBuilder := newMutableStateBuilder("test", s.historyEngine.shard, s.mockEventsCache,
bark.NewLoggerFromLogrus(log.New()))
msBuilder.SetHistoryTree(msBuilder.GetExecutionInfo().RunID)
msBuilder := newMutableStateBuilderWithEventV2("test", s.historyEngine.shard, s.mockEventsCache,
bark.NewLoggerFromLogrus(log.New()), we.GetRunId())
executionInfo := msBuilder.GetExecutionInfo()
executionInfo.StickyTaskList = stickyTl

Expand Down Expand Up @@ -322,9 +322,8 @@ func (s *engine3Suite) TestSignalWithStartWorkflowExecution_JustSignal() {
},
}

msBuilder := newMutableStateBuilder(s.mockClusterMetadata.GetCurrentClusterName(), s.historyEngine.shard, s.mockEventsCache,
bark.NewLoggerFromLogrus(log.New()))
msBuilder.SetHistoryTree(msBuilder.GetExecutionInfo().RunID)
msBuilder := newMutableStateBuilderWithEventV2(s.mockClusterMetadata.GetCurrentClusterName(), s.historyEngine.shard, s.mockEventsCache,
bark.NewLoggerFromLogrus(log.New()), runID)
ms := createMutableState(msBuilder)
gwmsResponse := &p.GetWorkflowExecutionResponse{State: ms}
gceResponse := &p.GetCurrentExecutionResponse{RunID: runID}
Expand Down
322 changes: 173 additions & 149 deletions service/history/historyEngine_test.go

Large diffs are not rendered by default.

53 changes: 40 additions & 13 deletions service/history/historyReplicator_test.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int, enableVisibilit

// history client: client/history/client.go set the client timeout 30s
LongPollExpirationInterval: dc.GetDurationPropertyFilteredByDomain(dynamicconfig.HistoryLongPollExpirationInterval, time.Second*20),
EventEncodingType: dc.GetStringPropertyFnWithDomainFilter(dynamicconfig.DefaultEventEncoding, string(common.EncodingTypeJSON)),
EnableEventsV2: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.EnableEventsV2, false),
EventEncodingType: dc.GetStringPropertyFnWithDomainFilter(dynamicconfig.DefaultEventEncoding, string(common.EncodingTypeThriftRW)),
EnableEventsV2: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.EnableEventsV2, true),

NumArchiveSystemWorkflows: dc.GetIntProperty(dynamicconfig.NumArchiveSystemWorkflows, 1000),

Expand Down
15 changes: 10 additions & 5 deletions service/history/timerQueueProcessor2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type (
mockVisibilityMgr *mocks.VisibilityManager
mockExecutionMgr *mocks.ExecutionManager
mockHistoryMgr *mocks.HistoryManager
mockHistoryV2Mgr *mocks.HistoryV2Manager
mockShard ShardContext
mockClusterMetadata *mocks.ClusterMetadata
mockProducer *mocks.KafkaProducer
Expand Down Expand Up @@ -93,6 +94,7 @@ func (s *timerQueueProcessor2Suite) SetupTest() {
s.mockExecutionMgr = &mocks.ExecutionManager{}
s.mockShardManager = &mocks.ShardManager{}
s.mockHistoryMgr = &mocks.HistoryManager{}
s.mockHistoryV2Mgr = &mocks.HistoryV2Manager{}
s.mockVisibilityMgr = &mocks.VisibilityManager{}
s.mockMetadataMgr = &mocks.MetadataManager{}
s.mockClusterMetadata = &mocks.ClusterMetadata{}
Expand Down Expand Up @@ -127,6 +129,7 @@ func (s *timerQueueProcessor2Suite) SetupTest() {
executionManager: s.mockExecutionMgr,
shardManager: s.mockShardManager,
historyMgr: s.mockHistoryMgr,
historyV2Mgr: s.mockHistoryV2Mgr,
maxTransferSequenceNumber: 100000,
closeCh: s.shardClosedCh,
config: s.config,
Expand All @@ -147,6 +150,7 @@ func (s *timerQueueProcessor2Suite) SetupTest() {
currentClusterName: s.mockShard.GetService().GetClusterMetadata().GetCurrentClusterName(),
shard: s.mockShard,
historyMgr: s.mockHistoryMgr,
historyV2Mgr: s.mockHistoryV2Mgr,
executionManager: s.mockExecutionMgr,
historyCache: historyCache,
logger: s.logger,
Expand All @@ -163,6 +167,7 @@ func (s *timerQueueProcessor2Suite) TearDownTest() {
s.mockMatchingClient.AssertExpectations(s.T())
s.mockExecutionMgr.AssertExpectations(s.T())
s.mockHistoryMgr.AssertExpectations(s.T())
s.mockHistoryV2Mgr.AssertExpectations(s.T())
s.mockVisibilityMgr.AssertExpectations(s.T())
s.mockProducer.AssertExpectations(s.T())
s.mockClientBean.AssertExpectations(s.T())
Expand All @@ -176,7 +181,7 @@ func (s *timerQueueProcessor2Suite) TestTimerUpdateTimesOut() {

taskList := "user-timer-update-times-out"

builder := newMutableStateBuilder(cluster.TestCurrentClusterName, s.mockHistoryEngine.shard, s.mockEventsCache, s.logger)
builder := newMutableStateBuilderWithEventV2(cluster.TestCurrentClusterName, s.mockShard, s.mockEventsCache, s.logger, we.GetRunId())
startRequest := &workflow.StartWorkflowExecutionRequest{
WorkflowType: &workflow.WorkflowType{Name: common.StringPtr("wType")},
TaskList: common.TaskListPtr(workflow.TaskList{Name: common.StringPtr(taskList)}),
Expand Down Expand Up @@ -217,11 +222,11 @@ func (s *timerQueueProcessor2Suite) TestTimerUpdateTimesOut() {
s.mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything).Return(
&persistence.GetTimerIndexTasksResponse{Timers: []*persistence.TimerTaskInfo{}}, nil)

s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(&p.AppendHistoryEventsResponse{Size: 0}, nil).Once()
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything).Return(&p.AppendHistoryNodesResponse{Size: 0}, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(&p.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &p.MutableStateUpdateSessionStats{}}, errors.New("FAILED")).Once()
s.mockShardManager.On("UpdateShard", mock.Anything).Return(nil)

s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(&p.AppendHistoryEventsResponse{Size: 0}, nil).Once()
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything).Return(&p.AppendHistoryNodesResponse{Size: 0}, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(&p.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &p.MutableStateUpdateSessionStats{}}, nil).Run(func(arguments mock.Arguments) {
// Done.
waitCh <- struct{}{}
Expand All @@ -248,7 +253,7 @@ func (s *timerQueueProcessor2Suite) TestWorkflowTimeout() {
RunId: common.StringPtr(validRunID)}
taskList := "task-workflow-times-out"

builder := newMutableStateBuilder(cluster.TestCurrentClusterName, s.mockHistoryEngine.shard, s.mockEventsCache, s.logger)
builder := newMutableStateBuilderWithEventV2(cluster.TestCurrentClusterName, s.mockShard, s.mockEventsCache, s.logger, we.GetRunId())
startRequest := &workflow.StartWorkflowExecutionRequest{
WorkflowType: &workflow.WorkflowType{Name: common.StringPtr("wType")},
TaskList: common.TaskListPtr(workflow.TaskList{Name: common.StringPtr(taskList)}),
Expand Down Expand Up @@ -282,7 +287,7 @@ func (s *timerQueueProcessor2Suite) TestWorkflowTimeout() {
wfResponse := &persistence.GetWorkflowExecutionResponse{State: ms}
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(wfResponse, nil).Once()

s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(&p.AppendHistoryEventsResponse{Size: 0}, nil).Once()
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything).Return(&p.AppendHistoryNodesResponse{Size: 0}, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(&p.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &p.MutableStateUpdateSessionStats{}}, nil).Run(func(arguments mock.Arguments) {
// Done.
waitCh <- struct{}{}
Expand Down
76 changes: 38 additions & 38 deletions service/history/timerQueueProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ func (s *timerQueueProcessorSuite) createExecutionWithTimers(domainID string, we
identity string, timeOuts []int32) (*persistence.WorkflowMutableState, []persistence.Task) {

// Generate first decision task event.
builder := newMutableStateBuilder(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger)
builder := newMutableStateBuilderWithEventV2(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger, we.GetRunId())
addWorkflowExecutionStartedEvent(builder, we, "wType", tl, []byte("input"), 100, 200, identity)
di := addDecisionTaskScheduledEvent(builder)

Expand All @@ -159,8 +159,8 @@ func (s *timerQueueProcessorSuite) createExecutionWithTimers(domainID string, we
state0, err2 := s.GetWorkflowExecutionInfo(domainID, we)
s.NoError(err2, "No error expected.")

builder = newMutableStateBuilder(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger)
builder = newMutableStateBuilderWithEventV2(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger, we.GetRunId())
builder.Load(state0)
startedEvent := addDecisionTaskStartedEvent(builder, di.ScheduleID, tl, identity)
addDecisionTaskCompletedEvent(builder, di.ScheduleID, *startedEvent.EventId, nil, identity)
Expand Down Expand Up @@ -198,8 +198,8 @@ func (s *timerQueueProcessorSuite) addDecisionTimer(domainID string, we workflow
s.NoError(err)

condition := state.ExecutionInfo.NextEventID
builder := newMutableStateBuilder(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger)
builder := newMutableStateBuilderWithEventV2(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger, we.GetRunId())
builder.Load(state)

di := addDecisionTaskScheduledEvent(builder)
Expand All @@ -220,8 +220,8 @@ func (s *timerQueueProcessorSuite) addDecisionTimer(domainID string, we workflow
func (s *timerQueueProcessorSuite) addUserTimer(domainID string, we workflow.WorkflowExecution, timerID string, tb *timerBuilder) []persistence.Task {
state, err := s.GetWorkflowExecutionInfo(domainID, we)
s.NoError(err)
builder := newMutableStateBuilder(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger)
builder := newMutableStateBuilderWithEventV2(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger, we.GetRunId())
builder.Load(state)
condition := state.ExecutionInfo.NextEventID

Expand All @@ -241,8 +241,8 @@ func (s *timerQueueProcessorSuite) addHeartBeatTimer(domainID string,
we workflow.WorkflowExecution, tb *timerBuilder) (*workflow.HistoryEvent, []persistence.Task) {
state, err := s.GetWorkflowExecutionInfo(domainID, we)
s.NoError(err)
builder := newMutableStateBuilder(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger)
builder := newMutableStateBuilderWithEventV2(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger, we.GetRunId())
builder.Load(state)
condition := state.ExecutionInfo.NextEventID

Expand Down Expand Up @@ -362,8 +362,8 @@ func (s *timerQueueProcessorSuite) checkTimedOutEventFor(domainID string, we wor
scheduleID int64) bool {
info, err1 := s.GetWorkflowExecutionInfo(domainID, we)
s.NoError(err1)
builder := newMutableStateBuilder(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger)
builder := newMutableStateBuilderWithEventV2(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger, we.GetRunId())
builder.Load(info)
_, isRunning := builder.GetActivityInfo(scheduleID)

Expand All @@ -374,8 +374,8 @@ func (s *timerQueueProcessorSuite) checkTimedOutEventForUserTimer(domainID strin
timerID string) bool {
info, err1 := s.GetWorkflowExecutionInfo(domainID, we)
s.NoError(err1)
builder := newMutableStateBuilder(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger)
builder := newMutableStateBuilderWithEventV2(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger, we.GetRunId())
builder.Load(info)

isRunning, _ := builder.GetUserTimer(timerID)
Expand Down Expand Up @@ -417,8 +417,8 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTaskScheduleToStart_WithOutS

state, err := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
s.NoError(err)
builder := newMutableStateBuilder(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger)
builder := newMutableStateBuilderWithEventV2(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger, workflowExecution.GetRunId())
builder.Load(state)
condition := state.ExecutionInfo.NextEventID

Expand Down Expand Up @@ -461,8 +461,8 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTaskScheduleToStart_WithStar

state, err := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
s.NoError(err)
builder := newMutableStateBuilder(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger)
builder := newMutableStateBuilderWithEventV2(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger, workflowExecution.GetRunId())
builder.Load(state)
condition := state.ExecutionInfo.NextEventID

Expand Down Expand Up @@ -507,8 +507,8 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTaskScheduleToStart_MoreThan

state, err := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
s.NoError(err)
builder := newMutableStateBuilder(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger)
builder := newMutableStateBuilderWithEventV2(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger, workflowExecution.GetRunId())
builder.Load(state)
condition := state.ExecutionInfo.NextEventID

Expand Down Expand Up @@ -553,8 +553,8 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTaskStartToClose_WithStart()

state, err := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
s.NoError(err)
builder := newMutableStateBuilder(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger)
builder := newMutableStateBuilderWithEventV2(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger, workflowExecution.GetRunId())
builder.Load(state)
condition := state.ExecutionInfo.NextEventID

Expand Down Expand Up @@ -597,8 +597,8 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTaskStartToClose_CompletedAc

state, err := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
s.NoError(err)
builder := newMutableStateBuilder(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger)
builder := newMutableStateBuilderWithEventV2(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger, workflowExecution.GetRunId())
builder.Load(state)
condition := state.ExecutionInfo.NextEventID

Expand Down Expand Up @@ -646,8 +646,8 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTaskScheduleToClose_JustSche

state, err := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
s.NoError(err)
builder := newMutableStateBuilder(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger)
builder := newMutableStateBuilderWithEventV2(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger, workflowExecution.GetRunId())
builder.Load(state)
condition := state.ExecutionInfo.NextEventID

Expand Down Expand Up @@ -690,8 +690,8 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTaskScheduleToClose_Started(

state, err := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
s.NoError(err)
builder := newMutableStateBuilder(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger)
builder := newMutableStateBuilderWithEventV2(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger, workflowExecution.GetRunId())
builder.Load(state)
condition := state.ExecutionInfo.NextEventID

Expand Down Expand Up @@ -736,8 +736,8 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTaskScheduleToClose_Complete

state, err := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
s.NoError(err)
builder := newMutableStateBuilder(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger)
builder := newMutableStateBuilderWithEventV2(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger, workflowExecution.GetRunId())
builder.Load(state)
condition := state.ExecutionInfo.NextEventID

Expand Down Expand Up @@ -812,8 +812,8 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTask_SameExpiry() {

state, err := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
s.NoError(err)
builder := newMutableStateBuilder(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger)
builder := newMutableStateBuilderWithEventV2(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger, workflowExecution.GetRunId())
builder.Load(state)
condition := state.ExecutionInfo.NextEventID

Expand Down Expand Up @@ -858,8 +858,8 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTask_SameExpiry() {
// assert activity infos are deleted
state, err = s.GetWorkflowExecutionInfo(domainID, workflowExecution)
s.NoError(err)
builder = newMutableStateBuilder(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger)
builder = newMutableStateBuilderWithEventV2(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger, workflowExecution.GetRunId())
builder.Load(state)
s.Equal(0, len(builder.pendingActivityInfoIDs))
}
Expand Down Expand Up @@ -901,8 +901,8 @@ func (s *timerQueueProcessorSuite) TestTimerUserTimers_SameExpiry() {

state, err := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
s.NoError(err)
builder := newMutableStateBuilder(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger)
builder := newMutableStateBuilderWithEventV2(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger, workflowExecution.GetRunId())
builder.Load(state)
condition := state.ExecutionInfo.NextEventID

Expand Down Expand Up @@ -935,8 +935,8 @@ func (s *timerQueueProcessorSuite) TestTimerUserTimers_SameExpiry() {
// assert user timer infos are deleted
state, err = s.GetWorkflowExecutionInfo(domainID, workflowExecution)
s.NoError(err)
builder = newMutableStateBuilder(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger)
builder = newMutableStateBuilderWithEventV2(s.mockClusterMetadata.GetCurrentClusterName(), s.ShardContext,
s.ShardContext.GetEventsCache(), s.logger, workflowExecution.GetRunId())
builder.Load(state)
s.Equal(0, len(builder.pendingTimerInfoIDs))
}
Expand Down
Loading

0 comments on commit 4c32d3e

Please sign in to comment.