Skip to content

Commit

Permalink
KAFKA-15605: Fix topic deletion handling during ZK migration (apache#…
Browse files Browse the repository at this point in the history
…14545)

This patch adds reconciliation logic to migrating ZK brokers to deal with pending topic deletions as well as missed StopReplicas.

During the hybrid mode of the ZK migration, the KRaft controller is asynchronously sending UMR and LISR to the ZK brokers to propagate metadata. Since this process is essentially "best effort" it is possible for a broker to miss a StopReplicas. The new logic lets the ZK broker examine its local logs compared with the full set of replicas in a "Full" LISR. Any local logs which are not present in the set of replicas in the request are removed from ReplicaManager and marked as "stray".

To avoid inadvertent data loss with this new behavior, the brokers do not delete the "stray" partitions. They will rename the directories and log warning messages during log recovery. It will be up to the operator to manually delete the stray partitions. We can possibly enhance this in the future to clean up old stray logs.

This patch makes use of the previously unused Type field on LeaderAndIsrRequest. This was added as part of KIP-516 but never implemented. Since its introduction, an implicit 0 was sent in all LISR. The KRaft controller will now send a value of 2 to indicate a full LISR (as specified by the KIP). The presence of this value acts as a trigger for the ZK broker to perform the log reconciliation.

Reviewers: Colin P. McCabe <[email protected]>
  • Loading branch information
mumrah authored Oct 26, 2023
1 parent 986c1b1 commit 339d255
Show file tree
Hide file tree
Showing 15 changed files with 567 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,26 +42,52 @@

public class LeaderAndIsrRequest extends AbstractControlRequest {

public enum Type {
UNKNOWN(0),
INCREMENTAL(1),
FULL(2);

private final byte type;
private Type(int type) {
this.type = (byte) type;
}

public byte toByte() {
return type;
}

public static Type fromByte(byte type) {
for (Type t : Type.values()) {
if (t.type == type) {
return t;
}
}
return UNKNOWN;
}
}

public static class Builder extends AbstractControlRequest.Builder<LeaderAndIsrRequest> {

private final List<LeaderAndIsrPartitionState> partitionStates;
private final Map<String, Uuid> topicIds;
private final Collection<Node> liveLeaders;
private final Type updateType;

public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch,
List<LeaderAndIsrPartitionState> partitionStates, Map<String, Uuid> topicIds,
Collection<Node> liveLeaders) {
this(version, controllerId, controllerEpoch, brokerEpoch, partitionStates, topicIds,
liveLeaders, false);
liveLeaders, false, Type.UNKNOWN);
}

public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch,
List<LeaderAndIsrPartitionState> partitionStates, Map<String, Uuid> topicIds,
Collection<Node> liveLeaders, boolean kraftController) {
Collection<Node> liveLeaders, boolean kraftController, Type updateType) {
super(ApiKeys.LEADER_AND_ISR, version, controllerId, controllerEpoch, brokerEpoch, kraftController);
this.partitionStates = partitionStates;
this.topicIds = topicIds;
this.liveLeaders = liveLeaders;
this.updateType = updateType;
}

@Override
Expand All @@ -82,6 +108,10 @@ public LeaderAndIsrRequest build(short version) {
data.setIsKRaftController(kraftController);
}

if (version >= 5) {
data.setType(updateType.toByte());
}

if (version >= 2) {
Map<String, LeaderAndIsrTopicState> topicStatesMap = groupByTopic(partitionStates, topicIds);
data.setTopicStates(new ArrayList<>(topicStatesMap.values()));
Expand Down Expand Up @@ -210,6 +240,10 @@ public List<LeaderAndIsrLiveLeader> liveLeaders() {
return Collections.unmodifiableList(data.liveLeaders());
}

public Type requestType() {
return Type.fromByte(data.type());
}

@Override
public LeaderAndIsrRequestData data() {
return data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
val stopReplicaRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, StopReplicaPartitionState]]
val updateMetadataRequestBrokerSet = mutable.Set.empty[Int]
val updateMetadataRequestPartitionInfoMap = mutable.Map.empty[TopicPartition, UpdateMetadataPartitionState]
private var updateType: LeaderAndIsrRequest.Type = LeaderAndIsrRequest.Type.UNKNOWN
private var metadataInstance: ControllerChannelContext = _

def sendRequest(brokerId: Int,
Expand All @@ -398,12 +399,17 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
metadataInstance = metadataProvider()
}

def setUpdateType(updateType: LeaderAndIsrRequest.Type): Unit = {
this.updateType = updateType
}

def clear(): Unit = {
leaderAndIsrRequestMap.clear()
stopReplicaRequestMap.clear()
updateMetadataRequestBrokerSet.clear()
updateMetadataRequestPartitionInfoMap.clear()
metadataInstance = null
updateType = LeaderAndIsrRequest.Type.UNKNOWN
}

def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int],
Expand Down Expand Up @@ -543,15 +549,25 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
.toSet[String]
.map(topic => (topic, metadataInstance.topicIds.getOrElse(topic, Uuid.ZERO_UUID)))
.toMap
val leaderAndIsrRequestBuilder = new LeaderAndIsrRequest.Builder(leaderAndIsrRequestVersion, controllerId,
controllerEpoch, brokerEpoch, leaderAndIsrPartitionStates.values.toBuffer.asJava, topicIds.asJava, leaders.asJava, kraftController)
val leaderAndIsrRequestBuilder = new LeaderAndIsrRequest.Builder(
leaderAndIsrRequestVersion,
controllerId,
controllerEpoch,
brokerEpoch,
leaderAndIsrPartitionStates.values.toBuffer.asJava,
topicIds.asJava,
leaders.asJava,
kraftController,
updateType
)
sendRequest(broker, leaderAndIsrRequestBuilder, (r: AbstractResponse) => {
val leaderAndIsrResponse = r.asInstanceOf[LeaderAndIsrResponse]
handleLeaderAndIsrResponse(leaderAndIsrResponse, broker)
})
}
}
leaderAndIsrRequestMap.clear()
updateType = LeaderAndIsrRequest.Type.UNKNOWN
}

def handleLeaderAndIsrResponse(response: LeaderAndIsrResponse, broker: Int): Unit
Expand Down
37 changes: 31 additions & 6 deletions core/src/main/scala/kafka/log/LocalLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -610,8 +610,12 @@ object LocalLog extends Logging {
/** a directory that is used for future partition */
private[log] val FutureDirSuffix = "-future"

/** a directory that is used for stray partition */
private[log] val StrayDirSuffix = "-stray"

private[log] val DeleteDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$DeleteDirSuffix")
private[log] val FutureDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$FutureDirSuffix")
private[log] val StrayDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$StrayDirSuffix")

private[log] val UnknownOffset = -1L

Expand All @@ -622,10 +626,17 @@ object LocalLog extends Logging {
* from exceeding 255 characters.
*/
private[log] def logDeleteDirName(topicPartition: TopicPartition): String = {
val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "")
val suffix = s"-${topicPartition.partition()}.$uniqueId$DeleteDirSuffix"
val prefixLength = Math.min(topicPartition.topic().size, 255 - suffix.size)
s"${topicPartition.topic().substring(0, prefixLength)}$suffix"
logDirNameWithSuffixCappedLength(topicPartition, DeleteDirSuffix)
}

/**
* Return a directory name to rename the log directory to for stray partition deletion.
* The name will be in the following format: "topic-partitionId.uniqueId-stray".
* If the topic name is too long, it will be truncated to prevent the total name
* from exceeding 255 characters.
*/
private[log] def logStrayDirName(topicPartition: TopicPartition): String = {
logDirNameWithSuffixCappedLength(topicPartition, StrayDirSuffix)
}

/**
Expand All @@ -636,6 +647,18 @@ object LocalLog extends Logging {
logDirNameWithSuffix(topicPartition, FutureDirSuffix)
}

/**
* Return a new directory name in the following format: "${topic}-${partitionId}.${uniqueId}${suffix}".
* If the topic name is too long, it will be truncated to prevent the total name
* from exceeding 255 characters.
*/
private[log] def logDirNameWithSuffixCappedLength(topicPartition: TopicPartition, suffix: String): String = {
val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "")
val fullSuffix = s"-${topicPartition.partition()}.$uniqueId$suffix"
val prefixLength = Math.min(topicPartition.topic().size, 255 - fullSuffix.size)
s"${topicPartition.topic().substring(0, prefixLength)}$fullSuffix"
}

private[log] def logDirNameWithSuffix(topicPartition: TopicPartition, suffix: String): String = {
val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "")
s"${logDirName(topicPartition)}.$uniqueId$suffix"
Expand Down Expand Up @@ -666,11 +689,13 @@ object LocalLog extends Logging {
if (dirName == null || dirName.isEmpty || !dirName.contains('-'))
throw exception(dir)
if (dirName.endsWith(DeleteDirSuffix) && !DeleteDirPattern.matcher(dirName).matches ||
dirName.endsWith(FutureDirSuffix) && !FutureDirPattern.matcher(dirName).matches)
dirName.endsWith(FutureDirSuffix) && !FutureDirPattern.matcher(dirName).matches ||
dirName.endsWith(StrayDirSuffix) && !StrayDirPattern.matcher(dirName).matches)
throw exception(dir)

val name: String =
if (dirName.endsWith(DeleteDirSuffix) || dirName.endsWith(FutureDirSuffix)) dirName.substring(0, dirName.lastIndexOf('.'))
if (dirName.endsWith(DeleteDirSuffix) || dirName.endsWith(FutureDirSuffix) || dirName.endsWith(StrayDirSuffix))
dirName.substring(0, dirName.lastIndexOf('.'))
else dirName

val index = name.lastIndexOf('-')
Expand Down
31 changes: 25 additions & 6 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ class LogManager(logDirs: Seq[File],
// Each element in the queue contains the log object to be deleted and the time it is scheduled for deletion.
private val logsToBeDeleted = new LinkedBlockingQueue[(UnifiedLog, Long)]()

// Map of stray partition to stray log. This holds all stray logs detected on the broker.
// Visible for testing
private val strayLogs = new Pool[TopicPartition, UnifiedLog]()

private val _liveLogDirs: ConcurrentLinkedQueue[File] = createAndValidateLogDirs(logDirs, initialOfflineDirs)
@volatile private var _currentDefaultConfig = initialDefaultConfig
@volatile private var numRecoveryThreadsPerDataDir = recoveryThreadsPerDataDir
Expand Down Expand Up @@ -302,6 +306,10 @@ class LogManager(logDirs: Seq[File],
this.logsToBeDeleted.add((log, time.milliseconds()))
}

def addStrayLog(strayPartition: TopicPartition, strayLog: UnifiedLog): Unit = {
this.strayLogs.put(strayPartition, strayLog)
}

// Only for testing
private[log] def hasLogsToBeDeleted: Boolean = !logsToBeDeleted.isEmpty

Expand Down Expand Up @@ -337,6 +345,9 @@ class LogManager(logDirs: Seq[File],

if (logDir.getName.endsWith(UnifiedLog.DeleteDirSuffix)) {
addLogToBeDeleted(log)
} else if (logDir.getName.endsWith(UnifiedLog.StrayDirSuffix)) {
addStrayLog(topicPartition, log)
warn(s"Loaded stray log: $logDir")
} else {
val previous = {
if (log.isFuture)
Expand Down Expand Up @@ -1203,7 +1214,8 @@ class LogManager(logDirs: Seq[File],
*/
def asyncDelete(topicPartition: TopicPartition,
isFuture: Boolean = false,
checkpoint: Boolean = true): Option[UnifiedLog] = {
checkpoint: Boolean = true,
isStray: Boolean = false): Option[UnifiedLog] = {
val removedLog: Option[UnifiedLog] = logCreationOrDeletionLock synchronized {
removeLogAndMetrics(if (isFuture) futureLogs else currentLogs, topicPartition)
}
Expand All @@ -1216,15 +1228,21 @@ class LogManager(logDirs: Seq[File],
cleaner.updateCheckpoints(removedLog.parentDirFile, partitionToRemove = Option(topicPartition))
}
}
removedLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), false)
if (isStray) {
// Move aside stray partitions, don't delete them
removedLog.renameDir(UnifiedLog.logStrayDirName(topicPartition), false)
warn(s"Log for partition ${removedLog.topicPartition} is marked as stray and renamed to ${removedLog.dir.getAbsolutePath}")
} else {
removedLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), false)
addLogToBeDeleted(removedLog)
info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
}
if (checkpoint) {
val logDir = removedLog.parentDirFile
val logsToCheckpoint = logsInDir(logDir)
checkpointRecoveryOffsetsInDir(logDir, logsToCheckpoint)
checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint)
}
addLogToBeDeleted(removedLog)
info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")

case None =>
if (offlineLogDirs.nonEmpty) {
Expand All @@ -1244,18 +1262,19 @@ class LogManager(logDirs: Seq[File],
* topic-partition is raised
*/
def asyncDelete(topicPartitions: Set[TopicPartition],
isStray: Boolean,
errorHandler: (TopicPartition, Throwable) => Unit): Unit = {
val logDirs = mutable.Set.empty[File]

topicPartitions.foreach { topicPartition =>
try {
getLog(topicPartition).foreach { log =>
logDirs += log.parentDirFile
asyncDelete(topicPartition, checkpoint = false)
asyncDelete(topicPartition, checkpoint = false, isStray = isStray)
}
getLog(topicPartition, isFuture = true).foreach { log =>
logDirs += log.parentDirFile
asyncDelete(topicPartition, isFuture = true, checkpoint = false)
asyncDelete(topicPartition, isFuture = true, checkpoint = false, isStray = isStray)
}
} catch {
case e: Throwable => errorHandler(topicPartition, e)
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1870,6 +1870,8 @@ object UnifiedLog extends Logging {

val DeleteDirSuffix = LocalLog.DeleteDirSuffix

val StrayDirSuffix = LocalLog.StrayDirSuffix

val FutureDirSuffix = LocalLog.FutureDirSuffix

private[log] val DeleteDirPattern = LocalLog.DeleteDirPattern
Expand Down Expand Up @@ -1952,6 +1954,8 @@ object UnifiedLog extends Logging {

def logFutureDirName(topicPartition: TopicPartition): String = LocalLog.logFutureDirName(topicPartition)

def logStrayDirName(topicPartition: TopicPartition): String = LocalLog.logStrayDirName(topicPartition)

def logDirName(topicPartition: TopicPartition): String = LocalLog.logDirName(topicPartition)

def transactionIndexFile(dir: File, offset: Long, suffix: String = ""): File = LogFileUtils.transactionIndexFile(dir, offset, suffix)
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/migration/MigrationPropagator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import kafka.controller.{ControllerChannelContext, ControllerChannelManager, Rep
import kafka.server.KafkaConfig
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.requests.LeaderAndIsrRequest
import org.apache.kafka.common.utils.Time
import org.apache.kafka.image.{ClusterImage, MetadataDelta, MetadataImage, TopicsImage}
import org.apache.kafka.metadata.PartitionRegistration
Expand Down Expand Up @@ -225,6 +226,7 @@ class MigrationPropagator(
requestBatch.sendRequestsToBrokers(zkControllerEpoch)

requestBatch.newBatch()
requestBatch.setUpdateType(LeaderAndIsrRequest.Type.FULL)
// When we need to send RPCs from the image, we're sending 'full' requests meaning we let
// every broker know about all the metadata and all the LISR requests it needs to handle.
// Note that we cannot send StopReplica requests from the image. We don't have any state
Expand Down
Loading

0 comments on commit 339d255

Please sign in to comment.