Skip to content

Commit

Permalink
Merge pull request databricks#75 from databricks/ecommerce_demo
Browse files Browse the repository at this point in the history
Ecommerce demo updates
  • Loading branch information
dennyglee authored May 20, 2021
2 parents 13a8785 + d11bc54 commit 6c0d79f
Show file tree
Hide file tree
Showing 8 changed files with 400 additions and 280 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Databricks notebook source
# MAGIC %md
# MAGIC #Auto-loader demo
# MAGIC We show here an example to process and save data in real-time:
# MAGIC - We iteratively save each hour of data as a csv file in a folder (Software Engineer notebook)
# MAGIC - A structured streaming command automatically picks new files to append them to a Delta table. We use the **auto-loader**: https://docs.databricks.com/spark/latest/structured-streaming/auto-loader.html
# MAGIC - As the category_code is not in the input data, we add it during the streaming step
# MAGIC
# MAGIC More on structured streaming: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#overview

# COMMAND ----------

email = 'yourEmail'
basePath = f"/Users/{email}/ecommerce-demo"

# COMMAND ----------

# MAGIC %md
# MAGIC Provide schema of input data

# COMMAND ----------

from pyspark.sql.types import StructType
schema = StructType().add("event_time", "timestamp") \
.add("event_type", "string") \
.add("product_id", "integer") \
.add("category_id", "long") \
.add("brand", "string") \
.add("price", "float") \
.add("user_id", "integer") \
.add("user_session", "string")

# COMMAND ----------

# MAGIC %md
# MAGIC Dimension table

# COMMAND ----------

df_categories = table("ecommerce_demo.categories")

# COMMAND ----------

# MAGIC %md
# MAGIC Input stream

# COMMAND ----------

df_raw = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("cloudFiles.validateOptions", "false") \
.schema(schema) \
.load(f"{basePath}/Streaming") \
.select("event_time", "event_type", "product_id", "category_id", "brand", "price", "user_id", "user_session")

# COMMAND ----------

# MAGIC %md
# MAGIC Join with dimension table to add category

# COMMAND ----------

df = df_raw \
.join(df_categories, on=['category_id'], how='left') \
.select("event_time", "event_type", "product_id", "category_id", "category", "brand", "price", "user_id", "user_session")

# COMMAND ----------

# MAGIC %md
# MAGIC Write output

# COMMAND ----------

df.writeStream.format("delta") \
.outputMode("append") \
.option("checkpointLocation", f"{basePath}/StreamingCheckpoint") \
.table('ecommerce_demo.events_stream')

# COMMAND ----------


154 changes: 154 additions & 0 deletions samples/2021-04-21 | DAIS21 demo/DAIS21 - eCommerce/Data Scientist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
# Databricks notebook source
# MAGIC %md
# MAGIC # Data science
# MAGIC 1. We start from user actions with the top 10 categories.
# MAGIC 2. We pick a category: electronics.audio.headphone.
# MAGIC 3. We want to predict whether a user is a potential headphone buyer based on his/her actions with the other categories.

# COMMAND ----------

# MAGIC %md
# MAGIC # 1. Data prep

# COMMAND ----------

# MAGIC %md
# MAGIC Define broad pool of features, and one feature to predict

# COMMAND ----------

import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score

## Select features (10 categories * 3 actions = 30 features)
categories = ['electronics.smartphone',
'electronics.audio.headphone',
'electronics.video.tv',
'electronics.clocks',
'appliances.kitchen.washer',
'computers.notebook',
'appliances.environment.vacuum',
'appliances.kitchen.refrigerators',
'apparel.shoes',
'electronics.tablet']
category_event = []
for cat in categories:
category_event += [f'{cat}_purchase', f'{cat}_cart', f'{cat}_view']

## Feature to predict
target_category = 'electronics.audio.headphone'
target = f'{target_category}_purchase'
features = [x for x in category_event if target_category not in x]

# COMMAND ----------

# MAGIC %md
# MAGIC Select desired columns and convert to Pandas dataframe

# COMMAND ----------

cat_str = '`, `'.join(category_event)
df = spark.sql(f"Select user_id, `{cat_str}` From ecommerce_demo.user_features").toPandas()
print(len(df))

# COMMAND ----------

# MAGIC %md
# MAGIC Select balanced sample of buyers and non-buyers of the target_category

# COMMAND ----------

df_select = df[df[target] == 1].append(df[df[target] == 0].sample(np.sum(df[target] == 1)))
print(len(df_select))

# COMMAND ----------

# MAGIC %md
# MAGIC # 2. First model
# MAGIC We use Logistic regression.

# COMMAND ----------

X, Y = df_select[features], df_select[target]
model = LogisticRegression(random_state=0)
print(f'Random guess score = {round(np.mean(cross_val_score(model, 0*X, Y, cv=5)), 3)}')
print(f'Model score = {round(np.mean(cross_val_score(model, X, Y, cv=5)), 3)}')
print(f'Number of features = {len(features)}')

# COMMAND ----------

# MAGIC %md
# MAGIC # 3. Feature selection with MLFlow tracking
# MAGIC We use recursive feature elimination. We use MLFlow to save the model and track its performance atmosphere each iteration.

# COMMAND ----------

email = 'yourEmail'
experiment_id = mlflow.create_experiment(f"/Users/{email}/Headphone purchase prediction - Feature selection")

# COMMAND ----------

import mlflow
import mlflow.sklearn
from operator import itemgetter

def run(features_select, target, run_name, experiment_id):
"""1. Run cross-validation based on features_select
2. Log model characteristics
3. Return list of features sorted by importance"""
## Log run name
with mlflow.start_run(run_name=run_name, experiment_id=experiment_id):
## Finding score
X, Y = df_select[features_select], df_select[target]
score = np.mean(cross_val_score(model, X, Y, cv=5))

## feature importance
clf = model.fit(X, Y)
feature_importance = [[features_select[i],
clf.coef_[0][i],
abs(np.sum(df_select[features_select[i]]) * clf.coef_[0][i])] for i in range(len(features_select))]
feature_importance_sorted = sorted(feature_importance, key = itemgetter(2), reverse=True)

## Logging score, model and importance
mlflow.log_metric("CV score", score)
mlflow.sklearn.log_model(model, "Logistic Regression")
mlflow.log_dict([[x[0],
f"coefficient = {str(x[1])}",
f"importance = {str(x[2])}"] for x in feature_importance_sorted], "feature_importance.yml")
mlflow.end_run()
return feature_importance_sorted


model = LogisticRegression(random_state=0)
features_select = features
feature_importance_sorted = run(features_select, target, "All features", experiment_id)
while len(features_select) > 1:
features_select = [x[0] for x in feature_importance_sorted[:-1]]
feature_removed = feature_importance_sorted[-1][0]
feature_importance_sorted = run(features_select, target, f"Removing {feature_removed}", experiment_id)

# COMMAND ----------

# MAGIC %md
# MAGIC # 4. Final model
# MAGIC Features from .yml file are: [electronics.clocks_view, electronics.smartphone_purchase]

# COMMAND ----------

model_uri = 'runs:/37752b08ff754f53b149581537aad160/Logistic Regression'
final_model = mlflow.sklearn.load_model(model_uri)
print(f"A user who viewed a Smart Watch (electronics.clocks) and purchased a smartphone has a {round(final_model.predict_proba([[1, 1]])[0][1] * 100)}% probablility to be a headphone buyer.")

# COMMAND ----------

# MAGIC %md
# MAGIC # Other MLFlow features
# MAGIC 1. MLFlow Projects
# MAGIC 2. MLFlow Model
# MAGIC 3. MLFlow Model Registry
# MAGIC 4. More to come...

# COMMAND ----------


Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ set spark.databricks.delta.properties.defaults.autoOptimize.autoCompact = true
-- COMMAND ----------

-- MAGIC %md
-- MAGIC # Create business aggregated table
-- MAGIC # DA: Create business aggregated table

-- COMMAND ----------

Expand All @@ -20,11 +20,13 @@ Group By category_code, date
-- COMMAND ----------

-- MAGIC %md
-- MAGIC # Create Feature table
-- MAGIC # DS: Create Feature table

-- COMMAND ----------

-- MAGIC %python
-- MAGIC from pyspark.sql.functions import expr
-- MAGIC
-- MAGIC spark.sql("""Select user_id, concat(category_code, '_', event_type) as category_event
-- MAGIC From ecommerce_demo.events
-- MAGIC Where category_code is not null
Expand Down
135 changes: 135 additions & 0 deletions samples/2021-04-21 | DAIS21 demo/DAIS21 - eCommerce/Prepare data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Databricks notebook source
# MAGIC %md
# MAGIC #1. Download Kaggle data from Terminal
# MAGIC - pip install kaggle
# MAGIC - KAGGLE_CONFIG_DIR='/dbfs/Users/{your-email}/ecommerce-demo/' [assumes that you have your Kaggle token kaggle.json stored in this folder]
# MAGIC - export KAGGLE_CONFIG_DIR
# MAGIC - kaggle datasets download mkechinov/ecommerce-behavior-data-from-multi-category-store -p /dbfs/Users/{your-email}/ecommerce-demo/
# MAGIC
# MAGIC More information on https://www.kaggle.com/mkechinov/ecommerce-behavior-data-from-multi-category-store

# COMMAND ----------

# MAGIC %md
# MAGIC #2. Convert downloaded data to Delta
# MAGIC Unzip downloaded file, then move it to desired folder

# COMMAND ----------

email = 'yourEmail'
basePath = f"/Users/{email}/ecommerce-demo"

# COMMAND ----------

# MAGIC %sh
# MAGIC unzip /dbfs/Users/{email}/ecommerce-demo/ecommerce-behavior-data-from-multi-category-store.zip

# COMMAND ----------

dbutils.fs.mv("file:/databricks/driver/2019-Oct.csv", f"{basePath}/2019-Oct.csv")
dbutils.fs.mv("file:/databricks/driver/2019-Nov.csv", f"{basePath}/2019-Nov.csv")

# COMMAND ----------

dbutils.fs.ls(f"{basePath}/")

# COMMAND ----------

# MAGIC %md
# MAGIC #3. Save to Delta
# MAGIC - Write to Delta = save data as multiple Parquet files (Parquet = better than csv for performance, size), but process it as one table, and a lot more functionalities: https://delta.io/
# MAGIC - Use Spark to process data in parallel

# COMMAND ----------

# MAGIC %sql
# MAGIC set spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true;
# MAGIC set spark.databricks.delta.properties.defaults.autoOptimize.autoCompact = true

# COMMAND ----------

# MAGIC %sql
# MAGIC CREATE DATABASE ecommerce_demo

# COMMAND ----------

df = spark.read.option("header", "true").csv(f"{basePath}/*.csv")

df.write \
.format("delta") \
.mode("overwrite") \
.saveAsTable('ecommerce_demo.events')

# COMMAND ----------

# MAGIC %sql
# MAGIC OPTIMIZE ecommerce_demo.events

# COMMAND ----------

# MAGIC %md
# MAGIC The 2 files add up to ~13GB
# MAGIC The Delta table is ~4.2GB (>3X gain vs .csv)

# COMMAND ----------

# MAGIC %sql
# MAGIC Select * From ecommerce_demo.events Limit 10

# COMMAND ----------

# MAGIC %md
# MAGIC # 4. Prepare dimension table, raw table w/o category_code and streaming directory

# COMMAND ----------

# MAGIC %md
# MAGIC Create the table that contains the category_id -> category_code mapping

# COMMAND ----------

# MAGIC %sql
# MAGIC Create or Replace Table ecommerce_demo.categories Using Delta AS
# MAGIC Select cast(category_id as bigint) as category_id,
# MAGIC max(category_code) as category
# MAGIC From ecommerce_demo.events
# MAGIC Group By category_id

# COMMAND ----------

# MAGIC %md
# MAGIC We create a table without category_code for the purpose of the demo

# COMMAND ----------

# MAGIC %sql
# MAGIC Create or Replace Table ecommerce_demo.events_raw Using Delta AS
# MAGIC Select event_time,
# MAGIC event_type,
# MAGIC product_id,
# MAGIC category_id,
# MAGIC brand,
# MAGIC price,
# MAGIC user_id,
# MAGIC user_session
# MAGIC From ecommerce_demo.events

# COMMAND ----------

# MAGIC %md
# MAGIC Make streaming directories

# COMMAND ----------

# Clean up old directories and tables
dbutils.fs.rm(f"{basePath}/Streaming", recurse=True)
dbutils.fs.rm(f"{basePath}/StreamingCheckpoint", recurse=True)
dbutils.fs.rm("dbfs:/user/hive/warehouse/ecommerce_demo.db/events_stream", recurse=True)

# Create directories for streaming input, and streaming checkpoints
dbutils.fs.mkdirs(f"{basePath}/Streaming")
dbutils.fs.mkdirs(f"{basePath}/StreamingCheckpoint")

# COMMAND ----------


Loading

0 comments on commit 6c0d79f

Please sign in to comment.