Skip to content

Commit

Permalink
KAFKA-8564; Fix NPE on deleted partition dir when no segments remain (a…
Browse files Browse the repository at this point in the history
…pache#6968)

Kafka should not NPE while loading a deleted partition dir with no log segments. This patch ensures that there will always be at least one segment after initialization.

Co-authored-by: Edoardo Comar <[email protected]>
Co-authored-by: Mickael Maison <[email protected]>

Reviewers: Ismael Juma <[email protected]>, Jason Gustafson <[email protected]>
  • Loading branch information
mimaison authored and hachikuji committed Jun 19, 2019
1 parent a7e771c commit 635c213
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 1 deletion.
9 changes: 9 additions & 0 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,15 @@ class Log(@volatile var dir: File,
activeSegment.resizeIndexes(config.maxIndexSize)
nextOffset
} else {
if (logSegments.isEmpty) {
addSegment(LogSegment.open(dir = dir,
baseOffset = 0,
config,
time = time,
fileAlreadyExists = false,
initFileSize = this.initFileSize,
preallocate = false))
}
0
}
}
Expand Down
12 changes: 11 additions & 1 deletion core/src/test/scala/unit/kafka/log/LogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3741,7 +3741,17 @@ class LogTest {
assertEquals(new AbortedTransaction(pid, 0), fetchDataInfo.abortedTransactions.get.head)
}

private def allAbortedTransactions(log: Log) = log.logSegments.flatMap(_.txnIndex.allAbortedTxns)
@Test
def testLoadPartitionDirWithNoSegmentsShouldNotThrow() {
val dirName = Log.logDeleteDirName(new TopicPartition("foo", 3))
val logDir = new File(tmpDir, dirName)
logDir.mkdirs()
val logConfig = LogTest.createLogConfig()
val log = createLog(logDir, logConfig)
assertEquals(1, log.numberOfSegments)
}

private def allAbortedTransactions(log: Log) = log.logSegments.flatMap(_.txnIndex.allAbortedTxns)

private def appendTransactionalAsLeader(log: Log, producerId: Long, producerEpoch: Short): Int => Unit = {
var sequence = 0
Expand Down

0 comments on commit 635c213

Please sign in to comment.