From 3d39f2d01b87874fb1fd7dad05af302badecef8b Mon Sep 17 00:00:00 2001
From: Marc Guasch <marc-gr@users.noreply.github.com>
Date: Tue, 30 Mar 2021 10:19:44 +0200
Subject: [PATCH] [httpjson] Add fail_on_template_error option (#24784)

* Update cursor value after every event is published

* Fix documentation typos

* Add fail_on_template_error option
---
 CHANGELOG.next.asciidoc                       |   1 +
 .../docs/inputs/input-httpjson.asciidoc       |  18 +-
 .../input/httpjson/internal/v2/cursor.go      |   2 +-
 .../input/httpjson/internal/v2/pagination.go  |   6 +-
 .../httpjson/internal/v2/rate_limiter.go      |   4 +-
 .../input/httpjson/internal/v2/request.go     |   3 +-
 .../httpjson/internal/v2/transform_append.go  |  30 ++-
 .../httpjson/internal/v2/transform_set.go     |  35 +--
 .../input/httpjson/internal/v2/value_tpl.go   |  24 +-
 .../httpjson/internal/v2/value_tpl_test.go    | 221 ++++++++++--------
 10 files changed, 194 insertions(+), 150 deletions(-)

diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc
index 3d48e670c10c..f1ab7158121c 100644
--- a/CHANGELOG.next.asciidoc
+++ b/CHANGELOG.next.asciidoc
@@ -808,6 +808,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
 - Updating field mappings for Cisco AMP module, fixing certain fields. {pull}24661[24661]
 - Added NTP fileset to Zeek module {pull}24224[24224]
 - Add `proxy_url` config for httpjson v2 input. {issue}24615[24615] {pull}24662[24662]
+- Add `fail_on_template_error` option for httpjson input. {pull}24784[24784]
 
 *Heartbeat*
 
diff --git a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
index a0ac58f72474..c698c99e4ccd 100644
--- a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
+++ b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
@@ -54,10 +54,10 @@ filebeat.inputs:
         target: url.value
         value: http://localhost:9200/_search/scroll
     - set:
-        target: .url.params.scroll_id
-        value: '[[.last_request.body._scroll_id]]'
+        target: url.params.scroll_id
+        value: '[[.last_response.body._scroll_id]]'
     - set:
-        target: .body.scroll
+        target: body.scroll
         value: 5m
 ----
 
@@ -97,7 +97,7 @@ The `httpjson` input keeps a runtime state between requests. This state can be a
 The state has the following elements:
 
 - `last_response.url.value`: The full URL with params and fragments from the last request with a successful response.
-- `last_request.url.params`: A map containing the params from the URL in `last_response.url.value`.
+- `last_response.url.params`: A map containing the params from the URL in `last_response.url.value`.
 - `last_response.header`: A map containing the headers from the last successful response.
 - `last_response.body`: A map containing the parsed JSON body from the last successful response. This is the response as it comes from the remote server.
 - `last_response.page`: A number indicating the page number of the last response.
@@ -134,6 +134,7 @@ Appends a value to a list. If the field does not exist, the first entry will be
 - `target` defines the destination field where the value is stored.
 - `value` defines the value that will be stored and it is a <<value-templates,value template>>.
 - `default` defines the fallback value whenever `value` is empty or the template parsing fails. Default templates do not have access to any state, only to functions.
+- `fail_on_template_error` if set to `true` an error will be returned and the request will be aborted when the template evaluation fails. Default is `false`.
 
 [float]
 ==== `delete`
@@ -164,6 +165,7 @@ Sets a value.
 - `target` defines the destination field where the value is stored.
 - `value` defines the value that will be stored and it is a <<value-templates,value template>>.
 - `default` defines the fallback value whenever `value` is empty or the template parsing fails. Default templates do not have access to any state, only to functions.
+- `fail_on_template_error` if set to `true` an error will be returned and the request will be aborted when the template evaluation fails. Default is `false`.
 
 [[value-templates]]
 ==== Value templates
@@ -499,16 +501,16 @@ filebeat.inputs:
     - delete:
         target: body.very_confidential
   response.split:
-    target: .body.hits.hits
+    target: body.hits.hits
   response.pagination:
     - set:
         target: url.value
         value: http://localhost:9200/_search/scroll
     - set:
-        target: .url.params.scroll_id
-        value: '[[.last_request.body._scroll_id]]'
+        target: url.params.scroll_id
+        value: '[[.last_response.body._scroll_id]]'
     - set:
-        target: .body.scroll
+        target: body.scroll
         value: 5m
 ----
 
diff --git a/x-pack/filebeat/input/httpjson/internal/v2/cursor.go b/x-pack/filebeat/input/httpjson/internal/v2/cursor.go
index 053cdd87bd4b..a324ac15dd50 100644
--- a/x-pack/filebeat/input/httpjson/internal/v2/cursor.go
+++ b/x-pack/filebeat/input/httpjson/internal/v2/cursor.go
@@ -50,7 +50,7 @@ func (c *cursor) update(trCtx *transformContext) {
 	}
 
 	for k, cfg := range c.cfg {
-		v := cfg.Value.Execute(trCtx, transformable{}, cfg.Default, c.log)
+		v, _ := cfg.Value.Execute(trCtx, transformable{}, cfg.Default, c.log)
 		_, _ = c.state.Put(k, v)
 		c.log.Debugf("cursor.%s stored with %s", k, v)
 	}
diff --git a/x-pack/filebeat/input/httpjson/internal/v2/pagination.go b/x-pack/filebeat/input/httpjson/internal/v2/pagination.go
index 4796742d9bcd..6b28fa0f206b 100644
--- a/x-pack/filebeat/input/httpjson/internal/v2/pagination.go
+++ b/x-pack/filebeat/input/httpjson/internal/v2/pagination.go
@@ -124,8 +124,10 @@ func (iter *pageIterator) next() (*response, bool, error) {
 
 	httpReq, err := iter.pagination.requestFactory.newHTTPRequest(iter.stdCtx, iter.trCtx)
 	if err != nil {
-		if err == errNewURLValueNotSet {
-			// if this error happens here it means the transform used to pick the new url.value
+		if err == errNewURLValueNotSet ||
+			err == errEmptyTemplateResult ||
+			err == errExecutingTemplate {
+			// if this error happens here it means a transform
 			// did not find any new value and we can stop paginating without error
 			iter.done = true
 			return nil, false, nil
diff --git a/x-pack/filebeat/input/httpjson/internal/v2/rate_limiter.go b/x-pack/filebeat/input/httpjson/internal/v2/rate_limiter.go
index 5c7e2c16a985..5d457dea7ae2 100644
--- a/x-pack/filebeat/input/httpjson/internal/v2/rate_limiter.go
+++ b/x-pack/filebeat/input/httpjson/internal/v2/rate_limiter.go
@@ -104,7 +104,7 @@ func (r *rateLimiter) getRateLimit(resp *http.Response) (int64, error) {
 	tr := transformable{}
 	tr.setHeader(resp.Header)
 
-	remaining := r.remaining.Execute(emptyTransformContext(), tr, nil, r.log)
+	remaining, _ := r.remaining.Execute(emptyTransformContext(), tr, nil, r.log)
 	if remaining == "" {
 		return 0, errors.New("remaining value is empty")
 	}
@@ -122,7 +122,7 @@ func (r *rateLimiter) getRateLimit(resp *http.Response) (int64, error) {
 		return 0, nil
 	}
 
-	reset := r.reset.Execute(emptyTransformContext(), tr, nil, r.log)
+	reset, _ := r.reset.Execute(emptyTransformContext(), tr, nil, r.log)
 	if reset == "" {
 		return 0, errors.New("reset value is empty")
 	}
diff --git a/x-pack/filebeat/input/httpjson/internal/v2/request.go b/x-pack/filebeat/input/httpjson/internal/v2/request.go
index 2ef921754945..c02cab5be8be 100644
--- a/x-pack/filebeat/input/httpjson/internal/v2/request.go
+++ b/x-pack/filebeat/input/httpjson/internal/v2/request.go
@@ -201,11 +201,10 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
 			trCtx.updateFirstEvent(maybeMsg.msg)
 		}
 		trCtx.updateLastEvent(maybeMsg.msg)
+		trCtx.updateCursor()
 		n++
 	}
 
-	trCtx.updateCursor()
-
 	r.log.Infof("request finished: %d events published", n)
 
 	return nil
diff --git a/x-pack/filebeat/input/httpjson/internal/v2/transform_append.go b/x-pack/filebeat/input/httpjson/internal/v2/transform_append.go
index 6a5867e5bbb9..f2561ecd55b4 100644
--- a/x-pack/filebeat/input/httpjson/internal/v2/transform_append.go
+++ b/x-pack/filebeat/input/httpjson/internal/v2/transform_append.go
@@ -16,16 +16,18 @@ import (
 const appendName = "append"
 
 type appendConfig struct {
-	Target  string    `config:"target"`
-	Value   *valueTpl `config:"value"`
-	Default *valueTpl `config:"default"`
+	Target              string    `config:"target"`
+	Value               *valueTpl `config:"value"`
+	Default             *valueTpl `config:"default"`
+	FailOnTemplateError bool      `config:"fail_on_template_error"`
 }
 
 type appendt struct {
-	log          *logp.Logger
-	targetInfo   targetInfo
-	value        *valueTpl
-	defaultValue *valueTpl
+	log                 *logp.Logger
+	targetInfo          targetInfo
+	value               *valueTpl
+	defaultValue        *valueTpl
+	failOnTemplateError bool
 
 	runFunc func(ctx *transformContext, transformable transformable, key, val string) error
 }
@@ -100,15 +102,19 @@ func newAppend(cfg *common.Config, log *logp.Logger) (appendt, error) {
 	}
 
 	return appendt{
-		log:          log,
-		targetInfo:   ti,
-		value:        c.Value,
-		defaultValue: c.Default,
+		log:                 log,
+		targetInfo:          ti,
+		value:               c.Value,
+		defaultValue:        c.Default,
+		failOnTemplateError: c.FailOnTemplateError,
 	}, nil
 }
 
 func (append *appendt) run(ctx *transformContext, tr transformable) (transformable, error) {
-	value := append.value.Execute(ctx, tr, append.defaultValue, append.log)
+	value, err := append.value.Execute(ctx, tr, append.defaultValue, append.log)
+	if err != nil && append.failOnTemplateError {
+		return transformable{}, err
+	}
 	if err := append.runFunc(ctx, tr, append.targetInfo.Name, value); err != nil {
 		return transformable{}, err
 	}
diff --git a/x-pack/filebeat/input/httpjson/internal/v2/transform_set.go b/x-pack/filebeat/input/httpjson/internal/v2/transform_set.go
index fcdb1fbbb391..4f975ba7db43 100644
--- a/x-pack/filebeat/input/httpjson/internal/v2/transform_set.go
+++ b/x-pack/filebeat/input/httpjson/internal/v2/transform_set.go
@@ -19,16 +19,18 @@ var errNewURLValueNotSet = errors.New("the new url.value was not set")
 const setName = "set"
 
 type setConfig struct {
-	Target  string    `config:"target"`
-	Value   *valueTpl `config:"value"`
-	Default *valueTpl `config:"default"`
+	Target              string    `config:"target"`
+	Value               *valueTpl `config:"value"`
+	Default             *valueTpl `config:"default"`
+	FailOnTemplateError bool      `config:"fail_on_template_error"`
 }
 
 type set struct {
-	log          *logp.Logger
-	targetInfo   targetInfo
-	value        *valueTpl
-	defaultValue *valueTpl
+	log                 *logp.Logger
+	targetInfo          targetInfo
+	value               *valueTpl
+	defaultValue        *valueTpl
+	failOnTemplateError bool
 
 	runFunc func(ctx *transformContext, transformable transformable, key, val string) error
 }
@@ -105,15 +107,19 @@ func newSet(cfg *common.Config, log *logp.Logger) (set, error) {
 	}
 
 	return set{
-		log:          log,
-		targetInfo:   ti,
-		value:        c.Value,
-		defaultValue: c.Default,
+		log:                 log,
+		targetInfo:          ti,
+		value:               c.Value,
+		defaultValue:        c.Default,
+		failOnTemplateError: c.FailOnTemplateError,
 	}, nil
 }
 
 func (set *set) run(ctx *transformContext, tr transformable) (transformable, error) {
-	value := set.value.Execute(ctx, tr, set.defaultValue, set.log)
+	value, err := set.value.Execute(ctx, tr, set.defaultValue, set.log)
+	if err != nil && set.failOnTemplateError {
+		return transformable{}, err
+	}
 	if err := set.runFunc(ctx, tr, set.targetInfo.Name, value); err != nil {
 		return transformable{}, err
 	}
@@ -155,11 +161,6 @@ func setURLParams(ctx *transformContext, transformable transformable, key, value
 }
 
 func setURLValue(ctx *transformContext, transformable transformable, _, value string) error {
-	// if the template processing did not find any value
-	// we fail without parsing
-	if value == "<no value>" || value == "" {
-		return errNewURLValueNotSet
-	}
 	url, err := url.Parse(value)
 	if err != nil {
 		return errNewURLValueNotSet
diff --git a/x-pack/filebeat/input/httpjson/internal/v2/value_tpl.go b/x-pack/filebeat/input/httpjson/internal/v2/value_tpl.go
index 9db90ce7ae11..6238bf06273d 100644
--- a/x-pack/filebeat/input/httpjson/internal/v2/value_tpl.go
+++ b/x-pack/filebeat/input/httpjson/internal/v2/value_tpl.go
@@ -6,6 +6,7 @@ package v2
 
 import (
 	"bytes"
+	"errors"
 	"regexp"
 	"strconv"
 	"strings"
@@ -21,6 +22,11 @@ const (
 	rightDelim = "]]"
 )
 
+var (
+	errEmptyTemplateResult = errors.New("the template result is empty")
+	errExecutingTemplate   = errors.New("the template execution failed")
+)
+
 type valueTpl struct {
 	*template.Template
 }
@@ -51,21 +57,21 @@ func (t *valueTpl) Unpack(in string) error {
 	return nil
 }
 
-func (t *valueTpl) Execute(trCtx *transformContext, tr transformable, defaultVal *valueTpl, log *logp.Logger) (val string) {
-	fallback := func(err error) string {
-		if err != nil {
-			log.Debugf("template execution failed: %v", err)
-		}
+func (t *valueTpl) Execute(trCtx *transformContext, tr transformable, defaultVal *valueTpl, log *logp.Logger) (val string, err error) {
+	fallback := func(err error) (string, error) {
 		if defaultVal != nil {
 			log.Debugf("template execution: falling back to default value")
 			return defaultVal.Execute(emptyTransformContext(), transformable{}, nil, log)
 		}
-		return ""
+		return "", err
 	}
 
 	defer func() {
 		if r := recover(); r != nil {
-			val = fallback(r.(error))
+			val, err = fallback(errExecutingTemplate)
+		}
+		if err != nil {
+			log.Debugf("template execution failed: %v", err)
 		}
 		log.Debugf("template execution: evaluated template %q", val)
 	}()
@@ -83,9 +89,9 @@ func (t *valueTpl) Execute(trCtx *transformContext, tr transformable, defaultVal
 
 	val = buf.String()
 	if val == "" || strings.Contains(val, "<no value>") {
-		return fallback(nil)
+		return fallback(errEmptyTemplateResult)
 	}
-	return val
+	return val, nil
 }
 
 var (
diff --git a/x-pack/filebeat/input/httpjson/internal/v2/value_tpl_test.go b/x-pack/filebeat/input/httpjson/internal/v2/value_tpl_test.go
index b9be148d1f05..0fd8e9964871 100644
--- a/x-pack/filebeat/input/httpjson/internal/v2/value_tpl_test.go
+++ b/x-pack/filebeat/input/httpjson/internal/v2/value_tpl_test.go
@@ -17,14 +17,15 @@ import (
 
 func TestValueTpl(t *testing.T) {
 	cases := []struct {
-		name        string
-		value       string
-		paramCtx    *transformContext
-		paramTr     transformable
-		paramDefVal string
-		expected    string
-		setup       func()
-		teardown    func()
+		name          string
+		value         string
+		paramCtx      *transformContext
+		paramTr       transformable
+		paramDefVal   string
+		expectedVal   string
+		expectedError string
+		setup         func()
+		teardown      func()
 	}{
 		{
 			name:  "can render values from ctx",
@@ -36,7 +37,7 @@ func TestValueTpl(t *testing.T) {
 			},
 			paramTr:     transformable{},
 			paramDefVal: "",
-			expected:    "25",
+			expectedVal: "25",
 		},
 		{
 			name:  "can render default value if execute fails",
@@ -46,7 +47,7 @@ func TestValueTpl(t *testing.T) {
 			},
 			paramTr:     transformable{},
 			paramDefVal: "25",
-			expected:    "25",
+			expectedVal: "25",
 		},
 		{
 			name:        "can render default value if template is empty",
@@ -54,107 +55,123 @@ func TestValueTpl(t *testing.T) {
 			paramCtx:    emptyTransformContext(),
 			paramTr:     transformable{},
 			paramDefVal: "25",
-			expected:    "25",
+			expectedVal: "25",
+		},
+		{
+			name:          "returns error if result is empty and no default is set",
+			value:         "",
+			paramCtx:      emptyTransformContext(),
+			paramTr:       transformable{},
+			paramDefVal:   "",
+			expectedVal:   "",
+			expectedError: errEmptyTemplateResult.Error(),
 		},
 		{
 			name:        "can render default value if execute panics",
 			value:       "[[.last_response.panic]]",
 			paramDefVal: "25",
-			expected:    "25",
+			expectedVal: "25",
 		},
 		{
-			name:     "func parseDuration",
-			value:    `[[ parseDuration "-1h" ]]`,
-			paramCtx: emptyTransformContext(),
-			paramTr:  transformable{},
-			expected: "-1h0m0s",
+			name:          "returns error if panics and no default is set",
+			value:         "[[.last_response.panic]]",
+			paramDefVal:   "",
+			expectedVal:   "",
+			expectedError: errExecutingTemplate.Error(),
 		},
 		{
-			name:     "func now",
-			setup:    func() { timeNow = func() time.Time { return time.Unix(1604582732, 0).UTC() } },
-			teardown: func() { timeNow = time.Now },
-			value:    `[[ now ]]`,
-			paramCtx: emptyTransformContext(),
-			paramTr:  transformable{},
-			expected: "2020-11-05 13:25:32 +0000 UTC",
+			name:        "func parseDuration",
+			value:       `[[ parseDuration "-1h" ]]`,
+			paramCtx:    emptyTransformContext(),
+			paramTr:     transformable{},
+			expectedVal: "-1h0m0s",
 		},
 		{
-			name:     "func now with duration",
-			setup:    func() { timeNow = func() time.Time { return time.Unix(1604582732, 0).UTC() } },
-			teardown: func() { timeNow = time.Now },
-			value:    `[[ now (parseDuration "-1h") ]]`,
-			paramCtx: emptyTransformContext(),
-			paramTr:  transformable{},
-			expected: "2020-11-05 12:25:32 +0000 UTC",
+			name:        "func now",
+			setup:       func() { timeNow = func() time.Time { return time.Unix(1604582732, 0).UTC() } },
+			teardown:    func() { timeNow = time.Now },
+			value:       `[[ now ]]`,
+			paramCtx:    emptyTransformContext(),
+			paramTr:     transformable{},
+			expectedVal: "2020-11-05 13:25:32 +0000 UTC",
 		},
 		{
-			name:     "func parseDate",
-			value:    `[[ parseDate "2020-11-05T12:25:32.1234567Z" "RFC3339Nano" ]]`,
-			paramCtx: emptyTransformContext(),
-			paramTr:  transformable{},
-			expected: "2020-11-05 12:25:32.1234567 +0000 UTC",
+			name:        "func now with duration",
+			setup:       func() { timeNow = func() time.Time { return time.Unix(1604582732, 0).UTC() } },
+			teardown:    func() { timeNow = time.Now },
+			value:       `[[ now (parseDuration "-1h") ]]`,
+			paramCtx:    emptyTransformContext(),
+			paramTr:     transformable{},
+			expectedVal: "2020-11-05 12:25:32 +0000 UTC",
 		},
 		{
-			name:     "func parseDate defaults to RFC3339",
-			value:    `[[ parseDate "2020-11-05T12:25:32Z" ]]`,
-			paramCtx: emptyTransformContext(),
-			paramTr:  transformable{},
-			expected: "2020-11-05 12:25:32 +0000 UTC",
+			name:        "func parseDate",
+			value:       `[[ parseDate "2020-11-05T12:25:32.1234567Z" "RFC3339Nano" ]]`,
+			paramCtx:    emptyTransformContext(),
+			paramTr:     transformable{},
+			expectedVal: "2020-11-05 12:25:32.1234567 +0000 UTC",
 		},
 		{
-			name:     "func parseDate with custom layout",
-			value:    `[[ (parseDate "Thu Nov  5 12:25:32 +0000 2020" "Mon Jan _2 15:04:05 -0700 2006") ]]`,
-			paramCtx: emptyTransformContext(),
-			paramTr:  transformable{},
-			expected: "2020-11-05 12:25:32 +0000 UTC",
+			name:        "func parseDate defaults to RFC3339",
+			value:       `[[ parseDate "2020-11-05T12:25:32Z" ]]`,
+			paramCtx:    emptyTransformContext(),
+			paramTr:     transformable{},
+			expectedVal: "2020-11-05 12:25:32 +0000 UTC",
 		},
 		{
-			name:     "func formatDate",
-			setup:    func() { timeNow = func() time.Time { return time.Unix(1604582732, 0).UTC() } },
-			teardown: func() { timeNow = time.Now },
-			value:    `[[ formatDate (now) "UnixDate" "America/New_York" ]]`,
-			paramCtx: emptyTransformContext(),
-			paramTr:  transformable{},
-			expected: "Thu Nov  5 08:25:32 EST 2020",
+			name:        "func parseDate with custom layout",
+			value:       `[[ (parseDate "Thu Nov  5 12:25:32 +0000 2020" "Mon Jan _2 15:04:05 -0700 2006") ]]`,
+			paramCtx:    emptyTransformContext(),
+			paramTr:     transformable{},
+			expectedVal: "2020-11-05 12:25:32 +0000 UTC",
 		},
 		{
-			name:     "func formatDate defaults to UTC",
-			setup:    func() { timeNow = func() time.Time { return time.Unix(1604582732, 0).UTC() } },
-			teardown: func() { timeNow = time.Now },
-			value:    `[[ formatDate (now) "UnixDate" ]]`,
-			paramCtx: emptyTransformContext(),
-			paramTr:  transformable{},
-			expected: "Thu Nov  5 13:25:32 UTC 2020",
+			name:        "func formatDate",
+			setup:       func() { timeNow = func() time.Time { return time.Unix(1604582732, 0).UTC() } },
+			teardown:    func() { timeNow = time.Now },
+			value:       `[[ formatDate (now) "UnixDate" "America/New_York" ]]`,
+			paramCtx:    emptyTransformContext(),
+			paramTr:     transformable{},
+			expectedVal: "Thu Nov  5 08:25:32 EST 2020",
 		},
 		{
-			name:     "func formatDate falls back to UTC",
-			setup:    func() { timeNow = func() time.Time { return time.Unix(1604582732, 0).UTC() } },
-			teardown: func() { timeNow = time.Now },
-			value:    `[[ formatDate (now) "UnixDate" "wrong/tz"]]`,
-			paramCtx: emptyTransformContext(),
-			paramTr:  transformable{},
-			expected: "Thu Nov  5 13:25:32 UTC 2020",
+			name:        "func formatDate defaults to UTC",
+			setup:       func() { timeNow = func() time.Time { return time.Unix(1604582732, 0).UTC() } },
+			teardown:    func() { timeNow = time.Now },
+			value:       `[[ formatDate (now) "UnixDate" ]]`,
+			paramCtx:    emptyTransformContext(),
+			paramTr:     transformable{},
+			expectedVal: "Thu Nov  5 13:25:32 UTC 2020",
 		},
 		{
-			name:     "func parseTimestamp",
-			value:    `[[ (parseTimestamp 1604582732) ]]`,
-			paramCtx: emptyTransformContext(),
-			paramTr:  transformable{},
-			expected: "2020-11-05 13:25:32 +0000 UTC",
+			name:        "func formatDate falls back to UTC",
+			setup:       func() { timeNow = func() time.Time { return time.Unix(1604582732, 0).UTC() } },
+			teardown:    func() { timeNow = time.Now },
+			value:       `[[ formatDate (now) "UnixDate" "wrong/tz"]]`,
+			paramCtx:    emptyTransformContext(),
+			paramTr:     transformable{},
+			expectedVal: "Thu Nov  5 13:25:32 UTC 2020",
 		},
 		{
-			name:     "func parseTimestampMilli",
-			value:    `[[ (parseTimestampMilli 1604582732000) ]]`,
-			paramCtx: emptyTransformContext(),
-			paramTr:  transformable{},
-			expected: "2020-11-05 13:25:32 +0000 UTC",
+			name:        "func parseTimestamp",
+			value:       `[[ (parseTimestamp 1604582732) ]]`,
+			paramCtx:    emptyTransformContext(),
+			paramTr:     transformable{},
+			expectedVal: "2020-11-05 13:25:32 +0000 UTC",
 		},
 		{
-			name:     "func parseTimestampNano",
-			value:    `[[ (parseTimestampNano 1604582732000000000) ]]`,
-			paramCtx: emptyTransformContext(),
-			paramTr:  transformable{},
-			expected: "2020-11-05 13:25:32 +0000 UTC",
+			name:        "func parseTimestampMilli",
+			value:       `[[ (parseTimestampMilli 1604582732000) ]]`,
+			paramCtx:    emptyTransformContext(),
+			paramTr:     transformable{},
+			expectedVal: "2020-11-05 13:25:32 +0000 UTC",
+		},
+		{
+			name:        "func parseTimestampNano",
+			value:       `[[ (parseTimestampNano 1604582732000000000) ]]`,
+			paramCtx:    emptyTransformContext(),
+			paramTr:     transformable{},
+			expectedVal: "2020-11-05 13:25:32 +0000 UTC",
 		},
 		{
 			name:  "func getRFC5988Link",
@@ -171,8 +188,8 @@ func TestValueTpl(t *testing.T) {
 					"",
 				),
 			},
-			paramTr:  transformable{},
-			expected: "https://example.com/api/v1/users?before=00ubfjQEMYBLRUWIEDKK",
+			paramTr:     transformable{},
+			expectedVal: "https://example.com/api/v1/users?before=00ubfjQEMYBLRUWIEDKK",
 		},
 		{
 			name:  "func getRFC5988Link does not match",
@@ -188,7 +205,7 @@ func TestValueTpl(t *testing.T) {
 			},
 			paramTr:     transformable{},
 			paramDefVal: "https://example.com/default",
-			expected:    "https://example.com/default",
+			expectedVal: "https://example.com/default",
 		},
 		{
 			name:        "func getRFC5988Link empty header",
@@ -196,16 +213,16 @@ func TestValueTpl(t *testing.T) {
 			paramCtx:    emptyTransformContext(),
 			paramTr:     transformable{},
 			paramDefVal: "https://example.com/default",
-			expected:    "https://example.com/default",
+			expectedVal: "https://example.com/default",
 		},
 		{
-			name:     "can execute functions pipeline",
-			setup:    func() { timeNow = func() time.Time { return time.Unix(1604582732, 0).UTC() } },
-			teardown: func() { timeNow = time.Now },
-			value:    `[[ (parseDuration "-1h") | now | formatDate ]]`,
-			paramCtx: emptyTransformContext(),
-			paramTr:  transformable{},
-			expected: "2020-11-05T12:25:32Z",
+			name:        "can execute functions pipeline",
+			setup:       func() { timeNow = func() time.Time { return time.Unix(1604582732, 0).UTC() } },
+			teardown:    func() { timeNow = time.Now },
+			value:       `[[ (parseDuration "-1h") | now | formatDate ]]`,
+			paramCtx:    emptyTransformContext(),
+			paramTr:     transformable{},
+			expectedVal: "2020-11-05T12:25:32Z",
 		},
 	}
 
@@ -220,10 +237,20 @@ func TestValueTpl(t *testing.T) {
 			}
 			tpl := &valueTpl{}
 			assert.NoError(t, tpl.Unpack(tc.value))
-			defTpl := &valueTpl{}
-			assert.NoError(t, defTpl.Unpack(tc.paramDefVal))
-			got := tpl.Execute(tc.paramCtx, tc.paramTr, defTpl, logp.NewLogger(""))
-			assert.Equal(t, tc.expected, got)
+
+			var defTpl *valueTpl
+			if tc.paramDefVal != "" {
+				defTpl = &valueTpl{}
+				assert.NoError(t, defTpl.Unpack(tc.paramDefVal))
+			}
+
+			got, err := tpl.Execute(tc.paramCtx, tc.paramTr, defTpl, logp.NewLogger(""))
+			assert.Equal(t, tc.expectedVal, got)
+			if tc.expectedError == "" {
+				assert.NoError(t, err)
+			} else {
+				assert.Equal(t, tc.expectedError, err.Error())
+			}
 		})
 	}
 }