Data Lakes are widely used by organizations as part of their infrastructure to host both structured (tabular format data like CSVs or JSONS) and non-structured (images, videos, etc...) information. In recent times many companies choose the cloud as the platform where the Data Lake will be maintained since cloud providers (Google, Microsoft, or Amazon) offer services to store objects (blobs) in a long-term fashion, with high data availability, fully managed security, high performance, scalability and different storage types for hot/cold data. These features are important to the end user since it makes possible to create a Data Lake with enough resources to suit the needs of the company or, in other words, the problem of under- or over-provisioning of resources is avoided.
In the use-case presented here a Data Lake will be built on top of Amazon S3 and a data pipeline will be used to ingest data on the Data Lake. To this end, we will work with two sets of data: the song and log datasets. With the information contained in these datasets we can retrieve valuable insights on the customer's usage of a music app like, for example, what is the top 10 songs played this week? or, how long do customers make use of the paid tier subscription?
In this repository we explored the same scenario but using a Data Warehouse instead of a Data Lake. This is a valid alternative to the work presented here as we are working with tabular data. Nowadays, the usage of Data Lakes is gaining popularity due to the possibility of storing both structured and un-structured data and fast integration with other cloud services and APIs.
As mentioned in the introductions, we are going to process the information present in the song and the log dataset. As described below, they encapsulate different information and, therefore, their schema is distinct.
The song dataset is a subset of the Million Song Dataset and it contains information about the songs available in the music app. The records are categorized, among other fields, by artist ID, song ID, title, duration, etc... Each row is written as a JSON with the schema shown below and file organized in folders with the following structure.
./data/song_data/
└── A
├── A
│ ├── A
│ ├── B
│ └── C
└── B
├── A
├── B
└── C
Example of a song data file.
{
"num_songs": 1,
"artist_id": "ARJIE2Y1187B994AB7",
"artist_latitude": null,
"artist_longitude": null,
"artist_location": "",
"artist_name": "Line Renaud",
"song_id": "SOUPIRU12A6D4FA1E1",
"title": "Der Kleine Dompfaff",
"duration": 152.92036,
"year": 0
}
The log dataset contains information about the user interaction with the app (sign-in/out, user ID, registration type, listened songs, etc...). This dataset was build from the events simulator eventsim and, like the song dataset, the information is stored in JSON files. Below you can find the schema of the log dataset.
{
artist TEXT,
auth TEXT,
firstName TEXT,
gender TEXT,
itemInSession INT,
lastName TEXT,
length DOUBLE,
level TEXT,
location TEXT,
method TEXT,
page TEXT,
registration DOUBLE,
sessionId INT,
song TEXT,
status INT,
ts FLOAT,
userId INT
}
For convenience the above datasets are placed in the S3 buckets shown below. However, it is also possible to employ the ./data
file if we want to work with this data from local.
- Song data bucket:
s3://udacity-dend/song_data
- Log data bucket:
s3://udacity-dend/log_data
We are going to make use of an ETL pipeline to process and ingest the song and log data into the Data Lake. ETLs are widely used in the Data Engineering community ant it refers to a series of steps (Extract, Transform and Load) applied on the raw data. We are going to use Python as a programming language and Apache Spark to benefit from the data management efficiency, task parallelization and ease of use. We must keep in mind that there are a myriad of packages that can be used to implement the ETL like, for example, Apache Beam or Apache Hadoop. Below we describe the AWS resources needed to test this repository, the Data Lake structure, and an overview of the ETL pipeline.
The Data Lake will accommodate the song and log data organized in normalized form with the shape of a star-schema. The star-schemas have a characteristic structure modeled by a single fact table that is connected to the dimension tables via foreign keys. While the fact table contains data related to measurements, metrics or other core aspects of a business process, the dimension tables are used to add descriptive data. In our case the fact table will include data of the songs played by the users during a particular session and the dimension tables will provide additional details on the song, the artist and user details, for example. Below you can find the schema of the fact and dimension tables.
songplay_id INT,
start_time TIMESTAMP,
user_id INT,
level VARCHAR,
song_id VARCHAR,
artist_id VARCHAR,
session_id INT,
location VARCHAR,
user_agent VARCHAR
artist_id VARCHAR,
artist_name VARCHAR,
artist_location VARCHAR,
artist_latitude INT,
artist_longitude INT
song_id VARCHAR,
title VARCHAR,
artist_id VARCHAR,
year INT,
duration FLOAT
start_time TIMESTAMP,
hour INT,
day INT,
week INT,
month INT,
year INT,
weekday INT
user_id INT,
first_name VARCHAR,
last_name VARCHAR,
gender VARCHAR,
level VARCHAR
We are going to build the Data Lake on top of Amazon S3. In S3 the data is stored inside of buckets. There are multiple ways to create an S3 bucket: from the AWS portal, using the AWS SDK or via any of the supported APIs. Below we describe how to create a bucket via the AWS SDK with the command [mb
] (https://awscli.amazonaws.com/v2/documentation/api/latest/reference/s3/mb.html) in the us-west-1
region.
$ aws s3 mb s3://<BUCKET_NAME> --region us-west-1
When running the above command, take into account that you must create an AWS account and have a valid AWS credentials. Also, keep in mind the creation rules to avoid problems related to naming conventions.
The pipeline code can be found in the script ./etl.py
. The main function of this code performs the following steps:
- User authentication: the AWS access key and the AWS secret key of the user are retrieved from the file
./dl.cfg
and added as environmental variables. Here you can find more information about credentials authentication withboto3
.
User authentication coded in ./etl.py
config = configparser.ConfigParser()
config.read('dl.cfg')
os.environ['AWS_ACCESS_KEY_ID']=config['AWS_SECURITY']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS_SECURITY']['AWS_SECRET_ACCESS_KEY']
Format of ./dl.cfg
for user credentials
[AWS_SECURITY]
AWS_ACCESS_KEY_ID=<AWS_ACCESS_KEY_ID>
AWS_SECRET_ACCESS_KEY=<AWS_SECRET_ACCESS_KEY>
- Spark Session: Definition of the SparkSession variable. This parameter is the main entry point of the Apache Spark functionality or, in other words, the variable that we will employ to make use of the Spark SQL and DataFrame features. The Spark Session is instantiated in
./etl.py
.
spark = create_spark_session()
The method create_spark_session
can be found in ./lib.py
def create_spark_session(log_level = "ERROR"):
[...]
spark = SparkSession \
.builder \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
.getOrCreate()
spark.sparkContext.setLogLevel(log_level)
return spark
-
Song data processing: the song files are read from their cloud location at
s3://udacity-dend/song-data/*/*/*/*.json
and the data is loaded in memory as a Spark DataFrame. From this data we can extract thesongs
andartists
tables and save this information as parquet files in the storage bucket created previously. -
Log data processing: like the song data, the log entries are loaded as a Data Frame. At this stage, we can retrieve the
songplays
fact table (by joining the song and log DataFrames) and thetime
andusers
tables. Like in the previous step, these tables are saved as parquet files in the storage bucket created previously.
Below you can find the usage help of ./etl.py
. This script can be executed with two flags --input_data
and --output_data
that refer to the input and output data location (local or cloud).
usage: etl.py [-h] [--input_data INPUT_DATA] [--output_data OUTPUT_DATA]
optional arguments:
-h, --help show this help message and exit
--input_data INPUT_DATA
Path of the input data where the log and song datasets are located.
--output_data OUTPUT_DATA
Path of the output data where the parquet files will be written.
If --input_data
is not provided the song and log data will be read from s3://udacity-dend/song-data
and s3://udacity-dend/log-data
respectively. On the other hand, if --output_data
is not specified the parquet files will be written locally in the directory ./sparkify-output-data
Usage example
$ python etl.py --output_data s3://<BUCKET_NAME>