Skip to content

DataEngDev/aws-streaming-pipeline

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

44 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Build Status

aws-streaming-pipeline

Requirements

Development

Install the libs running:

pip install -r requirements.txt
for i in consumers/*/; do pip install -r $i"requirements.txt"; done

Infrastructure

The infrastructure is provisioned using Cloudformation. It is split in 2 mains stacks:

  • data-sources stack that consists of:

    • S3 Bucket used for to store the artifacts produced to deploy the Lambda functions
    • S3 Bucket used to store the Output of the Lambda functions
  • stream stack that consists of:

    • Kinesis stream with 2 shards
    • Lambda function write_to_s3, setup as consumer for the stream, to write the events to S3
    • Execution role for the Lambda function write_to_s3
    • Lambda function send_to_firehose, setup as consumer for the stream, to send the event to Firehose Delivery Stream
    • Execution role for the Lambda function send_to_firehose
    • Firehose Delivery Stream, configured to deliver to S3
    • Firehose Delivery Stream Execution Role, necessary to put object to S3

Architecture

alt text

Init the infrastructure

  • First create the data-sources stack

    bash scripts/create_data_source_stack.sh
  • Create a zipped file to init the lambda functions, and upload to S3

      cd consumers/init_consumer
      zip -r init.zip .
      aws s3 cp init.zip s3://streaming-pipeline-dev-deployment/consumers/
      cd ..
      
  • Create the stream stack

    bash scripts/create_stream_stack.sh

    NOTE: if the S3 key consumers/init.zip is not uploaded in the S3 Bucket used for the deployments, the stack creation will fail

All the stack and resources contains a stage parameter, to handle multiple stages (e.g. production)

Consumers

The consumers implementation is located in the consumers folder. We have one folder for each consumer, plus one folder used to init the consumer on first Cloudformation stack creation.

Producer

In order to test the proposed solution, we needed to implement a producer, that put the records to the stream. The producer use a sample csv from a Kaggle competition It's possible to the send sample records, just calling:

python producer/producer.py

Unit tests

pytest -vv tests/*

CI/CD

At each push to a remote branch Travis is triggered. It will take care of:

  • running the unit tests
  • preparing the zipped lambda function(containing all needed requirements)
  • update the stream stack with the latest build code for the lambdas

NOTE: it's possible to have a production stack, that is deployed only when merging a PR in the master branch

It's also possible to manually update the stream stack locally simply running:

bash scripts/deploy_stream_stack_locally.sh 

To run the script you need to have Docker. The script will use a Docker image to packaged the lambdas. There are some python libs (e.g. pandas) that need to be installed in a Linux environment to have them working inside the Lambda environment.

Considerations

The records in Kinesis are consumed by 2 lambda functions. The write_to_s3 lambda receive the records and write them as JSON to S3, creating a unique path each time that a nex batch is received. The file written by such consumer can be really small, the size can be even smaller when adding more shards. This is due to the fact that the lambda is invoked many times with a limited amount of events. The produced files are hard to read with applications like Spark/Presto/Athena, because they will create a not needed network overhead. To solve this problem we can easily create a consumer that send the record to Firehose, then firehose will take care of writing the file to S3 only when specific conditions are reached, for example in this implementations the files saved written to S3 with a max size of 5MB. We also implemented a JSONlines structure to make the file easier to read for Spark/Presto/Athena. To test that, it's possible to create an Athena table, DDL are in athena/taxi_trajectories.sql.

Improvements

  • It's possible to add a Lambda function that autoscale the number of Kinesis shards to support spikes of Incoming Bytes. This can be achive setting up a Cloudwatch alarm that trigger the lambda when a specific limit is reached. The lambda then calculates the shards number based on the latest incoming bytes
  • Add Alarms to monitor when the invocations of the lambda functions fails
  • Adding a JSON schema validation process in the lambdas will guarantee to have more quality in the received records. This is event more valid when implementing a consumer that write to a DB, that generally is much less flexible then S3

About

Streaming pipeline based on AWS resources

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Python 76.4%
  • Shell 23.6%