Skip to content

Commit

Permalink
Merge pull request moby#16316 from vbatts/vbatts-events
Browse files Browse the repository at this point in the history
daemon/events: let Log be [slightly] blocking
  • Loading branch information
calavera committed Sep 16, 2015
2 parents fb01c2e + fc77ea7 commit b9a3660
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 36 deletions.
24 changes: 11 additions & 13 deletions daemon/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,17 @@ func (e *Events) Evict(l chan interface{}) {
// Log broadcasts event to listeners. Each listener has 100 millisecond for
// receiving event or it will be skipped.
func (e *Events) Log(action, id, from string) {
go func() {
e.mu.Lock()
jm := &jsonmessage.JSONMessage{Status: action, ID: id, From: from, Time: time.Now().UTC().Unix()}
if len(e.events) == cap(e.events) {
// discard oldest event
copy(e.events, e.events[1:])
e.events[len(e.events)-1] = jm
} else {
e.events = append(e.events, jm)
}
e.mu.Unlock()
e.pub.Publish(jm)
}()
jm := &jsonmessage.JSONMessage{Status: action, ID: id, From: from, Time: time.Now().UTC().Unix()}
e.mu.Lock()
if len(e.events) == cap(e.events) {
// discard oldest event
copy(e.events, e.events[1:])
e.events[len(e.events)-1] = jm
} else {
e.events = append(e.events, jm)
}
e.mu.Unlock()
e.pub.Publish(jm)
}

// SubscribersCount returns number of event listeners
Expand Down
63 changes: 40 additions & 23 deletions daemon/events/events_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package events

import (
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -80,39 +81,55 @@ func TestEventsLogTimeout(t *testing.T) {
}
}

func TestEventsCap(t *testing.T) {
func TestLogEvents(t *testing.T) {
e := New()
for i := 0; i < eventsLimit+1; i++ {
e.Log("action", "id", "from")
}
// let all events go through
time.Sleep(1 * time.Second)

for i := 0; i < eventsLimit+16; i++ {
action := fmt.Sprintf("action_%d", i)
id := fmt.Sprintf("cont_%d", i)
from := fmt.Sprintf("image_%d", i)
e.Log(action, id, from)
}
time.Sleep(50 * time.Millisecond)
current, l := e.Subscribe()
if len(current) != eventsLimit {
t.Fatalf("Must be %d events, got %d", eventsLimit, len(current))
for i := 0; i < 10; i++ {
num := i + eventsLimit + 16
action := fmt.Sprintf("action_%d", num)
id := fmt.Sprintf("cont_%d", num)
from := fmt.Sprintf("image_%d", num)
e.Log(action, id, from)
}
if len(e.events) != eventsLimit {
t.Fatalf("Must be %d events, got %d", eventsLimit, len(e.events))
}

for i := 0; i < 10; i++ {
e.Log("action", "id", "from")
}
// let all events go through
time.Sleep(1 * time.Second)

var msgs []*jsonmessage.JSONMessage
for len(msgs) < 10 {
select {
case m := <-l:
jm, ok := (m).(*jsonmessage.JSONMessage)
if !ok {
t.Fatalf("Unexpected type %T", m)
}
msgs = append(msgs, jm)
default:
t.Fatalf("There is no enough events in channel")
m := <-l
jm, ok := (m).(*jsonmessage.JSONMessage)
if !ok {
t.Fatalf("Unexpected type %T", m)
}
msgs = append(msgs, jm)
}
if len(current) != eventsLimit {
t.Fatalf("Must be %d events, got %d", eventsLimit, len(current))
}
first := current[0]
if first.Status != "action_16" {
t.Fatalf("First action is %s, must be action_16", first.Status)
}
last := current[len(current)-1]
if last.Status != "action_79" {
t.Fatalf("Last action is %s, must be action_79", last.Status)
}

firstC := msgs[0]
if firstC.Status != "action_80" {
t.Fatalf("First action is %s, must be action_80", firstC.Status)
}
lastC := msgs[len(msgs)-1]
if lastC.Status != "action_89" {
t.Fatalf("Last action is %s, must be action_89", lastC.Status)
}
}

0 comments on commit b9a3660

Please sign in to comment.