Skip to content

Commit

Permalink
MINOR: Send latest LeaderAndIsr version (apache#7351)
Browse files Browse the repository at this point in the history
KIP-455 (18d4e57) bumped the LeaderAndIsr version to 3 but did not change the Controller code to actually send the new version. The ControllerChannelManagerTest had a bug which made it assert wrongly, hence why it did not catch it. This patch fixes said test.
Because the new fields in LeaderAndIsr are not used yet, the gap was not caught by integration tests either.

Reviewers: Jason Gustafson <[email protected]>
  • Loading branch information
stanislavkozlovski authored and Jason Gustafson committed Sep 19, 2019
1 parent 73c6bd8 commit 3825ff3
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 35 deletions.
11 changes: 10 additions & 1 deletion core/src/main/scala/kafka/api/ApiVersion.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ object ApiVersion {
// Introduced static membership.
KAFKA_2_3_IV0,
// Add rack_id to FetchRequest, preferred_read_replica to FetchResponse, and replica_id to OffsetsForLeaderRequest
KAFKA_2_3_IV1
KAFKA_2_3_IV1,
// Add adding_replicas and removing_replicas fields to LeaderAndIsrRequest
KAFKA_2_4_IV0
)

// Map keys are the union of the short and full versions
Expand Down Expand Up @@ -316,6 +318,13 @@ case object KAFKA_2_3_IV1 extends DefaultApiVersion {
val id: Int = 23
}

case object KAFKA_2_4_IV0 extends DefaultApiVersion {
val shortVersion: String = "2.4"
val subVersion = "IV0"
val recordVersion = RecordVersion.V2
val id: Int = 24
}

object ApiVersionValidator extends Validator {

override def ensureValid(name: String, value: Any): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,8 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,

private def sendLeaderAndIsrRequest(controllerEpoch: Int, stateChangeLog: StateChangeLogger): Unit = {
val leaderAndIsrRequestVersion: Short =
if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 2
if (config.interBrokerProtocolVersion >= KAFKA_2_4_IV0) 3
else if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 2
else if (config.interBrokerProtocolVersion >= KAFKA_1_0_IV0) 1
else 0

Expand Down
22 changes: 22 additions & 0 deletions core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ class ApiVersionTest {
assertEquals(KAFKA_2_2_IV1, ApiVersion("2.2"))
assertEquals(KAFKA_2_2_IV0, ApiVersion("2.2-IV0"))
assertEquals(KAFKA_2_2_IV1, ApiVersion("2.2-IV1"))

assertEquals(KAFKA_2_3_IV1, ApiVersion("2.3"))
assertEquals(KAFKA_2_3_IV0, ApiVersion("2.3-IV0"))
assertEquals(KAFKA_2_3_IV1, ApiVersion("2.3-IV1"))

assertEquals(KAFKA_2_4_IV0, ApiVersion("2.4"))
assertEquals(KAFKA_2_4_IV0, ApiVersion("2.4-IV0"))
}

@Test
Expand Down Expand Up @@ -116,7 +123,22 @@ class ApiVersionTest {
def testShortVersion(): Unit = {
assertEquals("0.8.0", KAFKA_0_8_0.shortVersion)
assertEquals("0.10.0", KAFKA_0_10_0_IV0.shortVersion)
assertEquals("0.10.0", KAFKA_0_10_0_IV1.shortVersion)
assertEquals("0.11.0", KAFKA_0_11_0_IV0.shortVersion)
assertEquals("0.11.0", KAFKA_0_11_0_IV1.shortVersion)
assertEquals("0.11.0", KAFKA_0_11_0_IV2.shortVersion)
assertEquals("1.0", KAFKA_1_0_IV0.shortVersion)
assertEquals("1.1", KAFKA_1_1_IV0.shortVersion)
assertEquals("2.0", KAFKA_2_0_IV0.shortVersion)
assertEquals("2.0", KAFKA_2_0_IV1.shortVersion)
assertEquals("2.1", KAFKA_2_1_IV0.shortVersion)
assertEquals("2.1", KAFKA_2_1_IV1.shortVersion)
assertEquals("2.1", KAFKA_2_1_IV2.shortVersion)
assertEquals("2.2", KAFKA_2_2_IV0.shortVersion)
assertEquals("2.2", KAFKA_2_2_IV1.shortVersion)
assertEquals("2.3", KAFKA_2_3_IV0.shortVersion)
assertEquals("2.3", KAFKA_2_3_IV1.shortVersion)
assertEquals("2.4", KAFKA_2_4_IV0.shortVersion)
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package kafka.controller

import java.util.Properties

import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, KAFKA_0_10_2_IV0, KAFKA_0_9_0, KAFKA_1_0_IV0, KAFKA_2_2_IV0, LeaderAndIsr}
import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, KAFKA_0_10_2_IV0, KAFKA_0_9_0, KAFKA_1_0_IV0, KAFKA_2_2_IV0, KAFKA_2_4_IV0, LeaderAndIsr}
import kafka.cluster.{Broker, EndPoint}
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
Expand Down Expand Up @@ -149,8 +149,9 @@ class ControllerChannelManagerTest {

for (apiVersion <- ApiVersion.allVersions) {
val leaderAndIsrRequestVersion: Short =
if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 2
else if (config.interBrokerProtocolVersion >= KAFKA_1_0_IV0) 1
if (apiVersion >= KAFKA_2_4_IV0) 3
else if (apiVersion >= KAFKA_2_2_IV0) 2
else if (apiVersion >= KAFKA_1_0_IV0) 1
else 0

testLeaderAndIsrRequestFollowsInterBrokerProtocolVersion(apiVersion, leaderAndIsrRequestVersion)
Expand All @@ -173,9 +174,10 @@ class ControllerChannelManagerTest {
batch.addLeaderAndIsrRequestForBrokers(Seq(2), partition, leaderIsrAndControllerEpoch, replicaAssignment(Seq(1, 2, 3)), isNew = false)
batch.sendRequestsToBrokers(controllerEpoch)

val leaderAndIsrRequests = batch.collectLeaderAndIsrRequestsFor(2, expectedLeaderAndIsrVersion)
val leaderAndIsrRequests = batch.collectLeaderAndIsrRequestsFor(2)
assertEquals(1, leaderAndIsrRequests.size)
assertEquals(expectedLeaderAndIsrVersion, leaderAndIsrRequests.head.version)
assertEquals(s"IBP $interBrokerProtocolVersion should use version $expectedLeaderAndIsrVersion",
expectedLeaderAndIsrVersion, leaderAndIsrRequests.head.version)
}

@Test
Expand Down Expand Up @@ -316,11 +318,11 @@ class ControllerChannelManagerTest {

for (apiVersion <- ApiVersion.allVersions) {
val updateMetadataRequestVersion: Short =
if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 5
else if (config.interBrokerProtocolVersion >= KAFKA_1_0_IV0) 4
else if (config.interBrokerProtocolVersion >= KAFKA_0_10_2_IV0) 3
else if (config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV1) 2
else if (config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1
if (apiVersion >= KAFKA_2_2_IV0) 5
else if (apiVersion >= KAFKA_1_0_IV0) 4
else if (apiVersion >= KAFKA_0_10_2_IV0) 3
else if (apiVersion >= KAFKA_0_10_0_IV1) 2
else if (apiVersion >= KAFKA_0_9_0) 1
else 0

testUpdateMetadataFollowsInterBrokerProtocolVersion(apiVersion, updateMetadataRequestVersion)
Expand All @@ -341,8 +343,11 @@ class ControllerChannelManagerTest {
assertEquals(1, batch.sentRequests.size)
assertTrue(batch.sentRequests.contains(2))

val requests = batch.collectUpdateMetadataRequestsFor(2, expectedUpdateMetadataVersion)
assertTrue(requests.forall(_.version == expectedUpdateMetadataVersion))
val requests = batch.collectUpdateMetadataRequestsFor(2)
val allVersions = requests.map(_.version)
assertTrue(s"IBP $interBrokerProtocolVersion should use version $expectedUpdateMetadataVersion, " +
s"but found versions $allVersions",
allVersions.forall(_ == expectedUpdateMetadataVersion))
}

@Test
Expand All @@ -369,7 +374,7 @@ class ControllerChannelManagerTest {
val sentRequests = batch.sentRequests(2)
assertEquals(1, sentRequests.size)

val sentStopReplicaRequests = batch.collectStopReplicRequestsFor(2)
val sentStopReplicaRequests = batch.collectStopReplicaRequestsFor(2)
assertEquals(1, sentStopReplicaRequests.size)

val stopReplicaRequest = sentStopReplicaRequests.head
Expand Down Expand Up @@ -407,7 +412,7 @@ class ControllerChannelManagerTest {
val sentRequests = batch.sentRequests(2)
assertEquals(1, sentRequests.size)

val sentStopReplicaRequests = batch.collectStopReplicRequestsFor(2)
val sentStopReplicaRequests = batch.collectStopReplicaRequestsFor(2)
assertEquals(1, sentStopReplicaRequests.size)
assertEquals(partitions, sentStopReplicaRequests.flatMap(_.partitions.asScala).toSet)
assertTrue(sentStopReplicaRequests.forall(_.deletePartitions()))
Expand Down Expand Up @@ -444,7 +449,7 @@ class ControllerChannelManagerTest {
val sentRequests = batch.sentRequests(2)
assertEquals(1, sentRequests.size)

val sentStopReplicaRequests = batch.collectStopReplicRequestsFor(2)
val sentStopReplicaRequests = batch.collectStopReplicaRequestsFor(2)
assertEquals(1, sentStopReplicaRequests.size)
assertEquals(partitions, sentStopReplicaRequests.flatMap(_.partitions.asScala).toSet)
assertTrue(sentStopReplicaRequests.forall(_.deletePartitions()))
Expand Down Expand Up @@ -493,7 +498,7 @@ class ControllerChannelManagerTest {
val sentRequests = batch.sentRequests(2)
assertEquals(2, sentRequests.size)

val sentStopReplicaRequests = batch.collectStopReplicRequestsFor(2)
val sentStopReplicaRequests = batch.collectStopReplicaRequestsFor(2)
assertEquals(2, sentStopReplicaRequests.size)

val (deleteRequests, nonDeleteRequests) = sentStopReplicaRequests.partition(_.deletePartitions())
Expand Down Expand Up @@ -529,7 +534,7 @@ class ControllerChannelManagerTest {
assertEquals(1, sentRequests.size)

for (brokerId <- Set(2, 3)) {
val sentStopReplicaRequests = batch.collectStopReplicRequestsFor(brokerId)
val sentStopReplicaRequests = batch.collectStopReplicaRequestsFor(brokerId)
assertEquals(1, sentStopReplicaRequests.size)

val stopReplicaRequest = sentStopReplicaRequests.head
Expand Down Expand Up @@ -569,7 +574,7 @@ class ControllerChannelManagerTest {
val sentRequests = batch.sentRequests(2)
assertEquals(1, sentRequests.size)

val sentStopReplicaRequests = batch.collectStopReplicRequestsFor(2)
val sentStopReplicaRequests = batch.collectStopReplicaRequestsFor(2)
assertEquals(1, sentStopReplicaRequests.size)

val stopReplicaRequest = sentStopReplicaRequests.head
Expand All @@ -583,14 +588,14 @@ class ControllerChannelManagerTest {

for (apiVersion <- ApiVersion.allVersions) {
if (apiVersion < KAFKA_2_2_IV0)
testStopReplicaFollowsInterBrokerProtocolVersion(ApiVersion.latestVersion, 0.toShort)
testStopReplicaFollowsInterBrokerProtocolVersion(apiVersion, 0.toShort)
else
testStopReplicaFollowsInterBrokerProtocolVersion(ApiVersion.latestVersion, 1.toShort)
testStopReplicaFollowsInterBrokerProtocolVersion(apiVersion, 1.toShort)
}
}

private def testStopReplicaFollowsInterBrokerProtocolVersion(interBrokerProtocolVersion: ApiVersion,
expectedStopReplicaRequestVersion: Short): Unit = {
expectedStopReplicaRequestVersion: Short): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo"), 2, 3)
val config = createConfig(interBrokerProtocolVersion)
val batch = new MockControllerBrokerRequestBatch(context, config)
Expand All @@ -605,8 +610,11 @@ class ControllerChannelManagerTest {
assertEquals(1, batch.sentRequests.size)
assertTrue(batch.sentRequests.contains(2))

val requests = batch.collectStopReplicRequestsFor(2, expectedStopReplicaRequestVersion)
assertTrue(requests.forall(_.version() == expectedStopReplicaRequestVersion))
val requests = batch.collectStopReplicaRequestsFor(2)
val allVersions = requests.map(_.version)
assertTrue(s"IBP $interBrokerProtocolVersion should use version $expectedStopReplicaRequestVersion, " +
s"but found versions $allVersions",
allVersions.forall(_ == expectedStopReplicaRequestVersion))
}

private def applyStopReplicaResponseCallbacks(error: Errors, sentRequests: List[SentRequest]): Unit = {
Expand All @@ -631,7 +639,8 @@ class ControllerChannelManagerTest {
val props = new Properties()
props.put(KafkaConfig.BrokerIdProp, controllerId.toString)
props.put(KafkaConfig.ZkConnectProp, "zkConnect")
props.put(KafkaConfig.InterBrokerProtocolVersionProp, ApiVersion.latestVersion.version)
props.put(KafkaConfig.InterBrokerProtocolVersionProp, interBrokerVersion.version)
props.put(KafkaConfig.LogMessageFormatVersionProp, interBrokerVersion.version)
KafkaConfig.fromProps(props)
}

Expand Down Expand Up @@ -680,32 +689,29 @@ class ControllerChannelManagerTest {
sentRequests(brokerId).append(SentRequest(request, callback))
}

def collectStopReplicRequestsFor(brokerId: Int,
version: Short = ApiKeys.STOP_REPLICA.latestVersion): List[StopReplicaRequest] = {
def collectStopReplicaRequestsFor(brokerId: Int): List[StopReplicaRequest] = {
sentRequests.get(brokerId) match {
case Some(requests) => requests
.filter(_.request.apiKey == ApiKeys.STOP_REPLICA)
.map(_.request.build(version).asInstanceOf[StopReplicaRequest]).toList
.map(_.request.build().asInstanceOf[StopReplicaRequest]).toList
case None => List.empty[StopReplicaRequest]
}
}

def collectUpdateMetadataRequestsFor(brokerId: Int,
version: Short = ApiKeys.UPDATE_METADATA.latestVersion): List[UpdateMetadataRequest] = {
def collectUpdateMetadataRequestsFor(brokerId: Int): List[UpdateMetadataRequest] = {
sentRequests.get(brokerId) match {
case Some(requests) => requests
.filter(_.request.apiKey == ApiKeys.UPDATE_METADATA)
.map(_.request.build(version).asInstanceOf[UpdateMetadataRequest]).toList
.map(_.request.build().asInstanceOf[UpdateMetadataRequest]).toList
case None => List.empty[UpdateMetadataRequest]
}
}

def collectLeaderAndIsrRequestsFor(brokerId: Int,
version: Short = ApiKeys.LEADER_AND_ISR.latestVersion): List[LeaderAndIsrRequest] = {
def collectLeaderAndIsrRequestsFor(brokerId: Int): List[LeaderAndIsrRequest] = {
sentRequests.get(brokerId) match {
case Some(requests) => requests
.filter(_.request.apiKey == ApiKeys.LEADER_AND_ISR)
.map(_.request.build(version).asInstanceOf[LeaderAndIsrRequest]).toList
.map(_.request.build().asInstanceOf[LeaderAndIsrRequest]).toList
case None => List.empty[LeaderAndIsrRequest]
}
}
Expand Down

0 comments on commit 3825ff3

Please sign in to comment.