Skip to content

Commit

Permalink
2024-10-30-08-33-16 - airflow-docker
Browse files Browse the repository at this point in the history
  • Loading branch information
josephmachado committed Oct 30, 2024
1 parent cdbec18 commit f261935
Show file tree
Hide file tree
Showing 50 changed files with 87,921 additions and 0 deletions.
111 changes: 111 additions & 0 deletions 6-Scheduling-&-Orchestration/Scheduling-&-Orchestration.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,117 @@
"source": [
"# Define data pipeline as a DAG"
]
},
{
"cell_type": "code",
"execution_count": 35,
"id": "05d1b2f0-b7d9-4d1b-981c-a69f8a6c4670",
"metadata": {},
"outputs": [],
"source": [
"%%capture\n",
"! docker compose down"
]
},
{
"cell_type": "code",
"execution_count": 36,
"id": "01843f37-cfc7-4803-b6f6-ccb46b85e497",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES\n"
]
}
],
"source": [
"! docker ps"
]
},
{
"cell_type": "code",
"execution_count": 37,
"id": "917336a9-4b79-4210-883e-85a27e47ea71",
"metadata": {},
"outputs": [],
"source": [
"# do this in a terminal in this directory\n",
"# sudo mkdir -p logs plugins temp dags tests data visualization && sudo chmod -R u=rwx,g=rwx,o=rwx logs plugins temp dags tests data visualization"
]
},
{
"cell_type": "code",
"execution_count": 38,
"id": "058454f0-7971-4b74-95f2-2a721e66494e",
"metadata": {},
"outputs": [],
"source": [
"%%capture\n",
"! docker compose up --build -d"
]
},
{
"cell_type": "code",
"execution_count": 39,
"id": "30780e1d-193d-4fc1-9397-e5687bda4c00",
"metadata": {},
"outputs": [],
"source": [
"! sleep 30"
]
},
{
"cell_type": "code",
"execution_count": 44,
"id": "733e409b-f9c4-4a83-8281-8606c0a385a9",
"metadata": {},
"outputs": [],
"source": [
"! rm -rf ./dags/tpch_warehouse/models/*/.ipynb_checkpoints # always run before dbt run, caused by notebooks, no need to do this if performed via terminal"
]
},
{
"cell_type": "code",
"execution_count": 41,
"id": "99c764a5-224f-4f1d-aefa-c1ba6c25fafa",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES\n",
"fc03ce69ee87 6-scheduling--orchestration-airflow-scheduler \"/usr/bin/dumb-init …\" About a minute ago Up 57 seconds (healthy) 8080/tcp scheduler\n",
"a3dfe1c2abc5 6-scheduling--orchestration-airflow-webserver \"/usr/bin/dumb-init …\" About a minute ago Up 42 seconds (healthy) 0.0.0.0:8080->8080/tcp, :::8080->8080/tcp webserver\n",
"15f6641953e4 postgres:16 \"docker-entrypoint.s…\" About a minute ago Up About a minute (healthy) 0.0.0.0:5432->5432/tcp, :::5432->5432/tcp postgres\n"
]
}
],
"source": [
"! docker ps"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7df815d7-fab6-4ce4-98a3-c03230a3be66",
"metadata": {},
"outputs": [],
"source": [
"%%capture\n",
"! docker compose down"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "30cb6691-81c2-4d2c-b0c4-2ce84e8d1fd3",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
Expand Down
16 changes: 16 additions & 0 deletions 6-Scheduling-&-Orchestration/containers/airflow/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM apache/airflow:2.9.2

# Install requirements
COPY requirements.txt /
RUN pip install --no-cache-dir -r /requirements.txt

User root

# RUN python $AIRFLOW_HOME/setup_conn.py
# RUN apt-get update && \
# apt-get install -y --no-install-recommends \
# default-jdk

RUN apt-get update && \
apt-get install -y --no-install-recommends \
default-jdk
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
apache-airflow-client==2.9.0
duckdb==1.0.0
dbt-core==1.8.0
dbt-duckdb==1.8.0
38 changes: 38 additions & 0 deletions 6-Scheduling-&-Orchestration/dags/coincap_etl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import csv
import os
from datetime import datetime, timedelta

import requests

from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator

with DAG(
'coincap_elt',
description='A simple DAG to fetch data \
from CoinCap Exchanges API and write to a file',
schedule_interval=timedelta(days=1),
start_date=datetime(2023, 1, 1),
catchup=False,
) as dag:

url = "https://api.coincap.io/v2/exchanges"
file_path = f'{os.getenv("AIRFLOW_HOME")}/data/coincap_exchanges.csv'

@task
def fetch_coincap_exchanges(url, file_path):
response = requests.get(url)
data = response.json()
exchanges = data['data']
if exchanges:
keys = exchanges[0].keys()
with open(file_path, 'w') as f:
dict_writer = csv.DictWriter(f, fieldnames=keys)
dict_writer.writeheader()
dict_writer.writerows(exchanges)

stop_pipeline = DummyOperator(task_id='stop_pipeline')

fetch_coincap_exchanges(url, file_path) >> stop_pipeline
28 changes: 28 additions & 0 deletions 6-Scheduling-&-Orchestration/dags/customer_outreach.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator

with DAG(
'customer_outreach_etl',
description='A simple DAG to generate customer outreach metrics',
schedule_interval=timedelta(days=1),
start_date=datetime(2023, 1, 1),
catchup=False,
) as dag:

run_dbt = BashOperator(
task_id='run_dbt',
bash_command='cd /opt/airflow/dags/tpch_warehouse && dbt run',
)

test_dbt = BashOperator(
task_id='test_dbt',
bash_command='cd /opt/airflow/dags/tpch_warehouse && dbt run',
)

stop_pipeline = DummyOperator(task_id='stop_pipeline')

run_dbt >> test_dbt >> stop_pipeline
4 changes: 4 additions & 0 deletions 6-Scheduling-&-Orchestration/dags/tpch_warehouse/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

target/
dbt_packages/
logs/
15 changes: 15 additions & 0 deletions 6-Scheduling-&-Orchestration/dags/tpch_warehouse/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
Welcome to your new dbt project!

### Using the starter project

Try running the following commands:
- dbt run
- dbt test


### Resources:
- Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction)
- Check out [Discourse](https://discourse.getdbt.com/) for commonly asked questions and answers
- Join the [chat](https://community.getdbt.com/) on Slack for live discussions and support
- Find [dbt events](https://events.getdbt.com) near you
- Check out [the blog](https://blog.getdbt.com/) for the latest news on dbt's development and best practices
Empty file.
36 changes: 36 additions & 0 deletions 6-Scheduling-&-Orchestration/dags/tpch_warehouse/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@

# Name your project! Project names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: 'tpch_warehouse'
version: '1.0.0'

# This setting configures which "profile" dbt uses for this project.
profile: 'tpch_warehouse'

# These configurations specify where dbt should look for different types of files.
# The `model-paths` config, for example, states that models in this project can be
# found in the "models/" directory. You probably won't need to change these!
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

clean-targets: # directories to be removed by `dbt clean`
- "target"
- "dbt_packages"


# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models

# In this example config, we tell dbt to build all models in the example/
# directory as views. These settings can be overridden in the individual model
# files using the `{{ config(...) }}` macro.
models:
tpch_warehouse:
# Config indicated by + and applies to all files under models/example/
example:
+materialized: view
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
with

customers as (

select * from {{ ref('stg_customers') }}
),

nation as (

select * from {{ ref('stg_nation') }}
),

region as (

select * from {{ ref('stg_region') }}
),

renamed as (

select

---------- customer info
c.customer_key,
c.name as customer_name,
c.address,
c.phone,
c.acctbal,
c.mktsegment,

---------- nation info
n.name as nation_name,
n.comment as nation_comment,

---------- region info
r.name as region_name,
r.comment as region_comment

from customers c
left join nation n on c.nationkey = n.nationkey
left join region r on n.regionkey = r.regionkey

)

select * from renamed
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
models:
- name: dim_customers
description: Dimension table for customer data with enriched information from nation and region.
columns:
- name: customer_key
description: Unique identifier for each customer.
data_tests:
- not_null
- unique
- name: customer_name
description: The name of the customer.
- name: address
description: The address of the customer.
- name: phone
description: Contact phone number of the customer.
- name: acctbal
description: Account balance of the customer.
- name: mktsegment
description: The market segment associated with the customer.
- name: nation_name
description: The name of the nation associated with the customer.
- name: nation_comment
description: Comments or notes related to the nation.
- name: region_name
description: The name of the region associated with the nation.
- name: region_comment
description: Comments or notes related to the region.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
with

source as (

select * from {{ ref('stg_lineitems') }}

),

renamed as (

select

---------- line item details
orderkey,
partkey,
suppkey,
linenumber,
quantity,
extendedprice,
discount,
tax,
returnflag,
linestatus,
shipdate,
commitdate,
receiptdate,
shipinstruct,
shipmode,
comment

from source

)

select * from renamed
Loading

0 comments on commit f261935

Please sign in to comment.