diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index d35174045f70..97fb1a8fe2df 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -219,6 +219,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add support for basic ECS logging. {pull}17974[17974] - Add config example of how to skip the `add_host_metadata` processor when forwarding logs. {issue}13920[13920] {pull}18153[18153] - When using the `decode_json_fields` processor, decoded fields are now deep-merged into existing event. {pull}17958[17958] +- Add backoff configuration options for the Kafka output. {issue}16777[16777] {pull}17808[17808] *Auditbeat* diff --git a/auditbeat/auditbeat.reference.yml b/auditbeat/auditbeat.reference.yml index 552398937c1f..95bdb13eb7ce 100644 --- a/auditbeat/auditbeat.reference.yml +++ b/auditbeat/auditbeat.reference.yml @@ -734,6 +734,17 @@ output.elasticsearch: # until all events are published. The default is 3. #max_retries: 3 + # The number of seconds to wait before trying to republish to Kafka + # after a network error. After waiting backoff.init seconds, the Beat + # tries to republish. If the attempt fails, the backoff timer is increased + # exponentially up to backoff.max. After a successful publish, the backoff + # timer is reset. The default is 1s. + #backoff.init: 1s + + # The maximum number of seconds to wait before attempting to republish to + # Kafka after a network error. The default is 60s. + #backoff.max: 60s + # The maximum number of events to bulk in a single Kafka request. The default # is 2048. #bulk_max_size: 2048 diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index 84c5eec57604..b793c73cfa42 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -1447,6 +1447,17 @@ output.elasticsearch: # until all events are published. The default is 3. #max_retries: 3 + # The number of seconds to wait before trying to republish to Kafka + # after a network error. After waiting backoff.init seconds, the Beat + # tries to republish. If the attempt fails, the backoff timer is increased + # exponentially up to backoff.max. After a successful publish, the backoff + # timer is reset. The default is 1s. + #backoff.init: 1s + + # The maximum number of seconds to wait before attempting to republish to + # Kafka after a network error. The default is 60s. + #backoff.max: 60s + # The maximum number of events to bulk in a single Kafka request. The default # is 2048. #bulk_max_size: 2048 diff --git a/heartbeat/heartbeat.reference.yml b/heartbeat/heartbeat.reference.yml index b71f30bedfd7..9ceb79b1e9fd 100644 --- a/heartbeat/heartbeat.reference.yml +++ b/heartbeat/heartbeat.reference.yml @@ -884,6 +884,17 @@ output.elasticsearch: # until all events are published. The default is 3. #max_retries: 3 + # The number of seconds to wait before trying to republish to Kafka + # after a network error. After waiting backoff.init seconds, the Beat + # tries to republish. If the attempt fails, the backoff timer is increased + # exponentially up to backoff.max. After a successful publish, the backoff + # timer is reset. The default is 1s. + #backoff.init: 1s + + # The maximum number of seconds to wait before attempting to republish to + # Kafka after a network error. The default is 60s. + #backoff.max: 60s + # The maximum number of events to bulk in a single Kafka request. The default # is 2048. #bulk_max_size: 2048 diff --git a/journalbeat/journalbeat.reference.yml b/journalbeat/journalbeat.reference.yml index e2b236a60999..7a9b0a62e2a1 100644 --- a/journalbeat/journalbeat.reference.yml +++ b/journalbeat/journalbeat.reference.yml @@ -671,6 +671,17 @@ output.elasticsearch: # until all events are published. The default is 3. #max_retries: 3 + # The number of seconds to wait before trying to republish to Kafka + # after a network error. After waiting backoff.init seconds, the Beat + # tries to republish. If the attempt fails, the backoff timer is increased + # exponentially up to backoff.max. After a successful publish, the backoff + # timer is reset. The default is 1s. + #backoff.init: 1s + + # The maximum number of seconds to wait before attempting to republish to + # Kafka after a network error. The default is 60s. + #backoff.max: 60s + # The maximum number of events to bulk in a single Kafka request. The default # is 2048. #bulk_max_size: 2048 diff --git a/libbeat/_meta/config/output-kafka.reference.yml.tmpl b/libbeat/_meta/config/output-kafka.reference.yml.tmpl index 50efb87fbb15..87b24c8d72eb 100644 --- a/libbeat/_meta/config/output-kafka.reference.yml.tmpl +++ b/libbeat/_meta/config/output-kafka.reference.yml.tmpl @@ -70,6 +70,17 @@ # until all events are published. The default is 3. #max_retries: 3 + # The number of seconds to wait before trying to republish to Kafka + # after a network error. After waiting backoff.init seconds, the Beat + # tries to republish. If the attempt fails, the backoff timer is increased + # exponentially up to backoff.max. After a successful publish, the backoff + # timer is reset. The default is 1s. + #backoff.init: 1s + + # The maximum number of seconds to wait before attempting to republish to + # Kafka after a network error. The default is 60s. + #backoff.max: 60s + # The maximum number of events to bulk in a single Kafka request. The default # is 2048. #bulk_max_size: 2048 diff --git a/libbeat/common/backoff/backoff.go b/libbeat/common/backoff/backoff.go index a027fb51a461..5439e38cfbb2 100644 --- a/libbeat/common/backoff/backoff.go +++ b/libbeat/common/backoff/backoff.go @@ -19,7 +19,10 @@ package backoff // Backoff defines the interface for backoff strategies. type Backoff interface { + // Wait blocks for a duration of time governed by the backoff strategy. Wait() bool + + // Reset resets the backoff duration to an initial value governed by the backoff strategy. Reset() } diff --git a/libbeat/internal/testutil/util.go b/libbeat/internal/testutil/util.go new file mode 100644 index 000000000000..51a811587e58 --- /dev/null +++ b/libbeat/internal/testutil/util.go @@ -0,0 +1,41 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This file contains commonly-used utility functions for testing. + +package testutil + +import ( + "flag" + "math/rand" + "testing" + "time" +) + +var ( + SeedFlag = flag.Int64("seed", 0, "Randomization seed") +) + +func SeedPRNG(t *testing.T) { + seed := *SeedFlag + if seed == 0 { + seed = time.Now().UnixNano() + } + + t.Logf("reproduce test with `go test ... -seed %v`", seed) + rand.Seed(seed) +} diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index 3174646da4af..fd3151b92c7d 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -20,6 +20,8 @@ package kafka import ( "errors" "fmt" + "math" + "math/rand" "strings" "time" @@ -37,6 +39,11 @@ import ( "github.com/elastic/beats/v7/libbeat/outputs/codec" ) +type backoffConfig struct { + Init time.Duration `config:"init"` + Max time.Duration `config:"max"` +} + type kafkaConfig struct { Hosts []string `config:"hosts" validate:"required"` TLS *tlscommon.Config `config:"ssl"` @@ -55,6 +62,7 @@ type kafkaConfig struct { BulkMaxSize int `config:"bulk_max_size"` BulkFlushFrequency time.Duration `config:"bulk_flush_frequency"` MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"` + Backoff backoffConfig `config:"backoff"` ClientID string `config:"client_id"` ChanBufferSize int `config:"channel_buffer_size" validate:"min=1"` Username string `config:"username"` @@ -122,10 +130,14 @@ func defaultConfig() kafkaConfig { CompressionLevel: 4, Version: kafka.Version("1.0.0"), MaxRetries: 3, - ClientID: "beats", - ChanBufferSize: 256, - Username: "", - Password: "", + Backoff: backoffConfig{ + Init: 1 * time.Second, + Max: 60 * time.Second, + }, + ClientID: "beats", + ChanBufferSize: 256, + Username: "", + Password: "", } } @@ -272,7 +284,7 @@ func newSaramaConfig(log *logp.Logger, config *kafkaConfig) (*sarama.Config, err retryMax = 1000 } k.Producer.Retry.Max = retryMax - // TODO: k.Producer.Retry.Backoff = ? + k.Producer.Retry.BackoffFunc = makeBackoffFunc(config.Backoff) // configure per broker go channel buffering k.ChannelBufferSize = config.ChanBufferSize @@ -307,3 +319,23 @@ func newSaramaConfig(log *logp.Logger, config *kafkaConfig) (*sarama.Config, err } return k, nil } + +// makeBackoffFunc returns a stateless implementation of exponential-backoff-with-jitter. It is conceptually +// equivalent to the stateful implementation used by other outputs, EqualJitterBackoff. +func makeBackoffFunc(cfg backoffConfig) func(retries, maxRetries int) time.Duration { + maxBackoffRetries := int(math.Ceil(math.Log2(float64(cfg.Max) / float64(cfg.Init)))) + + return func(retries, _ int) time.Duration { + // compute 'base' duration for exponential backoff + dur := cfg.Max + if retries < maxBackoffRetries { + dur = time.Duration(uint64(cfg.Init) * uint64(1<