Skip to content

Commit

Permalink
Tests for PurgeAckedMessages and replicationMessage in common/domain/…
Browse files Browse the repository at this point in the history
…replication_queue (cadence-workflow#5749)

* PurgeAckedMessages and replicationMessage test
  • Loading branch information
abhishekj720 authored Mar 8, 2024
1 parent da4cbf2 commit 09c3743
Showing 1 changed file with 115 additions and 27 deletions.
142 changes: 115 additions & 27 deletions common/domain/replication_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
Expand Down Expand Up @@ -121,46 +122,78 @@ func TestReplicationQueueImpl_PublishToDLQ(t *testing.T) {
}

func TestGetReplicationMessages(t *testing.T) {

tests := []struct {
name string
lastID int64
maxCount int
wantErr bool
setupMock func(q *persistence.MockQueueManager)
name string
setupMocks func(mockQueueManager *persistence.MockQueueManager)
expectedTasks int
expectedLastID int64
expectError bool
}{
{
name: "successful message retrieval",
lastID: 100,
maxCount: 10,
wantErr: false,
setupMock: func(q *persistence.MockQueueManager) {
q.EXPECT().ReadMessages(gomock.Any(), gomock.Eq(int64(100)), gomock.Eq(10)).Return(persistence.QueueMessageList{}, nil)
name: "handles empty message list",
setupMocks: func(mockQueueManager *persistence.MockQueueManager) {
mockQueueManager.EXPECT().
ReadMessages(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil, nil)
},
expectedTasks: 0,
expectedLastID: 0,
expectError: false,
},
{
name: "read messages fails",
lastID: 100,
maxCount: 10,
wantErr: true,
setupMock: func(q *persistence.MockQueueManager) {
q.EXPECT().ReadMessages(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, errors.New("read error"))
name: "decodes single message correctly",
setupMocks: func(mockQueueManager *persistence.MockQueueManager) {
// Setup mock to return one encoded message
encodedMessage, _ := mockEncodeReplicationTask(123)
mockQueueManager.EXPECT().
ReadMessages(gomock.Any(), gomock.Any(), gomock.Any()).
Return([]*persistence.QueueMessage{{ID: 1, Payload: encodedMessage}}, nil)
},
expectedTasks: 1,
expectedLastID: 1,
expectError: false,
},
{
name: "decodes multiple messages correctly",
setupMocks: func(mockQueueManager *persistence.MockQueueManager) {
// Setup mock to return multiple encoded messages
encodedMessage1, _ := mockEncodeReplicationTask(123)
encodedMessage2, _ := mockEncodeReplicationTask(456)
mockQueueManager.EXPECT().
ReadMessages(gomock.Any(), gomock.Any(), gomock.Any()).
Return([]*persistence.QueueMessage{
{ID: 1, Payload: encodedMessage1},
{ID: 2, Payload: encodedMessage2},
}, nil)
},
expectedTasks: 2,
expectedLastID: 2,
expectError: false,
},
{
name: "read messages fails",
expectedLastID: 100,
expectError: true,
setupMocks: func(mockQueueManager *persistence.MockQueueManager) {
mockQueueManager.EXPECT().ReadMessages(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, errors.New("read error"))
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
mockQueue := persistence.NewMockQueueManager(ctrl)
rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil)
mockQueueManager := persistence.NewMockQueueManager(ctrl)
tc.setupMocks(mockQueueManager)
replicationQueue := NewReplicationQueue(mockQueueManager, "testCluster", nil, nil)
tasks, lastID, err := replicationQueue.GetReplicationMessages(context.Background(), 0, 10)

tt.setupMock(mockQueue)
_, _, err := rq.GetReplicationMessages(context.Background(), tt.lastID, tt.maxCount)
if tt.wantErr {
assert.Error(t, err)
if tc.expectError {
require.Error(t, err)
} else {
assert.NoError(t, err)
require.NoError(t, err)
assert.Equal(t, tc.expectedTasks, len(tasks))
assert.Equal(t, tc.expectedLastID, lastID)
}
})
}
Expand Down Expand Up @@ -531,3 +564,58 @@ func TestGetDLQSize(t *testing.T) {
})
}
}

func TestPurgeAckedMessages(t *testing.T) {
tests := []struct {
name string
wantErr bool
setupMock func(m *persistence.MockQueueManager)
}{
{
name: "successfully purges messages",
wantErr: false,
setupMock: func(m *persistence.MockQueueManager) {
m.EXPECT().GetAckLevels(gomock.Any()).Return(map[string]int64{"clusterA": 5}, nil)
m.EXPECT().DeleteMessagesBefore(gomock.Any(), int64(5)).Return(nil)
},
},
{
name: "does nothing when no ack levels",
wantErr: false,
setupMock: func(m *persistence.MockQueueManager) {
m.EXPECT().GetAckLevels(gomock.Any()).Return(map[string]int64{}, nil)
},
},
{
name: "error on GetAckLevels",
wantErr: true,
setupMock: func(m *persistence.MockQueueManager) {
m.EXPECT().GetAckLevels(gomock.Any()).Return(nil, errors.New("database error"))
},
},
{
name: "error on DeleteMessagesBefore",
wantErr: true,
setupMock: func(m *persistence.MockQueueManager) {
m.EXPECT().GetAckLevels(gomock.Any()).Return(map[string]int64{"clusterA": 5}, nil)
m.EXPECT().DeleteMessagesBefore(gomock.Any(), int64(5)).Return(errors.New("delete error"))
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
mockQueueManager := persistence.NewMockQueueManager(ctrl)
tt.setupMock(mockQueueManager)
q := &replicationQueueImpl{queue: mockQueueManager}
err := q.purgeAckedMessages()

if tt.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
})
}
}

0 comments on commit 09c3743

Please sign in to comment.