Skip to content

Commit

Permalink
[SPARK-17782][STREAMING][KAFKA] alternative eliminate race condition …
Browse files Browse the repository at this point in the history
…of poll twice

## What changes were proposed in this pull request?

Alternative approach to apache#15387

Author: cody koeninger <[email protected]>

Closes apache#15401 from koeninger/SPARK-17782-alt.
  • Loading branch information
koeninger authored and zsxwing committed Oct 12, 2016
1 parent 9ce7d3e commit f9a56a1
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ private case class Subscribe[K, V](
toSeek.asScala.foreach { case (topicPartition, offset) =>
consumer.seek(topicPartition, offset)
}
// we've called poll, we must pause or next poll may consume messages and set position
consumer.pause(consumer.assignment())
}

consumer
Expand Down Expand Up @@ -154,6 +156,8 @@ private case class SubscribePattern[K, V](
toSeek.asScala.foreach { case (topicPartition, offset) =>
consumer.seek(topicPartition, offset)
}
// we've called poll, we must pause or next poll may consume messages and set position
consumer.pause(consumer.assignment())
}

consumer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,31 @@ private[spark] class DirectKafkaInputDStream[K, V](
}
}

/**
* The concern here is that poll might consume messages despite being paused,
* which would throw off consumer position. Fix position if this happens.
*/
private def paranoidPoll(c: Consumer[K, V]): Unit = {
val msgs = c.poll(0)
if (!msgs.isEmpty) {
// position should be minimum offset per topicpartition
msgs.asScala.foldLeft(Map[TopicPartition, Long]()) { (acc, m) =>
val tp = new TopicPartition(m.topic, m.partition)
val off = acc.get(tp).map(o => Math.min(o, m.offset)).getOrElse(m.offset)
acc + (tp -> off)
}.foreach { case (tp, off) =>
logInfo(s"poll(0) returned messages, seeking $tp to $off to compensate")
c.seek(tp, off)
}
}
}

/**
* Returns the latest (highest) available offsets, taking new partitions into account.
*/
protected def latestOffsets(): Map[TopicPartition, Long] = {
val c = consumer
c.poll(0)
paranoidPoll(c)
val parts = c.assignment().asScala

// make sure new partitions are reflected in currentOffsets
Expand Down Expand Up @@ -223,7 +242,7 @@ private[spark] class DirectKafkaInputDStream[K, V](

override def start(): Unit = {
val c = consumer
c.poll(0)
paranoidPoll(c)
if (currentOffsets.isEmpty) {
currentOffsets = c.assignment().asScala.map { tp =>
tp -> c.position(tp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,17 +159,19 @@ class DirectKafkaStreamSuite
}

test("pattern based subscription") {
val topics = List("pat1", "pat2", "advanced3")
// Should match 2 out of 3 topics
val topics = List("pat1", "pat2", "pat3", "advanced3")
// Should match 3 out of 4 topics
val pat = """pat\d""".r.pattern
val data = Map("a" -> 7, "b" -> 9)
topics.foreach { t =>
kafkaTestUtils.createTopic(t)
kafkaTestUtils.sendMessages(t, data)
}
val offsets = Map(new TopicPartition("pat2", 0) -> 3L)
// 2 matching topics, one of which starts 3 messages later
val expectedTotal = (data.values.sum * 2) - 3
val offsets = Map(
new TopicPartition("pat2", 0) -> 3L,
new TopicPartition("pat3", 0) -> 4L)
// 3 matching topics, two of which start a total of 7 messages later
val expectedTotal = (data.values.sum * 3) - 7
val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")

ssc = new StreamingContext(sparkConf, Milliseconds(1000))
Expand Down

0 comments on commit f9a56a1

Please sign in to comment.