Skip to content

Commit

Permalink
Logfile: Add tracing spans
Browse files Browse the repository at this point in the history
This plumbs a context down the stack and handles cancellation as needed
so that we can have correlated traces from the API.

Signed-off-by: Brian Goff <[email protected]>
  • Loading branch information
cpuguy83 committed Jul 22, 2024
1 parent 1b46faf commit dbf6873
Show file tree
Hide file tree
Showing 15 changed files with 184 additions and 65 deletions.
2 changes: 1 addition & 1 deletion daemon/attach.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (daemon *Daemon) containerAttach(c *container.Container, cfg *stream.Attach
if !ok {
return logger.ErrReadLogsNotSupported{}
}
logs := cLog.ReadLogs(logger.ReadConfig{Tail: -1})
logs := cLog.ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1})
defer logs.ConsumerGone()

LogLoop:
Expand Down
8 changes: 7 additions & 1 deletion daemon/logger/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type pluginAdapterWithRead struct {
*pluginAdapter
}

func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher {
func (a *pluginAdapterWithRead) ReadLogs(ctx context.Context, config ReadConfig) *LogWatcher {
watcher := NewLogWatcher()

go func() {
Expand All @@ -101,6 +101,10 @@ func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher {

dec := logdriver.NewLogEntryDecoder(stream)
for {
if ctx.Err() != nil {
return
}

var buf logdriver.LogEntry
if err := dec.Decode(&buf); err != nil {
if err == io.EOF {
Expand All @@ -127,6 +131,8 @@ func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher {
// send the message unless the consumer is gone
select {
case watcher.Msg <- msg:
case <-ctx.Done():
return
case <-watcher.WatchConsumerGone():
return
}
Expand Down
5 changes: 3 additions & 2 deletions daemon/logger/adapter_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package logger // import "github.com/docker/docker/daemon/logger"

import (
"context"
"encoding/binary"
"io"
"sync"
Expand Down Expand Up @@ -154,7 +155,7 @@ func TestAdapterReadLogs(t *testing.T) {
lr, ok := l.(LogReader)
assert.Check(t, ok, "Logger does not implement LogReader")

lw := lr.ReadLogs(ReadConfig{})
lw := lr.ReadLogs(context.TODO(), ReadConfig{})

for _, x := range testMsg {
select {
Expand All @@ -173,7 +174,7 @@ func TestAdapterReadLogs(t *testing.T) {
}
lw.ConsumerGone()

lw = lr.ReadLogs(ReadConfig{Follow: true})
lw = lr.ReadLogs(context.TODO(), ReadConfig{Follow: true})
for _, x := range testMsg {
select {
case msg := <-lw.Msg:
Expand Down
4 changes: 3 additions & 1 deletion daemon/logger/journald/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ const (
waitInterval = 250 * time.Millisecond
)

var _ logger.LogReader = (*journald)(nil)

// Fields which we know are not user-provided attribute fields.
var wellKnownFields = map[string]bool{
"MESSAGE": true,
Expand Down Expand Up @@ -447,7 +449,7 @@ func (r *reader) signalReady() {
}
}

func (s *journald) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
func (s *journald) ReadLogs(ctx context.Context, config logger.ReadConfig) *logger.LogWatcher {
r := &reader{
s: s,
logWatcher: logger.NewLogWatcher(),
Expand Down
6 changes: 4 additions & 2 deletions daemon/logger/jsonfilelog/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import (
"github.com/docker/docker/pkg/tailfile"
)

var _ logger.LogReader = (*JSONFileLogger)(nil)

// ReadLogs implements the logger's LogReader interface for the logs
// created by this driver.
func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
return l.writer.ReadLogs(config)
func (l *JSONFileLogger) ReadLogs(ctx context.Context, config logger.ReadConfig) *logger.LogWatcher {
return l.writer.ReadLogs(ctx, config)
}

func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
Expand Down
3 changes: 2 additions & 1 deletion daemon/logger/jsonfilelog/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package jsonfilelog // import "github.com/docker/docker/daemon/logger/jsonfilelo
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -62,7 +63,7 @@ func BenchmarkJSONFileLoggerReadLogs(b *testing.B) {
}
}()

lw := jsonlogger.(*JSONFileLogger).ReadLogs(logger.ReadConfig{Follow: true})
lw := jsonlogger.(*JSONFileLogger).ReadLogs(context.TODO(), logger.ReadConfig{Follow: true})
for {
select {
case _, ok := <-lw.Msg:
Expand Down
4 changes: 2 additions & 2 deletions daemon/logger/local/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
// logger.defaultBufSize caps the size of Line field.
const maxMsgLen int = 1e6 // 1MB.

func (d *driver) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
return d.logfile.ReadLogs(config)
func (d *driver) ReadLogs(ctx context.Context, config logger.ReadConfig) *logger.LogWatcher {
return d.logfile.ReadLogs(ctx, config)
}

func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (loggerutils.SizeReaderAt, int, error) {
Expand Down
3 changes: 2 additions & 1 deletion daemon/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package logger // import "github.com/docker/docker/daemon/logger"

import (
"context"
"sync"
"time"

Expand Down Expand Up @@ -88,7 +89,7 @@ type ReadConfig struct {
// LogReader is the interface for reading log messages for loggers that support reading.
type LogReader interface {
// ReadLogs reads logs from underlying logging backend.
ReadLogs(ReadConfig) *LogWatcher
ReadLogs(context.Context, ReadConfig) *LogWatcher
}

// LogWatcher is used when consuming logs read from the LogReader interface.
Expand Down
39 changes: 20 additions & 19 deletions daemon/logger/loggertest/logreader.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package loggertest // import "github.com/docker/docker/daemon/logger/loggertest"

import (
"context"
"fmt"
"runtime"
"strings"
Expand Down Expand Up @@ -93,63 +94,63 @@ func (tr Reader) testTail(t *testing.T, live bool) {

t.Run("Exact", func(t *testing.T) {
t.Parallel()
lw := lr.ReadLogs(logger.ReadConfig{Tail: len(mm)})
lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: len(mm)})
defer lw.ConsumerGone()
assert.DeepEqual(t, readAll(t, lw), expected, compareLog)
})

t.Run("LessThanAvailable", func(t *testing.T) {
t.Parallel()
lw := lr.ReadLogs(logger.ReadConfig{Tail: 2})
lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: 2})
defer lw.ConsumerGone()
assert.DeepEqual(t, readAll(t, lw), expected[len(mm)-2:], compareLog)
})

t.Run("MoreThanAvailable", func(t *testing.T) {
t.Parallel()
lw := lr.ReadLogs(logger.ReadConfig{Tail: 100})
lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: 100})
defer lw.ConsumerGone()
assert.DeepEqual(t, readAll(t, lw), expected, compareLog)
})

t.Run("All", func(t *testing.T) {
t.Parallel()
lw := lr.ReadLogs(logger.ReadConfig{Tail: -1})
lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1})
defer lw.ConsumerGone()
assert.DeepEqual(t, readAll(t, lw), expected, compareLog)
})

t.Run("Since", func(t *testing.T) {
t.Parallel()
lw := lr.ReadLogs(logger.ReadConfig{Tail: -1, Since: mm[1].Timestamp.Truncate(time.Millisecond)})
lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1, Since: mm[1].Timestamp.Truncate(time.Millisecond)})
defer lw.ConsumerGone()
assert.DeepEqual(t, readAll(t, lw), expected[1:], compareLog)
})

t.Run("MoreThanSince", func(t *testing.T) {
t.Parallel()
lw := lr.ReadLogs(logger.ReadConfig{Tail: len(mm), Since: mm[1].Timestamp.Truncate(time.Millisecond)})
lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: len(mm), Since: mm[1].Timestamp.Truncate(time.Millisecond)})
defer lw.ConsumerGone()
assert.DeepEqual(t, readAll(t, lw), expected[1:], compareLog)
})

t.Run("LessThanSince", func(t *testing.T) {
t.Parallel()
lw := lr.ReadLogs(logger.ReadConfig{Tail: len(mm) - 2, Since: mm[1].Timestamp.Truncate(time.Millisecond)})
lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: len(mm) - 2, Since: mm[1].Timestamp.Truncate(time.Millisecond)})
defer lw.ConsumerGone()
assert.DeepEqual(t, readAll(t, lw), expected[2:], compareLog)
})

t.Run("Until", func(t *testing.T) {
t.Parallel()
lw := lr.ReadLogs(logger.ReadConfig{Tail: -1, Until: mm[2].Timestamp.Add(-time.Millisecond)})
lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1, Until: mm[2].Timestamp.Add(-time.Millisecond)})
defer lw.ConsumerGone()
assert.DeepEqual(t, readAll(t, lw), expected[:2], compareLog)
})

t.Run("SinceAndUntil", func(t *testing.T) {
t.Parallel()
lw := lr.ReadLogs(logger.ReadConfig{Tail: -1, Since: mm[1].Timestamp.Truncate(time.Millisecond), Until: mm[1].Timestamp.Add(time.Millisecond)})
lw := lr.ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1, Since: mm[1].Timestamp.Truncate(time.Millisecond), Until: mm[1].Timestamp.Add(time.Millisecond)})
defer lw.ConsumerGone()
assert.DeepEqual(t, readAll(t, lw), expected[1:2], compareLog)
})
Expand Down Expand Up @@ -182,7 +183,7 @@ func (tr Reader) testTailEmptyLogs(t *testing.T, live bool) {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{})
lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{})
defer lw.ConsumerGone()
assert.DeepEqual(t, readAll(t, lw), ([]*logger.Message)(nil), cmpopts.EquateEmpty())
})
Expand All @@ -204,7 +205,7 @@ func (tr Reader) TestFollow(t *testing.T) {
ContainerID: fmt.Sprintf("followstart%d", i),
ContainerName: fmt.Sprintf("logloglog%d", i),
})(t)
lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: tail, Follow: true})
lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Tail: tail, Follow: true})
defer lw.ConsumerGone()

doneReading := make(chan struct{})
Expand Down Expand Up @@ -232,7 +233,7 @@ func (tr Reader) TestFollow(t *testing.T) {
mm := makeTestMessages()
expected := logMessages(t, l, mm[0:1])

lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true})
lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1, Follow: true})
defer lw.ConsumerGone()

doneReading := make(chan struct{})
Expand All @@ -257,7 +258,7 @@ func (tr Reader) TestFollow(t *testing.T) {

mm := makeTestMessages()

lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true, Since: mm[2].Timestamp.Truncate(time.Millisecond)})
lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1, Follow: true, Since: mm[2].Timestamp.Truncate(time.Millisecond)})
defer lw.ConsumerGone()

doneReading := make(chan struct{})
Expand All @@ -282,7 +283,7 @@ func (tr Reader) TestFollow(t *testing.T) {

mm := makeTestMessages()

lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true, Until: mm[2].Timestamp.Add(-time.Millisecond)})
lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1, Follow: true, Until: mm[2].Timestamp.Add(-time.Millisecond)})
defer lw.ConsumerGone()

doneReading := make(chan struct{})
Expand All @@ -307,7 +308,7 @@ func (tr Reader) TestFollow(t *testing.T) {

mm := makeTestMessages()

lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true, Since: mm[1].Timestamp.Add(-time.Millisecond), Until: mm[2].Timestamp.Add(-time.Millisecond)})
lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1, Follow: true, Since: mm[1].Timestamp.Add(-time.Millisecond), Until: mm[2].Timestamp.Add(-time.Millisecond)})
defer lw.ConsumerGone()

doneReading := make(chan struct{})
Expand All @@ -334,7 +335,7 @@ func (tr Reader) TestFollow(t *testing.T) {
logMessages(t, l, mm[0:2])
syncLogger(t, l)

lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: 0, Follow: true})
lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Tail: 0, Follow: true})
defer lw.ConsumerGone()

doneReading := make(chan struct{})
Expand All @@ -361,7 +362,7 @@ func (tr Reader) TestFollow(t *testing.T) {
expected := logMessages(t, l, mm[0:2])[1:]
syncLogger(t, l)

lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: 1, Follow: true})
lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Tail: 1, Follow: true})
defer lw.ConsumerGone()

doneReading := make(chan struct{})
Expand Down Expand Up @@ -390,7 +391,7 @@ func (tr Reader) TestFollow(t *testing.T) {
assert.NilError(t, l.Close())

l = factory(t)
lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Tail: -1, Follow: true})
lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Tail: -1, Follow: true})
defer lw.ConsumerGone()

doneReading := make(chan struct{})
Expand Down Expand Up @@ -430,7 +431,7 @@ func (tr Reader) TestConcurrent(t *testing.T) {
}

// Follow all logs
lw := l.(logger.LogReader).ReadLogs(logger.ReadConfig{Follow: true, Tail: -1})
lw := l.(logger.LogReader).ReadLogs(context.TODO(), logger.ReadConfig{Follow: true, Tail: -1})
defer lw.ConsumerGone()

// Log concurrently from two sources and close log
Expand Down
6 changes: 4 additions & 2 deletions daemon/logger/loggerutils/cache/local_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ var builtInCacheLogOpts = map[string]bool{
cacheDisabledKey: true,
}

var _ logger.LogReader = (*loggerWithCache)(nil)

// WithLocalCache wraps the passed in logger with a logger caches all writes locally
// in addition to writing to the passed in logger.
func WithLocalCache(l logger.Logger, info logger.Info) (logger.Logger, error) {
Expand Down Expand Up @@ -85,8 +87,8 @@ func (l *loggerWithCache) Name() string {
return l.l.Name()
}

func (l *loggerWithCache) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
return l.cache.(logger.LogReader).ReadLogs(config)
func (l *loggerWithCache) ReadLogs(ctx context.Context, config logger.ReadConfig) *logger.LogWatcher {
return l.cache.(logger.LogReader).ReadLogs(ctx, config)
}

func (l *loggerWithCache) Close() error {
Expand Down
6 changes: 4 additions & 2 deletions daemon/logger/loggerutils/follow.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (fl *follow) Do(ctx context.Context, f *os.File, read logPos) {
}()

for {
wrote, ok := fl.nextPos(read)
wrote, ok := fl.nextPos(ctx, read)
if !ok {
return
}
Expand Down Expand Up @@ -100,9 +100,11 @@ func (fl *follow) Do(ctx context.Context, f *os.File, read logPos) {

// nextPos waits until the write position of the LogFile being followed has
// advanced from current and returns the new position.
func (fl *follow) nextPos(current logPos) (next logPos, ok bool) {
func (fl *follow) nextPos(ctx context.Context, current logPos) (next logPos, ok bool) {
var st logReadState
select {
case <-ctx.Done():
return current, false
case <-fl.Watcher.WatchConsumerGone():
return current, false
case st = <-fl.LogFile.read:
Expand Down
Loading

0 comments on commit dbf6873

Please sign in to comment.