A library that integrates Airflow DAGs
with Marquez for automatic metadata collection.
Metadata
- Task lifecycle
- Task parameters
- Task runs linked to versioned code
- Task inputs / outputs
Lineage
- Track inter-DAG dependencies
Built-in
- SQL parser
- Link to code builder (ex: GitHub)
- Metadata extractors
$ pip3 install marquez-airflow
Note: You can also add
marquez-airflow
to yourrequirements.txt
for Airflow.
To install from source, run:
$ python3 setup.py install
The library depends on a backend. A Backend
is configurable and lets the library know where to write dataset and job metadata.
HTTP
: Write metadata to MarquezFILE
: Write metadata to a file (asjson
) under/tmp/marquez
LOG
: Simply just logs the metadata to the console
By default, the HTTP
backend will be used (see next section). To override the default backend and write metadata to a file, use MARQUEZ_BACKEND
:
MARQUEZ_BACKEND=FILE
Note: Metadata will be written to
/tmp/marquez/client.requests.log
, but can be overridden withMARQUEZ_FILE
.
The HTTP
backend supports using API keys to authenticate requests via Bearer
auth. To include a key when making an API request, use MARQUEZ_API_KEY
:
MARQUEZ_BACKEND=HTTP
MARQUEZ_API_KEY=[API_KEY]
marquez-airflow
needs to know where to talk to the Marquez server API. You can set these using environment variables to be read by your Airflow service.
You will also need to set the namespace if you are using something other than the default
namespace.
MARQUEZ_BACKEND=HTTP
MARQUEZ_URL=http://my_hosted_marquez.example.com:5000
MARQUEZ_NAMESPACE=my_special_ns
If you do nothing, Marquez will receive the Job
and the Run
from your DAGs, but sources and datasets will not be sent.
marquez-airflow
allows you to do more than that by building "Extractors". Extractors are in the process of changing right now, but they basically take a task and extract:
- Name : The name of the task
- Location : Location of the code for the task
- Inputs : List of input datasets
- Outputs : List of output datasets
- Context : The Airflow context for the task
It's important to understand the inputs and outputs are lists and relate directly to the Dataset
object in Marquez. Datasets also include a source which relates directly to the Source
object in Marquez.
A PostgresExtractor is currently in progress. When that's merged, it will represent a good example of how to write custom extractors
To begin collecting Airflow DAG metadata with Marquez, use:
- from airflow import DAG
+ from marquez_airflow import DAG
When enabled, the library will:
- On DAG start, collect metadata for each task using an
Extractor
(the library defines a default extractor to use otherwise) - Collect task input / output metadata (
source
,schema
, etc) - Collect task run-level metadata (execution time, state, parameters, etc)
- On DAG complete, also mark the task as complete in Marquez
To enable logging, set the environment variable MARQUEZ_LOG_LEVEL
to DEBUG
, INFO
, or ERROR
:
$ export MARQUEZ_LOG_LEVEL=INFO
from datetime import datetime
from marquez_airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'datascience',
'depends_on_past': False,
'start_date': days_ago(1),
'email_on_failure': False,
'email_on_retry': False,
'email': ['[email protected]']
}
dag = DAG(
'orders_popular_day_of_week',
schedule_interval='@weekly',
default_args=default_args,
description='Determines the popular day of week orders are placed.'
)
t1 = PostgresOperator(
task_id='if_not_exists',
postgres_conn_id='food_delivery_db',
sql='''
CREATE TABLE IF NOT EXISTS popular_orders_day_of_week (
order_day_of_week VARCHAR(64) NOT NULL,
order_placed_on TIMESTAMP NOT NULL,
orders_placed INTEGER NOT NULL
);''',
dag=dag
)
t2 = PostgresOperator(
task_id='insert',
postgres_conn_id='food_delivery_db',
sql='''
INSERT INTO popular_orders_day_of_week (order_day_of_week, order_placed_on, orders_placed)
SELECT EXTRACT(ISODOW FROM order_placed_on) AS order_day_of_week,
order_placed_on,
COUNT(*) AS orders_placed
FROM top_delivery_times
GROUP BY order_placed_on;
''',
dag=dag
)
t1 >> t2