Skip to content

Commit

Permalink
完成案例,spark读取ES数据进行统计分析,回写到ES中
Browse files Browse the repository at this point in the history
  • Loading branch information
liumingmusic committed Sep 22, 2018
1 parent 0cc70d9 commit 79717a1
Show file tree
Hide file tree
Showing 7 changed files with 276 additions and 142 deletions.
291 changes: 151 additions & 140 deletions .idea/workspace.xml

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions SparkSQL/src/main/resources/sql/balance_stats.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
select
firstname,
lastname,
address,
employer,
state
from
bank_table
where
gender = "M"
AND
balance >= 30000
41 changes: 41 additions & 0 deletions SparkSQL/src/main/resources/sql/gender_group_by_sql.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
select
a1.gender,
a1.all_count as all_count,
a2.age_count_28 as age_count_28,
a3.age_count_38 as age_count_38
from
(
select
gender,
count(1) as all_count
from
bank_table
group by
gender
) as a1
left join
(
select
gender,
count(1) as age_count_28
from
bank_table
where
age >= 28
group by
gender
) as a2
on a1.gender = a2.gender
left join
(
select
gender,
count(1) as age_count_38
from
bank_table
where
age >= 38
group by
gender
) as a3
on a1.gender = a3.gender
8 changes: 8 additions & 0 deletions SparkSQL/src/main/resources/sql/gender_stats_sql.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
select
firstname,
lastname,
address
from
bank_table
where
gender = "M"
1 change: 1 addition & 0 deletions SparkSQL/src/main/resources/sql/show_all_sql.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select * from bank_table
60 changes: 60 additions & 0 deletions SparkSQL/src/main/scala/com/c503/es/SparkToES.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.c503.es

import com.c503.utils.SparkSqlUtils
import org.elasticsearch.spark.sql.sparkDataFrameFunctions

/**
* 描述 简单描述方法的作用
*
* @author liumm
* @since 2018-09-22 18:12
*/
object SparkToES {

def main(args: Array[String]): Unit = {

SparkSqlUtils.offLogger()

val sqlContext = SparkSqlUtils.newSparkSession("SparkToES")

//参数
val options = Map(
"pushdown" -> "true",
"es.nodes" -> "192.168.56.100",
"es.port" -> "9200",
"es.nodes.wan.only" -> "true",
"es.query" -> "?q=*",
"es.http.timeout" -> "10s",
"es.scroll.size" -> "5000",
"es.read.field.as.array.include" -> "",
"es.mapping.date.rich" -> "false"
)

val df = sqlContext.read.format("es").options(options).load("bank")

df.createOrReplaceTempView("bank_table")

//SQL脚本
val show_all_sql = SparkSqlUtils.readSqlByPath("sql/show_all_sql.sql")
val gender_stats_sql = SparkSqlUtils.readSqlByPath("sql/gender_stats_sql.sql")
val balance_stats_sql = SparkSqlUtils.readSqlByPath("sql/balance_stats.sql")
val gender_group_by_sql = SparkSqlUtils.readSqlByPath("sql/gender_group_by_sql.sql")

//sql查询
val result_all = sqlContext.sql(show_all_sql)
val result_gender = sqlContext.sql(gender_stats_sql)
val result_balance = sqlContext.sql(balance_stats_sql)
val result_gender_stats = sqlContext.sql(gender_group_by_sql)

//统计分析数据存入ES中
result_all.saveToEs("i57_stats/all", options)
result_gender.saveToEs("i57_stats/gender", options)
result_balance.saveToEs("i57_stats/balance", options)
result_gender_stats.saveToEs("i57_stats/gender_stats", options)

//关闭回话
sqlContext.stop()

}

}
5 changes: 3 additions & 2 deletions SparkSQL/src/main/scala/com/c503/sparksql/Spark_SQL_3.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ object Spark_SQL_3 {
"es.http.timeout" -> "10s",
"es.scroll.size" -> "5000",
"es.read.field.as.array.include" -> "",
"es.mapping.date.rich" -> "false"
"es.mapping.date.rich" -> "false",
"es.index.auto.create" -> "true"
)

//构建df
val df = sqlContext.read.format("org.elasticsearch.spark.sql").options(options).load("spark_es")
val df = sqlContext.read.format("es").options(options).load("spark_es")

//df.select("name", "age").collect().foreach(println(_))
//临时表
Expand Down

0 comments on commit 79717a1

Please sign in to comment.