- DAG a graph object representing your data pipeline
- Operator Describe a single task in your data pipeline
- Task An instance of an operator
- Task Instance Represents a specific run of a task = DAG + Task + Point of time
- Workflow Combination of all above
# create python virtual env
python3 -m venv airflow-env
source airflow-env/bin/activate
# create folder
mkdir airflow
export AIRFLOW_HOME=`pwd`/airflow
# install workflow
pip install apache-airflow
# init workflow
airflow initdb
airflow scheduler &
airflow webserver -p 8080 &
echo "localhost:8080"
# sudo apt install sqllite3
# sqllite3 $AIRFLOW_HOME/airflow.db
python env create -f environment.yml
source activate airflow-tutorial
credentials
ssh -p 2200 airflow@localhost
# passw: airflow
activate workspace
source .sandbox/bin/activate
check workspace
airflow --help
should be placed into "dag" folder ( default: %AIRFLOW%/dag )
- minimal
from airflow import DAG
with DAG('airflow_tutorial_v01',
default_args=default_args,
schedule_interval='0 * * * *',
) as dag:
print(dag)
- simple
from airflow import DAG
from datetime import date, timedelta, datetime
arguments = {
'owner': 'me',
'start_date': dt.datetime(2019, 4, 13),
'retries': 1,
'retry_delay': dt.timedelta(minutes=5),
}
with DAG('airflow_tutorial_v01',
default_args=default_args,
schedule_interval='0 * * * *',
default_args=arguments
) as dag:
print(dag)
- action
- transfer ( data )
- sensor ( waiting for some event )
- long running task
- BaseSensorOperator
- poke method is responsible for waiting
import urllib2
import json
AIRFLOW_URL="https://airflow.local/api/experimental/dags/name_of_my_dag/dag_runs"
payload_dict = {"conf": {"dag_param_1": "test value"}}
req = urllib2.Request(AIRFLOW_URL, data=json.dumps(payload_dict))
req.add_header('Content-Type', 'application/json')
req.add_header('Cache-Control', 'no-cache')
req.get_method = lambda: "POST"
f = urllib2.urlopen(req)
print(f.read())