Skip to content

Commit

Permalink
Store broker offset history (linkedin#338)
Browse files Browse the repository at this point in the history
* Add a ring of broker offsets for each partition to keep a short history of partition LEO

* Clean up repetitive casting in tests

* Add asserts to check for broker offset history

* Delay alerting for stopped partition

In order to account for the race condition described in linkedin#303 (second part), we need to delay alerting for a stopped partition for a short period of time. We do that by only marking the partition stopped if the timestamps show it is stopped AND if the partition did not have zero lag against any recent broker LEO. In the case where the intervals config for storage is 10 and the cluster offset-refresh is 30 seconds, this would give the consumer 5 minutes to consume from a slow partition and commit an offset before it gets marked as stopped.

* Update current tests for the evaluator to handle the new recent lag check

* Reformat test objects so they're readable

* Add a test to cover a slow data partition as described in linkedin#303

* gofmt fixes
  • Loading branch information
Todd Palino authored Jan 30, 2018
1 parent 1a60efe commit 389ef47
Show file tree
Hide file tree
Showing 5 changed files with 315 additions and 108 deletions.
24 changes: 20 additions & 4 deletions core/internal/evaluator/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,15 +295,18 @@ func evaluatePartitionStatus(partition *protocol.ConsumerPartition) *protocol.Pa
status.Start = offsets[0]
status.End = offsets[len(offsets)-1]

status.Status = calculatePartitionStatus(offsets, partition.CurrentLag, time.Now().Unix())
status.Status = calculatePartitionStatus(offsets, partition.BrokerOffsets, partition.CurrentLag, time.Now().Unix())
return status
}

func calculatePartitionStatus(offsets []*protocol.ConsumerOffset, currentLag uint64, timeNow int64) protocol.StatusConstant {
func calculatePartitionStatus(offsets []*protocol.ConsumerOffset, brokerOffsets []int64, currentLag uint64, timeNow int64) protocol.StatusConstant {
// If the current lag is zero, the partition is never in error
if currentLag > 0 {
// Check if the partition is stopped first, as this is a problem even if the consumer had zero lag at some point
if checkIfOffsetsStopped(offsets, timeNow) {
// Check if the partition is stopped first, as this is a problem even if the consumer had zero lag at some
// point in its commit history (as the commit history could be very old). However, if the recent broker offsets
// for this partition show that the consumer had zero lag recently ("intervals * offset-refresh" should be on
// the order of minutes), don't consider it stopped yet.
if checkIfOffsetsStopped(offsets, timeNow) && (!checkIfRecentLagZero(offsets, brokerOffsets)) {
return protocol.StatusStop
}

Expand Down Expand Up @@ -372,3 +375,16 @@ func checkIfLagNotDecreasing(offsets []*protocol.ConsumerOffset) bool {
}
return true
}

// Using the most recent committed offset, return true if there was zero lag at some point in the stored broker
// LEO offsets. This has the effect of returning true if the consumer was up to date on this partition in recent
// (minutes) history, so it can be used to delay alerting for a short period of time.
func checkIfRecentLagZero(offsets []*protocol.ConsumerOffset, brokerOffsets []int64) bool {
lastOffset := offsets[len(offsets)-1].Offset
for i := 0; i < len(brokerOffsets); i++ {
if brokerOffsets[i] <= lastOffset {
return true
}
}
return false
}
Loading

0 comments on commit 389ef47

Please sign in to comment.