From c558984e29dc04340d1dc129f6020189d5d26466 Mon Sep 17 00:00:00 2001 From: Steffen Siering Date: Thu, 19 Jul 2018 09:21:50 +0200 Subject: [PATCH] Fix filebeat registry meta being nil vs empty (#7632) Filebeat introduces a meta field to registry entries in 6.3.1. The meta field is used to distuingish different log streams in docker files. For other input types the meta field must be null. Unfortunately the input loader did initialize the meta field with an empty dictionary. This leads to failing matches of old and new registry entries. Due to the match failing, old entries will not be removed, and filebeat will handle all files as new files on startup (old logs are send again). Users will observe duplicate entries in the reigstry file. One entry with "meta": null and one entry with "meta": {}. The entry with "meta": {} will be used by filebeat. The null-entry will not be used by filebeat, but is kept in the registry file, cause it has now active owner (yet). Improvements provided by this PR: * when matching states consider an empty map and a null-map to be equivalent * update input loader to create a null map for old state -> registry entries will be compatible on upgrade * Add checks in critical places replacing an empty map with a null-map * Add support to fix registry entries on load. states from corrupted 6.3.1 files will be merged into one single state on load * introduce unit tests for loading different registry formats * introduce system tests validating output and registry when upgrading filebeat from an older version Closes: #7634 --- CHANGELOG.asciidoc | 1 + filebeat/input/docker/input.go | 3 + filebeat/input/file/state.go | 7 +- filebeat/input/input.go | 2 +- filebeat/input/log/input.go | 11 +- filebeat/registrar/registrar.go | 104 +++++++++- filebeat/registrar/registrar_test.go | 189 ++++++++++++++++++ .../files/registry/test-2lines-registry-6.3.0 | 1 + .../files/registry/test-2lines-registry-6.3.1 | 1 + .../test-2lines-registry-6.3.1-faulty | 4 + .../registry/test-2lines-registry-latest | 1 + filebeat/tests/system/test_registrar.py | 1 + .../tests/system/test_registrar_upgrade.py | 86 ++++++++ 13 files changed, 401 insertions(+), 10 deletions(-) create mode 100644 filebeat/registrar/registrar_test.go create mode 100644 filebeat/tests/files/registry/test-2lines-registry-6.3.0 create mode 100644 filebeat/tests/files/registry/test-2lines-registry-6.3.1 create mode 100644 filebeat/tests/files/registry/test-2lines-registry-6.3.1-faulty create mode 100644 filebeat/tests/files/registry/test-2lines-registry-latest create mode 100644 filebeat/tests/system/test_registrar_upgrade.py diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index a533f1fbbf0a..519737a6ee74 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -103,6 +103,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff] - Fix offset field pointing at end of a line. {issue}6514[6514] - Fix an issue when parsing ISO8601 dates with timezone definition {issue}7367[7367] - Fix Grok pattern of MongoDB module. {pull}7568[7568] +- Fix registry duplicates and log resending on upgrade. {issue}7634[7634] *Heartbeat* - Fix race due to updates of shared a map, that was not supposed to be shared between multiple go-routines. {issue}6616[6616] diff --git a/filebeat/input/docker/input.go b/filebeat/input/docker/input.go index e1b3eb49af7b..ce34276e5479 100644 --- a/filebeat/input/docker/input.go +++ b/filebeat/input/docker/input.go @@ -73,6 +73,9 @@ func NewInput( // Add stream to meta to ensure different state per stream if config.Containers.Stream != "all" { + if context.Meta == nil { + context.Meta = map[string]string{} + } context.Meta["stream"] = config.Containers.Stream } diff --git a/filebeat/input/file/state.go b/filebeat/input/file/state.go index 1f93a259f799..96c5abf2071f 100644 --- a/filebeat/input/file/state.go +++ b/filebeat/input/file/state.go @@ -44,6 +44,9 @@ type State struct { // NewState creates a new file state func NewState(fileInfo os.FileInfo, path string, t string, meta map[string]string) State { + if len(meta) == 0 { + meta = nil + } return State{ Fileinfo: fileInfo, Source: path, @@ -60,7 +63,7 @@ func NewState(fileInfo os.FileInfo, path string, t string, meta map[string]strin func (s *State) ID() string { // Generate id on first request. This is needed as id is not set when converting back from json if s.Id == "" { - if s.Meta == nil { + if len(s.Meta) == 0 { s.Id = s.FileStateOS.String() } else { hashValue, _ := hashstructure.Hash(s.Meta, nil) @@ -91,6 +94,6 @@ func (s *State) IsEqual(c *State) bool { func (s *State) IsEmpty() bool { return s.FileStateOS == file.StateOS{} && s.Source == "" && - s.Meta == nil && + len(s.Meta) == 0 && s.Timestamp.IsZero() } diff --git a/filebeat/input/input.go b/filebeat/input/input.go index 414d20963fe6..98eea51db8a5 100644 --- a/filebeat/input/input.go +++ b/filebeat/input/input.go @@ -96,7 +96,7 @@ func New( Done: input.done, BeatDone: input.beatDone, DynamicFields: dynFields, - Meta: map[string]string{}, + Meta: nil, } var ipt Input ipt, err = f(conf, outlet, context) diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index 6964b513c893..afba5fb42202 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -93,6 +93,11 @@ func NewInput( // can be forwarded correctly to the registrar. stateOut := channel.CloseOnSignal(channel.SubOutlet(out), context.BeatDone) + meta := context.Meta + if len(meta) == 0 { + meta = nil + } + p := &Input{ config: defaultConfig, cfg: cfg, @@ -101,7 +106,7 @@ func NewInput( stateOutlet: stateOut, states: file.NewStates(), done: context.Done, - meta: context.Meta, + meta: meta, } if err := cfg.Unpack(&p.config); err != nil { @@ -687,6 +692,10 @@ func (p *Input) updateState(state file.State) error { state.TTL = p.config.CleanInactive } + if len(state.Meta) == 0 { + state.Meta = nil + } + // Update first internal state p.states.Update(state) diff --git a/filebeat/registrar/registrar.go b/filebeat/registrar/registrar.go index 804ef5692034..5a2a6d0c1353 100644 --- a/filebeat/registrar/registrar.go +++ b/filebeat/registrar/registrar.go @@ -20,6 +20,7 @@ package registrar import ( "encoding/json" "fmt" + "io" "os" "path/filepath" "sync" @@ -132,20 +133,111 @@ func (r *Registrar) loadStates() error { logp.Info("Loading registrar data from %s", r.registryFile) - decoder := json.NewDecoder(f) - states := []file.State{} - err = decoder.Decode(&states) + states, err := readStatesFrom(f) if err != nil { - return fmt.Errorf("Error decoding states: %s", err) + return err } - - states = resetStates(states) r.states.SetStates(states) logp.Info("States Loaded from registrar: %+v", len(states)) return nil } +func readStatesFrom(in io.Reader) ([]file.State, error) { + states := []file.State{} + decoder := json.NewDecoder(in) + if err := decoder.Decode(&states); err != nil { + return nil, fmt.Errorf("Error decoding states: %s", err) + } + + states = fixStates(states) + states = resetStates(states) + return states, nil +} + +// fixStates cleans up the regsitry states when updating from an older version +// of filebeat potentially writing invalid entries. +func fixStates(states []file.State) []file.State { + if len(states) == 0 { + return states + } + + // we use a map of states here, so to identify and merge duplicate entries. + idx := map[string]*file.State{} + for i := range states { + state := &states[i] + fixState(state) + + id := state.ID() + old, exists := idx[id] + if !exists { + idx[id] = state + } else { + mergeStates(old, state) // overwrite the entry in 'old' + } + } + + if len(idx) == len(states) { + return states + } + + i := 0 + newStates := make([]file.State, len(idx)) + for _, state := range idx { + newStates[i] = *state + i++ + } + return newStates +} + +// fixState updates a read state to fullfil required invariantes: +// - "Meta" must be nil if len(Meta) == 0 +func fixState(st *file.State) { + if len(st.Meta) == 0 { + st.Meta = nil + } +} + +// mergeStates merges 2 states by trying to determine the 'newer' state. +// The st state is overwritten with the updated fields. +func mergeStates(st, other *file.State) { + st.Finished = st.Finished || other.Finished + if st.Offset < other.Offset { // always select the higher offset + st.Offset = other.Offset + } + + // update file meta-data. As these are updated concurrently by the + // prospectors, select the newer state based on the update timestamp. + var meta, metaOld, metaNew map[string]string + if st.Timestamp.Before(other.Timestamp) { + st.Source = other.Source + st.Timestamp = other.Timestamp + st.TTL = other.TTL + st.FileStateOS = other.FileStateOS + + metaOld, metaNew = st.Meta, other.Meta + } else { + metaOld, metaNew = other.Meta, st.Meta + } + + if len(metaOld) == 0 || len(metaNew) == 0 { + meta = metaNew + } else { + meta = map[string]string{} + for k, v := range metaOld { + meta[k] = v + } + for k, v := range metaNew { + meta[k] = v + } + } + + if len(meta) == 0 { + meta = nil + } + st.Meta = meta +} + // resetStates sets all states to finished and disable TTL on restart // For all states covered by an input, TTL will be overwritten with the input value func resetStates(states []file.State) []file.State { diff --git a/filebeat/registrar/registrar_test.go b/filebeat/registrar/registrar_test.go new file mode 100644 index 000000000000..102b073dffad --- /dev/null +++ b/filebeat/registrar/registrar_test.go @@ -0,0 +1,189 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package registrar + +import ( + "reflect" + "sort" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/filebeat/input/file" +) + +func TestRegistrarRead(t *testing.T) { + type testCase struct { + input string + expected []file.State + } + + zone := time.FixedZone("+0000", 0) + + cases := map[string]testCase{ + "ok registry with one entry": testCase{ + input: `[ + { + "type": "log", + "source": "test.log", + "offset": 10, + "timestamp": "2018-07-16T10:45:01+00:00", + "ttl": -1, + "meta": null + } + ]`, + expected: []file.State{ + { + Type: "log", + Source: "test.log", + Timestamp: time.Date(2018, time.July, 16, 10, 45, 01, 0, zone), + Offset: 10, + TTL: -2, // loader always resets states + }, + }, + }, + + "load config without meta": testCase{ + input: `[ + { + "type": "log", + "source": "test.log", + "offset": 10, + "timestamp": "2018-07-16T10:45:01+00:00", + "ttl": -1 + } + ]`, + expected: []file.State{ + { + Type: "log", + Source: "test.log", + Timestamp: time.Date(2018, time.July, 16, 10, 45, 01, 0, zone), + Offset: 10, + TTL: -2, // loader always resets states + }, + }, + }, + + "load config with empty meta": testCase{ + input: `[ + { + "type": "log", + "source": "test.log", + "offset": 10, + "timestamp": "2018-07-16T10:45:01+00:00", + "ttl": -1, + "meta": {} + } + ]`, + expected: []file.State{ + { + Type: "log", + Source: "test.log", + Timestamp: time.Date(2018, time.July, 16, 10, 45, 01, 0, zone), + Offset: 10, + TTL: -2, // loader always resets states + }, + }, + }, + + "requires merge without meta-data": testCase{ + input: `[ + { + "type": "log", + "source": "test.log", + "offset": 100, + "timestamp": "2018-07-16T10:45:01+00:00", + "ttl": -1, + "meta": {} + }, + { + "type": "log", + "source": "test.log", + "offset": 10, + "timestamp": "2018-07-16T10:45:10+00:00", + "ttl": -1, + "meta": null + } + ]`, + expected: []file.State{ + { + Type: "log", + Source: "test.log", + Timestamp: time.Date(2018, time.July, 16, 10, 45, 10, 0, zone), + Offset: 100, + TTL: -2, // loader always resets states + Meta: nil, + }, + }, + }, + } + + matchState := func(t *testing.T, i int, expected, actual file.State) { + check := func(name string, a, b interface{}) { + if !reflect.DeepEqual(a, b) { + t.Errorf("State %v: %v mismatch (expected=%v, actual=%v)", i, name, a, b) + } + } + + check("id", expected.ID(), actual.ID()) + check("source", expected.Source, actual.Source) + check("offset", expected.Offset, actual.Offset) + check("ttl", expected.TTL, actual.TTL) + check("meta", expected.Meta, actual.Meta) + check("type", expected.Type, actual.Type) + + if t1, t2 := expected.Timestamp, actual.Timestamp; !t1.Equal(t2) { + t.Errorf("State %v: timestamp mismatch (expected=%v, actual=%v)", i, t1, t2) + } + } + + for name, test := range cases { + test := test + t.Run(name, func(t *testing.T) { + in := strings.NewReader(test.input) + + states, err := readStatesFrom(in) + if !assert.NoError(t, err) { + return + } + + actual := sortedStates(states) + expected := sortedStates(test.expected) + if len(actual) != len(expected) { + t.Errorf("expected %v state, but registrar did load %v states", + len(expected), len(actual)) + return + } + + for i := range expected { + matchState(t, i, expected[i], actual[i]) + } + }) + } +} + +func sortedStates(states []file.State) []file.State { + tmp := make([]file.State, len(states)) + copy(tmp, states) + sort.Slice(tmp, func(i, j int) bool { + return tmp[i].ID() < tmp[j].ID() + }) + return tmp +} diff --git a/filebeat/tests/files/registry/test-2lines-registry-6.3.0 b/filebeat/tests/files/registry/test-2lines-registry-6.3.0 new file mode 100644 index 000000000000..5f7414b9cf3a --- /dev/null +++ b/filebeat/tests/files/registry/test-2lines-registry-6.3.0 @@ -0,0 +1 @@ +[{"source":"test.log","offset":10,"timestamp":"2018-07-18T21:51:43.529893808+02:00","ttl":-1,"type":"log","FileStateOS":{"inode":8604592318,"device":16777220}}] diff --git a/filebeat/tests/files/registry/test-2lines-registry-6.3.1 b/filebeat/tests/files/registry/test-2lines-registry-6.3.1 new file mode 100644 index 000000000000..a4c2ccf126c6 --- /dev/null +++ b/filebeat/tests/files/registry/test-2lines-registry-6.3.1 @@ -0,0 +1 @@ +[{"source":"test.log","offset":10,"timestamp":"2018-07-18T21:51:43.529893808+02:00","ttl":-1,"type":"log","meta":{},"FileStateOS":{"inode":8604592318,"device":16777220}}] diff --git a/filebeat/tests/files/registry/test-2lines-registry-6.3.1-faulty b/filebeat/tests/files/registry/test-2lines-registry-6.3.1-faulty new file mode 100644 index 000000000000..2606e69bbbc3 --- /dev/null +++ b/filebeat/tests/files/registry/test-2lines-registry-6.3.1-faulty @@ -0,0 +1,4 @@ +[ + {"source":"test.log","offset":10,"timestamp":"2018-07-18T21:51:43.529893808+02:00","ttl":-1,"type":"log","meta":{},"FileStateOS":{"inode":8604592318,"device":16777220}}, + {"source":"test.log","offset":0,"timestamp":"2018-07-18T21:51:43.529893808+02:00","ttl":-1,"type":"log","meta":null,"FileStateOS":{"inode":8604592318,"device":16777220}} +] diff --git a/filebeat/tests/files/registry/test-2lines-registry-latest b/filebeat/tests/files/registry/test-2lines-registry-latest new file mode 100644 index 000000000000..110dc1613d1b --- /dev/null +++ b/filebeat/tests/files/registry/test-2lines-registry-latest @@ -0,0 +1 @@ +[{"source":"test.log","offset":10,"timestamp":"2018-07-18T21:51:43.529893808+02:00","ttl":-1,"type":"log","meta":null,"FileStateOS":{"inode":8604592318,"device":16777220}}] diff --git a/filebeat/tests/system/test_registrar.py b/filebeat/tests/system/test_registrar.py index 73d303245232..dd74266fa815 100644 --- a/filebeat/tests/system/test_registrar.py +++ b/filebeat/tests/system/test_registrar.py @@ -70,6 +70,7 @@ def test_registrar_file_content(self): "offset": iterations * line_len, }, record) self.assertTrue("FileStateOS" in record) + self.assertIsNone(record["meta"]) file_state_os = record["FileStateOS"] if os.name == "nt": diff --git a/filebeat/tests/system/test_registrar_upgrade.py b/filebeat/tests/system/test_registrar_upgrade.py new file mode 100644 index 000000000000..21569e9c384a --- /dev/null +++ b/filebeat/tests/system/test_registrar_upgrade.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python +"""Test the registrar with old registry file formats""" + +import os +import json + +from nose.plugins.skip import Skip, SkipTest + +from filebeat import BaseTest + + +class Test(BaseTest): + def test_upgrade_from_6_3_0(self): + template = "test-2lines-registry-6.3.0" + self.run_with_single_registry_format(template) + + def test_upgrade_from_6_3_1(self): + template = "test-2lines-registry-6.3.1" + self.run_with_single_registry_format(template) + + def test_upgrade_from_faulty_6_3_1(self): + template = "test-2lines-registry-6.3.1-faulty" + self.run_with_single_registry_format(template) + + def test_upgrade_from_latest(self): + template = "test-2lines-registry-latest" + self.run_with_single_registry_format(template) + + def run_with_single_registry_format(self, template): + # prepare log file + testfile, file_state = self.prepare_log() + + # prepare registry file + self.apply_registry_template(template, testfile, file_state) + + self.run_and_validate() + + def apply_registry_template(self, template, testfile, file_state): + source = self.beat_path + "/tests/files/registry/" + template + with open(source) as f: + registry = json.loads(f.read()) + + for state in registry: + state["source"] = testfile + state["FileStateOS"] = file_state + with open(self.working_dir + "/registry", 'w') as f: + f.write(json.dumps(registry)) + + def prepare_log(self): + # test is current skipped on windows, due to FileStateOS must match the + # current OS format. + if os.name == "nt": + raise SkipTest + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*" + ) + + os.mkdir(self.working_dir + "/log/") + + testfile_path = self.working_dir + "/log/test.log" + with open(testfile_path, 'w') as f: + f.write("123456789\n") + f.write("abcdefghi\n") + + st = os.stat(testfile_path) + file_state = {"inode": st.st_ino, "device": st.st_dev} + return testfile_path, file_state + + def run_and_validate(self): + filebeat = self.start_beat() + self.wait_until( + lambda: self.output_has(lines=1), + max_timeout=15) + + # stop filebeat and enforce one last registry update + filebeat.check_kill_and_wait() + + data = self.get_registry() + assert len(data) == 1 + assert data[0]["offset"] == 20 + + # check only second line has been written + output = self.read_output() + assert len(output) == 1 + assert output[0]["message"] == "abcdefghi"