-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfilmengine.py
142 lines (123 loc) · 4.19 KB
/
filmengine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
'''
Description
-----------
The FilmEngine is an end-to-end ETL pipeline that processes data from Kaggle,
applies business transformations with Spark and loads it into a PostgreSQL DB.
Author: Alexander Sommer
Initial Release: 20/09/2020
'''
# Built-in Libraries
import os
import zipfile
import fnmatch
# Other Libraries
import wikipedia
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id, lit, udf
from kaggle.api.kaggle_api_extended import KaggleApi
# Own Modules
from modules.kaggle_extractor import download_kaggle, decompress_kaggle
from docs.env_variables import database, tgt_table, user, password, jdbcUrl, jdbcDriver
from modules.wiki_scraper import wikilink, wikiabstract
# Variables
kaggle_dataset = 'rounakbanik/the-movies-dataset'
kaggle_file_name = 'movies_metadata.csv'
files_dir = './files/'
"""
Extract
--------
The following section extracts a dataset from Kaggle
and decompresses it in the "/files/" subfolder.
"""
# Initiate the Spark engine
spark = SparkSession \
.builder \
.appName("FilmEngine") \
.config("spark.jars", "./docs/postgresql-42.2.16.jar") \
.getOrCreate()
sc = spark.sparkContext.getOrCreate()
# Download and extract the kaggle metadata file
print("\n Downloading and decompressing the file from " + kaggle_dataset + "...\n")
download_kaggle(kaggle_dataset,kaggle_file_name,files_dir)
decompress_kaggle(kaggle_file_name, files_dir)
print("\n...The " + kaggle_file_name + " has been downloaded and extracted in the " + files_dir + " directory.\n")
"""
Transform
---------
Process the CSV file and apply transformations using Spark.
This happens in a few key steps
1. Building a DataFrame from the CSV
2. Creating a temporary view, applying the business logic
3. Populating the Wiki links and abstracts
"""
# Build a DataFrame from the CSV file
print("\n Building a Spark DataFrame and a Temporary view" + "...\n")
df = spark.read \
.option('header', 'true') \
.csv(files_dir + kaggle_file_name)
df2 = df.select(df["original_title"].cast('string').alias("title"),\
df["budget"].cast('integer').alias("budget"),
df["release_date"].cast('date').alias("release_date"),
df["revenue"].cast('integer').alias("revenue"),
df["vote_average"].cast('float').alias("rating"),
df["production_companies"].cast('string').alias("production_company"))
# Create a temporary view, applying calculations and adding new columns
df2.createOrReplaceTempView("metadata")
sqlDF = spark.sql(
"SELECT \
title, \
budget, \
year(release_date) as year, \
revenue, \
rating, \
budget/revenue as ratio, \
production_company \
FROM \
metadata\
WHERE \
revenue IS NOT NULL\
AND revenue != 0\
AND budget != 0\
AND rating IS NOT NULL\
AND budget > 10\
ORDER BY ratio asc\
LIMIT 1000"
)
sqlDF = sqlDF.select("*")\
.withColumn("wiki_abstract",lit(None).cast('string'))\
.withColumn("wiki_link",lit(None).cast('string'))
sqlPDF = sqlDF.select("*").toPandas()
print("\n ...the DataFrame has now been built.\n")
sqlDF.show()
# Populate the DataFrame with Wikipedia links and abstracts
print("\n Now populating the Wikipedia links and abstracts...\n")
m = 0
while m <= 1000:
func_val = sqlPDF.at[m, 'title']
link = wikilink(func_val)
abstract = wikiabstract(func_val)
sqlPDF.at[m, 'wiki_link'] = link
sqlPDF.at[m, 'wiki_abstract'] = abstract
print("Row " + str(m) + " has been populated for the film: " + func_val)
m = m + 1
sqlDF = spark.createDataFrame(sqlPDF)
print("\n ...links and abstracts populated.\n")
"""
Load
----
Finally, load the final DataFrame into a PostgreSQL DB.
This is the final step in the process.
"""
print("\n Loading the final DataFrame into the PostgreSQL DB.\n")
sqlDF.select("title","budget", "year", "revenue", "ratio", "production_company", "wiki_link", "wiki_abstract") \
.write.format("jdbc") \
.mode("overwrite") \
.option("url", jdbcUrl) \
.option("dbtable", tgt_table) \
.option("user", user) \
.option("password", password) \
.option("driver", jdbcDriver) \
.save()
sqlDF.show()
print("\n ...the ETL process is now complete! \n")