From 160eb42eef4568ad05b1ee520c00d29731ee5ff6 Mon Sep 17 00:00:00 2001 From: liumingmusic Date: Sat, 28 Jul 2018 19:02:22 +0800 Subject: [PATCH] =?UTF-8?q?sparkstreaming=E5=AE=9E=E6=97=B6=E5=A4=84?= =?UTF-8?q?=E7=90=86kafka=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- SparkStreaming/pom.xml | 4 +- .../SparkStreaming_6_KafkaDirectStream.scala | 58 ++++++++++++++- .../src/main/java/com/utils/ConfManager.scala | 74 +++++++++++++++++++ .../src/main/java/com/utils/StreamConf.scala | 9 ++- 4 files changed, 135 insertions(+), 10 deletions(-) create mode 100644 SparkStreaming/src/main/java/com/utils/ConfManager.scala diff --git a/SparkStreaming/pom.xml b/SparkStreaming/pom.xml index 4981718..f4b0d1e 100644 --- a/SparkStreaming/pom.xml +++ b/SparkStreaming/pom.xml @@ -34,8 +34,8 @@ org.apache.spark - spark-streaming-kafka_${scala.suffix.version} - 1.6.3 + spark-streaming-kafka-0-10_${scala.suffix.version} + 2.0.0 diff --git a/SparkStreaming/src/main/java/com/c503/streaming/SparkStreaming_6_KafkaDirectStream.scala b/SparkStreaming/src/main/java/com/c503/streaming/SparkStreaming_6_KafkaDirectStream.scala index 17e8ebd..4e74490 100644 --- a/SparkStreaming/src/main/java/com/c503/streaming/SparkStreaming_6_KafkaDirectStream.scala +++ b/SparkStreaming/src/main/java/com/c503/streaming/SparkStreaming_6_KafkaDirectStream.scala @@ -1,9 +1,8 @@ package com.c503.streaming -import com.utils.SparkConf -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.kafka.KafkaUtils -import org.scalatest.time.Seconds +import com.utils.{ConfManager, SparkConf} +import org.apache.spark.streaming.kafka010._ +import org.apache.spark.streaming.{Seconds, StreamingContext} /** * sparkstream 从kafka中读取数据进行消费 @@ -15,11 +14,62 @@ object SparkStreaming_6_KafkaDirectStream { def main(args: Array[String]): Unit = { + //初始化conf val sparkConf = new SparkConf("local[2]", "SparkStreaming_6_KafkaDirectStream") val context = new StreamingContext(sparkConf, Seconds(1)) + //组装kafka配置 + val kafkaParams = ConfManager.kafkaParam(ConfManager.newStreamConf()) + //1、链接kafka,使用正则模式匹配topic + val dataStream = KafkaUtils.createDirectStream( + context, + LocationStrategies.PreferConsistent, + ConsumerStrategies.SubscribePattern[String, String](kafkaParams._2, kafkaParams._1) + ) + //2、链接kafka,使用topic全名称,为一个集合 + /* + val dataStream = KafkaUtils.createDirectStream( + context, + LocationStrategies.PreferConsistent, + ConsumerStrategies.Subscribe[String, String](List("i57_ndsa_result", "i57_ndsa_data"), kafkaParams._1) + ) + */ + + //3、链接kafka,力度更细,对应topic的分区 + /* + val dataStream = KafkaUtils.createDirectStream( + context, + LocationStrategies.PreferConsistent, + ConsumerStrategies.Assign[String, String]( + List( + new TopicPartition("i57_ndsa_result", 1), + new TopicPartition("i57_ndsa_data", 1) + ), + kafkaParams._1 + ) + ) + */ + + //执行数据 + dataStream.foreachRDD(rdd => { + rdd.foreach(partition => { + var msg = "topic=" + partition.topic() + "\n" + msg += "partition=" + partition.partition() + "\n" + msg += "offset=" + partition.offset() + "\n" + msg += "timestamp=" + partition.timestamp() + "\n" + msg += "checksum=" + partition.checksum() + "\n" + msg += "key=" + partition.key() + "\n" + msg += "value=" + partition.value() + "\n" + println(msg) + }) + //手动管理kafka的offset + dataStream.asInstanceOf[CanCommitOffsets].commitAsync(rdd.asInstanceOf[HasOffsetRanges].offsetRanges) + }) + + context.start() + context.awaitTermination() } } diff --git a/SparkStreaming/src/main/java/com/utils/ConfManager.scala b/SparkStreaming/src/main/java/com/utils/ConfManager.scala new file mode 100644 index 0000000..e969038 --- /dev/null +++ b/SparkStreaming/src/main/java/com/utils/ConfManager.scala @@ -0,0 +1,74 @@ +package com.utils + +import java.util.regex.Pattern + +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.serialization.StringDeserializer + +import scala.collection.mutable.HashMap + +/** + * 描述 Spark Streaming 配置 + * + * @author liumm + * @since 2018-07-27 20:27 + */ +object ConfManager { + + /** + * 每次入库最大记录数量 + */ + val maxRecords = 1000 + + /** + * 配置Kafka + * + * @param streamConf + * @return + */ + def kafkaParam(streamConf: StreamConf): (Map[String, Object], Pattern) = { + (getConsumerConfig(streamConf.brokers, streamConf.groupId), Pattern.compile(streamConf.topics)) + } + + def kafkaParamForMetadata(streamConf: StreamConf): Map[String, String] = { + val kafkaParams = new HashMap[String, String]() + kafkaParams += (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> streamConf.brokers) + kafkaParams += ("metadata.broker.list" -> streamConf.brokers) + kafkaParams += (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "smallest") + kafkaParams += (ConsumerConfig.GROUP_ID_CONFIG -> streamConf.groupId) + kafkaParams.toMap + } + + /** + * 生成Kafka的Consumer配置信息 + * + * @return Kafka的Consumer配置信息 + */ + private def getConsumerConfig(brokers: String, groupId: String): Map[String, Object] = { + val kafkaParams = new HashMap[String, Object]() + + kafkaParams += (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers) + kafkaParams += (ConsumerConfig.GROUP_ID_CONFIG -> groupId) + kafkaParams += (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]) + kafkaParams += (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]) + + kafkaParams += (ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG -> new Integer(3 * 1024 * 1024)) + kafkaParams += (ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> new Integer(100)) + + kafkaParams += (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest") + //关闭kafka自动提交offset方式 + kafkaParams += (ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)) + + kafkaParams.toMap + } + + def newStreamConf() = { + val conf = new StreamConf() + conf.zkUrl = "hdp01:2181" + conf.brokers = "hdp01:9092" + conf.groupId = "liumm_group" + conf.topics = "i57_.*" + conf + } + +} diff --git a/SparkStreaming/src/main/java/com/utils/StreamConf.scala b/SparkStreaming/src/main/java/com/utils/StreamConf.scala index 1c1805e..257119d 100644 --- a/SparkStreaming/src/main/java/com/utils/StreamConf.scala +++ b/SparkStreaming/src/main/java/com/utils/StreamConf.scala @@ -3,10 +3,11 @@ package com.utils import scala.beans.BeanProperty /** - * 描述 简单描述方法的作用 - * - * @author liumm - * @since 2018-07-28 11:26 + * Spark Streaming 配置 + * @author dtinone + * 加米谷大数据学院 + * http://www.dtinone.com/ + * 加米谷版权所有,仅供加米谷大数据学院内部使用,禁止其他机构使用,违者必究,追究法律责任。 */ class StreamConf {