Skip to content

Commit

Permalink
added more unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
leandro-lugaresi committed Apr 18, 2018
1 parent 65eb115 commit 4acfe4a
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 40 deletions.
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ TEST_OPTIONS?=-race

setup: ## Install all the build and lint dependencies
go get -u github.com/alecthomas/gometalinter
go get -u github.com/golang/dep/...
go get -u github.com/pierrre/gotestcover
go get -u golang.org/x/tools/cmd/cover
dep ensure
go get -u code.cloudfoundry.org/go-diodes
gometalinter --install --update

test: ## Run all the tests
Expand Down
4 changes: 3 additions & 1 deletion hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
diodes "code.cloudfoundry.org/go-diodes"
)

const AlertTopic = "hub.subscription.messageslost"

type (
//Hub is a component that provides publish and subscribe capabilities for messages.
// Every message has a Name used to route them to subscribers and this can be used like RabbitMQ topics exchanges.
Expand Down Expand Up @@ -81,7 +83,7 @@ func (*Hub) Close() error {

func (h *Hub) alert(missed int, topic string) {
h.Publish(Message{
Name: "hub.subscription.messageslost",
Name: AlertTopic,
Fields: Fields{
"missed": missed,
"topic": topic,
Expand Down
70 changes: 33 additions & 37 deletions hub_test.go
Original file line number Diff line number Diff line change
@@ -1,46 +1,42 @@
package hub

import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestSubscribe(t *testing.T) {
h := New()
subs1 := h.Subscribe("a.*.c", 0)
processed := false
go processSubscription(subs1, func(msg Message) {
require.Equal(t, "a.b.c", msg.Name)
require.Equal(t, []byte(`{"foo": "baz"}`), msg.Msg)
processed = true
})
h.Publish(Message{
Msg: []byte(`{"foo": "baz"}`),
Name: "a.b.c",
})
time.Sleep(time.Millisecond)
require.True(t, processed, "subscription function should be executed")

}

func TestNonBlockingSubscribe(t *testing.T) {
h := New()
subs1 := h.NonBlockingSubscribe("a.*.c", 10)
processed := false
go processSubscription(subs1, func(msg Message) {
require.Equal(t, "a.b.c", msg.Name)
require.Equal(t, []byte(`{"foo": "baz"}`), msg.Msg)
processed = true
})
h.Publish(Message{
Msg: []byte(`{"foo": "baz"}`),
Name: "a.b.c",
})
time.Sleep(30 * time.Millisecond)
require.True(t, processed, "subscription function should be executed")

func TestProcessSubscribers(t *testing.T) {
tests := []struct {
name string
cap int
blocking bool
}{
{name: "blocking and unbuffered", cap: 0, blocking: true},
{name: "blocking and buffered", cap: 10, blocking: true},
{name: "blocking and negative buffer", cap: -10, blocking: true},
{name: "nonBlocking and unbuffered", cap: 0, blocking: false},
{name: "nonBlocking and buffered", cap: 10, blocking: false},
{name: "nonBlocking and negative buffer", cap: -10, blocking: false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
h := New()
var subs *Subscription
if tt.blocking {
subs = h.Subscribe("a.*.c", tt.cap)
} else {
subs = h.NonBlockingSubscribe("a.*.c", tt.cap)
}
var wg sync.WaitGroup
wg.Add(2)
go processSubscription(subs, func(msg Message) {
wg.Done()
})
h.Publish(Message{Name: "a.b.c"})
h.Publish(Message{Name: "a.c.c"})
wg.Wait()
})
}
}

func processSubscription(s *Subscription, op func(msg Message)) {
Expand Down
3 changes: 3 additions & 0 deletions subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ func (d *nonBlockingSubscriber) Next() (Message, bool) {

// NewBlockingSubscriber returns a new blocking subscriber using chanels imternally.
func NewBlockingSubscriber(cap int) Subscriber {
if cap < 0 {
cap = 0
}
return &blockingSubscriber{
ch: make(chan Message, cap),
}
Expand Down

0 comments on commit 4acfe4a

Please sign in to comment.