Airflow Example
In this example, we'll walk you through how to enable Airflow DAGs to send lineage metadata to Marquez using OpenLineage. The example will help demonstrate some of the features of Marquez.
- Enable OpenLineage in Airflow
- Write your very first OpenLineage enabled DAG
- Troubleshoot a failing DAG using Marquez
Before you begin, make sure you have installed:
Note: We recommend that you have allocated at least 2 CPUs and 8 GB of memory to Docker.
First, if you haven't already, clone the Marquez repository and change into the examples/airflow
directory:
$ git clone https://github.com/MarquezProject/marquez.git
$ cd examples/airflow
To make sure the latest openlineage-airflow
library is downloaded and installed when starting Airflow, you'll need to create a requirements.txt
file with the following content:
openlineage-airflow
Next, we'll need to specify where we want Airflow to send DAG metadata. To do so, create a config file named openlineage.env
with the following environment variables and values:
OPENLINEAGE_URL=http://marquez:5000 # The URL of the HTTP backend
OPENLINEAGE_NAMESPACE=example # The namespace associated with the DAG collected metadata
Note: The
openlineage.env
config file will be used by theairflow
,airflow_scheduler
, andairflow_worker
containers to send lineage metadata to Marquez.
Your examples/airflow/
directory should now contain the following:
.
├── README.md
├── docker
├── docker-compose.yml
├── docs
├── openlineage.env
└── requirements.txt
In this step, we'll create two new Airflow DAGs that perform simple tasks. The counter
DAG will generate a random number every minute, while the sum
DAG calculates a sum every five minutes. This will result in a simple pipeline containing two jobs and two datasets.
First, let's create the dags/
folder where our example DAGs will be located:
$ mkdir dags
When writing our DAGs, we'll use openlineage-airflow
, enabling OpenLineage to observe the DAG and automatically collect task-level metadata. Notice that the only change required to begin collecting DAG metadata is to use openlineage.airflow
instead of airflow
:
- from airflow import DAG
+ from openlineage.airflow import DAG
Under dags/
, create a file named counter.py
and add the following code:
import random
from openlineage.airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'datascience',
'depends_on_past': False,
'start_date': days_ago(1),
'email_on_failure': False,
'email_on_retry': False,
'email': ['[email protected]']
}
dag = DAG(
'counter',
schedule_interval='*/1 * * * *',
catchup=False,
is_paused_upon_creation=False,
default_args=default_args,
description='DAG that generates a new count value between 1-10.'
)
t1 = PostgresOperator(
task_id='if_not_exists',
postgres_conn_id='example_db',
sql='''
CREATE TABLE IF NOT EXISTS counts (
value INTEGER
);''',
dag=dag
)
t2 = PostgresOperator(
task_id='inc',
postgres_conn_id='example_db',
sql='''
INSERT INTO counts (value)
VALUES (%(value)s)
''',
parameters={
'value': random.randint(1, 10)
},
dag=dag
)
t1 >> t2
Under dags/
, create a file named sum.py
and add the following code:
from openlineage.airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'datascience',
'depends_on_past': False,
'start_date': days_ago(1),
'email_on_failure': False,
'email_on_retry': False,
'email': ['[email protected]']
}
dag = DAG(
'sum',
schedule_interval='*/5 * * * *',
catchup=False,
is_paused_upon_creation=False,
default_args=default_args,
description='DAG that sums the total of generated count values.'
)
t1 = PostgresOperator(
task_id='if_not_exists',
postgres_conn_id='example_db',
sql='''
CREATE TABLE IF NOT EXISTS sums (
value INTEGER
);''',
dag=dag
)
t2 = PostgresOperator(
task_id='total',
postgres_conn_id='example_db',
sql='''
INSERT INTO sums (value)
SELECT SUM(c.value) FROM counts AS c;
''',
dag=dag
)
t1 >> t2
At this point, you should have the following under your examples/airflow/
directory:
.
├── README.md
├── dags
│ ├── counter.py
│ └── sum.py
├── docker/
├── docker-compose.yml
├── docs/
├── openlineage.env
└── requirements.txt
Now that we have our DAGs defined and OpenLineage is enabled in Airflow, we can run the example! To start Airflow, run:
$ docker-compose up
Tip: Use
-d
to run in detached mode.
The above command will:
- Start Airflow and install
openlineage-airflow
- Start Marquez
- Start Postgres
To view the Airflow UI and verify it's running, open http://localhost:8080. Then, login using the username and password: airflow
/ airflow
. You can also browse to http://localhost:3000 to view the Marquez UI.
To ensure that Airflow is executing counter
and sum
, navigate to the DAGs tab in Airflow and verify that they are both enabled and are in a running state:
To view DAG metadata collected by Marquez from Airflow, browse to the Marquez UI by visiting http://localhost:3000. Then, use the search bar in the upper right-side of the page and search for the counter.inc
job. To view lineage metadata for counter.inc
, click on the job from the drop-down list:
Note: If the
counter.inc
job is not in the drop-down list, check to see if Airflow has successfully executed the DAG.
If you take a quick look at the lineage graph for counter.inc
, you should see public.counts
as an output dataset and sum.total
as a downstream job!
In this step, let's quickly walk through a simple troubleshooting scenario where DAG sum
begins to fail as the result of an upstream schema change for table counts
. So, let's get to it!
Tip: It's helpful to also apply the same code changes outlined below to your Airflow DAGs defined in Step 2.
Let's say team A
owns the DAG counter
. Team A
decides to update the t1
task in counter
to rename the values
column in the counts
table to value_1_to_10
(without properly communicating the schema change!):
t1 = PostgresOperator(
- task_id='if_not_exists',
+ task_id='alter_name_of_column',
postgres_conn_id='example_db',
sql='''
- CREATE TABLE IF NOT EXISTS counts (
- value INTEGER
- );''',
+ DO $$
+ BEGIN
+ IF EXISTS(SELECT *
+ FROM information_schema.columns
+ WHERE table_name='counts' and column_name='value')
+ THEN
+ ALTER TABLE "counts" RENAME COLUMN "value" TO "value_1_to_10";
+ END IF;
+ END $$;
''',
dag=dag
)
t2 = PostgresOperator(
task_id='inc',
postgres_conn_id='example_db',
sql='''
- INSERT INTO counts (value)
+ INSERT INTO counts (value_1_to_10)
VALUES (%(value)s)
''',
parameters={
'value': random.randint(1, 10)
},
dag=dag
)
Team B
, unaware of the schema change, owns DAG sum
and begins to see DAG run metadata with failed run states:
But, team B
isn't sure what might have caused the DAG failure as no recent code changes have been made to DAG sum
. So, team B
decides to check the schema of the input dataset:
Team B
soon realizes that the schema has changed recently for the counts
table! To fix the DAG sum
, team B
updates the t2
task that calcuates the count total to use the new column name:
t2 = PostgresOperator(
task_id='total',
postgres_conn_id='example_db',
sql='''
INSERT INTO sums (value)
- SELECT SUM(c.value) FROM counts AS c;
+ SELECT SUM(c.value_1_to_10) FROM counts AS c;
''',
dag=dag
)
With the code change, the DAG sum
begins to run successfully:
Congrats! You successfully step through a troubleshooting scenario of a failing DAG using metadata collected with Marquez! You can now add your own DAGs to dags/
to build more complex data lineage graphs.
- Review the Marquez HTTP API used to collect Airflow DAG metadata and learn how to build your own integrations using OpenLineage
- Take a look at
openlineage-spark
integration that can be used with Airflow
What did you think of this example? You can reach out to us on slack and leave us feedback, or open a pull request with your suggestions!