Skip to content

Commit

Permalink
Merge pull request dunglas#20 from dunglas/race
Browse files Browse the repository at this point in the history
Send headers as soon as possible. Improve tests.
  • Loading branch information
dunglas authored Nov 1, 2018
2 parents 706c700 + d02a1ad commit 1127dcc
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 30 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ script:
- go vet
- gofmt -e -d *.go
- go build
- goveralls -covermode=set -service=travis-ci
- goveralls -race -service=travis-ci
after_success:
- git status
- test -n "$TRAVIS_TAG" && docker login -u="$DOCKER_USERNAME" -p="$DOCKER_PASSWORD"
Expand Down
27 changes: 21 additions & 6 deletions hub/hub.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package hub

import "net/http"
import (
"net/http"
"sync"
)

type serializedUpdate struct {
*Update
Expand All @@ -11,10 +14,15 @@ func newSerializedUpdate(u *Update) *serializedUpdate {
return &serializedUpdate{u, u.String()}
}

type subscribers struct {
sync.RWMutex
m map[chan *serializedUpdate]struct{}
}

// Hub stores channels with clients currently subcribed
type Hub struct {
options *Options
subscribers map[chan *serializedUpdate]struct{}
subscribers subscribers
newSubscribers chan chan *serializedUpdate
removedSubscribers chan chan *serializedUpdate
updates chan *serializedUpdate
Expand All @@ -36,7 +44,7 @@ func NewHubFromEnv(history History) (*Hub, error) {
func NewHub(history History, options *Options) *Hub {
return &Hub{
options,
make(map[chan *serializedUpdate]struct{}),
subscribers{m: make(map[chan *serializedUpdate]struct{})},
make(chan (chan *serializedUpdate)),
make(chan (chan *serializedUpdate)),
make(chan *serializedUpdate),
Expand All @@ -52,10 +60,14 @@ func (h *Hub) Start() {
select {

case s := <-h.newSubscribers:
h.subscribers[s] = struct{}{}
h.subscribers.Lock()
h.subscribers.m[s] = struct{}{}
h.subscribers.Unlock()

case s := <-h.removedSubscribers:
delete(h.subscribers, s)
h.subscribers.Lock()
delete(h.subscribers.m, s)
h.subscribers.Unlock()
close(s)

case serializedUpdate, ok := <-h.updates:
Expand All @@ -65,13 +77,16 @@ func (h *Hub) Start() {
}
}

for s := range h.subscribers {
h.subscribers.RLock()
for s := range h.subscribers.m {
if ok {
s <- serializedUpdate
} else {
close(s)
}
}
h.subscribers.RUnlock()

if !ok {
return
}
Expand Down
2 changes: 1 addition & 1 deletion hub/hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func TestNewHub(t *testing.T) {
h := createDummy()

assert.IsType(t, &Options{}, h.options)
assert.IsType(t, map[chan *serializedUpdate]struct{}{}, h.subscribers)
assert.IsType(t, map[chan *serializedUpdate]struct{}{}, h.subscribers.m)
assert.IsType(t, make(chan (chan *serializedUpdate)), h.newSubscribers)
assert.IsType(t, make(chan (chan *serializedUpdate)), h.removedSubscribers)
assert.IsType(t, make(chan *serializedUpdate), h.updates)
Expand Down
28 changes: 13 additions & 15 deletions hub/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,41 +66,39 @@ func TestServe(t *testing.T) {
resp, _ = client.Get("http://" + testAddr + "/")
}

var wg sync.WaitGroup
wg.Add(2)
var wgConnected, wgTested sync.WaitGroup
wgConnected.Add(2)
wgTested.Add(2)

go func(w *sync.WaitGroup) {
defer w.Done()
go func() {
defer wgTested.Done()
resp, err := client.Get(testURL + "?topic=http%3A%2F%2Fexample.com%2Ffoo%2F1")
if err != nil {
panic(err)
}
wgConnected.Done()

defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)

assert.Equal(t, []byte("id: first\ndata: hello\n\n"), body)
}(&wg)
}()

go func(w *sync.WaitGroup) {
defer w.Done()
go func() {
defer wgTested.Done()
resp, err := client.Get(testURL + "?topic=http%3A%2F%2Fexample.com%2Falt%2F1")
if err != nil {
panic(err)
}
wgConnected.Done()

defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)

assert.Equal(t, []byte("id: first\ndata: hello\n\n"), body)
}(&wg)
}()

// Wait for the subscription
for {
if len(h.subscribers) == 2 {
break
}
}
wgConnected.Wait()

body := url.Values{"topic": {"http://example.com/foo/1", "http://example.com/alt/1"}, "data": {"hello"}, "id": {"first"}}
req, _ := http.NewRequest("POST", testURL, strings.NewReader(body.Encode()))
Expand All @@ -113,5 +111,5 @@ func TestServe(t *testing.T) {
}

h.server.Shutdown(context.Background())
wg.Wait()
wgTested.Wait()
}
1 change: 1 addition & 0 deletions hub/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func sendHeaders(w http.ResponseWriter) {

// NGINX support https://www.nginx.com/resources/wiki/start/topics/examples/x-accel/#x-accel-buffering
w.Header().Set("X-Accel-Buffering", "no")
w.(http.Flusher).Flush()
}

func retrieveLastEventID(r *http.Request) string {
Expand Down
33 changes: 26 additions & 7 deletions hub/subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,11 @@ func TestSubscribe(t *testing.T) {

go func() {
for {
if len(hub.subscribers) == 0 {
hub.subscribers.RLock()
empty := len(hub.subscribers.m) == 0
hub.subscribers.RUnlock()

if empty {
continue
}

Expand Down Expand Up @@ -161,7 +165,7 @@ func TestSubscribe(t *testing.T) {
func TestUnsubscribe(t *testing.T) {
hub := createAnonymousDummy()
hub.Start()
assert.Equal(t, 0, len(hub.subscribers))
assert.Equal(t, 0, len(hub.subscribers.m))
wr := newCloseNotifyingRecorder()

var wg sync.WaitGroup
Expand All @@ -170,11 +174,14 @@ func TestUnsubscribe(t *testing.T) {
defer w.Done()
req := httptest.NewRequest("GET", "http://example.com/hub?topic=http://example.com/books/1", nil)
hub.SubscribeHandler(wr, req)
assert.Equal(t, 0, len(hub.subscribers))
assert.Equal(t, 0, len(hub.subscribers.m))
}(&wg)

for {
if len(hub.subscribers) != 0 {
hub.subscribers.RLock()
notEmpty := len(hub.subscribers.m) != 0
hub.subscribers.RUnlock()
if notEmpty {
break
}
}
Expand All @@ -189,7 +196,11 @@ func TestSubscribeTarget(t *testing.T) {

go func() {
for {
if len(hub.subscribers) == 0 {
hub.subscribers.RLock()
empty := len(hub.subscribers.m) == 0
hub.subscribers.RUnlock()

if empty {
continue
}

Expand Down Expand Up @@ -231,7 +242,11 @@ func TestSubscribeAllTargets(t *testing.T) {

go func() {
for {
if len(hub.subscribers) == 0 {
hub.subscribers.RLock()
empty := len(hub.subscribers.m) == 0
hub.subscribers.RUnlock()

if empty {
continue
}

Expand Down Expand Up @@ -306,7 +321,11 @@ func TestSendMissedEvents(t *testing.T) {
}(&wg)

for {
if len(hub.subscribers) == 2 {
hub.subscribers.RLock()
two := len(hub.subscribers.m) == 2
hub.subscribers.RUnlock()

if two {
break
}
}
Expand Down

0 comments on commit 1127dcc

Please sign in to comment.