forked from dunglas/mercure
-
Notifications
You must be signed in to change notification settings - Fork 0
/
local_transport_test.go
123 lines (96 loc) · 3.4 KB
/
local_transport_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package mercure
import (
"net/url"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
func TestLocalTransportDoNotDispatchUntilListen(t *testing.T) {
transport, _ := NewLocalTransport(&url.URL{Scheme: "local"}, zap.NewNop())
defer transport.Close()
assert.Implements(t, (*Transport)(nil), transport)
u := &Update{Topics: []string{"http://example.com/books/1"}}
err := transport.Dispatch(u)
require.NoError(t, err)
s := NewSubscriber("", zap.NewNop())
s.SetTopics(u.Topics, nil)
require.NoError(t, transport.AddSubscriber(s))
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for range s.Receive() {
t.Fail()
}
}()
s.Disconnect()
wg.Wait()
}
func TestLocalTransportDispatch(t *testing.T) {
transport, _ := NewLocalTransport(&url.URL{Scheme: "local"}, zap.NewNop())
defer transport.Close()
assert.Implements(t, (*Transport)(nil), transport)
s := NewSubscriber("", zap.NewNop())
s.SetTopics([]string{"http://example.com/foo"}, nil)
require.NoError(t, transport.AddSubscriber(s))
u := &Update{Topics: s.SubscribedTopics}
require.NoError(t, transport.Dispatch(u))
assert.Equal(t, u, <-s.Receive())
}
func TestLocalTransportClosed(t *testing.T) {
transport, _ := NewLocalTransport(&url.URL{Scheme: "local"}, zap.NewNop())
defer transport.Close()
assert.Implements(t, (*Transport)(nil), transport)
s := NewSubscriber("", zap.NewNop())
require.NoError(t, transport.AddSubscriber(s))
require.NoError(t, transport.Close())
assert.Equal(t, transport.AddSubscriber(NewSubscriber("", zap.NewNop())), ErrClosedTransport)
assert.Equal(t, transport.Dispatch(&Update{}), ErrClosedTransport)
_, ok := <-s.out
assert.False(t, ok)
}
func TestLiveCleanDisconnectedSubscribers(t *testing.T) {
tr, _ := NewLocalTransport(&url.URL{Scheme: "local"}, zap.NewNop())
transport := tr.(*LocalTransport)
defer transport.Close()
s1 := NewSubscriber("", zap.NewNop())
require.NoError(t, transport.AddSubscriber(s1))
s2 := NewSubscriber("", zap.NewNop())
require.NoError(t, transport.AddSubscriber(s2))
assert.Equal(t, 2, transport.subscribers.Len())
s1.Disconnect()
transport.RemoveSubscriber(s1)
assert.Equal(t, 1, transport.subscribers.Len())
s2.Disconnect()
transport.RemoveSubscriber(s2)
assert.Equal(t, 0, transport.subscribers.Len())
}
func TestLiveReading(t *testing.T) {
transport, _ := NewLocalTransport(&url.URL{Scheme: "local"}, zap.NewNop())
defer transport.Close()
assert.Implements(t, (*Transport)(nil), transport)
s := NewSubscriber("", zap.NewNop())
s.SetTopics([]string{"https://example.com"}, nil)
require.NoError(t, transport.AddSubscriber(s))
u := &Update{Topics: s.SubscribedTopics}
require.NoError(t, transport.Dispatch(u))
receivedUpdate := <-s.Receive()
assert.Equal(t, u, receivedUpdate)
}
func TestLocalTransportGetSubscribers(t *testing.T) {
transport, _ := NewLocalTransport(&url.URL{Scheme: "local"}, zap.NewNop())
defer transport.Close()
require.NotNil(t, transport)
s1 := NewSubscriber("", zap.NewNop())
require.NoError(t, transport.AddSubscriber(s1))
s2 := NewSubscriber("", zap.NewNop())
require.NoError(t, transport.AddSubscriber(s2))
lastEventID, subscribers, err := transport.(TransportSubscribers).GetSubscribers()
require.NoError(t, err)
assert.Equal(t, EarliestLastEventID, lastEventID)
assert.Len(t, subscribers, 2)
assert.Contains(t, subscribers, s1)
assert.Contains(t, subscribers, s2)
}