Skip to content

Commit

Permalink
promtail: add "max_age" field to configure cutoff for journal reading (
Browse files Browse the repository at this point in the history
…grafana#921)

The journal scrape config in promtail has been updated to support
a "max_age" field. max_age determines the oldest journal entry
promtail will read when starting the journal reader. When
unspecified, max_age defaults to 7h.

Even if a position in the journal is saved in the promtail
positions file, that position may be ignored if the entry corresponding
to that position is older than the max_age cutoff time.

Example promtail config for getting up to the last 12h of journal
entries:

  server:
    http_listen_port: 9080
    grpc_listen_port: 0

  positions:
    filename: /tmp/positions.yaml

  clients:
    - url: http://localhost:3100/api/prom/push

  scrape_configs:
  - job_name: journal
    journal:
      max_age: 12h
      path: /var/log/journal
      labels:
        job: systemd-journal
    relabel_configs:
      - source_labels: ['__journal__systemd_unit']
        target_label: 'unit'
  • Loading branch information
rfratto authored Aug 21, 2019
1 parent 75a3e61 commit 4c41843
Show file tree
Hide file tree
Showing 4 changed files with 247 additions and 7 deletions.
8 changes: 7 additions & 1 deletion docs/promtail-examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ clients:
scrape_configs:
- job_name: journal
journal:
max_age: 12h
path: /var/log/journal
labels:
job: systemd-journal
Expand All @@ -103,6 +104,11 @@ Just like the Docker example, the `scrape_configs` sections holds various
jobs for parsing logs. A job with a `journal` key configures it for systemd
journal reading.

`max_age` is an optional string specifying the earliest entry that will be
read. If unspecified, `max_age` defaults to `7h`. Even if the position in the
journal is saved, if the entry corresponding to that position is older than
the max_age, the position won't be used.

`path` is an optional string specifying the path to read journal entries
from. If unspecified, defaults to the system default (`/var/log/journal`).

Expand Down Expand Up @@ -130,4 +136,4 @@ If running with Docker, that means to bind that path:
docker run -d --name promtail --network loki_network -p 9080:9080 \
-v /var/log/journal:/var/log/journal \
mypromtail-image -config.file=/etc/promtail/my-systemd-journal-config.yaml
```
```
8 changes: 8 additions & 0 deletions pkg/promtail/scrape/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ type Config struct {

// JournalTargetConfig describes systemd journal records to scrape.
type JournalTargetConfig struct {
// MaxAge determines the oldest relative time from process start that will
// be read and sent to Loki. Values like 14h means no entry older than
// 14h will be read. If unspecified, defaults to 7h.
//
// A relative time specified here takes precedence over the saved position;
// if the cursor is older than the MaxAge value, it will not be used.
MaxAge string `yaml:"max_age"`

// Labels optionally holds labels to associate with each record coming out
// of the journal.
Labels model.LabelSet `yaml:"labels"`
Expand Down
109 changes: 104 additions & 5 deletions pkg/promtail/targets/journaltarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,48 @@ const (
// Formatter causes an immediate EOF and induces performance issues
// with how that is handled in sdjournal.
journalEmptyStr = " "

// journalDefaultMaxAgeTime represents the default earliest entry that
// will be read by the journal reader if there is no saved position
// newer than the "max_age" time.
journalDefaultMaxAgeTime = time.Hour * 7
)

type journalReader interface {
io.Closer
Follow(until <-chan time.Time, writer io.Writer) error
}

// Abstracted functions for interacting with the journal, used for mocking in tests:
type journalReaderFunc func(sdjournal.JournalReaderConfig) (journalReader, error)
type journalEntryFunc func(cfg sdjournal.JournalReaderConfig, cursor string) (*sdjournal.JournalEntry, error)

// Default implementations of abstracted functions:
var defaultJournalReaderFunc = func(c sdjournal.JournalReaderConfig) (journalReader, error) {
return sdjournal.NewJournalReader(c)
}

var defaultJournalEntryFunc = func(c sdjournal.JournalReaderConfig, cursor string) (*sdjournal.JournalEntry, error) {
var (
journal *sdjournal.Journal
err error
)

if c.Path != "" {
journal, err = sdjournal.NewJournalFromDir(c.Path)
} else {
journal, err = sdjournal.NewJournal()
}

if err != nil {
return nil, err
} else if err := journal.SeekCursor(cursor); err != nil {
return nil, err
}

return journal.GetEntry()
}

// JournalTarget tails systemd journal entries.
type JournalTarget struct {
logger log.Logger
Expand Down Expand Up @@ -76,6 +105,7 @@ func NewJournalTarget(
relabelConfig,
targetConfig,
defaultJournalReaderFunc,
defaultJournalEntryFunc,
)
}

Expand All @@ -87,6 +117,7 @@ func journalTargetWithReader(
relabelConfig []*relabel.Config,
targetConfig *scrape.JournalTargetConfig,
readerFunc journalReaderFunc,
entryFunc journalEntryFunc,
) (*JournalTarget, error) {

positionPath := fmt.Sprintf("journal-%s", jobName)
Expand All @@ -95,6 +126,9 @@ func journalTargetWithReader(
if readerFunc == nil {
readerFunc = defaultJournalReaderFunc
}
if entryFunc == nil {
entryFunc = defaultJournalEntryFunc
}

until := make(chan time.Time)
t := &JournalTarget{
Expand All @@ -109,6 +143,17 @@ func journalTargetWithReader(
until: until,
}

var maxAge time.Duration
var err error
if targetConfig.MaxAge == "" {
maxAge = journalDefaultMaxAgeTime
} else {
maxAge, err = time.ParseDuration(targetConfig.MaxAge)
}
if err != nil {
return nil, errors.Wrap(err, "parsing journal reader 'max_age' config value")
}

// Default to system path if not defined. Passing an empty string to
// sdjournal is valid but forces reads from the journal to be from
// the local machine id only, which contradicts the default behavior
Expand All @@ -119,12 +164,13 @@ func journalTargetWithReader(
journalPath = "/var/log/journal"
}

var err error
t.r, err = readerFunc(sdjournal.JournalReaderConfig{
Path: journalPath,
Cursor: position,
Formatter: t.formatter,
cfg := t.generateJournalConfig(journalConfigBuilder{
JournalPath: journalPath,
Position: position,
MaxAge: maxAge,
EntryFunc: entryFunc,
})
t.r, err = readerFunc(cfg)
if err != nil {
return nil, errors.Wrap(err, "creating journal reader")
}
Expand All @@ -139,6 +185,59 @@ func journalTargetWithReader(
return t, nil
}

type journalConfigBuilder struct {
JournalPath string
Position string
MaxAge time.Duration
EntryFunc journalEntryFunc
}

// generateJournalConfig generates a journal config by trying to intelligently
// determine if a time offset or the cursor should be used for the starting
// position in the reader.
func (t *JournalTarget) generateJournalConfig(
cb journalConfigBuilder,
) sdjournal.JournalReaderConfig {

cfg := sdjournal.JournalReaderConfig{
Path: cb.JournalPath,
Formatter: t.formatter,
}

// When generating the JournalReaderConfig, we want to preferably
// use the Cursor, since it's guaranteed unique to a given journal
// entry. When we don't know the cursor position (or want to set
// a start time), we'll fall back to the less-precise Since, which
// takes a negative duration back from the current system time.
//
// The presence of Since takes precedence over Cursor, so we only
// ever set one and not both here.

if cb.Position == "" {
cfg.Since = -1 * cb.MaxAge
return cfg
}

// We have a saved position and need to get that entry to see if it's
// older than cb.MaxAge. If it _is_ older, then we need to use cfg.Since
// rather than cfg.Cursor.
entry, err := cb.EntryFunc(cfg, cb.Position)
if err != nil {
level.Error(t.logger).Log("msg", "received error reading saved journal position", "err", err.Error())
cfg.Since = -1 * cb.MaxAge
return cfg
}

ts := time.Unix(0, int64(entry.RealtimeTimestamp)*int64(time.Microsecond))
if time.Since(ts) > cb.MaxAge {
cfg.Since = -1 * cb.MaxAge
return cfg
}

cfg.Cursor = cb.Position
return cfg
}

func (t *JournalTarget) formatter(entry *sdjournal.JournalEntry) (string, error) {
ts := time.Unix(0, int64(entry.RealtimeTimestamp)*int64(time.Microsecond))

Expand Down
129 changes: 128 additions & 1 deletion pkg/promtail/targets/journaltarget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ func (r *mockJournalReader) Follow(until <-chan time.Time, writer io.Writer) err
return nil
}

func newMockJournalEntry(entry *sdjournal.JournalEntry) journalEntryFunc {
return func(c sdjournal.JournalReaderConfig, cursor string) (*sdjournal.JournalEntry, error) {
return entry, nil
}
}

func (r *mockJournalReader) Write(msg string, fields map[string]string) {
allFields := make(map[string]string, len(fields))
for k, v := range fields {
Expand Down Expand Up @@ -94,7 +100,7 @@ func TestJournalTarget(t *testing.T) {
require.NoError(t, err)

jt, err := journalTargetWithReader(logger, client, ps, "test", relabels,
&scrape.JournalTargetConfig{}, newMockJournalReader)
&scrape.JournalTargetConfig{}, newMockJournalReader, newMockJournalEntry(nil))
require.NoError(t, err)

r := jt.r.(*mockJournalReader)
Expand All @@ -110,3 +116,124 @@ func TestJournalTarget(t *testing.T) {
assert.Len(t, client.messages, 10)
require.NoError(t, jt.Stop())
}

func TestJournalTarget_Since(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)

initRandom()
dirName := "/tmp/" + randName()
positionsFileName := dirName + "/positions.yml"

// Set the sync period to a really long value, to guarantee the sync timer
// never runs, this way we know everything saved was done through channel
// notifications when target.stop() was called.
ps, err := positions.New(logger, positions.Config{
SyncPeriod: 10 * time.Second,
PositionsFile: positionsFileName,
})
if err != nil {
t.Fatal(err)
}

client := &TestClient{
log: logger,
messages: make([]string, 0),
}

cfg := scrape.JournalTargetConfig{
MaxAge: "4h",
}

jt, err := journalTargetWithReader(logger, client, ps, "test", nil,
&cfg, newMockJournalReader, newMockJournalEntry(nil))
require.NoError(t, err)

r := jt.r.(*mockJournalReader)
require.Equal(t, r.config.Since, -1*time.Hour*4)
}

func TestJournalTarget_Cursor_TooOld(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)

initRandom()
dirName := "/tmp/" + randName()
positionsFileName := dirName + "/positions.yml"

// Set the sync period to a really long value, to guarantee the sync timer
// never runs, this way we know everything saved was done through channel
// notifications when target.stop() was called.
ps, err := positions.New(logger, positions.Config{
SyncPeriod: 10 * time.Second,
PositionsFile: positionsFileName,
})
if err != nil {
t.Fatal(err)
}
ps.PutString("journal-test", "foobar")

client := &TestClient{
log: logger,
messages: make([]string, 0),
}

cfg := scrape.JournalTargetConfig{}

entryTs := time.Date(1980, time.July, 3, 12, 0, 0, 0, time.UTC)
journalEntry := newMockJournalEntry(&sdjournal.JournalEntry{
Cursor: "foobar",
Fields: nil,
RealtimeTimestamp: uint64(entryTs.UnixNano()),
})

jt, err := journalTargetWithReader(logger, client, ps, "test", nil,
&cfg, newMockJournalReader, journalEntry)
require.NoError(t, err)

r := jt.r.(*mockJournalReader)
require.Equal(t, r.config.Since, -1*time.Hour*7)
}

func TestJournalTarget_Cursor_NotTooOld(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)

initRandom()
dirName := "/tmp/" + randName()
positionsFileName := dirName + "/positions.yml"

// Set the sync period to a really long value, to guarantee the sync timer
// never runs, this way we know everything saved was done through channel
// notifications when target.stop() was called.
ps, err := positions.New(logger, positions.Config{
SyncPeriod: 10 * time.Second,
PositionsFile: positionsFileName,
})
if err != nil {
t.Fatal(err)
}
ps.PutString("journal-test", "foobar")

client := &TestClient{
log: logger,
messages: make([]string, 0),
}

cfg := scrape.JournalTargetConfig{}

entryTs := time.Now().Add(-time.Hour)
journalEntry := newMockJournalEntry(&sdjournal.JournalEntry{
Cursor: "foobar",
Fields: nil,
RealtimeTimestamp: uint64(entryTs.UnixNano() / int64(time.Microsecond)),
})

jt, err := journalTargetWithReader(logger, client, ps, "test", nil,
&cfg, newMockJournalReader, journalEntry)
require.NoError(t, err)

r := jt.r.(*mockJournalReader)
require.Equal(t, r.config.Since, time.Duration(0))
require.Equal(t, r.config.Cursor, "foobar")
}

0 comments on commit 4c41843

Please sign in to comment.