Skip to content

Commit

Permalink
Fix aws-s3 config initialization plus cleanup (elastic#25763)
Browse files Browse the repository at this point in the history
The bug was that the FileSelectorCfg values were not being initialized to their defaults prior to config `Unpack`. This caused previously working configs to fail because the new options recently added were uninitialized. This fixes the issue by implementing the ucfg Initializer interface (https://pkg.go.dev/github.com/elastic/go-ucfg#Initializer) to ensure defaults are set.

Other changes:

- Refactoring the config to have a `readerConfig` struct that contained all the S3 object processing options since they were defined in several structs.

- Replaced `int`s used to represent byte sizes with `cfgtype.ByteSize` so that users can write values like `10 MiB` for convenience.

- Changed the validation logic to check that api_timeout and visibility_timeout are > 0. Previously they accepted 0.

- Changed the config structs to use `*match.Matcher` instead of manually initializing and validating regexp values. match.Matcher implements the ucfg interface so there is less work to do here.

- Added tests to cover all of the config validation logic.
  • Loading branch information
andrewkroh authored May 18, 2021
1 parent 053b582 commit 5233fb0
Show file tree
Hide file tree
Showing 6 changed files with 272 additions and 123 deletions.
13 changes: 5 additions & 8 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,21 @@ The `aws-s3` input supports the following configuration options plus the

The maximum duration of the AWS API call. If it exceeds the timeout, the AWS API
call will be interrupted. The default AWS API call timeout for a message is 120
seconds. The minimum is 0 seconds. The maximum is half of the visibility timeout
value.
seconds. The maximum is half of the visibility timeout value.

[id="input-{type}-buffer_size"]
[float]
==== `buffer_size`

The size in bytes of the buffer that each harvester uses when fetching a file.
This only applies to non-JSON logs.
The default is 16384.
This only applies to non-JSON logs. The default is `16 KiB`.

[id="input-{type}-encoding"]
[float]
==== `encoding`

The file encoding to use for reading data that contains international
characters. This only applies to non-JSON logs. See <<_encoding_5>>.
characters. This only applies to non-JSON logs. See <<_encoding_5>>.


[float]
Expand Down Expand Up @@ -133,7 +131,7 @@ connecting to the correct service endpoint. For example:
The maximum number of bytes that a single log message can have. All bytes after
`max_bytes` are discarded and not sent. This setting is especially useful for
multiline log messages, which can get large. This only applies to non-JSON logs.
The default is 10MB (10485760).
The default is `10 MiB`.

[float]
==== `max_number_of_messages`
Expand Down Expand Up @@ -164,8 +162,7 @@ The duration that the received messages are hidden from subsequent retrieve
requests after being retrieved by a ReceiveMessage request. This value needs to
be a lot bigger than {beatname_uc} collection frequency so if it took too long
to read the S3 log, this SQS message will not be reprocessed. The default
visibility timeout for a message is 300 seconds. The minimum is 0 seconds. The
maximum is 12 hours.
visibility timeout for a message is 300 seconds. The maximum is 12 hours.

[float]
==== `aws credentials`
Expand Down
91 changes: 32 additions & 59 deletions x-pack/filebeat/input/awss3/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,11 @@ type s3Collector struct {
}

type s3Info struct {
name string
key string
region string
arn string
expandEventListFromField string
maxBytes int
multiline *multiline.Config
lineTerminator readfile.LineTerminator
encoding string
bufferSize int
name string
key string
region string
arn string
readerConfig
}

type bucket struct {
Expand Down Expand Up @@ -301,45 +296,23 @@ func (c *s3Collector) handleSQSMessage(m sqs.Message) ([]s3Info, error) {

if len(c.config.FileSelectors) == 0 {
s3Infos = append(s3Infos, s3Info{
region: record.AwsRegion,
name: record.S3.bucket.Name,
key: filename,
arn: record.S3.bucket.Arn,
expandEventListFromField: c.config.ExpandEventListFromField,
maxBytes: c.config.MaxBytes,
multiline: c.config.Multiline,
lineTerminator: c.config.LineTerminator,
encoding: c.config.Encoding,
bufferSize: c.config.BufferSize,
region: record.AwsRegion,
name: record.S3.bucket.Name,
key: filename,
arn: record.S3.bucket.Arn,
readerConfig: c.config.ReaderConfig,
})
continue
}

for _, fs := range c.config.FileSelectors {
if fs.Regex == nil {
continue
}
if fs.Regex.MatchString(filename) {
if fs.Regex != nil && fs.Regex.MatchString(filename) {
info := s3Info{
region: record.AwsRegion,
name: record.S3.bucket.Name,
key: filename,
arn: record.S3.bucket.Arn,
expandEventListFromField: fs.ExpandEventListFromField,
maxBytes: fs.MaxBytes,
multiline: fs.Multiline,
lineTerminator: fs.LineTerminator,
encoding: fs.Encoding,
bufferSize: fs.BufferSize,
}
if info.bufferSize == 0 {
info.bufferSize = c.config.BufferSize
}
if info.maxBytes == 0 {
info.maxBytes = c.config.MaxBytes
}
if info.lineTerminator == 0 {
info.lineTerminator = c.config.LineTerminator
region: record.AwsRegion,
name: record.S3.bucket.Name,
key: filename,
arn: record.S3.bucket.Arn,
readerConfig: fs.ReaderConfig,
}
s3Infos = append(s3Infos, info)
}
Expand Down Expand Up @@ -429,7 +402,7 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info,
}

// Decode JSON documents when content-type is "application/json" or expand_event_list_from_field is given in config
if resp.ContentType != nil && *resp.ContentType == "application/json" || info.expandEventListFromField != "" {
if resp.ContentType != nil && *resp.ContentType == "application/json" || info.ExpandEventListFromField != "" {
decoder := json.NewDecoder(bodyReader)
err := c.decodeJSON(decoder, objectHash, info, s3Ctx)
if err != nil {
Expand All @@ -439,9 +412,9 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info,
}

// handle s3 objects that are not json content-type
encodingFactory, ok := encoding.FindEncoding(info.encoding)
encodingFactory, ok := encoding.FindEncoding(info.Encoding)
if !ok || encodingFactory == nil {
return fmt.Errorf("unable to find '%v' encoding", info.encoding)
return fmt.Errorf("unable to find '%v' encoding", info.Encoding)
}
enc, err := encodingFactory(bodyReader)
if err != nil {
Expand All @@ -450,20 +423,20 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info,
var r reader.Reader
r, err = readfile.NewEncodeReader(ioutil.NopCloser(bodyReader), readfile.Config{
Codec: enc,
BufferSize: info.bufferSize,
Terminator: info.lineTerminator,
MaxBytes: info.maxBytes * 4,
BufferSize: int(info.BufferSize),
Terminator: info.LineTerminator,
MaxBytes: int(info.MaxBytes) * 4,
})
r = readfile.NewStripNewline(r, info.lineTerminator)
r = readfile.NewStripNewline(r, info.LineTerminator)

if info.multiline != nil {
r, err = multiline.New(r, "\n", info.maxBytes, info.multiline)
if info.Multiline != nil {
r, err = multiline.New(r, "\n", int(info.MaxBytes), info.Multiline)
if err != nil {
return fmt.Errorf("error setting up multiline: %v", err)
}
}

r = readfile.NewLimitReader(r, info.maxBytes)
r = readfile.NewLimitReader(r, int(info.MaxBytes))

var offset int64
for {
Expand Down Expand Up @@ -516,10 +489,10 @@ func (c *s3Collector) decodeJSON(decoder *json.Decoder, objectHash string, s3Inf
func (c *s3Collector) jsonFieldsType(jsonFields interface{}, offset int64, objectHash string, s3Info s3Info, s3Ctx *s3Context) (int64, error) {
switch f := jsonFields.(type) {
case map[string][]interface{}:
if s3Info.expandEventListFromField != "" {
textValues, ok := f[s3Info.expandEventListFromField]
if s3Info.ExpandEventListFromField != "" {
textValues, ok := f[s3Info.ExpandEventListFromField]
if !ok {
err := fmt.Errorf("key '%s' not found", s3Info.expandEventListFromField)
err := fmt.Errorf("key '%s' not found", s3Info.ExpandEventListFromField)
c.logger.Error(err)
return offset, err
}
Expand All @@ -534,10 +507,10 @@ func (c *s3Collector) jsonFieldsType(jsonFields interface{}, offset int64, objec
return offset, nil
}
case map[string]interface{}:
if s3Info.expandEventListFromField != "" {
textValues, ok := f[s3Info.expandEventListFromField]
if s3Info.ExpandEventListFromField != "" {
textValues, ok := f[s3Info.ExpandEventListFromField]
if !ok {
err := fmt.Errorf("key '%s' not found", s3Info.expandEventListFromField)
err := fmt.Errorf("key '%s' not found", s3Info.ExpandEventListFromField)
c.logger.Error(err)
return offset, err
}
Expand Down
103 changes: 57 additions & 46 deletions x-pack/filebeat/input/awss3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,77 +6,88 @@ package awss3

import (
"fmt"
"regexp"
"time"

"github.com/dustin/go-humanize"

"github.com/elastic/beats/v7/libbeat/common/cfgtype"
"github.com/elastic/beats/v7/libbeat/common/match"
"github.com/elastic/beats/v7/libbeat/reader/multiline"
"github.com/elastic/beats/v7/libbeat/reader/readfile"
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
)

type config struct {
APITimeout time.Duration `config:"api_timeout"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
FileSelectors []FileSelectorCfg `config:"file_selectors"`
FipsEnabled bool `config:"fips_enabled"`
MaxNumberOfMessages int `config:"max_number_of_messages"`
QueueURL string `config:"queue_url" validate:"nonzero,required"`
VisibilityTimeout time.Duration `config:"visibility_timeout"`
AwsConfig awscommon.ConfigAWS `config:",inline"`
MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"`
Multiline *multiline.Config `config:"multiline"`
LineTerminator readfile.LineTerminator `config:"line_terminator"`
Encoding string `config:"encoding"`
BufferSize int `config:"buffer_size"`
}

// FileSelectorCfg defines type and configuration of FileSelectors
type FileSelectorCfg struct {
RegexString string `config:"regex"`
Regex *regexp.Regexp `config:",ignore"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"`
Multiline *multiline.Config `config:"multiline"`
LineTerminator readfile.LineTerminator `config:"line_terminator"`
Encoding string `config:"encoding"`
BufferSize int `config:"buffer_size"`
APITimeout time.Duration `config:"api_timeout"`
VisibilityTimeout time.Duration `config:"visibility_timeout"`
FIPSEnabled bool `config:"fips_enabled"`
MaxNumberOfMessages int `config:"max_number_of_messages"`
QueueURL string `config:"queue_url" validate:"required"`
AWSConfig awscommon.ConfigAWS `config:",inline"`
FileSelectors []fileSelectorConfig `config:"file_selectors"`
ReaderConfig readerConfig `config:",inline"` // Reader options to apply when no file_selectors are used.
}

func defaultConfig() config {
return config{
c := config{
APITimeout: 120 * time.Second,
FipsEnabled: false,
MaxNumberOfMessages: 5,
VisibilityTimeout: 300 * time.Second,
LineTerminator: readfile.AutoLineTerminator,
MaxBytes: 10 * humanize.MiByte,
BufferSize: 16 * humanize.KiByte,
FIPSEnabled: false,
MaxNumberOfMessages: 5,
}
c.ReaderConfig.InitDefaults()
return c
}

func (c *config) Validate() error {
if c.VisibilityTimeout < 0 || c.VisibilityTimeout.Hours() > 12 {
return fmt.Errorf("visibility timeout %v is not within the "+
"required range 0s to 12h", c.VisibilityTimeout)
if c.VisibilityTimeout <= 0 || c.VisibilityTimeout.Hours() > 12 {
return fmt.Errorf("visibility_timeout <%v> must be greater than 0 and "+
"less than or equal to 12h", c.VisibilityTimeout)
}

if c.APITimeout < 0 || c.APITimeout > c.VisibilityTimeout/2 {
return fmt.Errorf("api timeout %v needs to be larger than"+
" 0s and smaller than half of the visibility timeout", c.APITimeout)
if c.APITimeout <= 0 || c.APITimeout > c.VisibilityTimeout/2 {
return fmt.Errorf("api_timeout <%v> must be greater than 0 and less "+
"than 1/2 of the visibility_timeout (%v)", c.APITimeout, c.VisibilityTimeout/2)
}

for i := range c.FileSelectors {
r, err := regexp.Compile(c.FileSelectors[i].RegexString)
if err != nil {
return err
}
c.FileSelectors[i].Regex = r
if c.MaxNumberOfMessages <= 0 || c.MaxNumberOfMessages > 10 {
return fmt.Errorf("max_number_of_messages <%v> must be greater than "+
"0 and less than or equal to 10", c.MaxNumberOfMessages)
}
return nil
}

if c.MaxNumberOfMessages > 10 || c.MaxNumberOfMessages < 1 {
return fmt.Errorf(" max_number_of_messages %v needs to be between 1 and 10", c.MaxNumberOfMessages)
// fileSelectorConfig defines reader configuration that applies to a subset
// of S3 objects whose URL matches the given regex.
type fileSelectorConfig struct {
Regex *match.Matcher `config:"regex" validate:"required"`
ReaderConfig readerConfig `config:",inline"`
}

// readerConfig defines the options for reading the content of an S3 object.
type readerConfig struct {
ExpandEventListFromField string `config:"expand_event_list_from_field"`
BufferSize cfgtype.ByteSize `config:"buffer_size"`
MaxBytes cfgtype.ByteSize `config:"max_bytes"`
Multiline *multiline.Config `config:"multiline"`
LineTerminator readfile.LineTerminator `config:"line_terminator"`
Encoding string `config:"encoding"`
}

func (f *readerConfig) Validate() error {
if f.BufferSize <= 0 {
return fmt.Errorf("buffer_size <%v> must be greater than 0", f.BufferSize)
}

if f.MaxBytes <= 0 {
return fmt.Errorf("max_bytes <%v> must be greater than 0", f.MaxBytes)
}

return nil
}

func (f *readerConfig) InitDefaults() {
f.BufferSize = 16 * humanize.KiByte
f.MaxBytes = 10 * humanize.MiByte
f.LineTerminator = readfile.AutoLineTerminator
}
Loading

0 comments on commit 5233fb0

Please sign in to comment.