Skip to content

Commit

Permalink
增加Spark2 Streaming 接收Kerberos环境下Kafka消息,并将输入写入HDFS
Browse files Browse the repository at this point in the history
  • Loading branch information
peach committed Jul 18, 2018
1 parent fb9402f commit f20db9e
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 18 deletions.
16 changes: 16 additions & 0 deletions spark2demo/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
Expand Down Expand Up @@ -63,7 +64,22 @@
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.0.cloudera2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.0-cdh5.14.2</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-cdh5.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.11.2</version>
</dependency>
</dependencies>

<build>
Expand Down
2 changes: 2 additions & 0 deletions spark2demo/spark2streaming-kafka-hdfs/conf/0292.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
kafka.brokers=cdh02.fayson.com:9092,cdh03.fayson.com:9092,cdh04.fayson.com:9092
kafka.topics=kafka_hdfs_topic
Binary file not shown.
14 changes: 14 additions & 0 deletions spark2demo/spark2streaming-kafka-hdfs/conf/jaas.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/data/disk1/spark2streaming-kafka-hdfs/conf/fayson.keytab"
principal="[email protected]";
};

Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/data/disk1/spark2streaming-kafka-hdfs/conf/fayson.keytab"
principal="[email protected]";
};
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package com.cloudera.streaming

import java.io.{File, FileInputStream}
import java.util.Properties

import org.apache.commons.lang.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.util.parsing.json.JSON

/**
* package: com.cloudera.streaming
* describe: Kerberos环境中Spark2Streaming应用实时读取Kafka数据,解析后存入HDFS
* spark2-submit --class com.cloudera.streaming.Kafka2Spark2HDFS \
* --master yarn \
* --deploy-mode client \
* --executor-memory 2g \
* --executor-cores 2 \
* --driver-memory 2g \
* --num-executors 2 \
* --queue default \
* --principal [email protected] \
* --keytab /data/disk1/spark2streaming-kafka-hdfs/conf/fayson.keytab \
* --driver-java-options "-Djava.security.auth.login.config=/data/disk1/spark2streaming-kafka-hdfs/conf/jaas.conf" \
* --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/data/disk1/spark2streaming-kafka-hdfs/conf/jaas.conf" \
* spark2-demo-1.0-SNAPSHOT.jar
* creat_user: Fayson
* email: [email protected]
* creat_date: 2018/7/17
* creat_time: 下午11:08
* 公众号:Hadoop实操
*/
object Kafka2Spark2HDFS {

Logger.getLogger("com").setLevel(Level.ERROR) //设置日志级别

var confPath: String = System.getProperty("user.dir") + File.separator + "conf/0292.properties"

def main(args: Array[String]): Unit = {
//加载配置文件
val properties = new Properties()
val file = new File(confPath)
if(!file.exists()) {
System.out.println(Kafka2Spark2Hive.getClass.getClassLoader.getResource("0292.properties"))
val in = Kafka2Spark2Hive.getClass.getClassLoader.getResourceAsStream("0292.properties")
properties.load(in);
} else {
properties.load(new FileInputStream(confPath))
}

val brokers = properties.getProperty("kafka.brokers")
val topics = properties.getProperty("kafka.topics")
println("kafka.brokers:" + brokers)
println("kafka.topics:" + topics)

if(StringUtils.isEmpty(brokers)|| StringUtils.isEmpty(topics)) {
println("未配置Kafka信息...")
System.exit(0)
}
val topicsSet = topics.split(",").toSet

val spark = SparkSession.builder().appName("Kafka2Spark2HDFS-kerberos").config(new SparkConf()).getOrCreate()
val ssc = new StreamingContext(spark.sparkContext, Seconds(5)) //设置Spark时间窗口,每5s处理一次
val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers
, "auto.offset.reset" -> "latest"
, "security.protocol" -> "SASL_PLAINTEXT"
, "sasl.kerberos.service.name" -> "kafka"
, "key.deserializer" -> classOf[StringDeserializer]
, "value.deserializer" -> classOf[StringDeserializer]
, "group.id" -> "testgroup"
)

val dStream = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))

dStream.foreachRDD(rdd => {

val newrdd = rdd.map(line => {
val jsonObj = JSON.parseFull(line.value())
val map:Map[String,Any] = jsonObj.get.asInstanceOf[Map[String, Any]]
//将Map数据转为以","隔开的字符串
val userInfoStr = map.get("id").get.asInstanceOf[String].concat(",")
.concat(map.get("name").get.asInstanceOf[String]).concat(",")
.concat(map.get("sex").get.asInstanceOf[String]).concat(",")
.concat(map.get("city").get.asInstanceOf[String]).concat(",")
.concat(map.get("occupation").get.asInstanceOf[String]).concat(",")
.concat(map.get("mobile_phone_num").get.asInstanceOf[String]).concat(",")
.concat(map.get("fix_phone_num").get.asInstanceOf[String]).concat(",")
.concat(map.get("bank_name").get.asInstanceOf[String]).concat(",")
.concat(map.get("address").get.asInstanceOf[String]).concat(",")
.concat(map.get("marriage").get.asInstanceOf[String]).concat(",")
.concat(map.get("child_num").get.asInstanceOf[String])
userInfoStr
})

//将解析好的数据已流的方式写入HDFS,未使用RDD的方式可以避免数据被覆盖
newrdd.foreachPartition(partitionrecord => {
val conf = new Configuration()
val fs = FileSystem.get(conf)
val path = new Path("/tmp/kafka-data/test.txt")
//创建一个输出流
val outputStream = if (fs.exists(path)){
fs.append(path)
}else{
fs.create(path)
}
partitionrecord.foreach(line => outputStream.write((line + "\n").getBytes("UTF-8")))
outputStream.close()
})

})
ssc.start()
ssc.awaitTermination()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,6 @@ object Kafka2Spark2Hive {

var confPath: String = System.getProperty("user.dir") + File.separator + "conf/0291.properties"

/**
* 建表Schema定义
*/
val userInfoSchema = StructType(
// col name type nullable?
StructField("id", StringType , false) ::
StructField("name" , StringType, true ) ::
StructField("sex" , StringType, true ) ::
StructField("city" , StringType, true ) ::
StructField("occupation" , StringType, true ) ::
StructField("tel" , StringType, true ) ::
StructField("fixPhoneNum" , StringType, true ) ::
StructField("bankName" , StringType, true ) ::
StructField("address" , StringType, true ) ::
StructField("marriage" , StringType, true ) ::
StructField("childNum", StringType , true ) :: Nil
)

/**
* 定义一个UserInfo对象
*/
Expand Down

0 comments on commit f20db9e

Please sign in to comment.