Skip to content

Commit

Permalink
KAFKA-6266: Repeated occurrence of WARN Resetting first dirty offset (a…
Browse files Browse the repository at this point in the history
…pache#8089)

Previously, checkpointed offsets for a log were only updated if the log was chosen for cleaning once the cleaning job completes. This caused issues in cases where logs with invalid checkpointed offsets would repeatedly emit warnings if the log with an invalid cleaning checkpoint wasn't chosen for cleaning.

Proposed fix is to update the checkpointed offset for logs with invalid checkpoints regardless of whether it gets chosen for cleaning.

Reviewers: Anna Povzner <[email protected]>, Jun Rao <[email protected]>
  • Loading branch information
splett2 authored Feb 18, 2020
1 parent ebcdcd9 commit 72122fc
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 24 deletions.
41 changes: 29 additions & 12 deletions core/src/main/scala/kafka/log/LogCleanerManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
partitions.iterator.map { tp =>
val log = logs.get(tp)
val lastCleanOffset = lastClean.get(tp)
val (firstDirtyOffset, firstUncleanableDirtyOffset) = cleanableOffsets(log, lastCleanOffset, now)
val (_, uncleanableBytes) = calculateCleanableBytes(log, firstDirtyOffset, firstUncleanableDirtyOffset)
val offsetsToClean = cleanableOffsets(log, lastCleanOffset, now)
val (_, uncleanableBytes) = calculateCleanableBytes(log, offsetsToClean.firstDirtyOffset, offsetsToClean.firstUncleanableDirtyOffset)
uncleanableBytes
}.sum
case None => 0
Expand Down Expand Up @@ -179,11 +179,14 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
case (topicPartition, log) => // create a LogToClean instance for each
try {
val lastCleanOffset = lastClean.get(topicPartition)
val (firstDirtyOffset, firstUncleanableDirtyOffset) = cleanableOffsets(log, lastCleanOffset, now)
val compactionDelayMs = maxCompactionDelay(log, firstDirtyOffset, now)
val offsetsToClean = cleanableOffsets(log, lastCleanOffset, now)
// update checkpoint for logs with invalid checkpointed offsets
if (offsetsToClean.forceUpdateCheckpoint)
updateCheckpoints(log.dir.getParentFile(), Option(topicPartition, offsetsToClean.firstDirtyOffset))
val compactionDelayMs = maxCompactionDelay(log, offsetsToClean.firstDirtyOffset, now)
preCleanStats.updateMaxCompactionDelay(compactionDelayMs)

LogToClean(topicPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset, compactionDelayMs > 0)
LogToClean(topicPartition, log, offsetsToClean.firstDirtyOffset, offsetsToClean.firstUncleanableDirtyOffset, compactionDelayMs > 0)
} catch {
case e: Throwable => throw new LogCleaningException(log,
s"Failed to calculate log cleaning stats for partition $topicPartition", e)
Expand Down Expand Up @@ -480,6 +483,20 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
}
}

/**
* Helper class for the range of cleanable dirty offsets of a log and whether to update the checkpoint associated with
* the log
*
* @param firstDirtyOffset the lower (inclusive) offset to begin cleaning from
* @param firstUncleanableDirtyOffset the upper(exclusive) offset to clean to
* @param forceUpdateCheckpoint whether to update the checkpoint associated with this log. if true, checkpoint should be
* reset to firstDirtyOffset
*/
private case class OffsetsToClean(firstDirtyOffset: Long,
firstUncleanableDirtyOffset: Long,
forceUpdateCheckpoint: Boolean = false) {
}

private[log] object LogCleanerManager extends Logging {

def isCompactAndDelete(log: Log): Boolean = {
Expand Down Expand Up @@ -515,12 +532,12 @@ private[log] object LogCleanerManager extends Logging {
* @param log the log
* @param lastCleanOffset the last checkpointed offset
* @param now the current time in milliseconds of the cleaning operation
* @return the lower (inclusive) and upper (exclusive) offsets
* @return OffsetsToClean containing offsets for cleanable portion of log and whether the log checkpoint needs updating
*/
def cleanableOffsets(log: Log, lastCleanOffset: Option[Long], now: Long): (Long, Long) = {
def cleanableOffsets(log: Log, lastCleanOffset: Option[Long], now: Long): OffsetsToClean = {
// If the log segments are abnormally truncated and hence the checkpointed offset is no longer valid;
// reset to the log starting offset and log the error
val firstDirtyOffset = {
val (firstDirtyOffset, forceUpdateCheckpoint) = {
val logStartOffset = log.logStartOffset
val checkpointDirtyOffset = lastCleanOffset.getOrElse(logStartOffset)

Expand All @@ -529,15 +546,15 @@ private[log] object LogCleanerManager extends Logging {
if (!isCompactAndDelete(log))
warn(s"Resetting first dirty offset of ${log.name} to log start offset $logStartOffset " +
s"since the checkpointed offset $checkpointDirtyOffset is invalid.")
logStartOffset
(logStartOffset, true)
} else if (checkpointDirtyOffset > log.logEndOffset) {
// The dirty offset has gotten ahead of the log end offset. This could happen if there was data
// corruption at the end of the log. We conservatively assume that the full log needs cleaning.
warn(s"The last checkpoint dirty offset for partition ${log.name} is $checkpointDirtyOffset, " +
s"which is larger than the log end offset ${log.logEndOffset}. Resetting to the log start offset $logStartOffset.")
logStartOffset
(logStartOffset, true)
} else {
checkpointDirtyOffset
(checkpointDirtyOffset, false)
}
}

Expand Down Expand Up @@ -572,7 +589,7 @@ private[log] object LogCleanerManager extends Logging {
s"now=$now => firstDirtyOffset=$firstDirtyOffset firstUncleanableOffset=$firstUncleanableDirtyOffset " +
s"activeSegment.baseOffset=${log.activeSegment.baseOffset}")

(firstDirtyOffset, math.max(firstDirtyOffset, firstUncleanableDirtyOffset))
OffsetsToClean(firstDirtyOffset, math.max(firstDirtyOffset, firstUncleanableDirtyOffset), forceUpdateCheckpoint)
}

/**
Expand Down
87 changes: 75 additions & 12 deletions core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ class LogCleanerManagerTest extends Logging {
override def allCleanerCheckpoints: Map[TopicPartition, Long] = {
cleanerCheckpoints.toMap
}

override def updateCheckpoints(dataDir: File, update: Option[(TopicPartition,Long)]): Unit = {
val (tp, offset) = update.getOrElse(throw new IllegalArgumentException("update=None argument not yet handled"))
cleanerCheckpoints.put(tp, offset)
}
}

@After
Expand Down Expand Up @@ -423,8 +428,8 @@ class LogCleanerManagerTest extends Logging {

val lastCleanOffset = Some(0L)
val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, time.milliseconds)
assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1)
assertEquals("The first uncleanable offset begins with the active segment.", log.activeSegment.baseOffset, cleanableOffsets._2)
assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets.firstDirtyOffset)
assertEquals("The first uncleanable offset begins with the active segment.", log.activeSegment.baseOffset, cleanableOffsets.firstUncleanableDirtyOffset)
}

/**
Expand Down Expand Up @@ -453,8 +458,8 @@ class LogCleanerManagerTest extends Logging {

val lastCleanOffset = Some(0L)
val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, time.milliseconds)
assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1)
assertEquals("The first uncleanable offset begins with the second block of log entries.", activeSegAtT0.baseOffset, cleanableOffsets._2)
assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets.firstDirtyOffset)
assertEquals("The first uncleanable offset begins with the second block of log entries.", activeSegAtT0.baseOffset, cleanableOffsets.firstUncleanableDirtyOffset)
}

/**
Expand All @@ -478,8 +483,27 @@ class LogCleanerManagerTest extends Logging {

val lastCleanOffset = Some(0L)
val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, time.milliseconds)
assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1)
assertEquals("The first uncleanable offset begins with active segment.", log.activeSegment.baseOffset, cleanableOffsets._2)
assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets.firstDirtyOffset)
assertEquals("The first uncleanable offset begins with active segment.", log.activeSegment.baseOffset, cleanableOffsets.firstUncleanableDirtyOffset)
}

@Test
def testCleanableOffsetsNeedsCheckpointReset(): Unit = {
val tp = new TopicPartition("foo", 0)
val logs = setupIncreasinglyFilthyLogs(Seq(tp), startNumBatches = 20, batchIncrement = 5)
logs.get(tp).maybeIncrementLogStartOffset(10L)

var lastCleanOffset = Some(15L)
var cleanableOffsets = LogCleanerManager.cleanableOffsets(logs.get(tp), lastCleanOffset, time.milliseconds)
assertFalse("Checkpoint offset should not be reset if valid", cleanableOffsets.forceUpdateCheckpoint)

logs.get(tp).maybeIncrementLogStartOffset(20L)
cleanableOffsets = LogCleanerManager.cleanableOffsets(logs.get(tp), lastCleanOffset, time.milliseconds)
assertTrue("Checkpoint offset needs to be reset if less than log start offset", cleanableOffsets.forceUpdateCheckpoint)

lastCleanOffset = Some(25L)
cleanableOffsets = LogCleanerManager.cleanableOffsets(logs.get(tp), lastCleanOffset, time.milliseconds)
assertTrue("Checkpoint offset needs to be reset if greater than log end offset", cleanableOffsets.forceUpdateCheckpoint)
}

@Test
Expand All @@ -505,8 +529,8 @@ class LogCleanerManagerTest extends Logging {
time.sleep(compactionLag + 1)
// although the compaction lag has been exceeded, the undecided data should not be cleaned
var cleanableOffsets = LogCleanerManager.cleanableOffsets(log, Some(0L), time.milliseconds())
assertEquals(0L, cleanableOffsets._1)
assertEquals(0L, cleanableOffsets._2)
assertEquals(0L, cleanableOffsets.firstDirtyOffset)
assertEquals(0L, cleanableOffsets.firstUncleanableDirtyOffset)

log.appendAsLeader(MemoryRecords.withEndTransactionMarker(time.milliseconds(), producerId, producerEpoch,
new EndTransactionMarker(ControlRecordType.ABORT, 15)), leaderEpoch = 0,
Expand All @@ -516,15 +540,15 @@ class LogCleanerManagerTest extends Logging {

// the first segment should now become cleanable immediately
cleanableOffsets = LogCleanerManager.cleanableOffsets(log, Some(0L), time.milliseconds())
assertEquals(0L, cleanableOffsets._1)
assertEquals(3L, cleanableOffsets._2)
assertEquals(0L, cleanableOffsets.firstDirtyOffset)
assertEquals(3L, cleanableOffsets.firstUncleanableDirtyOffset)

time.sleep(compactionLag + 1)

// the second segment becomes cleanable after the compaction lag
cleanableOffsets = LogCleanerManager.cleanableOffsets(log, Some(0L), time.milliseconds())
assertEquals(0L, cleanableOffsets._1)
assertEquals(4L, cleanableOffsets._2)
assertEquals(0L, cleanableOffsets.firstDirtyOffset)
assertEquals(4L, cleanableOffsets.firstUncleanableDirtyOffset)
}

@Test
Expand Down Expand Up @@ -574,6 +598,45 @@ class LogCleanerManagerTest extends Logging {
assertEquals(LogCleaningPaused(1), cleanerManager.cleaningState(tp).get)
}

/**
* Logs with invalid checkpoint offsets should update their checkpoint offset even if the log doesn't need cleaning
*/
@Test
def testCheckpointUpdatedForInvalidOffsetNoCleaning(): Unit = {
val tp = new TopicPartition("foo", 0)
val logs = setupIncreasinglyFilthyLogs(Seq(tp), startNumBatches = 20, batchIncrement = 5)

logs.get(tp).maybeIncrementLogStartOffset(20L)
val cleanerManager = createCleanerManagerMock(logs)
cleanerCheckpoints.put(tp, 15L)

val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time)
assertEquals("Log should not be selected for cleaning", None, filthiestLog)
assertEquals("Unselected log should have checkpoint offset updated", 20L, cleanerCheckpoints.get(tp).get)
}

/**
* Logs with invalid checkpoint offsets should update their checkpoint offset even if they aren't selected
* for immediate cleaning
*/
@Test
def testCheckpointUpdatedForInvalidOffsetNotSelected(): Unit = {
val tp0 = new TopicPartition("foo", 0)
val tp1 = new TopicPartition("foo", 1)
val partitions = Seq(tp0, tp1)

// create two logs, one with an invalid offset, and one that is dirtier than the log with an invalid offset
val logs = setupIncreasinglyFilthyLogs(partitions, startNumBatches = 20, batchIncrement = 5)
logs.get(tp0).maybeIncrementLogStartOffset(15L)
val cleanerManager = createCleanerManagerMock(logs)
cleanerCheckpoints.put(tp0, 10L)
cleanerCheckpoints.put(tp1, 5L)

val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time).get
assertEquals("Dirtier log should be selected", tp1, filthiestLog.topicPartition)
assertEquals("Unselected log should have checkpoint offset updated", 15L, cleanerCheckpoints.get(tp0).get)
}

private def createCleanerManager(log: Log): LogCleanerManager = {
val logs = new Pool[TopicPartition, Log]()
logs.put(topicPartition, log)
Expand Down

0 comments on commit 72122fc

Please sign in to comment.