Skip to content

Commit

Permalink
use pubsub instead of filenotify to follow json logs
Browse files Browse the repository at this point in the history
inotify event is trigged immediately there's data written to disk.
But at the time that the inotify event is received, the json line might
not fully saved to disk. If the json decoder tries to decode in such
case, an io.UnexpectedEOF will be trigged.
We used to retry for several times to mitigate the io.UnexpectedEOF error.
But there are still flaky tests caused by the partial log entries.

The daemon knows exactly when there are new log entries emitted. We can
use the pubsub package to notify all the log readers instead of inotify.

Signed-off-by: Shijiang Wei <[email protected]>

try to fix broken test. will squash once tests pass

Signed-off-by: Shijiang Wei <[email protected]>
  • Loading branch information
mountkin committed Feb 15, 2016
1 parent 389a38e commit b1594c5
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 92 deletions.
25 changes: 15 additions & 10 deletions daemon/logger/jsonfilelog/jsonfilelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/loggerutils"
"github.com/docker/docker/pkg/jsonlog"
"github.com/docker/docker/pkg/pubsub"
"github.com/docker/go-units"
)

Expand All @@ -22,12 +23,13 @@ const Name = "json-file"

// JSONFileLogger is Logger implementation for default Docker logging.
type JSONFileLogger struct {
buf *bytes.Buffer
writer *loggerutils.RotateFileWriter
mu sync.Mutex
ctx logger.Context
readers map[*logger.LogWatcher]struct{} // stores the active log followers
extra []byte // json-encoded extra attributes
buf *bytes.Buffer
writer *loggerutils.RotateFileWriter
mu sync.Mutex
ctx logger.Context
readers map[*logger.LogWatcher]struct{} // stores the active log followers
extra []byte // json-encoded extra attributes
writeNotifier *pubsub.Publisher
}

func init() {
Expand Down Expand Up @@ -77,10 +79,11 @@ func New(ctx logger.Context) (logger.Logger, error) {
}

return &JSONFileLogger{
buf: bytes.NewBuffer(nil),
writer: writer,
readers: make(map[*logger.LogWatcher]struct{}),
extra: extra,
buf: bytes.NewBuffer(nil),
writer: writer,
readers: make(map[*logger.LogWatcher]struct{}),
extra: extra,
writeNotifier: pubsub.NewPublisher(0, 10),
}, nil
}

Expand All @@ -104,6 +107,7 @@ func (l *JSONFileLogger) Log(msg *logger.Message) error {

l.buf.WriteByte('\n')
_, err = l.writer.Write(l.buf.Bytes())
l.writeNotifier.Publish(struct{}{})
l.buf.Reset()

return err
Expand Down Expand Up @@ -137,6 +141,7 @@ func (l *JSONFileLogger) Close() error {
r.Close()
delete(l.readers, r)
}
l.writeNotifier.Close()
l.mu.Unlock()
return err
}
Expand Down
142 changes: 62 additions & 80 deletions daemon/logger/jsonfilelog/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,11 @@ import (

"github.com/Sirupsen/logrus"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/pkg/filenotify"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/jsonlog"
"github.com/docker/docker/pkg/tailfile"
)

const maxJSONDecodeRetry = 20000

func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
l.Reset()
if err := dec.Decode(l); err != nil {
Expand All @@ -35,7 +32,6 @@ func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, erro
// created by this driver.
func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
logWatcher := logger.NewLogWatcher()

go l.readLogs(logWatcher, config)
return logWatcher
}
Expand Down Expand Up @@ -85,7 +81,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
l.mu.Unlock()

notifyRotate := l.writer.NotifyRotate()
followLogs(latestFile, logWatcher, notifyRotate, config.Since)
l.followLogs(latestFile, logWatcher, notifyRotate, config.Since)

l.mu.Lock()
delete(l.readers, logWatcher)
Expand Down Expand Up @@ -121,95 +117,81 @@ func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since ti
}
}

func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
dec := json.NewDecoder(f)
l := &jsonlog.JSONLog{}
func (l *JSONFileLogger) followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
var (
rotated bool

fileWatcher, err := filenotify.New()
if err != nil {
logWatcher.Err <- err
}
defer fileWatcher.Close()
dec = json.NewDecoder(f)
log = &jsonlog.JSONLog{}
writeNotify = l.writeNotifier.Subscribe()
watchClose = logWatcher.WatchClose()
)

var retries int
for {
msg, err := decodeLogLine(dec, l)
reopenLogFile := func() error {
f.Close()
f, err := os.Open(f.Name())
if err != nil {
if err != io.EOF {
// try again because this shouldn't happen
if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
dec = json.NewDecoder(f)
retries++
continue
}

// io.ErrUnexpectedEOF is returned from json.Decoder when there is
// remaining data in the parser's buffer while an io.EOF occurs.
// If the json logger writes a partial json log entry to the disk
// while at the same time the decoder tries to decode it, the race condition happens.
if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry {
reader := io.MultiReader(dec.Buffered(), f)
dec = json.NewDecoder(reader)
retries++
continue
}
logWatcher.Err <- err
return
}
return err
}
dec = json.NewDecoder(f)
rotated = true
return nil
}

logrus.WithField("logger", "json-file").Debugf("waiting for events")
if err := fileWatcher.Add(f.Name()); err != nil {
logrus.WithField("logger", "json-file").Warn("falling back to file poller")
fileWatcher.Close()
fileWatcher = filenotify.NewPollingWatcher()
if err := fileWatcher.Add(f.Name()); err != nil {
logrus.Errorf("error watching log file for modifications: %v", err)
logWatcher.Err <- err
}
readToEnd := func() error {
for {
msg, err := decodeLogLine(dec, log)
if err != nil {
return err
}
select {
case <-fileWatcher.Events():
dec = json.NewDecoder(f)
fileWatcher.Remove(f.Name())
continue
case <-fileWatcher.Errors():
fileWatcher.Remove(f.Name())
logWatcher.Err <- err
return
case <-logWatcher.WatchClose():
fileWatcher.Remove(f.Name())
return
case <-notifyRotate:
f, err = os.Open(f.Name())
if err != nil {
logWatcher.Err <- err
return
}

dec = json.NewDecoder(f)
fileWatcher.Remove(f.Name())
fileWatcher.Add(f.Name())
if !since.IsZero() && msg.Timestamp.Before(since) {
continue
}
logWatcher.Msg <- msg
}
}

retries = 0 // reset retries since we've succeeded
if !since.IsZero() && msg.Timestamp.Before(since) {
continue
defer func() {
l.writeNotifier.Evict(writeNotify)
if rotated {
f.Close()
}
}()

for {
select {
case logWatcher.Msg <- msg:
case <-logWatcher.WatchClose():
logWatcher.Msg <- msg
for {
msg, err := decodeLogLine(dec, l)
if err != nil {
case <-watchClose:
readToEnd()
return
case <-notifyRotate:
readToEnd()
if err := reopenLogFile(); err != nil {
logWatcher.Err <- err
return
}
case _, ok := <-writeNotify:
if err := readToEnd(); err == io.EOF {
if !ok {
// The writer is closed, no new logs will be generated.
return
}
if !since.IsZero() && msg.Timestamp.Before(since) {
continue

select {
case <-notifyRotate:
if err := reopenLogFile(); err != nil {
logWatcher.Err <- err
return
}
default:
dec = json.NewDecoder(f)
}
logWatcher.Msg <- msg

} else if err == io.ErrUnexpectedEOF {
dec = json.NewDecoder(io.MultiReader(dec.Buffered(), f))
} else {
logrus.Errorf("Failed to decode json log %s: %v", f.Name(), err)
logWatcher.Err <- err
return
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/pubsub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
// Evict removes the specified subscriber from receiving any more messages.
func (p *Publisher) Evict(sub chan interface{}) {
p.m.Lock()
delete(p.subscribers, sub)
close(sub)
if _, ok := p.subscribers[sub]; ok {
delete(p.subscribers, sub)
close(sub)
}
p.m.Unlock()
}

Expand Down

0 comments on commit b1594c5

Please sign in to comment.