Skip to content

Commit

Permalink
Update logger adapter test to avoid race
Browse files Browse the repository at this point in the history
Add synchronization around adding logs to a plugin
and reading those logs. Without the follow configuration,
a race occurs between go routines to add the logs into
the plugin and read the logs out of the plugin. This
adds a function to synchronize the action to avoid the
race.
Removes use of file for buffering, instead buffering whole
messages so log count can be checked discretely.

Signed-off-by: Derek McGowan <[email protected]>
Signed-off-by: Sebastiaan van Stijn <[email protected]>
  • Loading branch information
dmcgowan authored and thaJeztah committed Apr 12, 2018
1 parent fb17020 commit c208f1c
Showing 1 changed file with 79 additions and 44 deletions.
123 changes: 79 additions & 44 deletions daemon/logger/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ package logger // import "github.com/docker/docker/daemon/logger"
import (
"encoding/binary"
"io"
"io/ioutil"
"os"
"sync"
"testing"
"time"

Expand All @@ -17,24 +16,57 @@ import (
// mockLoggingPlugin implements the loggingPlugin interface for testing purposes
// it only supports a single log stream
type mockLoggingPlugin struct {
inStream io.ReadCloser
f *os.File
closed chan struct{}
t *testing.T
io.WriteCloser
inStream io.Reader
logs []*logdriver.LogEntry
c *sync.Cond
err error
}

func newMockLoggingPlugin() *mockLoggingPlugin {
r, w := io.Pipe()
return &mockLoggingPlugin{
WriteCloser: w,
inStream: r,
logs: []*logdriver.LogEntry{},
c: sync.NewCond(new(sync.Mutex)),
}
}

func (l *mockLoggingPlugin) StartLogging(file string, info Info) error {
go func() {
io.Copy(l.f, l.inStream)
close(l.closed)
dec := protoio.NewUint32DelimitedReader(l.inStream, binary.BigEndian, 1e6)
for {
var msg logdriver.LogEntry
if err := dec.ReadMsg(&msg); err != nil {
l.c.L.Lock()
if l.err == nil {
l.err = err
}
l.c.L.Unlock()

l.c.Broadcast()
return

}

l.c.L.Lock()
l.logs = append(l.logs, &msg)
l.c.L.Unlock()
l.c.Broadcast()
}

}()
return nil
}

func (l *mockLoggingPlugin) StopLogging(file string) error {
l.inStream.Close()
l.f.Close()
os.Remove(l.f.Name())
l.c.L.Lock()
if l.err == nil {
l.err = io.EOF
}
l.c.L.Unlock()
l.c.Broadcast()
return nil
}

Expand All @@ -44,63 +76,60 @@ func (l *mockLoggingPlugin) Capabilities() (cap Capability, err error) {

func (l *mockLoggingPlugin) ReadLogs(info Info, config ReadConfig) (io.ReadCloser, error) {
r, w := io.Pipe()
f, err := os.Open(l.f.Name())
if err != nil {
return nil, err
}

go func() {
defer f.Close()
dec := protoio.NewUint32DelimitedReader(f, binary.BigEndian, 1e6)
var idx int
enc := logdriver.NewLogEntryEncoder(w)

l.c.L.Lock()
defer l.c.L.Unlock()
for {
select {
case <-l.closed:
if l.err != nil {
w.Close()
return
default:
}

var msg logdriver.LogEntry
if err := dec.ReadMsg(&msg); err != nil {
if err == io.EOF {
if !config.Follow {
w.Close()
return
}
dec = protoio.NewUint32DelimitedReader(f, binary.BigEndian, 1e6)
continue
if idx >= len(l.logs) {
if !config.Follow {
w.Close()
return
}

l.t.Fatal(err)
l.c.Wait()
continue
}

if err := enc.Encode(&msg); err != nil {
if err := enc.Encode(l.logs[idx]); err != nil {
w.CloseWithError(err)
return
}
idx++
}
}()

return r, nil
}

func newMockPluginAdapter(t *testing.T) Logger {
r, w := io.Pipe()
f, err := ioutil.TempFile("", "mock-plugin-adapter")
assert.Check(t, err)
func (l *mockLoggingPlugin) waitLen(i int) {
l.c.L.Lock()
defer l.c.L.Unlock()
for len(l.logs) < i {
l.c.Wait()
}
}

enc := logdriver.NewLogEntryEncoder(w)
func (l *mockLoggingPlugin) check(t *testing.T) {
if l.err != nil && l.err != io.EOF {
t.Fatal(l.err)
}
}

func newMockPluginAdapter(plugin *mockLoggingPlugin) Logger {
enc := logdriver.NewLogEntryEncoder(plugin)
a := &pluginAdapterWithRead{
&pluginAdapter{
plugin: &mockLoggingPlugin{
inStream: r,
f: f,
closed: make(chan struct{}),
t: t,
},
stream: w,
plugin: plugin,
stream: plugin,
enc: enc,
},
}
Expand All @@ -109,7 +138,8 @@ func newMockPluginAdapter(t *testing.T) Logger {
}

func TestAdapterReadLogs(t *testing.T) {
l := newMockPluginAdapter(t)
plugin := newMockLoggingPlugin()
l := newMockPluginAdapter(plugin)

testMsg := []Message{
{Line: []byte("Are you the keymaker?"), Timestamp: time.Now()},
Expand All @@ -120,6 +150,9 @@ func TestAdapterReadLogs(t *testing.T) {
assert.Check(t, l.Log(m))
}

// Wait until messages are read into plugin
plugin.waitLen(len(testMsg))

lr, ok := l.(LogReader)
assert.Check(t, ok, "Logger does not implement LogReader")

Expand Down Expand Up @@ -172,6 +205,8 @@ func TestAdapterReadLogs(t *testing.T) {
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for logger to close")
}

plugin.check(t)
}

func testMessageEqual(t *testing.T, a, b *Message) {
Expand Down

0 comments on commit c208f1c

Please sign in to comment.