From d982ed9a45cf78a083144fba847b67f5c212783d Mon Sep 17 00:00:00 2001 From: Sebastian Spaink <3441183+sspaink@users.noreply.github.com> Date: Mon, 26 Sep 2022 14:28:32 -0500 Subject: [PATCH] feat(inputs.amqp_consumer): Determine content encoding automatically (#11860) --- internal/content_coding.go | 35 +++++++++++++ plugins/inputs/amqp_consumer/README.md | 15 +++++- plugins/inputs/amqp_consumer/amqp_consumer.go | 1 + .../amqp_consumer/amqp_consumer_test.go | 51 +++++++++++++++++++ plugins/inputs/amqp_consumer/sample.conf | 7 ++- 5 files changed, 105 insertions(+), 4 deletions(-) create mode 100644 plugins/inputs/amqp_consumer/amqp_consumer_test.go diff --git a/internal/content_coding.go b/internal/content_coding.go index df572ecb0fd2e..cd7984f2290ea 100644 --- a/internal/content_coding.go +++ b/internal/content_coding.go @@ -82,6 +82,32 @@ func NewContentEncoder(encoding string) (ContentEncoder, error) { } } +type AutoDecoder struct { + encoding string + gzip *GzipDecoder + identity *IdentityDecoder +} + +func (a *AutoDecoder) SetEnconding(encoding string) { + a.encoding = encoding +} + +func (a *AutoDecoder) Decode(data []byte) ([]byte, error) { + if a.encoding == "gzip" { + return a.gzip.Decode(data) + } + return a.identity.Decode(data) +} + +func NewAutoContentDecoder() (*AutoDecoder, error) { + var a AutoDecoder + var err error + + a.identity = NewIdentityDecoder() + a.gzip, err = NewGzipDecoder() + return &a, err +} + // NewContentDecoder returns a ContentDecoder for the encoding type. func NewContentDecoder(encoding string) (ContentDecoder, error) { switch encoding { @@ -91,6 +117,8 @@ func NewContentDecoder(encoding string) (ContentDecoder, error) { return NewZlibDecoder() case "identity", "": return NewIdentityDecoder(), nil + case "auto": + return NewAutoContentDecoder() default: return nil, errors.New("invalid value for content_encoding") } @@ -171,6 +199,7 @@ func (*IdentityEncoder) Encode(data []byte) ([]byte, error) { // ContentDecoder removes a wrapper encoding from byte buffers. type ContentDecoder interface { + SetEnconding(string) Decode([]byte) ([]byte, error) } @@ -187,6 +216,8 @@ func NewGzipDecoder() (*GzipDecoder, error) { }, nil } +func (*GzipDecoder) SetEnconding(string) {} + func (d *GzipDecoder) Decode(data []byte) ([]byte, error) { d.reader.Reset(bytes.NewBuffer(data)) d.buf.Reset() @@ -212,6 +243,8 @@ func NewZlibDecoder() (*ZlibDecoder, error) { }, nil } +func (*ZlibDecoder) SetEnconding(string) {} + func (d *ZlibDecoder) Decode(data []byte) ([]byte, error) { d.buf.Reset() @@ -238,6 +271,8 @@ func NewIdentityDecoder() *IdentityDecoder { return &IdentityDecoder{} } +func (*IdentityDecoder) SetEnconding(string) {} + func (*IdentityDecoder) Decode(data []byte) ([]byte, error) { return data, nil } diff --git a/plugins/inputs/amqp_consumer/README.md b/plugins/inputs/amqp_consumer/README.md index f64426bcfd2ed..e52f3061aa277 100644 --- a/plugins/inputs/amqp_consumer/README.md +++ b/plugins/inputs/amqp_consumer/README.md @@ -82,8 +82,11 @@ For an introduction to AMQP see: ## Use TLS but skip chain & host verification # insecure_skip_verify = false - ## Content encoding for message payloads, can be set to "gzip" to or - ## "identity" to apply no encoding. + ## Content encoding for message payloads, can be set to + ## "gzip", "identity" or "auto" + ## - Use "gzip" to decode gzip + ## - Use "identity" to apply no encoding + ## - Use "auto" determine the encoding using the ContentEncoding header # content_encoding = "identity" ## Data format to consume. @@ -92,3 +95,11 @@ For an introduction to AMQP see: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "influx" ``` + +## Metrics + +TODO + +## Example Output + +TODO diff --git a/plugins/inputs/amqp_consumer/amqp_consumer.go b/plugins/inputs/amqp_consumer/amqp_consumer.go index 2075508e2b88e..b0e902da76cf7 100644 --- a/plugins/inputs/amqp_consumer/amqp_consumer.go +++ b/plugins/inputs/amqp_consumer/amqp_consumer.go @@ -395,6 +395,7 @@ func (a *AMQPConsumer) onMessage(acc telegraf.TrackingAccumulator, d amqp.Delive } } + a.decoder.SetEnconding(d.ContentEncoding) body, err := a.decoder.Decode(d.Body) if err != nil { onError() diff --git a/plugins/inputs/amqp_consumer/amqp_consumer_test.go b/plugins/inputs/amqp_consumer/amqp_consumer_test.go new file mode 100644 index 0000000000000..818c644958987 --- /dev/null +++ b/plugins/inputs/amqp_consumer/amqp_consumer_test.go @@ -0,0 +1,51 @@ +package amqp_consumer + +import ( + "testing" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/parsers/influx" + "github.com/influxdata/telegraf/testutil" + "github.com/rabbitmq/amqp091-go" + "github.com/stretchr/testify/require" +) + +func TestAutoEncoding(t *testing.T) { + enc, err := internal.NewGzipEncoder() + require.NoError(t, err) + payload, err := enc.Encode([]byte(`measurementName fieldKey="gzip" 1556813561098000000`)) + require.NoError(t, err) + + var a AMQPConsumer + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + a.deliveries = make(map[telegraf.TrackingID]amqp091.Delivery) + a.parser = parser + a.decoder, err = internal.NewContentDecoder("auto") + require.NoError(t, err) + + acc := &testutil.Accumulator{} + + d := amqp091.Delivery{ + ContentEncoding: "gzip", + Body: payload, + } + err = a.onMessage(acc, d) + require.NoError(t, err) + acc.AssertContainsFields(t, "measurementName", map[string]interface{}{"fieldKey": "gzip"}) + + encIdentity := internal.NewIdentityEncoder() + require.NoError(t, err) + payload, err = encIdentity.Encode([]byte(`measurementName2 fieldKey="identity" 1556813561098000000`)) + require.NoError(t, err) + + d = amqp091.Delivery{ + ContentEncoding: "not_gzip", + Body: payload, + } + + err = a.onMessage(acc, d) + require.NoError(t, err) + acc.AssertContainsFields(t, "measurementName2", map[string]interface{}{"fieldKey": "identity"}) +} diff --git a/plugins/inputs/amqp_consumer/sample.conf b/plugins/inputs/amqp_consumer/sample.conf index 87992e35c4443..ddb6e1b37a47e 100644 --- a/plugins/inputs/amqp_consumer/sample.conf +++ b/plugins/inputs/amqp_consumer/sample.conf @@ -63,8 +63,11 @@ ## Use TLS but skip chain & host verification # insecure_skip_verify = false - ## Content encoding for message payloads, can be set to "gzip" to or - ## "identity" to apply no encoding. + ## Content encoding for message payloads, can be set to + ## "gzip", "identity" or "auto" + ## - Use "gzip" to decode gzip + ## - Use "identity" to apply no encoding + ## - Use "auto" determine the encoding using the ContentEncoding header # content_encoding = "identity" ## Data format to consume.