Skip to content

Commit

Permalink
MINOR: Improved error handling in ZK migration (apache#13372)
Browse files Browse the repository at this point in the history
This patch fixes many small issues to improve error handling and logging during the ZK migration. A test was added
to simulate a ZK session expiration to ensure the correctness of the migration driver.

With this change, ZK errors thrown during the migration will not hit the fault handler registered with with
KRaftMigrationDriver, but they will be logged.

Reviewers: Colin P. McCabe <[email protected]>
  • Loading branch information
mumrah authored Mar 16, 2023
1 parent d16b968 commit 5dcdf71
Show file tree
Hide file tree
Showing 7 changed files with 288 additions and 115 deletions.
160 changes: 103 additions & 57 deletions core/src/main/scala/kafka/zk/ZkMigrationClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@ import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
import org.apache.kafka.common.metadata._
import org.apache.kafka.common.quota.ClientQuotaEntity
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
import org.apache.kafka.metadata.migration.{MigrationClient, ZkMigrationLeadershipState}
import org.apache.kafka.metadata.migration.{MigrationClient, MigrationClientAuthException, MigrationClientException, ZkMigrationLeadershipState}
import org.apache.kafka.server.common.{ApiMessageAndVersion, ProducerIdsBlock}
import org.apache.zookeeper.KeeperException.Code
import org.apache.zookeeper.KeeperException.{AuthFailedException, Code, NoAuthException, SessionClosedRequireAuthException}
import org.apache.zookeeper.{CreateMode, KeeperException}

import java.util
Expand All @@ -43,31 +42,58 @@ import scala.jdk.CollectionConverters._

/**
* Migration client in KRaft controller responsible for handling communication to Zookeeper and
* the ZkBrokers present in the cluster.
* the ZkBrokers present in the cluster. Methods that directly use KafkaZkClient should use the wrapZkException
* wrapper function in order to translate KeeperExceptions into something usable by the caller.
*/
class ZkMigrationClient(
zkClient: KafkaZkClient,
zkConfigEncoder: PasswordEncoder
) extends MigrationClient with Logging {

override def getOrCreateMigrationRecoveryState(initialState: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
zkClient.createTopLevelPaths()
zkClient.getOrCreateMigrationState(initialState)
/**
* Wrap a function such that any KeeperExceptions is captured and converted to a MigrationClientException.
* Any authentication related exception is converted to a MigrationClientAuthException which may be treated
* differently by the caller.
*/
@throws(classOf[MigrationClientException])
private def wrapZkException[T](fn: => T): T = {
try {
fn
} catch {
case e @ (_: MigrationClientException | _: MigrationClientAuthException) => throw e
case e @ (_: AuthFailedException | _: NoAuthException | _: SessionClosedRequireAuthException) =>
// We don't expect authentication errors to be recoverable, so treat them differently
throw new MigrationClientAuthException(e)
case e: KeeperException => throw new MigrationClientException(e)
}
}

override def setMigrationRecoveryState(state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
override def getOrCreateMigrationRecoveryState(
initialState: ZkMigrationLeadershipState
): ZkMigrationLeadershipState = wrapZkException {
zkClient.createTopLevelPaths()
zkClient.getOrCreateMigrationState(initialState)
}

override def setMigrationRecoveryState(
state: ZkMigrationLeadershipState
): ZkMigrationLeadershipState = wrapZkException {
zkClient.updateMigrationState(state)
}

override def claimControllerLeadership(state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
override def claimControllerLeadership(
state: ZkMigrationLeadershipState
): ZkMigrationLeadershipState = wrapZkException {
zkClient.tryRegisterKRaftControllerAsActiveController(state.kraftControllerId(), state.kraftControllerEpoch()) match {
case SuccessfulRegistrationResult(controllerEpoch, controllerEpochZkVersion) =>
state.withZkController(controllerEpoch, controllerEpochZkVersion)
case FailedRegistrationResult() => state.withUnknownZkController()
}
}

override def releaseControllerLeadership(state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
override def releaseControllerLeadership(
state: ZkMigrationLeadershipState
): ZkMigrationLeadershipState = wrapZkException {
try {
zkClient.deleteController(state.zkControllerEpochZkVersion())
state.withUnknownZkController()
Expand All @@ -76,14 +102,14 @@ class ZkMigrationClient(
// If the controller moved, no need to release
state.withUnknownZkController()
case t: Throwable =>
throw new RuntimeException("Could not release controller leadership due to underlying error", t)
throw new MigrationClientException("Could not release controller leadership due to underlying error", t)
}
}

def migrateTopics(
recordConsumer: Consumer[util.List[ApiMessageAndVersion]],
brokerIdConsumer: Consumer[Integer]
): Unit = {
): Unit = wrapZkException {
val topics = zkClient.getAllTopicsInCluster()
val topicConfigs = zkClient.getEntitiesConfigs(ConfigType.Topic, topics)
val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(topics)
Expand Down Expand Up @@ -137,7 +163,9 @@ class ZkMigrationClient(
}
}

def migrateBrokerConfigs(recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
def migrateBrokerConfigs(
recordConsumer: Consumer[util.List[ApiMessageAndVersion]]
): Unit = wrapZkException {
val batch = new util.ArrayList[ApiMessageAndVersion]()

val brokerEntities = zkClient.getAllEntitiesWithConfig(ConfigType.Broker)
Expand Down Expand Up @@ -165,7 +193,9 @@ class ZkMigrationClient(
}
}

def migrateClientQuotas(recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
def migrateClientQuotas(
recordConsumer: Consumer[util.List[ApiMessageAndVersion]]
): Unit = wrapZkException {
val adminZkClient = new AdminZkClient(zkClient)

def migrateEntityType(entityType: String): Unit = {
Expand Down Expand Up @@ -207,7 +237,9 @@ class ZkMigrationClient(
migrateEntityType(ConfigType.Ip)
}

def migrateProducerId(recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
def migrateProducerId(
recordConsumer: Consumer[util.List[ApiMessageAndVersion]]
): Unit = wrapZkException {
val (dataOpt, _) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
dataOpt match {
case Some(data) =>
Expand All @@ -220,19 +252,21 @@ class ZkMigrationClient(
}
}

override def readAllMetadata(batchConsumer: Consumer[util.List[ApiMessageAndVersion]],
brokerIdConsumer: Consumer[Integer]): Unit = {
override def readAllMetadata(
batchConsumer: Consumer[util.List[ApiMessageAndVersion]],
brokerIdConsumer: Consumer[Integer]
): Unit = {
migrateTopics(batchConsumer, brokerIdConsumer)
migrateBrokerConfigs(batchConsumer)
migrateClientQuotas(batchConsumer)
migrateProducerId(batchConsumer)
}

override def readBrokerIds(): util.Set[Integer] = {
zkClient.getSortedBrokerList.map(Integer.valueOf).toSet.asJava
override def readBrokerIds(): util.Set[Integer] = wrapZkException {
new util.HashSet[Integer](zkClient.getSortedBrokerList.map(Integer.valueOf).toSet.asJava)
}

override def readBrokerIdsFromTopicAssignments(): util.Set[Integer] = {
override def readBrokerIdsFromTopicAssignments(): util.Set[Integer] = wrapZkException {
val topics = zkClient.getAllTopicsInCluster()
val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(topics)
val brokersWithAssignments = new util.HashSet[Integer]()
Expand All @@ -244,10 +278,12 @@ class ZkMigrationClient(
brokersWithAssignments
}

override def createTopic(topicName: String,
topicId: Uuid,
partitions: util.Map[Integer, PartitionRegistration],
state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
override def createTopic(
topicName: String,
topicId: Uuid,
partitions: util.Map[Integer, PartitionRegistration],
state: ZkMigrationLeadershipState
): ZkMigrationLeadershipState = wrapZkException {
val assignments = partitions.asScala.map { case (partitionId, partition) =>
new TopicPartition(topicName, partitionId) ->
ReplicaAssignment(partition.replicas, partition.addingReplicas, partition.removingReplicas)
Expand Down Expand Up @@ -289,18 +325,22 @@ class ZkMigrationClient(
state.withMigrationZkVersion(migrationZkVersion)
} else {
// not ok
throw new RuntimeException(s"Failed to create or update topic $topicName. ZK operation had results $resultCodes")
throw new MigrationClientException(s"Failed to create or update topic $topicName. ZK operation had results $resultCodes")
}
}

private def createTopicPartition(topicPartition: TopicPartition): CreateRequest = {
private def createTopicPartition(
topicPartition: TopicPartition
): CreateRequest = wrapZkException {
val path = TopicPartitionZNode.path(topicPartition)
CreateRequest(path, null, zkClient.defaultAcls(path), CreateMode.PERSISTENT, Some(topicPartition))
}

private def partitionStatePathAndData(topicPartition: TopicPartition,
partitionRegistration: PartitionRegistration,
controllerEpoch: Int): (String, Array[Byte]) = {
private def partitionStatePathAndData(
topicPartition: TopicPartition,
partitionRegistration: PartitionRegistration,
controllerEpoch: Int
): (String, Array[Byte]) = {
val path = TopicPartitionStateZNode.path(topicPartition)
val data = TopicPartitionStateZNode.encode(LeaderIsrAndControllerEpoch(new LeaderAndIsr(
partitionRegistration.leader,
Expand All @@ -311,22 +351,28 @@ class ZkMigrationClient(
(path, data)
}

private def createTopicPartitionState(topicPartition: TopicPartition,
partitionRegistration: PartitionRegistration,
controllerEpoch: Int): CreateRequest = {
private def createTopicPartitionState(
topicPartition: TopicPartition,
partitionRegistration: PartitionRegistration,
controllerEpoch: Int
): CreateRequest = {
val (path, data) = partitionStatePathAndData(topicPartition, partitionRegistration, controllerEpoch)
CreateRequest(path, data, zkClient.defaultAcls(path), CreateMode.PERSISTENT, Some(topicPartition))
}

private def updateTopicPartitionState(topicPartition: TopicPartition,
partitionRegistration: PartitionRegistration,
controllerEpoch: Int): SetDataRequest = {
private def updateTopicPartitionState(
topicPartition: TopicPartition,
partitionRegistration: PartitionRegistration,
controllerEpoch: Int
): SetDataRequest = {
val (path, data) = partitionStatePathAndData(topicPartition, partitionRegistration, controllerEpoch)
SetDataRequest(path, data, ZkVersion.MatchAnyVersion, Some(topicPartition))
}

override def updateTopicPartitions(topicPartitions: util.Map[String, util.Map[Integer, PartitionRegistration]],
state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
override def updateTopicPartitions(
topicPartitions: util.Map[String, util.Map[Integer, PartitionRegistration]],
state: ZkMigrationLeadershipState
): ZkMigrationLeadershipState = wrapZkException {
val requests = topicPartitions.asScala.flatMap { case (topicName, partitionRegistrations) =>
partitionRegistrations.asScala.flatMap { case (partitionId, partitionRegistration) =>
val topicPartition = new TopicPartition(topicName, partitionId)
Expand All @@ -341,18 +387,20 @@ class ZkMigrationClient(
if (resultCodes.forall { case (_, code) => code.equals(Code.OK) } ) {
state.withMigrationZkVersion(migrationZkVersion)
} else {
throw new RuntimeException(s"Failed to update partition states: $topicPartitions. ZK transaction had results $resultCodes")
throw new MigrationClientException(s"Failed to update partition states: $topicPartitions. ZK transaction had results $resultCodes")
}
}
}

// Try to update an entity config and the migration state. If NoNode is encountered, it probably means we
// need to recursively create the parent ZNode. In this case, return None.
def tryWriteEntityConfig(entityType: String,
path: String,
props: Properties,
create: Boolean,
state: ZkMigrationLeadershipState): Option[ZkMigrationLeadershipState] = {
def tryWriteEntityConfig(
entityType: String,
path: String,
props: Properties,
create: Boolean,
state: ZkMigrationLeadershipState
): Option[ZkMigrationLeadershipState] = wrapZkException {
val configData = ConfigEntityZNode.encode(props)

val requests = if (create) {
Expand All @@ -375,8 +423,7 @@ class ZkMigrationClient(
entity: util.Map[String, String],
quotas: util.Map[String, java.lang.Double],
state: ZkMigrationLeadershipState
): ZkMigrationLeadershipState = {

): ZkMigrationLeadershipState = wrapZkException {
val entityMap = entity.asScala
val hasUser = entityMap.contains(ClientQuotaEntity.USER)
val hasClient = entityMap.contains(ClientQuotaEntity.CLIENT_ID)
Expand Down Expand Up @@ -418,13 +465,16 @@ class ZkMigrationClient(

tryWriteEntityConfig(configType.get, path.get, props, create=true, state) match {
case Some(newStateSecondTry) => newStateSecondTry
case None => throw new RuntimeException(
case None => throw new MigrationClientException(
s"Could not write client quotas for $entity on second attempt when using Create instead of SetData")
}
}
}

override def writeProducerId(nextProducerId: Long, state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
override def writeProducerId(
nextProducerId: Long,
state: ZkMigrationLeadershipState
): ZkMigrationLeadershipState = wrapZkException {
val newProducerIdBlockData = ProducerIdBlockZNode.generateProducerIdBlockJson(
new ProducerIdsBlock(-1, nextProducerId, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE))

Expand All @@ -433,9 +483,11 @@ class ZkMigrationClient(
state.withMigrationZkVersion(migrationZkVersion)
}

override def writeConfigs(resource: ConfigResource,
configs: util.Map[String, String],
state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
override def writeConfigs(
resource: ConfigResource,
configs: util.Map[String, String],
state: ZkMigrationLeadershipState
): ZkMigrationLeadershipState = wrapZkException {
val configType = resource.`type`() match {
case ConfigResource.Type.BROKER => Some(ConfigType.Broker)
case ConfigResource.Type.TOPIC => Some(ConfigType.Topic)
Expand All @@ -456,7 +508,7 @@ class ZkMigrationClient(

tryWriteEntityConfig(configType.get, configName, props, create=true, state) match {
case Some(newStateSecondTry) => newStateSecondTry
case None => throw new RuntimeException(
case None => throw new MigrationClientException(
s"Could not write ${configType.get} configs on second attempt when using Create instead of SetData.")
}
}
Expand All @@ -465,10 +517,4 @@ class ZkMigrationClient(
state
}
}

override def writeMetadataDeltaToZookeeper(delta: MetadataDelta,
image: MetadataImage,
state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
state
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, ProducerIdsBlock}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertTrue}
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.extension.ExtendWith
import org.slf4j.LoggerFactory

Expand All @@ -43,7 +44,9 @@ import java.util.concurrent.TimeUnit
import scala.collection.Seq
import scala.jdk.CollectionConverters._


@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@Timeout(120)
class ZkMigrationIntegrationTest {

val log = LoggerFactory.getLogger(classOf[ZkMigrationIntegrationTest])
Expand Down Expand Up @@ -85,7 +88,7 @@ class ZkMigrationIntegrationTest {
quotas.add(new ClientQuotaAlteration(
new ClientQuotaEntity(Map("ip" -> "8.8.8.8").asJava),
List(new ClientQuotaAlteration.Op("connection_creation_rate", 10.0)).asJava))
admin.alterClientQuotas(quotas)
admin.alterClientQuotas(quotas).all().get(60, TimeUnit.SECONDS)

val zkClient = clusterInstance.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient
val kafkaConfig = clusterInstance.asInstanceOf[ZkClusterInstance].getUnderlying.servers.head.config
Expand Down
Loading

0 comments on commit 5dcdf71

Please sign in to comment.