Skip to content

Commit

Permalink
KAFKA-5413; Log cleaner fails due to large offset in segment file
Browse files Browse the repository at this point in the history
the contribution is my original work and I license the work to the project under the project's open source license.

junrao , I had already made the code change before your last comment.  I've done pretty much what you said, except that I've not used the current segment because I wasn't sure if it will always be available.
I'm happy to change it if you prefer.
I've run all the unit and integration tests which all passed.

Author: Kelvin Rutt <[email protected]>
Author: Kelvin Rutt <[email protected]>

Reviewers: Jun Rao <[email protected]>

Closes apache#3357 from kelvinrutt/kafka_5413_bugfix
  • Loading branch information
kelvinrutt authored and junrao committed Jun 21, 2017
1 parent de982ba commit 76f6e14
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 11 deletions.
29 changes: 26 additions & 3 deletions core/src/main/scala/kafka/log/LogCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -362,9 +362,10 @@ private[log] class Cleaner(val id: Int,
// this is the lower of the last active segment and the compaction lag
val cleanableHorizonMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L)


// group the segments and clean the groups
info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs)))
for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize))
for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize, cleanable.firstUncleanableOffset))
cleanSegments(log, group, offsetMap, deleteHorizonMs, stats)

// record buffer utilization
Expand Down Expand Up @@ -616,7 +617,7 @@ private[log] class Cleaner(val id: Int,
*
* @return A list of grouped segments
*/
private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]] = {
private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: Int, maxIndexSize: Int, firstUncleanableOffset: Long): List[Seq[LogSegment]] = {
var grouped = List[List[LogSegment]]()
var segs = segments.toList
while(segs.nonEmpty) {
Expand All @@ -629,7 +630,7 @@ private[log] class Cleaner(val id: Int,
logSize + segs.head.size <= maxSize &&
indexSize + segs.head.index.sizeInBytes <= maxIndexSize &&
timeIndexSize + segs.head.timeIndex.sizeInBytes <= maxIndexSize &&
segs.head.index.lastOffset - group.last.index.baseOffset <= Int.MaxValue) {
lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset <= Int.MaxValue) {
group = segs.head :: group
logSize += segs.head.size
indexSize += segs.head.index.sizeInBytes
Expand All @@ -641,6 +642,28 @@ private[log] class Cleaner(val id: Int,
grouped.reverse
}

/**
* We want to get the last offset in the first log segment in segs.
* LogSegment.nextOffset() gives the exact last offset in a segment, but can be expensive since it requires
* scanning the segment from the last index entry.
* Therefore, we estimate the last offset of the first log segment by using
* the base offset of the next segment in the list.
* If the next segment doesn't exist, first Uncleanable Offset will be used.
*
* @param segs - remaining segments to group.
* @return The estimated last offset for the first segment in segs
*/
private def lastOffsetForFirstSegment(segs: List[LogSegment], firstUncleanableOffset: Long): Long = {
if (segs.size > 1) {
/* if there is a next segment, use its base offset as the bounding offset to guarantee we know
* the worst case offset */
segs(1).baseOffset - 1
} else {
//for the last segment in the list, use the first uncleanable offset.
firstUncleanableOffset - 1
}
}

/**
* Build a map of key_hash => offset for the keys in the cleanable dirty portion of the log to use in cleaning.
* @param log The log to use
Expand Down
58 changes: 50 additions & 8 deletions core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -642,17 +642,17 @@ class LogCleanerTest extends JUnitSuite {
}

// grouping by very large values should result in a single group with all the segments in it
var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue)
var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, log.logEndOffset)
assertEquals(1, groups.size)
assertEquals(log.numberOfSegments, groups.head.size)
checkSegmentOrder(groups)

// grouping by very small values should result in all groups having one entry
groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 1, maxIndexSize = Int.MaxValue)
groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 1, maxIndexSize = Int.MaxValue, log.logEndOffset)
assertEquals(log.numberOfSegments, groups.size)
assertTrue("All groups should be singletons.", groups.forall(_.size == 1))
checkSegmentOrder(groups)
groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = 1)
groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = 1, log.logEndOffset)
assertEquals(log.numberOfSegments, groups.size)
assertTrue("All groups should be singletons.", groups.forall(_.size == 1))
checkSegmentOrder(groups)
Expand All @@ -661,13 +661,13 @@ class LogCleanerTest extends JUnitSuite {

// check grouping by log size
val logSize = log.logSegments.take(groupSize).map(_.size).sum.toInt + 1
groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = logSize, maxIndexSize = Int.MaxValue)
groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = logSize, maxIndexSize = Int.MaxValue, log.logEndOffset)
checkSegmentOrder(groups)
assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize))

// check grouping by index size
val indexSize = log.logSegments.take(groupSize).map(_.index.sizeInBytes).sum + 1
groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = indexSize)
groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = indexSize, log.logEndOffset)
checkSegmentOrder(groups)
assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize))
}
Expand Down Expand Up @@ -699,28 +699,70 @@ class LogCleanerTest extends JUnitSuite {
assertEquals(Int.MaxValue, log.activeSegment.index.lastOffset)

// grouping should result in a single group with maximum relative offset of Int.MaxValue
var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue)
var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, log.logEndOffset)
assertEquals(1, groups.size)

// append another message, making last offset of second segment > Int.MaxValue
log.appendAsLeader(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes), leaderEpoch = 0)

// grouping should not group the two segments to ensure that maximum relative offset in each group <= Int.MaxValue
groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue)
groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, log.logEndOffset)
assertEquals(2, groups.size)
checkSegmentOrder(groups)

// append more messages, creating new segments, further grouping should still occur
while (log.numberOfSegments < 4)
log.appendAsLeader(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes), leaderEpoch = 0)

groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue)
groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, log.logEndOffset)
assertEquals(log.numberOfSegments - 1, groups.size)
for (group <- groups)
assertTrue("Relative offset greater than Int.MaxValue", group.last.index.lastOffset - group.head.index.baseOffset <= Int.MaxValue)
checkSegmentOrder(groups)
}

/**
* Following the loading of a log segment where the index file is zero sized,
* the index returned would be the base offset. Sometimes the log file would
* contain data with offsets in excess of the baseOffset which would cause
* the log cleaner to group together segments with a range of > Int.MaxValue
* this test replicates that scenario to ensure that the segments are grouped
* correctly.
*/
@Test
def testSegmentGroupingFollowingLoadOfZeroIndex(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)

val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 400: java.lang.Integer)

//mimic the effect of loading an empty index file
logProps.put(LogConfig.IndexIntervalBytesProp, 400: java.lang.Integer)

val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))

val record1 = messageWithOffset("hello".getBytes, "hello".getBytes, 0)
log.appendAsFollower(record1)
val record2 = messageWithOffset("hello".getBytes, "hello".getBytes, 1)
log.appendAsFollower(record2)
log.roll(Int.MaxValue/2) // starting a new log segment at offset Int.MaxValue/2
val record3 = messageWithOffset("hello".getBytes, "hello".getBytes, Int.MaxValue/2)
log.appendAsFollower(record3)
val record4 = messageWithOffset("hello".getBytes, "hello".getBytes, Int.MaxValue.toLong + 1)
log.appendAsFollower(record4)

assertTrue("Actual offset range should be > Int.MaxValue", log.logEndOffset - 1 - log.logStartOffset > Int.MaxValue)
assertTrue("index.lastOffset is reporting the wrong last offset", log.logSegments.last.index.lastOffset - log.logStartOffset <= Int.MaxValue)

// grouping should result in two groups because the second segment takes the offset range > MaxInt
val groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, log.logEndOffset)
assertEquals(2, groups.size)

for (group <- groups)
assertTrue("Relative offset greater than Int.MaxValue", group.last.nextOffset() - 1 - group.head.baseOffset <= Int.MaxValue)
checkSegmentOrder(groups)
}

private def checkSegmentOrder(groups: Seq[Seq[LogSegment]]): Unit = {
val offsets = groups.flatMap(_.map(_.baseOffset))
assertEquals("Offsets should be in increasing order.", offsets.sorted, offsets)
Expand Down

0 comments on commit 76f6e14

Please sign in to comment.