Skip to content

Commit

Permalink
Merge pull request #1 from ashubhumca/master
Browse files Browse the repository at this point in the history
Added basic examples
  • Loading branch information
ashubhumca authored Jul 7, 2020
2 parents 6635943 + 40e3597 commit c758f7b
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 0 deletions.
55 changes: 55 additions & 0 deletions streaming/kafka_to_hive_acid.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/* In this example, it receive events from kafka source and dump it into a non-partitioned Hive ACID table.
Note: Your Hive table schema has to be same as incoming event schema. */

import java.sql.Timestamp

import sys.process._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types._
import spark.implicits._

/* Schema of the incoming event*/
val mySchema = (new StructType)
.add("sno", IntegerType)
.add("observation_date", StringType)
.add("state", StringType)
.add("country", StringType)
.add("last_update", StringType)
.add("confirmed", StringType)
.add("deaths", StringType)
.add("recovered", StringType)

/* Update your kafka end-point and checkpoint-location*/
val brokerEndPoint = "<your-end-point:port>"
val checkPointLocation = "<your-checkpoint-bukcet/cp1>"
/* Kafka topic and Hive sink detail*/
val tableName = "<db-name.table-name>"
val topic = "<your-kafka-topic-name>"

var ds_kafka = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerEndPoint)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.option("kafkaConsumer.pollTimeoutMs", 512)
.option("fetchOffset.numRetries", 3)
.option("fetchOffset.retryIntervalMs", 10)
.load()
val df_kafka = ds_kafka.selectExpr("CAST(value AS STRING)").as[(String)]
val df1_kafka = df_kafka.select(from_json($"value", mySchema).as("data")).select("data.*")
val query = df1_kafka.writeStream
.queryName("demo-acid-sink")
.format("HiveAcid")
.options(Map("table" ->tableName))
.outputMode("Append")
.trigger(ProcessingTime("10 seconds"))
.option("table.metastore.stopOnFailure", "false")
.option("table.metastore.updateIntervalSeconds", 10)
.option("checkpointLocation", checkPointLocation)
.start()


query.awaitTermination
52 changes: 52 additions & 0 deletions streaming/kafka_to_hive_non_partitioned.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/* In this example, it receive events from kafka source and dump it into a non-partitioned Hive table.
Note: Your Hive table schema has to be same as incoming event schema. */

import java.sql.Timestamp

import sys.process._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types._
import spark.implicits._

/* Schema of the incoming event*/
val mySchema = (new StructType)
.add("sno", StringType)
.add("observation_date", StringType)
.add("state", StringType)
.add("country", StringType)
.add("last_update", StringType)
.add("confirmed", StringType)
.add("deaths", StringType)
.add("recovered", StringType)

/* Update your kafka end-point and checkpoint-location*/
val brokerEndPoint = "<your-end-point:port>"
val checkPointLocation = "<your-checkpoint-bukcet/cp1>"
/* Kafka topic and Hive sink detail*/
val tableName = "<db-name.table-name>"
val topic = "<your-kafka-topic-name>"

var ds_kafka = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerEndPoint)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.option("kafkaConsumer.pollTimeoutMs", 512)
.option("fetchOffset.numRetries", 3)
.option("fetchOffset.retryIntervalMs", 10)
.load()
val df_kafka = ds_kafka.selectExpr("CAST(value AS STRING)").as[(String)]
val df1_kafka = df_kafka.select(from_json($"value", mySchema).as("data")).select("data.*")
val query = df1_kafka.writeStream
.queryName("stream_to_hive")
.appendToTable(tableName)
.outputMode("Append")
.option("table.metastore.stopOnFailure", "false")
.option("table.metastore.updateIntervalSeconds", 10)
.option("checkpointLocation", checkPointLocation)
.start()

query.awaitTermination
88 changes: 88 additions & 0 deletions streaming/streaming_notify.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# This is the class to send the notification to a slack web-hook
class Slack_ForeachWriter:
'''
Class to send alerts to a Slack Channel.
When used with `foreach`, copies of this class is going to be used to write
multiple rows in the executor. See the python docs for `DataStreamWriter.foreach`
for more details.
'''

def __init__(self, url):
self.webhook_url = url

def open(self, partition_id, epoch_id):
# This is called first when preparing to send multiple rows.
# Put all the initialization code inside open() so that a fresh
# copy of this class is initialized in the executor where open()
# will be called.
# This will be a no-op for our case
return True

def process(self, row):

# This is called for each row after open() has been called.
# This implementation sends one row at a time.
# Column names in row depend on the column names in streaming dataframe.
# In this case, row has columns `value` and `count`. Modify coulmn names depending on your schema.
# We check a predicate and send alert to slack channel if predicate is true.

import json
import requests

print(row['count'])
print(self.webhook_url)

# Modify Predicate Here, Right now it checks if count of a row is multiple of 5
if row['count'] % 5 == 0:

slack_data = {'text': "Reached {} count for {}".format(row['count'], row['value'])}

response = requests.post(
self.webhook_url, data=json.dumps(slack_data),
headers={'Content-Type': 'application/json'}
)
if response.status_code != 200:
raise ValueError(
'Request to slack returned an error %s, the response is:\n%s'
% (response.status_code, response.text)
)

def close(self, err):
# This is called after all the rows have been processed.
if err:
raise err

from pyspark.sql.functions import *
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("stream_analytics_notify_ashish_app").getOrCreate()

# Modify webhook_url and Kafka end-point here
webhook_url = 'https://hooks.slack.com/services/ABCDEFGHIJKL'
brokerEndPoint = "<your-end-point:port>"
# Modify topic-name here
topic = "<your-kafka-topic-name>"
# Modify checkpoint-location here
checkpoint_location = "<your-checkpoint-bukcet/cp1>"

query = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", brokerEndPoint)
.option("subscribe", "qubole")
.option("startingOffsets", "earliest")
.load()
# Kafka schema has key, value, we select value and do an aggregated count
# Modify the logic based on your schema
.selectExpr("CAST(value as STRING)")
.groupBy("value")
.count()
.toDF("value", "count")
.writeStream
.queryName("stream_analytics_notify_ashish")
.option("checkpointLocation", checkpoint_location)
.foreach(Slack_ForeachWriter(webhook_url))
.outputMode("complete")
.start()
)

query.awaitTermination()

0 comments on commit c758f7b

Please sign in to comment.