Skip to content

Commit

Permalink
[SPARK-4964] [Streaming] refactor createRDD to take leaders via map i…
Browse files Browse the repository at this point in the history
…nstead of array

Author: cody koeninger <[email protected]>

Closes apache#4511 from koeninger/kafkaRdd-leader-to-broker and squashes the following commits:

f7151d4 [cody koeninger] [SPARK-4964] test refactoring
6f8680b [cody koeninger] [SPARK-4964] add test of the scala api for KafkaUtils.createRDD
f81e016 [cody koeninger] [SPARK-4964] leave KafkaStreamSuite host and port as private
5173f3f [cody koeninger] [SPARK-4964] test the Java variations of createRDD
e9cece4 [cody koeninger] [SPARK-4964] pass leaders as a map to ensure 1 leader per TopicPartition
  • Loading branch information
koeninger authored and tdas committed Feb 11, 2015
1 parent c2131c0 commit 658687b
Show file tree
Hide file tree
Showing 4 changed files with 287 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,52 @@

package org.apache.spark.streaming.kafka

import kafka.common.TopicAndPartition

import org.apache.spark.annotation.Experimental

/**
* :: Experimental ::
* Represent the host info for the leader of a Kafka partition.
* Represent the host and port info for a Kafka broker.
* Differs from the Kafka project's internal kafka.cluster.Broker, which contains a server ID
*/
@Experimental
final class Leader private(
/** Kafka topic name */
val topic: String,
/** Kafka partition id */
val partition: Int,
/** Leader's hostname */
final class Broker private(
/** Broker's hostname */
val host: String,
/** Leader's port */
val port: Int) extends Serializable
/** Broker's port */
val port: Int) extends Serializable {
override def equals(obj: Any): Boolean = obj match {
case that: Broker =>
this.host == that.host &&
this.port == that.port
case _ => false
}

override def hashCode: Int = {
41 * (41 + host.hashCode) + port
}

override def toString(): String = {
s"Broker($host, $port)"
}
}

/**
* :: Experimental ::
* Companion object the provides methods to create instances of [[Leader]].
* Companion object that provides methods to create instances of [[Broker]].
*/
@Experimental
object Leader {
def create(topic: String, partition: Int, host: String, port: Int): Leader =
new Leader(topic, partition, host, port)

def create(topicAndPartition: TopicAndPartition, host: String, port: Int): Leader =
new Leader(topicAndPartition.topic, topicAndPartition.partition, host, port)

def apply(topic: String, partition: Int, host: String, port: Int): Leader =
new Leader(topic, partition, host, port)
object Broker {
def create(host: String, port: Int): Broker =
new Broker(host, port)

def apply(topicAndPartition: TopicAndPartition, host: String, port: Int): Leader =
new Leader(topicAndPartition.topic, topicAndPartition.partition, host, port)
def apply(host: String, port: Int): Broker =
new Broker(host, port)

def unapply(broker: Broker): Option[(String, Int)] = {
if (broker == null) {
None
} else {
Some((broker.host, broker.port))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,19 @@ object KafkaUtils {
jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
}

/** get leaders for the given offset ranges, or throw an exception */
private def leadersForRanges(
kafkaParams: Map[String, String],
offsetRanges: Array[OffsetRange]): Map[TopicAndPartition, (String, Int)] = {
val kc = new KafkaCluster(kafkaParams)
val topics = offsetRanges.map(o => TopicAndPartition(o.topic, o.partition)).toSet
val leaders = kc.findLeaders(topics).fold(
errs => throw new SparkException(errs.mkString("\n")),
ok => ok
)
leaders
}

/**
* Create a RDD from Kafka using offset ranges for each topic and partition.
*
Expand All @@ -176,12 +189,7 @@ object KafkaUtils {
offsetRanges: Array[OffsetRange]
): RDD[(K, V)] = {
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
val kc = new KafkaCluster(kafkaParams)
val topics = offsetRanges.map(o => TopicAndPartition(o.topic, o.partition)).toSet
val leaders = kc.findLeaders(topics).fold(
errs => throw new SparkException(errs.mkString("\n")),
ok => ok
)
val leaders = leadersForRanges(kafkaParams, offsetRanges)
new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
}

Expand All @@ -198,7 +206,8 @@ object KafkaUtils {
* host1:port1,host2:port2 form.
* @param offsetRanges Each OffsetRange in the batch corresponds to a
* range of offsets for a given Kafka topic/partition
* @param leaders Kafka leaders for each offset range in batch
* @param leaders Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty map,
* in which case leaders will be looked up on the driver.
* @param messageHandler Function for translating each message and metadata into the desired type
*/
@Experimental
Expand All @@ -211,12 +220,17 @@ object KafkaUtils {
sc: SparkContext,
kafkaParams: Map[String, String],
offsetRanges: Array[OffsetRange],
leaders: Array[Leader],
leaders: Map[TopicAndPartition, Broker],
messageHandler: MessageAndMetadata[K, V] => R
): RDD[R] = {
val leaderMap = leaders
.map(l => TopicAndPartition(l.topic, l.partition) -> (l.host, l.port))
.toMap
val leaderMap = if (leaders.isEmpty) {
leadersForRanges(kafkaParams, offsetRanges)
} else {
// This could be avoided by refactoring KafkaRDD.leaders and KafkaCluster to use Broker
leaders.map {
case (tp: TopicAndPartition, Broker(host, port)) => (tp, (host, port))
}.toMap
}
new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
}

Expand Down Expand Up @@ -263,7 +277,8 @@ object KafkaUtils {
* host1:port1,host2:port2 form.
* @param offsetRanges Each OffsetRange in the batch corresponds to a
* range of offsets for a given Kafka topic/partition
* @param leaders Kafka leaders for each offset range in batch
* @param leaders Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty map,
* in which case leaders will be looked up on the driver.
* @param messageHandler Function for translating each message and metadata into the desired type
*/
@Experimental
Expand All @@ -276,16 +291,17 @@ object KafkaUtils {
recordClass: Class[R],
kafkaParams: JMap[String, String],
offsetRanges: Array[OffsetRange],
leaders: Array[Leader],
leaders: JMap[TopicAndPartition, Broker],
messageHandler: JFunction[MessageAndMetadata[K, V], R]
): JavaRDD[R] = {
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
val leaderMap = Map(leaders.toSeq: _*)
createRDD[K, V, KD, VD, R](
jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaders, messageHandler.call _)
jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaderMap, messageHandler.call _)
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.kafka;

import java.io.Serializable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Arrays;

import org.apache.spark.SparkConf;

import scala.Tuple2;

import junit.framework.Assert;

import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import org.junit.Test;
import org.junit.After;
import org.junit.Before;

public class JavaKafkaRDDSuite implements Serializable {
private transient JavaSparkContext sc = null;
private transient KafkaStreamSuiteBase suiteBase = null;

@Before
public void setUp() {
suiteBase = new KafkaStreamSuiteBase() { };
suiteBase.setupKafka();
System.clearProperty("spark.driver.port");
SparkConf sparkConf = new SparkConf()
.setMaster("local[4]").setAppName(this.getClass().getSimpleName());
sc = new JavaSparkContext(sparkConf);
}

@After
public void tearDown() {
sc.stop();
sc = null;
System.clearProperty("spark.driver.port");
suiteBase.tearDownKafka();
}

@Test
public void testKafkaRDD() throws InterruptedException {
String topic1 = "topic1";
String topic2 = "topic2";

String[] topic1data = createTopicAndSendData(topic1);
String[] topic2data = createTopicAndSendData(topic2);

HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", suiteBase.brokerAddress());

OffsetRange[] offsetRanges = {
OffsetRange.create(topic1, 0, 0, 1),
OffsetRange.create(topic2, 0, 0, 1)
};

HashMap<TopicAndPartition, Broker> emptyLeaders = new HashMap();
HashMap<TopicAndPartition, Broker> leaders = new HashMap();
String[] hostAndPort = suiteBase.brokerAddress().split(":");
Broker broker = Broker.create(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
leaders.put(new TopicAndPartition(topic1, 0), broker);
leaders.put(new TopicAndPartition(topic2, 0), broker);

JavaRDD<String> rdd1 = KafkaUtils.createRDD(
sc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
offsetRanges
).map(
new Function<Tuple2<String, String>, String>() {
@Override
public String call(scala.Tuple2<String, String> kv) throws Exception {
return kv._2();
}
}
);

JavaRDD<String> rdd2 = KafkaUtils.createRDD(
sc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
String.class,
kafkaParams,
offsetRanges,
emptyLeaders,
new Function<MessageAndMetadata<String, String>, String>() {
@Override
public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception {
return msgAndMd.message();
}
}
);

JavaRDD<String> rdd3 = KafkaUtils.createRDD(
sc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
String.class,
kafkaParams,
offsetRanges,
leaders,
new Function<MessageAndMetadata<String, String>, String>() {
@Override
public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception {
return msgAndMd.message();
}
}
);

// just making sure the java user apis work; the scala tests handle logic corner cases
long count1 = rdd1.count();
long count2 = rdd2.count();
long count3 = rdd3.count();
Assert.assertTrue(count1 > 0);
Assert.assertEquals(count1, count2);
Assert.assertEquals(count1, count3);
}

private String[] createTopicAndSendData(String topic) {
String[] data = { topic + "-1", topic + "-2", topic + "-3"};
suiteBase.createTopic(topic);
suiteBase.sendMessages(topic, data);
return data;
}
}
Loading

0 comments on commit 658687b

Please sign in to comment.