Skip to content

Commit

Permalink
Cookbook window functions (zero-one-group#121)
Browse files Browse the repository at this point in the history
* Trying out dropdown

* Removed dropdown

* First draft of part 8 of cookbook

* Added link in README1

* Proof-read + revised part 8

* Added credit to u/joinr

* Sanitised resources directory

* Swapped target with data dir

* Added overwrite to make cokbook-code idempotent
  • Loading branch information
anthony-khong authored Jul 25, 2020
1 parent 121472a commit ba42c91
Show file tree
Hide file tree
Showing 15 changed files with 241 additions and 30 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Geni is designed to provide an idiomatic Spark interface for Clojure without the
5. [String Operations](docs/cookbook/part_5_string_operations.md)
6. [Cleaning up Messy Data](docs/cookbook/part_6_cleaning_up_messy_data.md)
7. [Timestamps and Dates](docs/cookbook/part_7_timestamps_and_dates.md)
8. Windowing Functions [TBD]
8. [Window Functions](docs/cookbook/part_8_window_functions.md)
9. Loading Data from SQL Databases [TBD]

[![cljdoc badge](https://cljdoc.org/badge/zero.one/geni)](https://cljdoc.org/d/zero.one/geni/CURRENT)
Expand Down Expand Up @@ -155,7 +155,7 @@ You would also need to add Spark as provided dependencies. For instance, have th
[ml.dmlc/xgboost4j_2.12 "1.0.0"]]}
```

You may also need to install `libgomp1` to train XGBoost4j models. When the optional dependencies are not present, the vars to the corresponding functions (such as `ml/xgboost-classifier`) will be left unbound.
You may also need to install `libatlas3-base` and `libopenblas-base` to use a native BLAS, and install `libgomp1` to train XGBoost4j models. When the optional dependencies are not present, the vars to the corresponding functions (such as `ml/xgboost-classifier`) will be left unbound.

## License

Expand All @@ -171,4 +171,5 @@ Some code was taken from:
* [LispCast](https://lispcast.com/) for [exponential backoff](https://lispcast.com/exponential-backoff/).
* Reddit users [/u/borkdude](https://old.reddit.com/user/borkdude) and [/u/czan](https://old.reddit.com/user/czan) for [with-dynamic-import](src/zero_one/geni/utils.clj).
* StackOverflow user [whocaresanyway's answer](https://stackoverflow.com/questions/1696693/clojure-how-to-find-out-the-arity-of-function-at-runtime) for `arg-count`.
* [Pandas Cookbook](https://github.com/jvns/pandas-cookbook) for its syllabus.
* [Julia Evans'](https://jvns.ca/) [Pandas Cookbook](https://github.com/jvns/pandas-cookbook) for its syllabus.
* Reddit user [/u/joinr](https://old.reddit.com/user/joinr) for helping with [unit-testing the REPL](test/zero_one/geni/main_test.clj).
File renamed without changes.
87 changes: 78 additions & 9 deletions docs/cookbook/cookbook-code.clj
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

;; Part 1: Reading and Writing Datasets
(def bikes-data-url "https://raw.githubusercontent.com/jvns/pandas-cookbook/master/data/bikes.csv")
(def bikes-data-path "resources/cookbook/bikes.csv")
(def bikes-data-path "data/cookbook/bikes.csv")
(download-data! bikes-data-url bikes-data-path)

;; 1.1 Creating a Spark Session
Expand Down Expand Up @@ -72,13 +72,13 @@
;; 1.3 Describing Columns

;; 1.4 Writing Datasets
(g/write-parquet! renamed-df "resources/cookbook/bikes.parquet")
(g/write-parquet! renamed-df "data/cookbook/bikes.parquet" {:mode "overwrite"})

;; Part 2: Selecting Rows and Columns
(def complaints-data-url
"https://raw.githubusercontent.com/jvns/pandas-cookbook/master/data/311-service-requests.csv")
(def complaints-data-path
"resources/cookbook/complaints.csv")
"data/cookbook/complaints.csv")
(download-data! complaints-data-url complaints-data-path)

(def raw-complaints
Expand Down Expand Up @@ -213,7 +213,7 @@
"&timeframe=1&submit=Download+Data"))

(defn weather-data-path [year month]
(str "resources/cookbook/weather/weather-" year "-" month ".csv"))
(str "data/cookbook/weather/weather-" year "-" month ".csv"))

(defn weather-data [year month]
(download-data! (weather-data-url year month) (weather-data-path year month))
Expand Down Expand Up @@ -300,7 +300,7 @@ columns-to-select
(mapv (partial weather-data 2012) (range 1 13))

(def unioned
(-> (g/read-csv! spark "resources/cookbook/weather" {:inferSchema "true"})
(-> (g/read-csv! spark "data/cookbook/weather" {:inferSchema "true"})
normalise-column-names
(g/select (g/columns weather-mar-2012))))

Expand All @@ -310,11 +310,11 @@ columns-to-select
(g/order-by :year :month)
g/show)

(g/write-csv! unioned "resources/cookbook/weather-2012.csv")
(g/write-csv! unioned "data/cookbook/weather-2012.csv" {:mode "overwrite"})

;; Part 5: String Operations
(def weather-2012
(g/read-csv! spark "resources/cookbook/weather-2012.csv" {:inferSchema "true"}))
(g/read-csv! spark "data/cookbook/weather-2012.csv" {:inferSchema "true"}))

;; 5.1 Finding The Snowiest Months
(-> weather-2012
Expand Down Expand Up @@ -361,7 +361,7 @@ columns-to-select

;(def complaints
;(normalise-column-names
;(g/read-csv! spark "resources/cookbook/complaints.csv" {:inferSchema "true"})))
;(g/read-csv! spark "data/cookbook/complaints.csv" {:inferSchema "true"})))

;; 6.1 Messy Zip Codes
(-> complaints g/dtypes :incident-zip)
Expand Down Expand Up @@ -439,7 +439,7 @@ columns-to-select
"https://raw.githubusercontent.com/jvns/pandas-cookbook/master/data/popularity-contest")

(def popularity-contest-data-path
"resources/cookbook/popularity-contest.csv")
"data/cookbook/popularity-contest.csv")

(download-data! popularity-contest-data-url popularity-contest-data-path)

Expand Down Expand Up @@ -474,3 +474,72 @@ columns-to-select
(g/select (g/year :access-time))
g/value-counts
g/show)

;; Part 8: Window Functions
(def product-revenue
(g/table->dataset
spark
[["Thin" "Cell phone" 6000]
["Normal" "Tablet" 1500]
["Mini" "Tablet" 5500]
["Ultra Thin" "Cell phone" 5000]
["Very Thin" "Cell phone" 6000]
["Big" "Tablet" 2500]
["Bendable" "Cell phone" 3000]
["Foldable" "Cell phone" 3000]
["Pro" "Tablet" 4500]
["Pro2" "Tablet" 6500]]
[:product :category :revenue]))

(g/print-schema product-revenue)

;; 8.1 The Best and Second Best in Every Category

(def rank-by-category
(g/windowed
{:window-col (g/dense-rank)
:partition-by :category
:order-by (g/desc :revenue)}))

(-> product-revenue
(g/with-column :rank-by-category rank-by-category)
(g/filter (g/< :rank-by-category 3))
g/show)

;; 8.2 Revenue Differences of Best and Second Best in Every Category
(def max-by-category
(g/windowed
{:window-col (g/max :revenue)
:partition-by :category}))

(-> product-revenue
(g/with-column :max-by-category max-by-category)
(g/with-column :revenue-diff (g/- :max-by-category :revenue))
(g/order-by :category (g/desc :revenue))
g/show)

;; 8.3 Revenue Differences to the Next Best in Every Category
(def next-best-by-category
(g/windowed
{:window-col (g/lag :revenue 1)
:partition-by :category
:order-by (g/desc :revenue)}))

(-> product-revenue
(g/with-column :next-best-by-category next-best-by-category)
(g/with-column :revenue-diff (g/- :next-best-by-category :revenue))
g/show)

;; 8.4 Underperformance by One Sigma in Every Category
(def mean-by-category
(g/windowed {:window-col (g/mean :revenue) :partition-by :category}))

(def std-by-category
(g/windowed {:window-col (g/stddev :revenue) :partition-by :category}))

(-> product-revenue
(g/with-column
:z-stat-by-category
(g// (g/- :revenue mean-by-category) std-by-category))
(g/filter (g/< :z-stat-by-category -1))
g/show)
4 changes: 2 additions & 2 deletions docs/cookbook/part_1_reading_and_writing_datasets.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ And actually download the data:

```clojure
(def bikes-data-url "https://raw.githubusercontent.com/jvns/pandas-cookbook/master/data/bikes.csv")
(def bikes-data-path "resources/cookbook/bikes.csv")
(def bikes-data-path "data/cookbook/bikes.csv")
(download-data! bikes-data-url bikes-data-path)
=> :downloaded
```
Expand Down Expand Up @@ -263,7 +263,7 @@ We can get descriptions of numeric columns using `g/describe`:
Writing datasets to file is straightforward. Spark [encourages the use of parquet](https://databricks.com/glossary/what-is-parquet) formats. To write to parquet, we can invoke `g/write-parquet!`:

```clojure
(g/write-parquet! renamed-df "resources/cookbook/bikes.parquet"))
(g/write-parquet! renamed-df "data/cookbook/bikes.parquet"))
```

Analogous read and write functions are available. For instance, `g/write-avro!` to write as an Avro file and `g/read-json!` to read a JSON file.
2 changes: 1 addition & 1 deletion docs/cookbook/part_2_selecting_rows_and_columns.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ As in the [Pandas Cookbook](https://nbviewer.jupyter.org/github/jvns/pandas-cook
"https://raw.githubusercontent.com/jvns/pandas-cookbook/master/data/311-service-requests.csv")

(def complaints-data-path
"resources/cookbook/complaints.csv")
"data/cookbook/complaints.csv")

(download-data! complaints-data-url complaints-data-path)
=> :downloaded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ We can download the data by hitting the right URL query. We can programmatically
"&timeframe=1&submit=Download+Data"))

(defn weather-data-path [year month]
(str "resources/cookbook/weather/weather-" year "-" month ".csv"))
(str "data/cookbook/weather/weather-" year "-" month ".csv"))

(defn weather-data [year month]
(download-data! (weather-data-url year month) (weather-data-path year month))
Expand Down Expand Up @@ -285,7 +285,7 @@ and we can simply set the CSV path to the directory path:

```clojure
(def weather-2012
(-> (g/read-csv! spark "resources/cookbook/weather" {:inferSchema "true"})
(-> (g/read-csv! spark "data/cookbook/weather" {:inferSchema "true"})
normalise-column-names
(g/select (g/columns weather-mar-2012))))

Expand Down Expand Up @@ -315,5 +315,5 @@ and we can simply set the CSV path to the directory path:
Finally, we can save the aggregated dataset for future use:

```clojure
(g/write-csv! weather-2012 "resources/cookbook/weather-2012.csv")
(g/write-csv! weather-2012 "data/cookbook/weather-2012.csv")
```
2 changes: 1 addition & 1 deletion docs/cookbook/part_5_string_operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ In this part of the cookbook, we are going to use the cleaned dataset from the p

```clojure
(def weather-2012
(g/read-csv! spark "resources/cookbook/weather-2012.csv" {:inferSchema "true"}))
(g/read-csv! spark "data/cookbook/weather-2012.csv" {:inferSchema "true"}))
```

## 5.1 Finding The Snowiest Months
Expand Down
2 changes: 1 addition & 1 deletion docs/cookbook/part_6_cleaning_up_messy_data.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ As in [part 2 of the cookbook](part_2_selecting_rows_and_columns.md), we are goi
```clojure
(def complaints
(normalise-column-names
(g/read-csv! spark "resources/cookbook/complaints.csv" {:inferSchema "true"})))
(g/read-csv! spark "data/cookbook/complaints.csv" {:inferSchema "true"})))
```

## 6.1 Messy Zip Codes
Expand Down
2 changes: 1 addition & 1 deletion docs/cookbook/part_7_timestamps_and_dates.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ We download the data as in [part 2 of the cookbook](part_2_selecting_rows_and_co
"https://raw.githubusercontent.com/jvns/pandas-cookbook/master/data/popularity-contest")

(def popularity-contest-data-path
"resources/cookbook/popularity-contest.csv")
"data/cookbook/popularity-contest.csv")

(download-data! popularity-contest-data-url popularity-contest-data-path)
=> :downloaded
Expand Down
142 changes: 142 additions & 0 deletions docs/cookbook/part_8_window_functions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# Cookbook 8: Window Functions

This part is based on [Databricks' post on window functions](https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html). Window functions allow us to perform grouped operations such as aggregations, ranking and lagging without having to do a separate group-by and join. We are going to use a synthetic dataset:

```clojure
(def product-revenue
(g/table->dataset
spark
[["Thin" "Cell phone" 6000]
["Normal" "Tablet" 1500]
["Mini" "Tablet" 5500]
["Ultra Thin" "Cell phone" 5000]
["Very Thin" "Cell phone" 6000]
["Big" "Tablet" 2500]
["Bendable" "Cell phone" 3000]
["Foldable" "Cell phone" 3000]
["Pro" "Tablet" 4500]
["Pro2" "Tablet" 6500]]
[:product :category :revenue]))

(g/print-schema product-revenue)
; root
; |-- product: string (nullable = true)
; |-- category: string (nullable = true)
; |-- revenue: long (nullable = true)
```

## 8.1 The Best and Second Best in Every Category

The easiest way to define a windowed column is to use `g/windowed`. The function accepts a map that expects `:window-col` and optionally `:partition-by`, `:order-by`, `:range-between` and `:rows-between`. Consider the following example:

```clojure
(def rank-by-category
(g/windowed
{:window-col (g/dense-rank)
:partition-by :category
:order-by (g/desc :revenue)}))

(-> product-revenue
(g/with-column :rank-by-category rank-by-category)
(g/filter (g/< :rank-by-category 3))
g/show)
; +----------+----------+-------+----------------+
; |product |category |revenue|rank-by-category|
; +----------+----------+-------+----------------+
; |Thin |Cell phone|6000 |1 |
; |Very Thin |Cell phone|6000 |1 |
; |Ultra Thin|Cell phone|5000 |2 |
; |Pro2 |Tablet |6500 |1 |
; |Mini |Tablet |5500 |2 |
; +----------+----------+-------+----------------+
```

The column `rank-by-category` essentially specifies the rank of the revenue in descending order grouped-by the categories. The rank starts from one, and taking the best and second best means filtering for `:rank-by-category` less than 3. Note that ties are included here.

## 8.2 Revenue Differences to the Best in Every Category

To achieve this, we can compose two windowed operations:

```clojure
(def max-by-category
(g/windowed
{:window-col (g/max :revenue)
:partition-by :category}))

(-> product-revenue
(g/with-column :max-by-category max-by-category)
(g/with-column :revenue-diff (g/- :max-by-category :revenue))
(g/order-by :category (g/desc :revenue))
g/show)
; +----------+----------+-------+---------------+------------+
; |product |category |revenue|max-by-category|revenue-diff|
; +----------+----------+-------+---------------+------------+
; |Thin |Cell phone|6000 |6000 |0 |
; |Very Thin |Cell phone|6000 |6000 |0 |
; |Ultra Thin|Cell phone|5000 |6000 |1000 |
; |Bendable |Cell phone|3000 |6000 |3000 |
; |Foldable |Cell phone|3000 |6000 |3000 |
; |Pro2 |Tablet |6500 |6500 |0 |
; |Mini |Tablet |5500 |6500 |1000 |
; |Pro |Tablet |4500 |6500 |2000 |
; |Big |Tablet |2500 |6500 |4000 |
; |Normal |Tablet |1500 |6500 |5000 |
; +----------+----------+-------+---------------+------------+
```

## 8.3 Revenue Differences to the Next Best in Every Category

Similar idea as the previous one, but instead of aggregating with `g/max`, we use [the analytic function](https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-functions-windows.html) `g/lag` with an offset of one row:

```clojure
(def next-best-by-category
(g/windowed
{:window-col (g/lag :revenue 1)
:partition-by :category
:order-by (g/desc :revenue)}))

(-> product-revenue
(g/with-column :next-best-by-category next-best-by-category)
(g/with-column :revenue-diff (g/- :next-best-by-category :revenue))
g/show)
; +----------+----------+-------+---------------------+------------+
; |product |category |revenue|next-best-by-category|revenue-diff|
; +----------+----------+-------+---------------------+------------+
; |Thin |Cell phone|6000 |null |null |
; |Very Thin |Cell phone|6000 |6000 |0 |
; |Ultra Thin|Cell phone|5000 |6000 |1000 |
; |Bendable |Cell phone|3000 |5000 |2000 |
; |Foldable |Cell phone|3000 |3000 |0 |
; |Pro2 |Tablet |6500 |null |null |
; |Mini |Tablet |5500 |6500 |1000 |
; |Pro |Tablet |4500 |5500 |1000 |
; |Big |Tablet |2500 |4500 |2000 |
; |Normal |Tablet |1500 |2500 |1000 |
; +----------+----------+-------+---------------------+------------+
```

## 8.4 Underperformance by One Sigma in Every Category

Suppose we would like to identify all products that are underperforming by one standard deviation from all the other products in the group. We can compose windowed columns in a single form:

```clojure
(def mean-by-category
(g/windowed {:window-col (g/mean :revenue) :partition-by :category}))

(def std-by-category
(g/windowed {:window-col (g/stddev :revenue) :partition-by :category}))

(-> product-revenue
(g/with-column
:z-stat-by-category
(g// (g/- :revenue mean-by-category) std-by-category))
(g/filter (g/< :z-stat-by-category -1))
g/show)
; +--------+----------+-------+-------------------+
; |product |category |revenue|z-stat-by-category |
; +--------+----------+-------+-------------------+
; |Bendable|Cell phone|3000 |-1.0550087574332592|
; |Foldable|Cell phone|3000 |-1.0550087574332592|
; |Normal |Tablet |1500 |-1.2538313376430714|
; +--------+----------+-------+-------------------+
```
Loading

0 comments on commit ba42c91

Please sign in to comment.