-
Notifications
You must be signed in to change notification settings - Fork 2
/
exchsub.scala
59 lines (52 loc) · 1.97 KB
/
exchsub.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.ProcessingTime
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import java.io.FileInputStream
import java.util.Properties
object exchsub {
def main(args: Array[String]): Unit = {
val p = new Properties()
p.load(new FileInputStream("gazua.ini"))
val url = p.getProperty("url")
val user = p.getProperty("user")
val pwd = p.getProperty("pwd")
val writer = new ExchJDBCSink(url, user, pwd)
val spark = SparkSession
.builder()
.appName("Spark Structured Streaming Example")
.master("local[4]")
.getOrCreate()
import spark.implicits._
val schema = StructType(
StructField("eventTime", TimestampType, true) ::
StructField("open", FloatType, true) ::
StructField("high", FloatType, true) ::
StructField("low", FloatType, true) ::
StructField("close", FloatType, true) ::
StructField("change", FloatType, true) ::
StructField("rate", FloatType, true) :: Nil
)
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "bithumb")
.load()
val values = df.select($"key" cast "string", $"value" cast "string").select($"key", from_json($"value", schema) as "data").select("key", "data.*")
val df_count = values.groupBy($"key", window($"eventTime", "1 minutes")).agg(first("close").alias("open"), max("close").alias("high"), min("close").alias("low"), last("close").alias("close"))
val df_res = df_count.select("key", "window.start", "open","high","low","close")
/*df_count.writeStream
//.trigger(ProcessingTime("1 seconds"))
.outputMode("complete")
.format("console")
.start()
.awaitTermination()*/
df_res.writeStream
.foreach(writer)
.outputMode("complete")
.trigger(ProcessingTime("30 seconds"))
.start()
.awaitTermination()
}
}