diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 8c2c6a5d03d3..b33b6cf9efef 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -37,7 +37,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { return nil, fmt.Errorf("Error reading config file: %v", err) } - moduleRegistry, err := fileset.NewModuleRegistry(config.Modules) + moduleRegistry, err := fileset.NewModuleRegistry(config.Modules, b.Version) if err != nil { return nil, err } diff --git a/filebeat/fileset/fileset.go b/filebeat/fileset/fileset.go index e723dd3ed769..0938399aeb60 100644 --- a/filebeat/fileset/fileset.go +++ b/filebeat/fileset/fileset.go @@ -51,7 +51,7 @@ func New( } // Read reads the manifest file and evaluates the variables. -func (fs *Fileset) Read() error { +func (fs *Fileset) Read(beatVersion string) error { var err error fs.manifest, err = fs.readManifest() if err != nil { @@ -63,7 +63,7 @@ func (fs *Fileset) Read() error { return err } - fs.pipelineID, err = fs.getPipelineID() + fs.pipelineID, err = fs.getPipelineID(beatVersion) if err != nil { return err } @@ -241,13 +241,13 @@ func (fs *Fileset) getProspectorConfig() (*common.Config, error) { } // getPipelineID returns the Ingest Node pipeline ID -func (fs *Fileset) getPipelineID() (string, error) { +func (fs *Fileset) getPipelineID(beatVersion string) (string, error) { path, err := applyTemplate(fs.vars, fs.manifest.IngestPipeline) if err != nil { return "", fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err) } - return formatPipelineID(fs.mcfg.Module, fs.name, path), nil + return formatPipelineID(fs.mcfg.Module, fs.name, path, beatVersion), nil } func (fs *Fileset) GetPipeline() (pipelineID string, content map[string]interface{}, err error) { @@ -266,12 +266,12 @@ func (fs *Fileset) GetPipeline() (pipelineID string, content map[string]interfac if err != nil { return "", nil, fmt.Errorf("Error JSON decoding the pipeline file: %s: %v", path, err) } - return formatPipelineID(fs.mcfg.Module, fs.name, path), content, nil + return fs.pipelineID, content, nil } // formatPipelineID generates the ID to be used for the pipeline ID in Elasticsearch -func formatPipelineID(module, fileset, path string) string { - return fmt.Sprintf("%s-%s-%s", module, fileset, removeExt(filepath.Base(path))) +func formatPipelineID(module, fileset, path, beatVersion string) string { + return fmt.Sprintf("filebeat-%s-%s-%s-%s", beatVersion, module, fileset, removeExt(filepath.Base(path))) } // removeExt returns the file name without the extension. If no dot is found, diff --git a/filebeat/fileset/fileset_test.go b/filebeat/fileset/fileset_test.go index 6c8bfbf57016..8ee770502641 100644 --- a/filebeat/fileset/fileset_test.go +++ b/filebeat/fileset/fileset_test.go @@ -149,7 +149,7 @@ func TestResolveVariable(t *testing.T) { func TestGetProspectorConfigNginx(t *testing.T) { fs := getModuleForTesting(t, "nginx", "access") - assert.NoError(t, fs.Read()) + assert.NoError(t, fs.Read("5.2.0")) cfg, err := fs.getProspectorConfig() assert.NoError(t, err) @@ -159,7 +159,7 @@ func TestGetProspectorConfigNginx(t *testing.T) { assert.True(t, cfg.HasField("pipeline")) pipelineID, err := cfg.String("pipeline", -1) assert.NoError(t, err) - assert.Equal(t, "nginx-access-with_plugins", pipelineID) + assert.Equal(t, "filebeat-5.2.0-nginx-access-with_plugins", pipelineID) } func TestGetProspectorConfigNginxOverrides(t *testing.T) { @@ -172,7 +172,7 @@ func TestGetProspectorConfigNginxOverrides(t *testing.T) { }) assert.NoError(t, err) - assert.NoError(t, fs.Read()) + assert.NoError(t, fs.Read("5.2.0")) cfg, err := fs.getProspectorConfig() assert.NoError(t, err) @@ -183,17 +183,17 @@ func TestGetProspectorConfigNginxOverrides(t *testing.T) { assert.True(t, cfg.HasField("pipeline")) pipelineID, err := cfg.String("pipeline", -1) assert.NoError(t, err) - assert.Equal(t, "nginx-access-with_plugins", pipelineID) + assert.Equal(t, "filebeat-5.2.0-nginx-access-with_plugins", pipelineID) } func TestGetPipelineNginx(t *testing.T) { fs := getModuleForTesting(t, "nginx", "access") - assert.NoError(t, fs.Read()) + assert.NoError(t, fs.Read("5.2.0")) pipelineID, content, err := fs.GetPipeline() assert.NoError(t, err) - assert.Equal(t, "nginx-access-with_plugins", pipelineID) + assert.Equal(t, "filebeat-5.2.0-nginx-access-with_plugins", pipelineID) assert.Contains(t, content, "description") assert.Contains(t, content, "processors") } diff --git a/filebeat/fileset/modules.go b/filebeat/fileset/modules.go index 6829d0e64fec..7845ee99f969 100644 --- a/filebeat/fileset/modules.go +++ b/filebeat/fileset/modules.go @@ -18,7 +18,8 @@ type ModuleRegistry struct { // newModuleRegistry reads and loads the configured module into the registry. func newModuleRegistry(modulesPath string, moduleConfigs []ModuleConfig, - overrides *ModuleOverrides) (*ModuleRegistry, error) { + overrides *ModuleOverrides, + beatVersion string) (*ModuleRegistry, error) { var reg ModuleRegistry reg.registry = map[string]map[string]*Fileset{} @@ -53,7 +54,7 @@ func newModuleRegistry(modulesPath string, if err != nil { return nil, err } - err = fileset.Read() + err = fileset.Read(beatVersion) if err != nil { return nil, fmt.Errorf("Error reading fileset %s/%s: %v", mcfg.Module, filesetName, err) } @@ -81,7 +82,7 @@ func newModuleRegistry(modulesPath string, } // NewModuleRegistry reads and loads the configured module into the registry. -func NewModuleRegistry(moduleConfigs []*common.Config) (*ModuleRegistry, error) { +func NewModuleRegistry(moduleConfigs []*common.Config, beatVersion string) (*ModuleRegistry, error) { modulesPath := paths.Resolve(paths.Home, "module") stat, err := os.Stat(modulesPath) @@ -106,7 +107,7 @@ func NewModuleRegistry(moduleConfigs []*common.Config) (*ModuleRegistry, error) if err != nil { return nil, err } - return newModuleRegistry(modulesPath, mcfgs, modulesOverrides) + return newModuleRegistry(modulesPath, mcfgs, modulesOverrides, beatVersion) } func mcfgFromConfig(cfg *common.Config) (*ModuleConfig, error) { diff --git a/filebeat/fileset/modules_integration_test.go b/filebeat/fileset/modules_integration_test.go index b9400844fae9..8e98a2c848dc 100644 --- a/filebeat/fileset/modules_integration_test.go +++ b/filebeat/fileset/modules_integration_test.go @@ -51,8 +51,8 @@ func TestLoadPipeline(t *testing.T) { func TestSetupNginx(t *testing.T) { client := elasticsearch.GetTestingElasticsearch() - client.Request("DELETE", "/_ingest/pipeline/nginx-access-with_plugins", "", nil, nil) - client.Request("DELETE", "/_ingest/pipeline/nginx-error-pipeline", "", nil, nil) + client.Request("DELETE", "/_ingest/pipeline/filebeat-5.2.0-nginx-access-with_plugins", "", nil, nil) + client.Request("DELETE", "/_ingest/pipeline/filebeat-5.2.0-nginx-error-pipeline", "", nil, nil) modulesPath, err := filepath.Abs("../module") assert.NoError(t, err) @@ -61,14 +61,14 @@ func TestSetupNginx(t *testing.T) { {Module: "nginx"}, } - reg, err := newModuleRegistry(modulesPath, configs, nil) + reg, err := newModuleRegistry(modulesPath, configs, nil, "5.2.0") assert.NoError(t, err) err = reg.LoadPipelines(client) assert.NoError(t, err) - status, _, _ := client.Request("GET", "/_ingest/pipeline/nginx-access-with_plugins", "", nil, nil) + status, _, _ := client.Request("GET", "/_ingest/pipeline/filebeat-5.2.0-nginx-access-with_plugins", "", nil, nil) assert.Equal(t, 200, status) - status, _, _ = client.Request("GET", "/_ingest/pipeline/nginx-error-pipeline", "", nil, nil) + status, _, _ = client.Request("GET", "/_ingest/pipeline/filebeat-5.2.0-nginx-error-pipeline", "", nil, nil) assert.Equal(t, 200, status) } diff --git a/filebeat/fileset/modules_test.go b/filebeat/fileset/modules_test.go index a6f5a612ea19..db9169e5bc70 100644 --- a/filebeat/fileset/modules_test.go +++ b/filebeat/fileset/modules_test.go @@ -30,7 +30,7 @@ func TestNewModuleRegistry(t *testing.T) { {Module: "system"}, } - reg, err := newModuleRegistry(modulesPath, configs, nil) + reg, err := newModuleRegistry(modulesPath, configs, nil, "5.2.0") assert.NoError(t, err) assert.NotNil(t, reg) @@ -86,7 +86,7 @@ func TestNewModuleRegistryConfig(t *testing.T) { }, } - reg, err := newModuleRegistry(modulesPath, configs, nil) + reg, err := newModuleRegistry(modulesPath, configs, nil, "5.2.0") assert.NoError(t, err) assert.NotNil(t, reg) @@ -335,7 +335,7 @@ func TestMissingModuleFolder(t *testing.T) { load(t, map[string]interface{}{"module": "nginx"}), } - reg, err := NewModuleRegistry(configs) + reg, err := NewModuleRegistry(configs, "5.2.0") assert.NoError(t, err) assert.NotNil(t, reg)