-
Notifications
You must be signed in to change notification settings - Fork 14
/
consumer_config_test.go
140 lines (121 loc) · 3.38 KB
/
consumer_config_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package kafka
import (
"testing"
"time"
kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka"
"github.com/google/go-cmp/cmp"
)
func TestConsumerConfig_validate(t *testing.T) {
t.Run("Set_Defaults", func(t *testing.T) {
// Given
cfg := ConsumerConfig{Reader: ReaderConfig{}}
// When
cfg.setDefaults()
// Then
if cfg.Concurrency != 1 {
t.Fatalf("Concurrency default value must equal to 1")
}
if cfg.CommitInterval != time.Second {
t.Fatalf("Commit Interval default value must equal to 1s")
}
if cfg.Reader.CommitInterval != time.Second {
t.Fatalf("Reader Commit Interval default value must equal to 1s")
}
if *cfg.TransactionalRetry != true {
t.Fatal("Default Transactional Retry is true")
}
if cfg.MessageGroupDuration != time.Second {
t.Fatal("Message Group Duration default value must equal to 1s")
}
})
t.Run("Set_Defaults_When_Distributed_Tracing_Enabled", func(t *testing.T) {
// Given
cfg := ConsumerConfig{Reader: ReaderConfig{}, DistributedTracingEnabled: true}
// When
cfg.setDefaults()
// Then
if cfg.DistributedTracingConfiguration.TracerProvider == nil {
t.Fatal("Traceprovider cannot be null")
}
if cfg.DistributedTracingConfiguration.Propagator == nil {
t.Fatal("Propagator cannot be null")
}
})
t.Run("Set_Commit_Interval_Value_To_The_Internal_Reader", func(t *testing.T) {
// Given
cfg := ConsumerConfig{CommitInterval: 5 * time.Second, Reader: ReaderConfig{}}
// When
cfg.setDefaults()
// Then
if cfg.CommitInterval != 5*time.Second {
t.Fatalf("Commit Interval value must equal to 5s")
}
if cfg.Reader.CommitInterval != 5*time.Second {
t.Fatalf("Reader Commit Interval default value must equal to 5s")
}
})
}
func TestConsumerConfig_newCronsumerConfig(t *testing.T) {
t.Run("Should_Return_Nil_When_Client_Don't_Use_SkipMessageByHeaderFn", func(t *testing.T) {
// Given
cfg := ConsumerConfig{}
// When
actual := cfg.newCronsumerConfig()
// Then
if actual.Consumer.SkipMessageByHeaderFn != nil {
t.Error("SkipMessageByHeaderFn must be nil")
}
})
t.Run("Should_Set_When_Client_Give_SkipMessageByHeaderFn", func(t *testing.T) {
// Given
cfg := ConsumerConfig{
RetryConfiguration: RetryConfiguration{
SkipMessageByHeaderFn: func(headers []Header) bool {
return false
},
},
}
// When
actual := cfg.newCronsumerConfig()
// Then
if actual.Consumer.SkipMessageByHeaderFn == nil {
t.Error("SkipMessageByHeaderFn mustn't be nil")
}
})
}
func Test_toHeader(t *testing.T) {
t.Run("Should_Return_Empty_List_When_Cronsumer_Header_Is_Nil", func(t *testing.T) {
// When
headers := toHeaders(nil)
// Then
if len(headers) != 0 {
t.Error("Header must be nil")
}
})
t.Run("Should_Return_Empty_List_When_Cronsumer_Header_Is_Empty", func(t *testing.T) {
// When
headers := toHeaders([]kcronsumer.Header{})
// Then
if len(headers) != 0 {
t.Error("Header must be nil")
}
})
t.Run("Should_Covert_List_When_Cronsumer_Header", func(t *testing.T) {
// Given
expected := []Header{
{Key: "key", Value: []byte("val")},
{Key: "key2", Value: []byte("val2")},
{Key: "key3", Value: nil},
}
// When
actual := toHeaders([]kcronsumer.Header{
{Key: "key", Value: []byte("val")},
{Key: "key2", Value: []byte("val2")},
{Key: "key3", Value: nil},
})
// Then
if diff := cmp.Diff(expected, actual); diff != "" {
t.Error(diff)
}
})
}