Skip to content

Commit

Permalink
Initial clean up scala 2.12, apex cleanup kafka010 and above compatib…
Browse files Browse the repository at this point in the history
…ility changes
  • Loading branch information
schintap committed May 5, 2020
1 parent 64f3cfd commit 441a99a
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 65 deletions.
9 changes: 4 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@ Licensed under the terms of the Apache License 2.0. Please see LICENSE file in t
<kafka.version>2.4.1</kafka.version>
<flink.version>1.10.0</flink.version>
<storm.version>1.2.2</storm.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.12</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.11</scala.version>
<json.version>20180813</json.version>
<jedis.version>2.9.0</jedis.version>
<sedis.version>1.2.2</sedis.version>
<slf4j.version>1.7.25</slf4j.version>
<commons-cli.version>1.4</commons-cli.version>
<snakeyaml.version>1.23</snakeyaml.version>
<shade.version>3.1.1</shade.version>
<!-- <apex.version>3.4.0</apex.version> -->
</properties>
<dependencyManagement>
<dependencies>
Expand Down Expand Up @@ -109,12 +108,12 @@ Licensed under the terms of the Apache License 2.0. Please see LICENSE file in t
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId>
<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.sedis</groupId>
<artifactId>sedis_${scala.binary.version}</artifactId>
<artifactId>sedis_2.11</artifactId>
<version>${sedis.version}</version>
</dependency>
<dependency>
Expand Down
4 changes: 2 additions & 2 deletions spark-benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId>
<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.sedis</groupId>
<artifactId>sedis_${scala.binary.version}</artifactId>
<artifactId>sedis_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
Expand Down
23 changes: 17 additions & 6 deletions spark-benchmarks/src/main/scala/AdvertisingSpark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ package spark.benchmark

import java.util

import kafka.serializer.StringDecoder
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

import org.apache.spark.streaming
import org.apache.spark.streaming.{Milliseconds, StreamingContext}

import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.dstream
import org.apache.spark.SparkConf
import org.json.JSONObject
Expand Down Expand Up @@ -61,19 +64,27 @@ object KafkaRedisAdvertisingStream {
// Create direct kafka stream with brokers and topics
val topicsSet = Set(topic)
val brokers = joinHosts(kafkaHosts, kafkaPort)
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "auto.offset.reset" -> "smallest")
val kafkaParams = Map[String, Object](
"metadata.broker.list" -> brokers,
"auto.offset.reset" -> "smallest",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer]
)
System.err.println(
"Trying to connect to Kafka at " + brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topicsSet, kafkaParams)
)

//We can repartition to use more executors if desired
// val messages_repartitioned = messages.repartition(10)


//take the second tuple of the implicit Tuple2 argument _, by calling the Tuple2 method ._2
//The first tuple is the key, which we don't use in this benchmark
val kafkaRawData = messages.map(_._2)
val kafkaRawData = messages.map(_.value())

//Parse the String as JSON
val kafkaData = kafkaRawData.map(parseJson(_))
Expand Down
57 changes: 5 additions & 52 deletions stream-bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,17 @@ MAKE=${MAKE:-make}

KAFKA_VERSION=${KAFKA_VERSION:-"2.4.1"}
REDIS_VERSION=${REDIS_VERSION:-"4.0.11"}
SCALA_BIN_VERSION=${SCALA_BIN_VERSION:-"2.11"}
SCALA_SUB_VERSION=${SCALA_SUB_VERSION:-"12"}
SCALA_BIN_VERSION=${SCALA_BIN_VERSION:-"2.12"}
SCALA_SUB_VERSION=${SCALA_SUB_VERSION:-"11"}
STORM_VERSION=${STORM_VERSION:-"1.2.2"}
FLINK_VERSION=${FLINK_VERSION:-"1.10.0"}
SPARK_VERSION=${SPARK_VERSION:-"2.3.1"}
# APEX_VERSION=${APEX_VERSION:-"3.5.0"}
SPARK_VERSION=${SPARK_VERSION:-"3.0.0-preview2"}

STORM_DIR="apache-storm-$STORM_VERSION"
REDIS_DIR="redis-$REDIS_VERSION"
KAFKA_DIR="kafka_$SCALA_BIN_VERSION-$KAFKA_VERSION"
FLINK_DIR="flink-$FLINK_VERSION"
SPARK_DIR="spark-$SPARK_VERSION-bin-hadoop2.7"
# APEX_DIR="apex-$APEX_VERSION"

#Get one of the closet apache mirrors
APACHE_MIRROR=$"https://archive.apache.org/dist"
Expand Down Expand Up @@ -136,10 +134,8 @@ run() {
echo 'storm.workers: 1' >> $CONF_FILE
echo 'storm.ackers: 2' >> $CONF_FILE
echo 'spark.batchtime: 2000' >> $CONF_FILE

$MVN clean install -Dspark.version="$SPARK_VERSION" -Dkafka.version="$KAFKA_VERSION" -Dflink.version="$FLINK_VERSION" -Dstorm.version="$STORM_VERSION" -Dscala.binary.version="$SCALA_BIN_VERSION" -Dscala.version="$SCALA_BIN_VERSION.$SCALA_SUB_VERSION"

# -Dapex.version="$APEX_VERSION"
$MVN clean install -Dspark.version="$SPARK_VERSION" -Dkafka.version="$KAFKA_VERSION" -Dflink.version="$FLINK_VERSION" -Dstorm.version="$STORM_VERSION" -Dscala.binary.version="$SCALA_BIN_VERSION" -Dscala.version="$SCALA_BIN_VERSION.$SCALA_SUB_VERSION"

#Fetch and build Redis
REDIS_FILE="$REDIS_DIR.tar.gz"
Expand All @@ -149,13 +145,6 @@ run() {
$MAKE
cd ..

#Fetch Apex
# APEX_FILE="$APEX_DIR.tgz.gz"
# fetch_untar_file "$APEX_FILE" "$APACHE_MIRROR/apex/apache-apex-core-$APEX_VERSION/apex-$APEX_VERSION-source-release.tar.gz"
#cd $APEX_DIR
#$MVN clean install -DskipTests
#cd ..

#Fetch Kafka
KAFKA_FILE="$KAFKA_DIR.tgz"
fetch_untar_file "$KAFKA_FILE" "$APACHE_MIRROR/kafka/$KAFKA_VERSION/$KAFKA_FILE"
Expand Down Expand Up @@ -265,26 +254,6 @@ run() {
"$FLINK_DIR/bin/flink" cancel $FLINK_ID
sleep 3
fi
# elif [ "START_APEX" = "$OPERATION" ];
# then
# "$APEX_DIR/engine/src/main/scripts/apex" -e "launch -local -conf ./conf/apex.xml ./apex-benchmarks/target/apex_benchmark-1.0-SNAPSHOT.apa -exactMatch Apex_Benchmark"
# sleep 5
# elif [ "STOP_APEX" = "$OPERATION" ];
# then
# pkill -f apex_benchmark
# elif [ "START_APEX_ON_YARN" = "$OPERATION" ];
# then
# "$APEX_DIR/engine/src/main/scripts/apex" -e "launch ./apex-benchmarks/target/apex_benchmark-1.0-SNAPSHOT.apa -conf ./conf/apex.xml -exactMatch Apex_Benchmark"
# elif [ "STOP_APEX_ON_YARN" = "$OPERATION" ];
# then
# APP_ID=`"$APEX_DIR/engine/src/main/scripts/apex" -e "list-apps" | grep id | awk '{ print $2 }'| cut -c -1 ; true`
# if [ "APP_ID" == "" ];
# then
# echo "Could not find streaming job to kill"
# else
# "$APEX_DIR/engine/src/main/scripts/apex" -e "kill-app $APP_ID"
# sleep 3
# fi
elif [ "STORM_TEST" = "$OPERATION" ];
then
run "START_ZK"
Expand Down Expand Up @@ -330,19 +299,6 @@ run() {
run "STOP_KAFKA"
run "STOP_REDIS"
run "STOP_ZK"
elif [ "APEX_TEST" = "$OPERATION" ];
then
run "START_ZK"
run "START_REDIS"
run "START_KAFKA"
run "START_APEX"
run "START_LOAD"
sleep $TEST_TIME
run "STOP_LOAD"
run "STOP_APEX"
run "STOP_KAFKA"
run "STOP_REDIS"
run "STOP_ZK"
elif [ "STOP_ALL" = "$OPERATION" ];
then
run "STOP_LOAD"
Expand Down Expand Up @@ -377,9 +333,7 @@ run() {
echo "STOP_FLINK: kill flink processes"
echo "START_SPARK: run spark processes"
echo "STOP_SPARK: kill spark processes"
echo "START_APEX: run the Apex test processing"
echo "STOP_APEX: kill the Apex test processing"
echo
echo
echo "START_STORM_TOPOLOGY: run the storm test topology"
echo "STOP_STORM_TOPOLOGY: kill the storm test topology"
echo "START_FLINK_PROCESSING: run the flink test processing"
Expand All @@ -390,7 +344,6 @@ run() {
echo "STORM_TEST: run storm test (assumes SETUP is done)"
echo "FLINK_TEST: run flink test (assumes SETUP is done)"
echo "SPARK_TEST: run spark test (assumes SETUP is done)"
echo "APEX_TEST: run Apex test (assumes SETUP is done)"
echo "STOP_ALL: stop everything"
echo
echo "HELP: print out this message"
Expand Down

0 comments on commit 441a99a

Please sign in to comment.