Skip to content

Commit

Permalink
KAFKA-13316; Enable KRaft mode in CreateTopics tests (apache#11655)
Browse files Browse the repository at this point in the history
This PR follows apache#11629 to enable `CreateTopicsRequestWithForwardingTest` and `CreateTopicsRequestWithPolicyTest` in KRaft mode.

Reviewers: Jason Gustafson <[email protected]>
  • Loading branch information
dengziming authored Feb 10, 2022
1 parent 4da515d commit 590df2c
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData
import org.apache.kafka.common.message.CreateTopicsRequestData._
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests._
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertTrue}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertNotNull, assertTrue}

import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -122,8 +122,8 @@ abstract class AbstractCreateTopicsRequestTest extends BaseRequestTest {
topic.replicationFactor

if (request.data.validateOnly) {
assertNotNull(metadataForTopic, s"Topic $topic should be created")
assertFalse(metadataForTopic.error == Errors.NONE, s"Error ${metadataForTopic.error} for topic $topic")
assertNotNull(metadataForTopic)
assertNotEquals(Errors.NONE, metadataForTopic.error, s"Topic $topic should not be created")
assertTrue(metadataForTopic.partitionMetadata.isEmpty, "The topic should have no partitions")
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ package kafka.server

import org.apache.kafka.common.protocol.Errors
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource

import scala.jdk.CollectionConverters._

class CreateTopicsRequestWithForwardingTest extends AbstractCreateTopicsRequestTest {

override def enableForwarding: Boolean = true

@Test
def testForwardToController(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testForwardToController(quorum: String): Unit = {
val req = topicsReq(Seq(topicReq("topic1")))
val response = sendCreateTopicRequest(req, notControllerSocketServer)
// With forwarding enabled, request could be forwarded to the active controller.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ package kafka.server

import java.util
import java.util.Properties

import kafka.log.LogConfig
import org.apache.kafka.common.errors.PolicyViolationException
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.server.policy.CreateTopicPolicy
import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource

import scala.jdk.CollectionConverters._

Expand All @@ -37,8 +38,15 @@ class CreateTopicsRequestWithPolicyTest extends AbstractCreateTopicsRequestTest
properties.put(KafkaConfig.CreateTopicPolicyClassNameProp, classOf[Policy].getName)
}

@Test
def testValidCreateTopicsRequests(): Unit = {
override def kraftControllerConfigs(): Seq[Properties] = {
val properties = new Properties()
properties.put(KafkaConfig.CreateTopicPolicyClassNameProp, classOf[Policy].getName)
Seq(properties)
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testValidCreateTopicsRequests(quorum: String): Unit = {
validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic1",
numPartitions = 5))))

Expand All @@ -55,10 +63,11 @@ class CreateTopicsRequestWithPolicyTest extends AbstractCreateTopicsRequestTest
assignment = Map(0 -> List(1, 0), 1 -> List(0, 1))))))
}

@Test
def testErrorCreateTopicsRequests(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testErrorCreateTopicsRequests(quorum: String): Unit = {
val existingTopic = "existing-topic"
createTopic(existingTopic, 1, 1)
createTopic(existingTopic, 5, 1)

// Policy violations
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("policy-topic1",
Expand Down Expand Up @@ -93,20 +102,35 @@ class CreateTopicsRequestWithPolicyTest extends AbstractCreateTopicsRequestTest
Map(existingTopic -> error(Errors.TOPIC_ALREADY_EXISTS,
Some("Topic 'existing-topic' already exists."))))

var errorMsg = if (isKRaftTest()) {
"Unable to replicate the partition 4 time(s): The target replication factor of 4 cannot be reached because only 3 broker(s) are registered."
} else {
"Replication factor: 4 larger than available brokers: 3."
}
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-replication",
numPartitions = 10, replicationFactor = brokerCount + 1)), validateOnly = true),
Map("error-replication" -> error(Errors.INVALID_REPLICATION_FACTOR,
Some("Replication factor: 4 larger than available brokers: 3."))))
Some(errorMsg))))

errorMsg = if (isKRaftTest()) {
"Replication factor must be larger than 0, or -1 to use the default value."
} else {
"Replication factor must be larger than 0."
}
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-replication2",
numPartitions = 10, replicationFactor = -2)), validateOnly = true),
Map("error-replication2" -> error(Errors.INVALID_REPLICATION_FACTOR,
Some("Replication factor must be larger than 0."))))
Some(errorMsg))))

errorMsg = if (isKRaftTest()) {
"Number of partitions was set to an invalid non-positive value."
} else {
"Number of partitions must be larger than 0."
}
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-partitions",
numPartitions = -2, replicationFactor = 1)), validateOnly = true),
Map("error-partitions" -> error(Errors.INVALID_PARTITIONS,
Some("Number of partitions must be larger than 0."))))
Some(errorMsg))))
}

}
Expand All @@ -123,6 +147,10 @@ object CreateTopicsRequestWithPolicyTest {
}

def validate(requestMetadata: RequestMetadata): Unit = {
if (Topic.isInternal(requestMetadata.topic())) {
// Do not verify internal topics
return
}
require(!closed, "Policy should not be closed")
require(!configs.isEmpty, "configure should have been called with non empty configs")

Expand Down

0 comments on commit 590df2c

Please sign in to comment.