Skip to content

Commit

Permalink
[httpjson] Add fail_on_template_error option (elastic#24784)
Browse files Browse the repository at this point in the history
* Update cursor value after every event is published

* Fix documentation typos

* Add fail_on_template_error option
  • Loading branch information
marc-gr authored Mar 30, 2021
1 parent 2f4358f commit 3d39f2d
Show file tree
Hide file tree
Showing 10 changed files with 194 additions and 150 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Updating field mappings for Cisco AMP module, fixing certain fields. {pull}24661[24661]
- Added NTP fileset to Zeek module {pull}24224[24224]
- Add `proxy_url` config for httpjson v2 input. {issue}24615[24615] {pull}24662[24662]
- Add `fail_on_template_error` option for httpjson input. {pull}24784[24784]

*Heartbeat*

Expand Down
18 changes: 10 additions & 8 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ filebeat.inputs:
target: url.value
value: http://localhost:9200/_search/scroll
- set:
target: .url.params.scroll_id
value: '[[.last_request.body._scroll_id]]'
target: url.params.scroll_id
value: '[[.last_response.body._scroll_id]]'
- set:
target: .body.scroll
target: body.scroll
value: 5m
----

Expand Down Expand Up @@ -97,7 +97,7 @@ The `httpjson` input keeps a runtime state between requests. This state can be a
The state has the following elements:

- `last_response.url.value`: The full URL with params and fragments from the last request with a successful response.
- `last_request.url.params`: A map containing the params from the URL in `last_response.url.value`.
- `last_response.url.params`: A map containing the params from the URL in `last_response.url.value`.
- `last_response.header`: A map containing the headers from the last successful response.
- `last_response.body`: A map containing the parsed JSON body from the last successful response. This is the response as it comes from the remote server.
- `last_response.page`: A number indicating the page number of the last response.
Expand Down Expand Up @@ -134,6 +134,7 @@ Appends a value to a list. If the field does not exist, the first entry will be
- `target` defines the destination field where the value is stored.
- `value` defines the value that will be stored and it is a <<value-templates,value template>>.
- `default` defines the fallback value whenever `value` is empty or the template parsing fails. Default templates do not have access to any state, only to functions.
- `fail_on_template_error` if set to `true` an error will be returned and the request will be aborted when the template evaluation fails. Default is `false`.

[float]
==== `delete`
Expand Down Expand Up @@ -164,6 +165,7 @@ Sets a value.
- `target` defines the destination field where the value is stored.
- `value` defines the value that will be stored and it is a <<value-templates,value template>>.
- `default` defines the fallback value whenever `value` is empty or the template parsing fails. Default templates do not have access to any state, only to functions.
- `fail_on_template_error` if set to `true` an error will be returned and the request will be aborted when the template evaluation fails. Default is `false`.

[[value-templates]]
==== Value templates
Expand Down Expand Up @@ -499,16 +501,16 @@ filebeat.inputs:
- delete:
target: body.very_confidential
response.split:
target: .body.hits.hits
target: body.hits.hits
response.pagination:
- set:
target: url.value
value: http://localhost:9200/_search/scroll
- set:
target: .url.params.scroll_id
value: '[[.last_request.body._scroll_id]]'
target: url.params.scroll_id
value: '[[.last_response.body._scroll_id]]'
- set:
target: .body.scroll
target: body.scroll
value: 5m
----

Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/httpjson/internal/v2/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (c *cursor) update(trCtx *transformContext) {
}

for k, cfg := range c.cfg {
v := cfg.Value.Execute(trCtx, transformable{}, cfg.Default, c.log)
v, _ := cfg.Value.Execute(trCtx, transformable{}, cfg.Default, c.log)
_, _ = c.state.Put(k, v)
c.log.Debugf("cursor.%s stored with %s", k, v)
}
Expand Down
6 changes: 4 additions & 2 deletions x-pack/filebeat/input/httpjson/internal/v2/pagination.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,10 @@ func (iter *pageIterator) next() (*response, bool, error) {

httpReq, err := iter.pagination.requestFactory.newHTTPRequest(iter.stdCtx, iter.trCtx)
if err != nil {
if err == errNewURLValueNotSet {
// if this error happens here it means the transform used to pick the new url.value
if err == errNewURLValueNotSet ||
err == errEmptyTemplateResult ||
err == errExecutingTemplate {
// if this error happens here it means a transform
// did not find any new value and we can stop paginating without error
iter.done = true
return nil, false, nil
Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/httpjson/internal/v2/rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (r *rateLimiter) getRateLimit(resp *http.Response) (int64, error) {
tr := transformable{}
tr.setHeader(resp.Header)

remaining := r.remaining.Execute(emptyTransformContext(), tr, nil, r.log)
remaining, _ := r.remaining.Execute(emptyTransformContext(), tr, nil, r.log)
if remaining == "" {
return 0, errors.New("remaining value is empty")
}
Expand All @@ -122,7 +122,7 @@ func (r *rateLimiter) getRateLimit(resp *http.Response) (int64, error) {
return 0, nil
}

reset := r.reset.Execute(emptyTransformContext(), tr, nil, r.log)
reset, _ := r.reset.Execute(emptyTransformContext(), tr, nil, r.log)
if reset == "" {
return 0, errors.New("reset value is empty")
}
Expand Down
3 changes: 1 addition & 2 deletions x-pack/filebeat/input/httpjson/internal/v2/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,10 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
trCtx.updateFirstEvent(maybeMsg.msg)
}
trCtx.updateLastEvent(maybeMsg.msg)
trCtx.updateCursor()
n++
}

trCtx.updateCursor()

r.log.Infof("request finished: %d events published", n)

return nil
Expand Down
30 changes: 18 additions & 12 deletions x-pack/filebeat/input/httpjson/internal/v2/transform_append.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@ import (
const appendName = "append"

type appendConfig struct {
Target string `config:"target"`
Value *valueTpl `config:"value"`
Default *valueTpl `config:"default"`
Target string `config:"target"`
Value *valueTpl `config:"value"`
Default *valueTpl `config:"default"`
FailOnTemplateError bool `config:"fail_on_template_error"`
}

type appendt struct {
log *logp.Logger
targetInfo targetInfo
value *valueTpl
defaultValue *valueTpl
log *logp.Logger
targetInfo targetInfo
value *valueTpl
defaultValue *valueTpl
failOnTemplateError bool

runFunc func(ctx *transformContext, transformable transformable, key, val string) error
}
Expand Down Expand Up @@ -100,15 +102,19 @@ func newAppend(cfg *common.Config, log *logp.Logger) (appendt, error) {
}

return appendt{
log: log,
targetInfo: ti,
value: c.Value,
defaultValue: c.Default,
log: log,
targetInfo: ti,
value: c.Value,
defaultValue: c.Default,
failOnTemplateError: c.FailOnTemplateError,
}, nil
}

func (append *appendt) run(ctx *transformContext, tr transformable) (transformable, error) {
value := append.value.Execute(ctx, tr, append.defaultValue, append.log)
value, err := append.value.Execute(ctx, tr, append.defaultValue, append.log)
if err != nil && append.failOnTemplateError {
return transformable{}, err
}
if err := append.runFunc(ctx, tr, append.targetInfo.Name, value); err != nil {
return transformable{}, err
}
Expand Down
35 changes: 18 additions & 17 deletions x-pack/filebeat/input/httpjson/internal/v2/transform_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ var errNewURLValueNotSet = errors.New("the new url.value was not set")
const setName = "set"

type setConfig struct {
Target string `config:"target"`
Value *valueTpl `config:"value"`
Default *valueTpl `config:"default"`
Target string `config:"target"`
Value *valueTpl `config:"value"`
Default *valueTpl `config:"default"`
FailOnTemplateError bool `config:"fail_on_template_error"`
}

type set struct {
log *logp.Logger
targetInfo targetInfo
value *valueTpl
defaultValue *valueTpl
log *logp.Logger
targetInfo targetInfo
value *valueTpl
defaultValue *valueTpl
failOnTemplateError bool

runFunc func(ctx *transformContext, transformable transformable, key, val string) error
}
Expand Down Expand Up @@ -105,15 +107,19 @@ func newSet(cfg *common.Config, log *logp.Logger) (set, error) {
}

return set{
log: log,
targetInfo: ti,
value: c.Value,
defaultValue: c.Default,
log: log,
targetInfo: ti,
value: c.Value,
defaultValue: c.Default,
failOnTemplateError: c.FailOnTemplateError,
}, nil
}

func (set *set) run(ctx *transformContext, tr transformable) (transformable, error) {
value := set.value.Execute(ctx, tr, set.defaultValue, set.log)
value, err := set.value.Execute(ctx, tr, set.defaultValue, set.log)
if err != nil && set.failOnTemplateError {
return transformable{}, err
}
if err := set.runFunc(ctx, tr, set.targetInfo.Name, value); err != nil {
return transformable{}, err
}
Expand Down Expand Up @@ -155,11 +161,6 @@ func setURLParams(ctx *transformContext, transformable transformable, key, value
}

func setURLValue(ctx *transformContext, transformable transformable, _, value string) error {
// if the template processing did not find any value
// we fail without parsing
if value == "<no value>" || value == "" {
return errNewURLValueNotSet
}
url, err := url.Parse(value)
if err != nil {
return errNewURLValueNotSet
Expand Down
24 changes: 15 additions & 9 deletions x-pack/filebeat/input/httpjson/internal/v2/value_tpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package v2

import (
"bytes"
"errors"
"regexp"
"strconv"
"strings"
Expand All @@ -21,6 +22,11 @@ const (
rightDelim = "]]"
)

var (
errEmptyTemplateResult = errors.New("the template result is empty")
errExecutingTemplate = errors.New("the template execution failed")
)

type valueTpl struct {
*template.Template
}
Expand Down Expand Up @@ -51,21 +57,21 @@ func (t *valueTpl) Unpack(in string) error {
return nil
}

func (t *valueTpl) Execute(trCtx *transformContext, tr transformable, defaultVal *valueTpl, log *logp.Logger) (val string) {
fallback := func(err error) string {
if err != nil {
log.Debugf("template execution failed: %v", err)
}
func (t *valueTpl) Execute(trCtx *transformContext, tr transformable, defaultVal *valueTpl, log *logp.Logger) (val string, err error) {
fallback := func(err error) (string, error) {
if defaultVal != nil {
log.Debugf("template execution: falling back to default value")
return defaultVal.Execute(emptyTransformContext(), transformable{}, nil, log)
}
return ""
return "", err
}

defer func() {
if r := recover(); r != nil {
val = fallback(r.(error))
val, err = fallback(errExecutingTemplate)
}
if err != nil {
log.Debugf("template execution failed: %v", err)
}
log.Debugf("template execution: evaluated template %q", val)
}()
Expand All @@ -83,9 +89,9 @@ func (t *valueTpl) Execute(trCtx *transformContext, tr transformable, defaultVal

val = buf.String()
if val == "" || strings.Contains(val, "<no value>") {
return fallback(nil)
return fallback(errEmptyTemplateResult)
}
return val
return val, nil
}

var (
Expand Down
Loading

0 comments on commit 3d39f2d

Please sign in to comment.