From c11d12cf6e80eee6561e931292c7048864106d13 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 14 Jan 2021 20:04:31 -0500 Subject: [PATCH] [libbeat] Fix Kafka output "circuit breaker is open" errors (#23484) --- CHANGELOG.next.asciidoc | 1 + go.mod | 1 + libbeat/outputs/kafka/client.go | 96 ++++++++++++++++++++++++++++++++- 3 files changed, 97 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index a0ae199be4ed..ed6474c4efd8 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -230,6 +230,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix `nested` subfield handling in generated Elasticsearch templates. {issue}23178[23178] {pull}23183[23183] - Fix CPU usage metrics on VMs with dynamic CPU config {pull}23154[23154] - Fix panic due to unhandled DeletedFinalStateUnknown in k8s OnDelete {pull}23419[23419] +- Fix error loop with runaway CPU use when the Kafka output encounters some connection errors {pull}23484[23484] *Auditbeat* diff --git a/go.mod b/go.mod index a6405a3414a7..8e00d8d35174 100644 --- a/go.mod +++ b/go.mod @@ -57,6 +57,7 @@ require ( github.com/dop251/goja v0.0.0-20200831102558-9af81ddcf0e1 github.com/dop251/goja_nodejs v0.0.0-20171011081505-adff31b136e6 github.com/dustin/go-humanize v1.0.0 + github.com/eapache/go-resiliency v1.2.0 github.com/eclipse/paho.mqtt.golang v1.2.1-0.20200121105743-0d940dd29fd2 github.com/elastic/ecs v1.6.0 github.com/elastic/elastic-agent-client/v7 v7.0.0-20200709172729-d43b7ad5833a diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index c785f9e729f2..c505e7d1824f 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -24,8 +24,10 @@ import ( "strings" "sync" "sync/atomic" + "time" "github.com/Shopify/sarama" + "github.com/eapache/go-resiliency/breaker" "github.com/elastic/beats/v7/libbeat/common/fmtstr" "github.com/elastic/beats/v7/libbeat/common/transport" @@ -47,6 +49,7 @@ type client struct { codec codec.Codec config sarama.Config mux sync.Mutex + done chan struct{} producer sarama.AsyncProducer @@ -85,6 +88,7 @@ func newKafkaClient( index: strings.ToLower(index), codec: writer, config: *cfg, + done: make(chan struct{}), } return c, nil } @@ -121,6 +125,7 @@ func (c *client) Close() error { return nil } + close(c.done) c.producer.AsyncClose() c.wg.Wait() c.producer = nil @@ -237,12 +242,92 @@ func (c *client) successWorker(ch <-chan *sarama.ProducerMessage) { } func (c *client) errorWorker(ch <-chan *sarama.ProducerError) { + breakerOpen := false defer c.wg.Done() defer c.log.Debug("Stop kafka error handler") for errMsg := range ch { msg := errMsg.Msg.Metadata.(*message) msg.ref.fail(msg, errMsg.Err) + + if errMsg.Err == breaker.ErrBreakerOpen { + // ErrBreakerOpen is a very special case in Sarama. It happens only when + // there have been repeated critical (broker / topic-level) errors, and it + // puts Sarama into a state where it immediately rejects all input + // for 10 seconds, ignoring retry / backoff settings. + // With this output's current design (in which Publish passes through to + // Sarama's input channel with no further synchronization), retrying + // these failed values causes an infinite retry loop that degrades + // the entire system. + // "Nice" approaches and why we haven't used them: + // - Use exposed API to navigate this state and its effect on retries. + // * Unfortunately, Sarama's circuit breaker and its errors are + // hard-coded and undocumented. We'd like to address this in the + // future. + // - If a batch fails with a circuit breaker error, delay before + // retrying it. + // * This would fix the most urgent performance issues, but requires + // extra bookkeeping because the Kafka output handles each batch + // independently. It results in potentially many batches / 10s of + // thousands of events being loaded and attempted, even though we + // know there's a fatal error early in the first batch. It also + // makes it hard to know when each batch should be retried. + // - In the Kafka Publish method, add a blocking first-pass intake step + // that can gate on error conditions, rather than handing off data + // to Sarama immediately. + // * This would fix the issue but would require a lot of work and + // testing, and we need a fix for the release now. It's also a + // fairly elaborate workaround for something that might be + // easier to fix in the library itself. + // + // Instead, we have applied the following fix, which is not very "nice" + // but satisfies all other important constraints: + // - When we receive a circuit breaker error, sleep for 10 seconds + // (Sarama's hard-coded timeout) on the _error worker thread_. + // + // This works because connection-level errors that can trigger the + // circuit breaker are on the critical path for input processing, and + // thus blocking on the error channel applies back-pressure to the + // input channel. This means that if there are any more errors while the + // error worker is asleep, any call to Publish will block until we + // start reading again. + // + // Reasons this solution is preferred: + // - It responds immediately to Sarama's global error state, rather than + // trying to detect it independently in each batch or adding more + // cumbersome synchronization to the output + // - It gives the minimal delay that is consistent with Sarama's + // internal behavior + // - It requires only a few lines of code and no design changes + // + // That said, this is still relying on undocumented library internals + // for correct behavior, which isn't ideal, but the error itself is an + // undocumented library internal, so this is de facto necessary for now. + // We'd like to have a more official / permanent fix merged into Sarama + // itself in the future. + + // The "breakerOpen" flag keeps us from sleeping the first time we see + // a circuit breaker error, because it might be an old error still + // sitting in the channel from 10 seconds ago. So we only end up + // sleeping every _other_ reported breaker error. + if breakerOpen { + // Immediately log the error that presumably caused this state, + // since the error reporting on this batch will be delayed. + if msg.ref.err != nil { + c.log.Errorf("Kafka (topic=%v): %v", msg.topic, msg.ref.err) + } + select { + case <-time.After(10 * time.Second): + // Sarama's circuit breaker is hard-coded to reject all inputs + // for 10sec. + case <-msg.ref.client.done: + // Allow early bailout if the output itself is closing. + } + breakerOpen = false + } else { + breakerOpen = true + } + } } } @@ -262,9 +347,18 @@ func (r *msgRef) fail(msg *message, err error) { len(msg.key)+len(msg.value)) r.client.observer.Dropped(1) + case breaker.ErrBreakerOpen: + // Add this message to the failed list, but don't overwrite r.err since + // all the breaker error means is "there were a lot of other errors". + r.failed = append(r.failed, msg.data) + default: r.failed = append(r.failed, msg.data) - r.err = err + if r.err == nil { + // Don't overwrite an existing error. This way at tne end of the batch + // we report the first error that we saw, rather than the last one. + r.err = err + } } r.dec() }