Skip to content

sudheerpalyam/stock_stream_processing

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

26 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Technical assessment

Currently focusing on performing Moving Averages on Streaming Market Stock data, but this framework is generally extendable for any Data Engineering, Machine Learning Engineering tasks.

@author: Sudheer Palyam

Problem Statement

Design and implement a Scalable, Distributed, Complete solution accepting input data feed and perform ***moving averages*** which helps identifying trends in Stock Market.

Technologies chosen: Apache Spark 2.3, Scala 2.11, SBT 1.0, Kafka 2.0.0, Zookeeper

Analysis of Structured Streaming Sliding Window based Rolling Average Aggregates:

Alt text

As we can see in the output above, Kafka is fed with one message per second (just to demonstrate a slow stream). Spark streaming is set to 3 seconds window, sliding every second. So there will be three messages in each window. Since we are grouping by StockName, in this case AGL. There were two AGL stocks in one sliding window and its aggregates like max, min and avg are computed. So we can observe how Spark Structured Streaming retains messages from previous windows progressing every second. Watermarking is used to limit the state maintenance, as more state to maintain mean more resources utilised.

Moving averages formula provided:
     y(n-k+1) = (x(n-k+1) + x(n-k+2 ---- + x*k) * (1 / K)
        Where n : Window number
              k : number of items in each window
              x : item values

From our sample output:
     y = ( 436.84 + 698.17) * (1 / 2)
     i.e., y = 567.505   <== Moving Window Average value

Data:

[Synthetic Stock Events generated: ](data/sample_input.txt)
[Final Moving Averages Output: ](data/sample_output.txt)
Please refer to above for more rolling averages.

Scalability considerations:

KAFKA:
    1. Partitions with in Topics drives the parallelism of consumers. So increase the number of partitions so that a Spark Consumer Group can process these messages in parallel.
    2. Based on throughput requirements one can pick a rough number of partitions.
        Lets call the throughput from producer to a single partition is P
        Throughput from a single partition to a consumer is C
        Target throughput is T
        Required partitions = Max (T/P, T/C)
 SPARK:
    1. Run in Yarn-Client or Yarn-Cluster mode with kafka consumer group set. Set the Number of Kafka Partitions according to number of Spark Executor tasks feasible on the cluster.
    2. Appropriate Spark Streaming Watermark configuration.
 AWS:
    1. Porting this pipeline to AWS Kinesis Firehose & Analytics will take care of scaling shards automatically and these are also managed serverless services.

Architectural Patterns:

Given problem can be implemented in the following architectures, Considering Cost optimization, Reliability, Operational Efficiency, Performance & Security (CROPS):

1. Classic Approach - Kafka + Spark Window Aggregations (current implementation)

Alt text

2. Serverless Approach - Cloud Native - Infinite Scalability and less management (proposed)

Alt text

3. File Streaming Mode - Spark Window Aggregations (Alternate approach)

Alt text

Machine Learning :

The problem is a typical time series problem. In this you can see if there's seasonality in the time series. In reality a time series problem one needs to decompose the time series to see whether its additive or multiplicative in nature. Below is the scenario:

Below is one such example : https://onlinecourses.science.psu.edu/stat510/node/70/

Usually in a time series we handle 3 scenarios: Seasonality (S) Trend (T) Remainder component (R) For example in a time series for a data point Y, if the above 3 components are additive in nature then its expressed as Y = S + T+ R if this is multiplicative in nature then it would be: Y = S * T * R

We can use following example: https://stackoverflow.com/questions/23402303/apache-spark-moving-average http://xinhstechblog.blogspot.com/2016/04/spark-window-functions-for-dataframes.html https://github.com/apache/spark/blob/v1.4.1/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala

But for now, you can do this in the format:

Feature bookmarks:

Unit Tests

Stock Aggregations Unit Tests results page https://rawgit.com/sudheerpalyam/stock_stream_processing/master/target/test-reports/index.html

Unit Test Results

Quick steps to setup kafka and run locally:

Download from https://kafka.apache.org/downloads

start zookeeper:
$<kafka-dir>/bin/zookeeper-server-start.sh config/zookeeper.properties

start kafka broker(s):
$<kafka-dir>/bin/kafka-server-start.sh config/server.properties

create kafka topics:
$<kafka-dir>/bin/kafka-topics.sh --create --topic "stocks" --replication-factor 1 --partitions 4 --zookeeper localhost:2181
$<kafka-dir>/bin/kafka-topics.sh --create --topic "stocks_averages" --replication-factor 1 --partitions 4 --zookeeper localhost:2181

List Topics:
$<kafka-dir>/bin/kafka-topics.sh  --list --zookeeper localhost:2181
Delete Topic
$<kafka-dir>/bin/kafka-topics.sh  --delete --topic stocks --zookeeper localhost:2181

$<kafka-dir>/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic stocks
$<kafka-dir>/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic stocks

describe:
$<kafka-dir>/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic stocks
$<kafka-dir>/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic stocks_averages

Spark Submit Script:
src/main/resources/submitSpark.sh

nohup spark-submit \
 --deploy-mode client --master yarn \
 --executor-cores 4 --executor-memory 6g --conf spark.yarn.executor.memoryOverhead=1G --conf spark.yarn.driver.memoryOverhead=1G  \
 --jars $(echo /home/sudheerpalyam/jars/*.jar | tr ' ' ',') \
 --class au.com.thoughtworks.assessment.spark.streaming.KafkaStructuredStreaming \
 /home/sudheerpalyam/jars/stock_stream_processing_2.11-0.1.jar \
 --isGlue false \
 --mode  yarn >> ../logs/stock-spark-$runTime.log &

Next Steps:

  1. Integrate a visualization layer based on Kibana & InfluxDB to continuously stream raw vs moving averages
  2. Run Kafka & Spark in Yarn/Mesos/DCOS Clustered Mode
  3. Implement the same pipeline using AWS native Serverless components replacing: Kafka -> AWS Kinesis Streams Spark -> AWS Kinesis Analytics (As there is no serverless equivalent of Spark yet in AWS) Spark Console/Kafka Writer -> AWS Kinesis FireHose Kibana -> AWS QuickSight Scala Kafka Producer -> Kinesis Data Generator
  4. Dockerize all the workflow components and run it in Container managers like Kubernetes or AWS Elastic Kubernetes Service
  5. Enhance Unit Tests and perform Code Coverage and eventually DevOps
  6. SonarQube/Fortify code vulnerability assessment
  7. Associate a Machine Learning use case which can be facilitated by moving averages.
  8. Integrate with actual public Stock streams APIs like and perform RealTime rolling averages : https://globalrealtime.xignite.com/v3/xGlobalRealTime.json/ListExchanges? https://kite.trade/docs/connect/v1/

References:

http://springbrain.blogspot.com/2017/12/spark-scala-perform-data-aggregation-on.html (My blog) https://github.com/soniclavier/bigdata-notebook/blob/master/spark_23 https://github.com/pablo-tech/SparkService--Statistician https://aws.amazon.com/big-data/datalakes-and-analytics/ https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one.html https://vishnuviswanath.com/spark_structured_streaming.html -- Good blog on Structured Streaming https://databricks.com/blog/2017/05/08/event-time-aggregation-watermarking-apache-sparks-structured-streaming.html - Structured Streaming Window aggregations https://github.com/snowplow/spark-streaming-example-project/blob/master/src/main/scala/com.snowplowanalytics.spark/streaming/StreamingCounts.scala - Spark Streaming write to DynamoDB https://github.com/snowplow/spark-streaming-example-project/blob/master/src/main/scala/com.snowplowanalytics.spark/streaming/kinesis/KinesisUtils.scala - Kinesis Utils https://github.com/snowplow/spark-streaming-example-project/blob/master/src/main/scala/com.snowplowanalytics.spark/streaming/storage/DynamoUtils.scala - Dynamo Utils https://www.slideshare.net/SparkSummit/time-series-analytics-with-spark-spark-summit-east-talk-by-simon-Ouellette

About

Stock Trade Analytics leveraging Spark Structured Stream Processing

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published