Skip to content

Commit

Permalink
sparkstreaming实时处理kafka数据
Browse files Browse the repository at this point in the history
  • Loading branch information
liumingmusic committed Jul 28, 2018
1 parent 3033b0d commit 160eb42
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 10 deletions.
4 changes: 2 additions & 2 deletions SparkStreaming/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_${scala.suffix.version}</artifactId>
<version>1.6.3</version>
<artifactId>spark-streaming-kafka-0-10_${scala.suffix.version}</artifactId>
<version>2.0.0</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package com.c503.streaming

import com.utils.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils
import org.scalatest.time.Seconds
import com.utils.{ConfManager, SparkConf}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* sparkstream 从kafka中读取数据进行消费
Expand All @@ -15,11 +14,62 @@ object SparkStreaming_6_KafkaDirectStream {

def main(args: Array[String]): Unit = {

//初始化conf
val sparkConf = new SparkConf("local[2]", "SparkStreaming_6_KafkaDirectStream")
val context = new StreamingContext(sparkConf, Seconds(1))

//组装kafka配置
val kafkaParams = ConfManager.kafkaParam(ConfManager.newStreamConf())

//1、链接kafka,使用正则模式匹配topic
val dataStream = KafkaUtils.createDirectStream(
context,
LocationStrategies.PreferConsistent,
ConsumerStrategies.SubscribePattern[String, String](kafkaParams._2, kafkaParams._1)
)

//2、链接kafka,使用topic全名称,为一个集合
/*
val dataStream = KafkaUtils.createDirectStream(
context,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](List("i57_ndsa_result", "i57_ndsa_data"), kafkaParams._1)
)
*/

//3、链接kafka,力度更细,对应topic的分区
/*
val dataStream = KafkaUtils.createDirectStream(
context,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Assign[String, String](
List(
new TopicPartition("i57_ndsa_result", 1),
new TopicPartition("i57_ndsa_data", 1)
),
kafkaParams._1
)
)
*/

//执行数据
dataStream.foreachRDD(rdd => {
rdd.foreach(partition => {
var msg = "topic=" + partition.topic() + "\n"
msg += "partition=" + partition.partition() + "\n"
msg += "offset=" + partition.offset() + "\n"
msg += "timestamp=" + partition.timestamp() + "\n"
msg += "checksum=" + partition.checksum() + "\n"
msg += "key=" + partition.key() + "\n"
msg += "value=" + partition.value() + "\n"
println(msg)
})
//手动管理kafka的offset
dataStream.asInstanceOf[CanCommitOffsets].commitAsync(rdd.asInstanceOf[HasOffsetRanges].offsetRanges)
})

context.start()
context.awaitTermination()
}

}
74 changes: 74 additions & 0 deletions SparkStreaming/src/main/java/com/utils/ConfManager.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.utils

import java.util.regex.Pattern

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer

import scala.collection.mutable.HashMap

/**
* 描述 Spark Streaming 配置
*
* @author liumm
* @since 2018-07-27 20:27
*/
object ConfManager {

/**
* 每次入库最大记录数量
*/
val maxRecords = 1000

/**
* 配置Kafka
*
* @param streamConf
* @return
*/
def kafkaParam(streamConf: StreamConf): (Map[String, Object], Pattern) = {
(getConsumerConfig(streamConf.brokers, streamConf.groupId), Pattern.compile(streamConf.topics))
}

def kafkaParamForMetadata(streamConf: StreamConf): Map[String, String] = {
val kafkaParams = new HashMap[String, String]()
kafkaParams += (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> streamConf.brokers)
kafkaParams += ("metadata.broker.list" -> streamConf.brokers)
kafkaParams += (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "smallest")
kafkaParams += (ConsumerConfig.GROUP_ID_CONFIG -> streamConf.groupId)
kafkaParams.toMap
}

/**
* 生成Kafka的Consumer配置信息
*
* @return Kafka的Consumer配置信息
*/
private def getConsumerConfig(brokers: String, groupId: String): Map[String, Object] = {
val kafkaParams = new HashMap[String, Object]()

kafkaParams += (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers)
kafkaParams += (ConsumerConfig.GROUP_ID_CONFIG -> groupId)
kafkaParams += (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
kafkaParams += (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])

kafkaParams += (ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG -> new Integer(3 * 1024 * 1024))
kafkaParams += (ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> new Integer(100))

kafkaParams += (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest")
//关闭kafka自动提交offset方式
kafkaParams += (ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean))

kafkaParams.toMap
}

def newStreamConf() = {
val conf = new StreamConf()
conf.zkUrl = "hdp01:2181"
conf.brokers = "hdp01:9092"
conf.groupId = "liumm_group"
conf.topics = "i57_.*"
conf
}

}
9 changes: 5 additions & 4 deletions SparkStreaming/src/main/java/com/utils/StreamConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package com.utils
import scala.beans.BeanProperty

/**
* 描述 简单描述方法的作用
*
* @author liumm
* @since 2018-07-28 11:26
* Spark Streaming 配置
* @author dtinone
* 加米谷大数据学院
* http://www.dtinone.com/
* 加米谷版权所有,仅供加米谷大数据学院内部使用,禁止其他机构使用,违者必究,追究法律责任。
*/
class StreamConf {

Expand Down

0 comments on commit 160eb42

Please sign in to comment.