Skip to content

Commit

Permalink
[libbeat] Fix Kafka output "circuit breaker is open" errors (elastic#…
Browse files Browse the repository at this point in the history
  • Loading branch information
faec authored Jan 15, 2021
1 parent 6986c84 commit c11d12c
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix `nested` subfield handling in generated Elasticsearch templates. {issue}23178[23178] {pull}23183[23183]
- Fix CPU usage metrics on VMs with dynamic CPU config {pull}23154[23154]
- Fix panic due to unhandled DeletedFinalStateUnknown in k8s OnDelete {pull}23419[23419]
- Fix error loop with runaway CPU use when the Kafka output encounters some connection errors {pull}23484[23484]

*Auditbeat*

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ require (
github.com/dop251/goja v0.0.0-20200831102558-9af81ddcf0e1
github.com/dop251/goja_nodejs v0.0.0-20171011081505-adff31b136e6
github.com/dustin/go-humanize v1.0.0
github.com/eapache/go-resiliency v1.2.0
github.com/eclipse/paho.mqtt.golang v1.2.1-0.20200121105743-0d940dd29fd2
github.com/elastic/ecs v1.6.0
github.com/elastic/elastic-agent-client/v7 v7.0.0-20200709172729-d43b7ad5833a
Expand Down
96 changes: 95 additions & 1 deletion libbeat/outputs/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (
"strings"
"sync"
"sync/atomic"
"time"

"github.com/Shopify/sarama"
"github.com/eapache/go-resiliency/breaker"

"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/common/transport"
Expand All @@ -47,6 +49,7 @@ type client struct {
codec codec.Codec
config sarama.Config
mux sync.Mutex
done chan struct{}

producer sarama.AsyncProducer

Expand Down Expand Up @@ -85,6 +88,7 @@ func newKafkaClient(
index: strings.ToLower(index),
codec: writer,
config: *cfg,
done: make(chan struct{}),
}
return c, nil
}
Expand Down Expand Up @@ -121,6 +125,7 @@ func (c *client) Close() error {
return nil
}

close(c.done)
c.producer.AsyncClose()
c.wg.Wait()
c.producer = nil
Expand Down Expand Up @@ -237,12 +242,92 @@ func (c *client) successWorker(ch <-chan *sarama.ProducerMessage) {
}

func (c *client) errorWorker(ch <-chan *sarama.ProducerError) {
breakerOpen := false
defer c.wg.Done()
defer c.log.Debug("Stop kafka error handler")

for errMsg := range ch {
msg := errMsg.Msg.Metadata.(*message)
msg.ref.fail(msg, errMsg.Err)

if errMsg.Err == breaker.ErrBreakerOpen {
// ErrBreakerOpen is a very special case in Sarama. It happens only when
// there have been repeated critical (broker / topic-level) errors, and it
// puts Sarama into a state where it immediately rejects all input
// for 10 seconds, ignoring retry / backoff settings.
// With this output's current design (in which Publish passes through to
// Sarama's input channel with no further synchronization), retrying
// these failed values causes an infinite retry loop that degrades
// the entire system.
// "Nice" approaches and why we haven't used them:
// - Use exposed API to navigate this state and its effect on retries.
// * Unfortunately, Sarama's circuit breaker and its errors are
// hard-coded and undocumented. We'd like to address this in the
// future.
// - If a batch fails with a circuit breaker error, delay before
// retrying it.
// * This would fix the most urgent performance issues, but requires
// extra bookkeeping because the Kafka output handles each batch
// independently. It results in potentially many batches / 10s of
// thousands of events being loaded and attempted, even though we
// know there's a fatal error early in the first batch. It also
// makes it hard to know when each batch should be retried.
// - In the Kafka Publish method, add a blocking first-pass intake step
// that can gate on error conditions, rather than handing off data
// to Sarama immediately.
// * This would fix the issue but would require a lot of work and
// testing, and we need a fix for the release now. It's also a
// fairly elaborate workaround for something that might be
// easier to fix in the library itself.
//
// Instead, we have applied the following fix, which is not very "nice"
// but satisfies all other important constraints:
// - When we receive a circuit breaker error, sleep for 10 seconds
// (Sarama's hard-coded timeout) on the _error worker thread_.
//
// This works because connection-level errors that can trigger the
// circuit breaker are on the critical path for input processing, and
// thus blocking on the error channel applies back-pressure to the
// input channel. This means that if there are any more errors while the
// error worker is asleep, any call to Publish will block until we
// start reading again.
//
// Reasons this solution is preferred:
// - It responds immediately to Sarama's global error state, rather than
// trying to detect it independently in each batch or adding more
// cumbersome synchronization to the output
// - It gives the minimal delay that is consistent with Sarama's
// internal behavior
// - It requires only a few lines of code and no design changes
//
// That said, this is still relying on undocumented library internals
// for correct behavior, which isn't ideal, but the error itself is an
// undocumented library internal, so this is de facto necessary for now.
// We'd like to have a more official / permanent fix merged into Sarama
// itself in the future.

// The "breakerOpen" flag keeps us from sleeping the first time we see
// a circuit breaker error, because it might be an old error still
// sitting in the channel from 10 seconds ago. So we only end up
// sleeping every _other_ reported breaker error.
if breakerOpen {
// Immediately log the error that presumably caused this state,
// since the error reporting on this batch will be delayed.
if msg.ref.err != nil {
c.log.Errorf("Kafka (topic=%v): %v", msg.topic, msg.ref.err)
}
select {
case <-time.After(10 * time.Second):
// Sarama's circuit breaker is hard-coded to reject all inputs
// for 10sec.
case <-msg.ref.client.done:
// Allow early bailout if the output itself is closing.
}
breakerOpen = false
} else {
breakerOpen = true
}
}
}
}

Expand All @@ -262,9 +347,18 @@ func (r *msgRef) fail(msg *message, err error) {
len(msg.key)+len(msg.value))
r.client.observer.Dropped(1)

case breaker.ErrBreakerOpen:
// Add this message to the failed list, but don't overwrite r.err since
// all the breaker error means is "there were a lot of other errors".
r.failed = append(r.failed, msg.data)

default:
r.failed = append(r.failed, msg.data)
r.err = err
if r.err == nil {
// Don't overwrite an existing error. This way at tne end of the batch
// we report the first error that we saw, rather than the last one.
r.err = err
}
}
r.dec()
}
Expand Down

0 comments on commit c11d12c

Please sign in to comment.