Skip to content

Latest commit

 

History

History
127 lines (107 loc) · 2.91 KB

airflow.md

File metadata and controls

127 lines (107 loc) · 2.91 KB

Airflow

Key concepts

  • DAG a graph object representing your data pipeline
  • Operator Describe a single task in your data pipeline
  • Task An instance of an operator
  • Task Instance Represents a specific run of a task = DAG + Task + Point of time
  • Workflow Combination of all above

Architecture overview

single node multi node

[Airflow install on python virtualenv]

# create python virtual env
python3 -m venv airflow-env
source airflow-env/bin/activate

# create folder 
mkdir airflow
export AIRFLOW_HOME=`pwd`/airflow

# install workflow
pip install apache-airflow

# init workflow
airflow initdb 
airflow scheduler &
airflow webserver -p 8080 &
echo "localhost:8080"

# sudo apt install sqllite3
# sqllite3 $AIRFLOW_HOME/airflow.db
python env create -f environment.yml
source activate airflow-tutorial

credentials

ssh -p 2200 airflow@localhost
# passw: airflow

activate workspace

source .sandbox/bin/activate

commands

check workspace

airflow --help

DAG example

should be placed into "dag" folder ( default: %AIRFLOW%/dag )

  • minimal
from airflow import DAG

with DAG('airflow_tutorial_v01',
         default_args=default_args, 
         schedule_interval='0 * * * *',
         ) as dag:
    print(dag)
  • simple
from airflow import DAG
from datetime import date, timedelta, datetime

arguments = {
    'owner': 'me',
    'start_date': dt.datetime(2019, 4, 13),
    'retries': 1,
    'retry_delay': dt.timedelta(minutes=5),
}

with DAG('airflow_tutorial_v01',
         default_args=default_args, 
         schedule_interval='0 * * * *',
         default_args=arguments
         ) as dag:
    print(dag)

operator types ( BaseOperator )

  • action
  • transfer ( data )
  • sensor ( waiting for some event )
    • long running task
    • BaseSensorOperator
    • poke method is responsible for waiting

REST endpoints

trigger DAG

import urllib2
import json

AIRFLOW_URL="https://airflow.local/api/experimental/dags/name_of_my_dag/dag_runs"
payload_dict = {"conf": {"dag_param_1": "test value"}}

req = urllib2.Request(AIRFLOW_URL, data=json.dumps(payload_dict))
req.add_header('Content-Type', 'application/json')
req.add_header('Cache-Control', 'no-cache')
req.get_method = lambda: "POST"
f = urllib2.urlopen(req)
print(f.read())