-
Notifications
You must be signed in to change notification settings - Fork 27
/
pubsub.go
101 lines (88 loc) · 2.62 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package live
import (
"context"
"log"
)
// PubSubTransport is how the messages should be sent to the listeners.
type PubSubTransport interface {
// Publish a message onto the given topic.
Publish(ctx context.Context, topic string, msg Event) error
// Listen will be called in a go routine so should be written to
// block.
Listen(ctx context.Context, p *PubSub) error
}
// PubSub handles communication between handlers. Depending on the given
// transport this could be between handlers in an application, or across
// nodes in a cluster.
type PubSub struct {
transport PubSubTransport
handlers map[string][]Engine
}
// NewPubSub creates a new PubSub handler.
func NewPubSub(ctx context.Context, t PubSubTransport) *PubSub {
p := &PubSub{
transport: t,
handlers: map[string][]Engine{},
}
go func(ctx context.Context, ps *PubSub) {
if err := t.Listen(ctx, ps); err != nil {
log.Fatal("could not listen on pubsub: %w", err)
}
}(ctx, p)
return p
}
// Publish send a message on a topic.
func (p *PubSub) Publish(ctx context.Context, topic string, msg Event) error {
return p.transport.Publish(ctx, topic, msg)
}
// Subscribe adds a handler to a PubSub topic.
func (p *PubSub) Subscribe(topic string, h Engine) {
p.handlers[topic] = append(p.handlers[topic], h)
// This adjusts the handlers broadcast function to publish onto the
// given topic.
h.HandleBroadcast(func(ctx context.Context, h Engine, msg Event) {
if err := p.transport.Publish(ctx, topic, msg); err != nil {
log.Println("could not publish broadcast:", err)
}
})
}
// Recieve a message from the transport.
func (p *PubSub) Recieve(topic string, msg Event) {
ctx := context.Background()
for _, node := range p.handlers[topic] {
node.self(ctx, nil, msg)
}
}
// TransportMessage a useful container to send live events.
type TransportMessage struct {
Topic string
Msg Event
}
// LocalTransport a pubsub transport that allows handlers to communicate
// locally.
type LocalTransport struct {
ctx context.Context
queue chan TransportMessage
}
// NewLocalTransport create a new LocalTransport.
func NewLocalTransport() *LocalTransport {
return &LocalTransport{
queue: make(chan TransportMessage),
}
}
// Publish send a message to all handlers subscribed to a topic.
func (l *LocalTransport) Publish(ctx context.Context, topic string, msg Event) error {
l.queue <- TransportMessage{Topic: topic, Msg: msg}
return nil
}
// Listen listen for new published messages.
func (l *LocalTransport) Listen(ctx context.Context, p *PubSub) error {
for {
select {
case msg := <-l.queue:
p.Recieve(msg.Topic, msg.Msg)
case <-ctx.Done():
return nil
}
}
}