Skip to content

Commit

Permalink
Add unit tests for execution/context.go ReapplyEvents (cadence-workfl…
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored Apr 3, 2024
1 parent 5ecb9f9 commit fcbc00e
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 9 deletions.
17 changes: 8 additions & 9 deletions service/history/execution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -1181,7 +1181,6 @@ func createWorkflowExecutionWithRetry(
op := func() error {
var err error
resp, err = shardContext.CreateWorkflowExecution(ctx, request)
fmt.Println(err)
return err
}
isRetryable := func(err error) bool {
Expand Down Expand Up @@ -1355,8 +1354,6 @@ func (c *contextImpl) ReapplyEvents(
workflowID := eventBatches[0].WorkflowID
runID := eventBatches[0].RunID
domainCache := c.shard.GetDomainCache()
clientBean := c.shard.GetService().GetClientBean()
serializer := c.shard.GetService().GetPayloadSerializer()
domainEntry, err := domainCache.GetDomainByID(domainID)
if err != nil {
return err
Expand Down Expand Up @@ -1384,12 +1381,6 @@ func (c *contextImpl) ReapplyEvents(
return nil
}

// Reapply events only reapply to the current run.
// The run id is only used for reapply event de-duplication
execution := &types.WorkflowExecution{
WorkflowID: workflowID,
RunID: runID,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultRemoteCallTimeout)
defer cancel()

Expand All @@ -1404,6 +1395,14 @@ func (c *contextImpl) ReapplyEvents(
)
}

// Reapply events only reapply to the current run.
// The run id is only used for reapply event de-duplication
execution := &types.WorkflowExecution{
WorkflowID: workflowID,
RunID: runID,
}
clientBean := c.shard.GetService().GetClientBean()
serializer := c.shard.GetService().GetPayloadSerializer()
// The active cluster of the domain is the same as current cluster.
// Use the history from the same cluster to reapply events
reapplyEventsDataBlob, err := serializer.SerializeBatchEvents(
Expand Down
148 changes: 148 additions & 0 deletions service/history/execution/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,18 @@ import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/log/testlogger"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
hcommon "github.com/uber/cadence/service/history/common"
"github.com/uber/cadence/service/history/engine"
"github.com/uber/cadence/service/history/events"
"github.com/uber/cadence/service/history/resource"
"github.com/uber/cadence/service/history/shard"
)

Expand Down Expand Up @@ -1116,3 +1119,148 @@ func TestCreateWorkflowExecution(t *testing.T) {
})
}
}

func TestReapplyEvents(t *testing.T) {
testCases := []struct {
name string
eventBatches []*persistence.WorkflowEvents
mockSetup func(*shard.MockContext, *cache.MockDomainCache, *resource.Test, *engine.MockEngine)
wantErr bool
}{
{
name: "empty input",
eventBatches: []*persistence.WorkflowEvents{},
wantErr: false,
},
{
name: "domain cache error",
eventBatches: []*persistence.WorkflowEvents{
{
DomainID: "test-domain-id",
},
},
mockSetup: func(mockShard *shard.MockContext, mockDomainCache *cache.MockDomainCache, _ *resource.Test, _ *engine.MockEngine) {
mockShard.EXPECT().GetDomainCache().Return(mockDomainCache)
mockDomainCache.EXPECT().GetDomainByID("test-domain-id").Return(nil, errors.New("some error"))
},
wantErr: true,
},
{
name: "domain is pending active",
eventBatches: []*persistence.WorkflowEvents{
{
DomainID: "test-domain-id",
},
},
mockSetup: func(mockShard *shard.MockContext, mockDomainCache *cache.MockDomainCache, _ *resource.Test, _ *engine.MockEngine) {
mockShard.EXPECT().GetDomainCache().Return(mockDomainCache)
mockDomainCache.EXPECT().GetDomainByID("test-domain-id").Return(cache.NewDomainCacheEntryForTest(nil, nil, true, nil, 0, common.Ptr(int64(1))), nil)
},
wantErr: false,
},
{
name: "domainID/workflowID mismatch",
eventBatches: []*persistence.WorkflowEvents{
{
DomainID: "test-domain-id",
},
{
DomainID: "test-domain-id2",
},
},
mockSetup: func(mockShard *shard.MockContext, mockDomainCache *cache.MockDomainCache, _ *resource.Test, _ *engine.MockEngine) {
mockShard.EXPECT().GetDomainCache().Return(mockDomainCache)
mockDomainCache.EXPECT().GetDomainByID("test-domain-id").Return(cache.NewDomainCacheEntryForTest(nil, nil, true, nil, 0, nil), nil)
},
wantErr: true,
},
{
name: "no signal events",
eventBatches: []*persistence.WorkflowEvents{
{
DomainID: "test-domain-id",
},
{
DomainID: "test-domain-id",
},
},
mockSetup: func(mockShard *shard.MockContext, mockDomainCache *cache.MockDomainCache, _ *resource.Test, _ *engine.MockEngine) {
mockShard.EXPECT().GetDomainCache().Return(mockDomainCache)
mockDomainCache.EXPECT().GetDomainByID("test-domain-id").Return(cache.NewDomainCacheEntryForTest(nil, nil, true, nil, 0, nil), nil)
},
wantErr: false,
},
{
name: "success - apply to current cluster",
eventBatches: []*persistence.WorkflowEvents{
{
DomainID: "test-domain-id",
WorkflowID: "test-workflow-id",
RunID: "test-run-id",
Events: []*types.HistoryEvent{
{
EventType: types.EventTypeWorkflowExecutionSignaled.Ptr(),
},
},
},
},
mockSetup: func(mockShard *shard.MockContext, mockDomainCache *cache.MockDomainCache, _ *resource.Test, mockEngine *engine.MockEngine) {
mockShard.EXPECT().GetDomainCache().Return(mockDomainCache)
mockDomainCache.EXPECT().GetDomainByID("test-domain-id").Return(cache.NewGlobalDomainCacheEntryForTest(nil, nil, &persistence.DomainReplicationConfig{ActiveClusterName: cluster.TestCurrentClusterName}, 0), nil)
mockShard.EXPECT().GetClusterMetadata().Return(cluster.TestActiveClusterMetadata)
mockShard.EXPECT().GetEngine().Return(mockEngine)
mockEngine.EXPECT().ReapplyEvents(gomock.Any(), "test-domain-id", "test-workflow-id", "test-run-id", []*types.HistoryEvent{
{
EventType: types.EventTypeWorkflowExecutionSignaled.Ptr(),
},
}).Return(nil)
},
wantErr: false,
},
{
name: "success - apply to remote cluster",
eventBatches: []*persistence.WorkflowEvents{
{
DomainID: "test-domain-id",
WorkflowID: "test-workflow-id",
RunID: "test-run-id",
Events: []*types.HistoryEvent{
{
EventType: types.EventTypeWorkflowExecutionSignaled.Ptr(),
},
},
},
},
mockSetup: func(mockShard *shard.MockContext, mockDomainCache *cache.MockDomainCache, mockResource *resource.Test, mockEngine *engine.MockEngine) {
mockShard.EXPECT().GetDomainCache().Return(mockDomainCache)
mockDomainCache.EXPECT().GetDomainByID("test-domain-id").Return(cache.NewGlobalDomainCacheEntryForTest(&persistence.DomainInfo{Name: "test-domain"}, nil, &persistence.DomainReplicationConfig{ActiveClusterName: cluster.TestAlternativeClusterName}, 0), nil)
mockShard.EXPECT().GetClusterMetadata().Return(cluster.TestActiveClusterMetadata)
mockShard.EXPECT().GetService().Return(mockResource).Times(2)
mockResource.RemoteAdminClient.EXPECT().ReapplyEvents(gomock.Any(), gomock.Any()).Return(nil)
},
wantErr: false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
mockCtrl := gomock.NewController(t)
mockShard := shard.NewMockContext(mockCtrl)
mockDomainCache := cache.NewMockDomainCache(mockCtrl)
mockEngine := engine.NewMockEngine(mockCtrl)
resource := resource.NewTest(t, mockCtrl, metrics.Common)
if tc.mockSetup != nil {
tc.mockSetup(mockShard, mockDomainCache, resource, mockEngine)
}
ctx := &contextImpl{
shard: mockShard,
}
err := ctx.ReapplyEvents(tc.eventBatches)
if tc.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
})
}
}

0 comments on commit fcbc00e

Please sign in to comment.