Skip to content

Commit

Permalink
Allowing users to override pipeline ID in fileset input config (elast…
Browse files Browse the repository at this point in the history
…ic#16561)

* Allowing users to override pipeline ID in module input config

* Removing redundant assertion

* Adding CHANGELOG entry
  • Loading branch information
ycombinator authored Mar 4, 2020
1 parent 1fc1cb9 commit 6d6cac9
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add `cloudfoundry` input to send events from Cloud Foundry. {pull}16586[16586]
- Improve ECS categorization field mappings in iis module. {issue}16165[16165] {pull}16618[16618]
- Improve ECS categorization field mapping in kafka module. {issue}16167[16167] {pull}16645[16645]
- Allow users to override pipeline ID in fileset input config. {issue}9531[9531] {pull}16561[16561]

*Heartbeat*

Expand Down
17 changes: 9 additions & 8 deletions filebeat/fileset/fileset.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,14 +363,15 @@ func (fs *Fileset) getInputConfig() (*common.Config, error) {
}
}

// force our pipeline ID
rootPipelineID := ""
if len(fs.pipelineIDs) > 0 {
rootPipelineID = fs.pipelineIDs[0]
}
err = cfg.SetString("pipeline", -1, rootPipelineID)
if err != nil {
return nil, fmt.Errorf("Error setting the pipeline ID in the input config: %v", err)
const pipelineField = "pipeline"
if !cfg.HasField(pipelineField) {
rootPipelineID := ""
if len(fs.pipelineIDs) > 0 {
rootPipelineID = fs.pipelineIDs[0]
}
if err := cfg.SetString(pipelineField, -1, rootPipelineID); err != nil {
return nil, errw.Wrap(err, "error setting the fileset pipeline ID in config")
}
}

// force our the module/fileset name
Expand Down
86 changes: 65 additions & 21 deletions filebeat/fileset/fileset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"testing"
"text/template"

"github.com/stretchr/testify/require"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/common"
Expand Down Expand Up @@ -185,33 +187,75 @@ func TestGetInputConfigNginx(t *testing.T) {
func TestGetInputConfigNginxOverrides(t *testing.T) {
modulesPath, err := filepath.Abs("../module")
assert.NoError(t, err)
fs, err := New(modulesPath, "access", &ModuleConfig{Module: "nginx"}, &FilesetConfig{
Input: map[string]interface{}{
"close_eof": true,

tests := map[string]struct {
input map[string]interface{}
expectedFn require.ValueAssertionFunc
}{
"close_eof": {
map[string]interface{}{
"close_eof": true,
},
func(t require.TestingT, cfg interface{}, rest ...interface{}) {
c, ok := cfg.(*common.Config)
if !ok {
t.FailNow()
}

require.True(t, c.HasField("close_eof"))
v, err := c.Bool("close_eof", -1)
require.NoError(t, err)
require.True(t, v)

pipelineID, err := c.String("pipeline", -1)
assert.NoError(t, err)
assert.Equal(t, "filebeat-5.2.0-nginx-access-default", pipelineID)
},
},
})
assert.NoError(t, err)
"pipeline": {
map[string]interface{}{
"pipeline": "foobar",
},
func(t require.TestingT, cfg interface{}, rest ...interface{}) {
c, ok := cfg.(*common.Config)
if !ok {
t.FailNow()
}

v, err := c.String("pipeline", -1)
require.NoError(t, err)
require.Equal(t, "foobar", v)
},
},
}

assert.NoError(t, fs.Read("5.2.0"))
for name, test := range tests {
t.Run(name, func(t *testing.T) {
fs, err := New(modulesPath, "access", &ModuleConfig{Module: "nginx"}, &FilesetConfig{
Input: test.input,
})
assert.NoError(t, err)

cfg, err := fs.getInputConfig()
assert.NoError(t, err)
assert.NoError(t, fs.Read("5.2.0"))

assert.True(t, cfg.HasField("paths"))
assert.True(t, cfg.HasField("exclude_files"))
assert.True(t, cfg.HasField("close_eof"))
assert.True(t, cfg.HasField("pipeline"))
pipelineID, err := cfg.String("pipeline", -1)
assert.NoError(t, err)
assert.Equal(t, "filebeat-5.2.0-nginx-access-default", pipelineID)
cfg, err := fs.getInputConfig()
assert.NoError(t, err)

moduleName, err := cfg.String("_module_name", -1)
assert.NoError(t, err)
assert.Equal(t, "nginx", moduleName)
assert.True(t, cfg.HasField("paths"))
assert.True(t, cfg.HasField("exclude_files"))
assert.True(t, cfg.HasField("pipeline"))

filesetName, err := cfg.String("_fileset_name", -1)
assert.NoError(t, err)
assert.Equal(t, "access", filesetName)
test.expectedFn(t, cfg)

moduleName, err := cfg.String("_module_name", -1)
assert.NoError(t, err)
assert.Equal(t, "nginx", moduleName)

filesetName, err := cfg.String("_fileset_name", -1)
assert.NoError(t, err)
assert.Equal(t, "access", filesetName)
})
}
}

func TestGetPipelineNginx(t *testing.T) {
Expand Down

0 comments on commit 6d6cac9

Please sign in to comment.