Skip to content

Commit

Permalink
Simplify Beater interface (elastic#2003)
Browse files Browse the repository at this point in the history
pass Beater constructor to beat.Run + simplify Beater interface to only require
Run and Stop methods.

* Do not stop publisher pipeline yet
* Prevent races on beat and libbeat shutdown by not shutting publisher pipeline on stop (restore old behavior).
* Update beat package doc
* Use defaultBeatVersion if empty string
* winlogbeat: split setup into init+setup
  • Loading branch information
Steffen Siering authored and ruflin committed Jul 12, 2016
1 parent 5d6083d commit 01b8d64
Show file tree
Hide file tree
Showing 23 changed files with 355 additions and 456 deletions.
40 changes: 11 additions & 29 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"

cfg "github.com/elastic/beats/filebeat/config"
Expand All @@ -21,37 +22,24 @@ type Filebeat struct {
}

// New creates a new Filebeat pointer instance.
func New() *Filebeat {
func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
config := cfg.DefaultConfig
return &Filebeat{
config: &config,
if err := rawConfig.Unpack(&config); err != nil {
return nil, fmt.Errorf("Error reading config file: %v", err)
}
}

// Config setups up the filebeat configuration by fetch all additional config files
func (fb *Filebeat) Config(b *beat.Beat) error {

// Load Base config
err := b.RawConfig.Unpack(&fb.config)
if err != nil {
return fmt.Errorf("Error reading config file: %v", err)
if err := config.FetchConfigs(); err != nil {
return nil, err
}

// Check if optional config_dir is set to fetch additional prospector config files
fb.config.FetchConfigs()

return nil
}

// Setup applies the minimum required setup to a new Filebeat instance for use.
func (fb *Filebeat) Setup(b *beat.Beat) error {
fb.done = make(chan struct{})
return nil
fb := &Filebeat{
done: make(chan struct{}),
config: &config,
}
return fb, nil
}

// Run allows the beater to be run as a beat.
func (fb *Filebeat) Run(b *beat.Beat) error {

var err error
config := fb.config.Filebeat

Expand Down Expand Up @@ -117,14 +105,8 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
return nil
}

// Cleanup removes any temporary files, data, or other items that were created by the Beat.
func (fb *Filebeat) Cleanup(b *beat.Beat) error {
return nil
}

// Stop is called on exit to stop the crawling, spooling and registration processes.
func (fb *Filebeat) Stop() {

logp.Info("Stopping filebeat")

// Stop Filebeat
Expand Down
13 changes: 10 additions & 3 deletions filebeat/config/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"errors"
"log"
"os"
"path/filepath"
Expand Down Expand Up @@ -98,13 +99,13 @@ func mergeConfigFiles(configFiles []string, config *Config) error {
}

// Fetches and merges all config files given by configDir. All are put into one config object
func (config *Config) FetchConfigs() {
func (config *Config) FetchConfigs() error {

configDir := config.Filebeat.ConfigDir

// If option not set, do nothing
if configDir == "" {
return
return nil
}

// If configDir is relative, consider it relative to the config path
Expand All @@ -117,14 +118,20 @@ func (config *Config) FetchConfigs() {

if err != nil {
log.Fatal("Could not use config_dir of: ", configDir, err)
return err
}

err = mergeConfigFiles(configFiles, config)
if err != nil {
log.Fatal("Error merging config files: ", err)
return err
}

if len(config.Filebeat.Prospectors) == 0 {
log.Fatalf("No paths given. What files do you want me to watch?")
err := errors.New("No paths given. What files do you want me to watch?")
log.Fatalf("%v", err)
return err
}

return nil
}
2 changes: 1 addition & 1 deletion filebeat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var Name = "filebeat"
// determine where in each file to restart a harvester.

func main() {
if err := beat.Run(Name, "", beater.New()); err != nil {
if err := beat.Run(Name, "", beater.New); err != nil {
os.Exit(1)
}
}
4 changes: 2 additions & 2 deletions filebeat/publish/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ func (p *syncLogPublisher) Start() {
}

func (p *syncLogPublisher) Stop() {
close(p.done)
p.client.Close()
close(p.done)
p.wg.Wait()
}

Expand Down Expand Up @@ -182,8 +182,8 @@ func (p *asyncLogPublisher) Start() {
}

func (p *asyncLogPublisher) Stop() {
close(p.done)
p.client.Close()
close(p.done)
p.wg.Wait()
}

Expand Down
3 changes: 1 addition & 2 deletions filebeat/tests/system/test_prospector.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,7 @@ def test_shutdown_no_prospectors(self):
max_timeout=10)

self.wait_until(
lambda: self.log_contains(
"Exiting"),
lambda: self.log_contains("No prospectors defined"),
max_timeout=10)

filebeat.check_wait(exit_code=1)
Expand Down
42 changes: 13 additions & 29 deletions generate/beat/{{cookiecutter.beat}}/beater/{{cookiecutter.beat}}.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,42 +13,29 @@ import (
)

type {{cookiecutter.beat|capitalize}} struct {
config config.Config
done chan struct{}
client publisher.Client
done chan struct{}
config config.Config
client publisher.Client
}

// Creates beater
func New() *{{cookiecutter.beat|capitalize}} {
return &{{cookiecutter.beat|capitalize}}{
done: make(chan struct{}),
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
config := config.DefaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, fmt.Errorf("Error reading config file: %v", err)
}
}

/// *** Beater interface methods ***///

func (bt *{{cookiecutter.beat|capitalize}}) Config(b *beat.Beat) error {

bt.config = config.DefaultConfig

// Load beater config
err := b.RawConfig.Unpack(&bt.config)
if err != nil {
return fmt.Errorf("Error reading config file: %v", err)
bt := &{{cookiecutter.beat|capitalize}}{
done: make(chan struct{}),
config: config,
}

return nil
}

func (bt *{{cookiecutter.beat|capitalize}}) Setup(b *beat.Beat) error {

bt.client = b.Publisher.Connect()
return nil
return bt, nil
}

func (bt *{{cookiecutter.beat|capitalize}}) Run(b *beat.Beat) error {
logp.Info("{{cookiecutter.beat}} is running! Hit CTRL-C to stop it.")

bt.client = b.Publisher.Connect()
ticker := time.NewTicker(bt.config.{{cookiecutter.beat|capitalize}}.Period)
counter := 1
for {
Expand All @@ -69,10 +56,7 @@ func (bt *{{cookiecutter.beat|capitalize}}) Run(b *beat.Beat) error {
}
}

func (bt *{{cookiecutter.beat|capitalize}}) Cleanup(b *beat.Beat) error {
return nil
}

func (bt *{{cookiecutter.beat|capitalize}}) Stop() {
bt.client.Close()
close(bt.done)
}
2 changes: 1 addition & 1 deletion generate/beat/{{cookiecutter.beat}}/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

func main() {
err := beat.Run("{{cookiecutter.beat}}", "", beater.New())
err := beat.Run("{{cookiecutter.beat}}", "", beater.New)
if err != nil {
os.Exit(1)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
var Name = "{{cookiecutter.beat}}"

func main() {
if err := beat.Run(Name, "", beater.New()); err != nil {
if err := beat.Run(Name, "", beater.New); err != nil {
os.Exit(1)
}
}
Loading

0 comments on commit 01b8d64

Please sign in to comment.