Skip to content

Commit

Permalink
[SPARK-2808] [STREAMING] [KAFKA] cleanup tests from
Browse files Browse the repository at this point in the history
see if requiring producer acks eliminates the need for waitUntilLeaderOffset calls in tests

Author: cody koeninger <[email protected]>

Closes apache#5921 from koeninger/kafka-0.8.2-test-cleanup and squashes the following commits:

1e89dc8 [cody koeninger] Merge branch 'master' into kafka-0.8.2-test-cleanup
4662828 [cody koeninger] [Streaming][Kafka] filter mima issue for removal of method from private test class
af1e083 [cody koeninger] Merge branch 'master' into kafka-0.8.2-test-cleanup
4298ac2 [cody koeninger] [Streaming][Kafka] update comment to trigger jenkins attempt
1274afb [cody koeninger] [Streaming][Kafka] see if requiring producer acks eliminates the need for waitUntilLeaderOffset calls in tests
  • Loading branch information
koeninger authored and srowen committed Jun 7, 2015
1 parent e84815d commit b127ff8
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ private class KafkaTestUtils extends Logging {
val props = new Properties()
props.put("metadata.broker.list", brokerAddress)
props.put("serializer.class", classOf[StringEncoder].getName)
// wait for all in-sync replicas to ack sends
props.put("request.required.acks", "-1")
props
}

Expand Down Expand Up @@ -229,21 +231,6 @@ private class KafkaTestUtils extends Logging {
tryAgain(1)
}

/** Wait until the leader offset for the given topic/partition equals the specified offset */
def waitUntilLeaderOffset(
topic: String,
partition: Int,
offset: Long): Unit = {
eventually(Time(10000), Time(100)) {
val kc = new KafkaCluster(Map("metadata.broker.list" -> brokerAddress))
val tp = TopicAndPartition(topic, partition)
val llo = kc.getLatestLeaderOffsets(Set(tp)).right.get.apply(tp).offset
assert(
llo == offset,
s"$topic $partition $offset not reached after timeout")
}
}

private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
case Some(partitionState) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ public void testKafkaRDD() throws InterruptedException {
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress());

kafkaTestUtils.waitUntilLeaderOffset(topic1, 0, topic1data.length);
kafkaTestUtils.waitUntilLeaderOffset(topic2, 0, topic2data.length);

OffsetRange[] offsetRanges = {
OffsetRange.create(topic1, 0, 0, 1),
OffsetRange.create(topic2, 0, 0, 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress,
"group.id" -> s"test-consumer-${Random.nextInt}")

kafkaTestUtils.waitUntilLeaderOffset(topic, 0, messages.size)

val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size))

val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
Expand All @@ -86,7 +84,6 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
// this is the "lots of messages" case
kafkaTestUtils.sendMessages(topic, sent)
val sentCount = sent.values.sum
kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sentCount)

// rdd defined from leaders after sending messages, should get the number sent
val rdd = getRdd(kc, Set(topic))
Expand All @@ -113,7 +110,6 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
val sentOnlyOne = Map("d" -> 1)

kafkaTestUtils.sendMessages(topic, sentOnlyOne)
kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sentCount + 1)

assert(rdd2.isDefined)
assert(rdd2.get.count === 0, "got messages when there shouldn't be any")
Expand Down
3 changes: 3 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ object MimaExcludes {
// Mima false positive (was a private[spark] class)
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.util.collection.PairIterator"),
// Removing a testing method from a private class
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.streaming.kafka.KafkaTestUtils.waitUntilLeaderOffset"),
// SQL execution is considered private.
excludePackage("org.apache.spark.sql.execution")
)
Expand Down
5 changes: 0 additions & 5 deletions python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,6 @@ def test_kafka_stream(self):

self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))

stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(),
"test-streaming-consumer", {topic: 1},
Expand All @@ -631,7 +630,6 @@ def test_kafka_direct_stream(self):

self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))

stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams)
self._validateStreamResult(sendData, stream)
Expand All @@ -646,7 +644,6 @@ def test_kafka_direct_stream_from_offset(self):

self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))

stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams, fromOffsets)
self._validateStreamResult(sendData, stream)
Expand All @@ -661,7 +658,6 @@ def test_kafka_rdd(self):

self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges)
self._validateRddResult(sendData, rdd)

Expand All @@ -677,7 +673,6 @@ def test_kafka_rdd_with_leaders(self):

self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders)
self._validateRddResult(sendData, rdd)

Expand Down

0 comments on commit b127ff8

Please sign in to comment.