-
Notifications
You must be signed in to change notification settings - Fork 8
/
consumer_test.go
123 lines (110 loc) · 6.57 KB
/
consumer_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package pgq
import (
"context"
"testing"
"time"
"golang.org/x/sync/semaphore"
"go.dataddo.com/pgq/internal/require"
)
func TestConsumer_generateQuery(t *testing.T) {
type args struct {
queueName string
opts []ConsumerOption
}
tests := []struct {
name string
args args
want string
}{
{
name: "simple",
args: args{queueName: "testing_queue"},
want: "UPDATE \"testing_queue\" SET locked_until = :locked_until, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND consumed_count < :max_consume_count AND processed_at IS NULL AND (scheduled_for IS NULL OR scheduled_for < CURRENT_TIMESTAMP) ORDER BY scheduled_for ASC NULLS LAST, consumed_count ASC, created_at ASC LIMIT :limit FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata, consumed_count, locked_until",
},
{
name: "scanInterval 12 hours",
args: args{
queueName: "testing_queue",
opts: []ConsumerOption{
WithHistoryLimit(12 * time.Hour),
},
},
want: "UPDATE \"testing_queue\" SET locked_until = :locked_until, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE created_at >= CURRENT_TIMESTAMP - CAST((:history_limit) AS interval) AND created_at < CURRENT_TIMESTAMP AND (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND consumed_count < :max_consume_count AND processed_at IS NULL AND (scheduled_for IS NULL OR scheduled_for < CURRENT_TIMESTAMP) ORDER BY scheduled_for ASC NULLS LAST, consumed_count ASC, created_at ASC LIMIT :limit FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata, consumed_count, locked_until",
},
{
name: "consume messages with metadata filter",
args: args{
queueName: "testing_queue",
opts: []ConsumerOption{
WithMetadataFilter(&MetadataFilter{Key: "foo", Operation: OpEqual, Value: "bar"}),
},
},
want: "UPDATE \"testing_queue\" SET locked_until = :locked_until, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND consumed_count < :max_consume_count AND metadata->>:metadata_key_0 = :metadata_value_0 AND processed_at IS NULL AND (scheduled_for IS NULL OR scheduled_for < CURRENT_TIMESTAMP) ORDER BY scheduled_for ASC NULLS LAST, consumed_count ASC, created_at ASC LIMIT :limit FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata, consumed_count, locked_until",
},
{
name: "scn interval 12 hours abd max consumed count limit disabled",
args: args{
queueName: "testing_queue",
opts: []ConsumerOption{
WithHistoryLimit(12 * time.Hour),
WithMaxConsumeCount(0),
},
},
want: "UPDATE \"testing_queue\" SET locked_until = :locked_until, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE created_at >= CURRENT_TIMESTAMP - CAST((:history_limit) AS interval) AND created_at < CURRENT_TIMESTAMP AND (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND processed_at IS NULL AND (scheduled_for IS NULL OR scheduled_for < CURRENT_TIMESTAMP) ORDER BY scheduled_for ASC NULLS LAST, consumed_count ASC, created_at ASC LIMIT :limit FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata, consumed_count, locked_until",
},
{
name: "with metadata condition",
args: args{queueName: "testing_queue"},
want: "UPDATE \"testing_queue\" SET locked_until = :locked_until, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND consumed_count < :max_consume_count AND processed_at IS NULL AND (scheduled_for IS NULL OR scheduled_for < CURRENT_TIMESTAMP) ORDER BY scheduled_for ASC NULLS LAST, consumed_count ASC, created_at ASC LIMIT :limit FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata, consumed_count, locked_until",
},
{
name: "scanInterval 12 hours with metadata condition",
args: args{
queueName: "testing_queue",
opts: []ConsumerOption{
WithHistoryLimit(12 * time.Hour),
},
},
want: "UPDATE \"testing_queue\" SET locked_until = :locked_until, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE created_at >= CURRENT_TIMESTAMP - CAST((:history_limit) AS interval) AND created_at < CURRENT_TIMESTAMP AND (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND consumed_count < :max_consume_count AND processed_at IS NULL AND (scheduled_for IS NULL OR scheduled_for < CURRENT_TIMESTAMP) ORDER BY scheduled_for ASC NULLS LAST, consumed_count ASC, created_at ASC LIMIT :limit FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata, consumed_count, locked_until",
},
{
name: "with negative metadata condition",
args: args{queueName: "testing_queue"},
want: "UPDATE \"testing_queue\" SET locked_until = :locked_until, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND consumed_count < :max_consume_count AND processed_at IS NULL AND (scheduled_for IS NULL OR scheduled_for < CURRENT_TIMESTAMP) ORDER BY scheduled_for ASC NULLS LAST, consumed_count ASC, created_at ASC LIMIT :limit FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata, consumed_count, locked_until",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c, err := NewConsumer(nil, tt.args.queueName, nil, tt.args.opts...)
require.NoError(t, err)
got, err := c.generateQuery()
require.NoError(t, err)
require.Equal(t, tt.want, got.String())
})
}
}
func TestAcquireMaxFromSemaphore(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
const size int64 = 10
w := semaphore.NewWeighted(size)
acquired, err := acquireMaxFromSemaphore(ctx, w, size)
require.NoError(t, err)
require.Equal(t, size, acquired)
const released1 int64 = 3
w.Release(released1)
acquired, err = acquireMaxFromSemaphore(ctx, w, size)
require.NoError(t, err)
require.Equal(t, released1, acquired)
const released2 int64 = 1
go func() {
time.Sleep(time.Millisecond)
w.Release(released2)
}()
acquired, err = acquireMaxFromSemaphore(ctx, w, size)
require.NoError(t, err)
require.Equal(t, released2, acquired)
acquired, err = acquireMaxFromSemaphore(ctx, w, size)
require.ErrorIs(t, err, context.DeadlineExceeded)
require.Equal(t, int64(0), acquired)
}