Skip to content

Commit

Permalink
[AIRFLOW-198] Implement latest_only_operator
Browse files Browse the repository at this point in the history
Dear Airflow Maintainers,

Please accept this PR that addresses the following
issues:
-
https://issues.apache.org/jira/browse/AIRFLOW-198

Testing Done:
- Local testing of dag operation with
LatestOnlyOperator
- Unit test added

Closes apache#1752 from gwax/latest_only
  • Loading branch information
George Leslie-Waksman authored and r39132 committed Sep 28, 2016
1 parent d4013f9 commit edf033b
Show file tree
Hide file tree
Showing 10 changed files with 305 additions and 2 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
.DS_Store
.ipynb*
.coverage
.python-version
airflow/git_version
airflow/www/static/coverage/
airflow.db
Expand Down
34 changes: 34 additions & 0 deletions airflow/example_dags/example_latest_only.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Example of the LatestOnlyOperator
"""
import datetime as dt

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator
from airflow.utils.trigger_rule import TriggerRule


dag = DAG(
dag_id='latest_only',
schedule_interval=dt.timedelta(hours=4),
start_date=dt.datetime(2016, 9, 20),
)

latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)

task1 = DummyOperator(task_id='task1', dag=dag)
task1.set_upstream(latest_only)
43 changes: 43 additions & 0 deletions airflow/example_dags/example_latest_only_with_trigger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Example LatestOnlyOperator and TriggerRule interactions
"""
import datetime as dt

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator
from airflow.utils.trigger_rule import TriggerRule


dag = DAG(
dag_id='latest_only_with_trigger',
schedule_interval=dt.timedelta(hours=4),
start_date=dt.datetime(2016, 9, 20),
)

latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)

task1 = DummyOperator(task_id='task1', dag=dag)
task1.set_upstream(latest_only)

task2 = DummyOperator(task_id='task2', dag=dag)

task3 = DummyOperator(task_id='task3', dag=dag)
task3.set_upstream([task1, task2])

task4 = DummyOperator(task_id='task4', dag=dag,
trigger_rule=TriggerRule.ALL_DONE)
task4.set_upstream([task1, task2])
1 change: 1 addition & 0 deletions airflow/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
'dummy_operator': ['DummyOperator'],
'email_operator': ['EmailOperator'],
'hive_to_samba_operator': ['Hive2SambaOperator'],
'latest_only_operator': ['LatestOnlyOperator'],
'mysql_operator': ['MySqlOperator'],
'sqlite_operator': ['SqliteOperator'],
'mysql_to_hive': ['MySqlToHiveTransfer'],
Expand Down
57 changes: 57 additions & 0 deletions airflow/operators/latest_only_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import logging

from airflow.models import BaseOperator, TaskInstance
from airflow.utils.state import State
from airflow import settings


class LatestOnlyOperator(BaseOperator):
"""
Allows a workflow to skip tasks that are not running during the most
recent schedule interval.
If the task is run outside of the latest schedule interval, all
directly downstream tasks will be skipped.
"""

ui_color = '#e9ffdb' # nyanza

def execute(self, context):
now = datetime.datetime.now()
left_window = context['dag'].following_schedule(
context['execution_date'])
right_window = context['dag'].following_schedule(left_window)
logging.info(
'Checking latest only with left_window: %s right_window: %s '
'now: %s', left_window, right_window, now)
if not left_window < now <= right_window:
logging.info('Not latest execution, skipping downstream.')
session = settings.Session()
for task in context['task'].downstream_list:
ti = TaskInstance(
task, execution_date=context['ti'].execution_date)
logging.info('Skipping task: %s', ti.task_id)
ti.state = State.SKIPPED
ti.start_date = now
ti.end_date = now
session.merge(ti)
session.commit()
session.close()
logging.info('Done.')
else:
logging.info('Latest, allowing execution to proceed.')
74 changes: 74 additions & 0 deletions docs/concepts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,80 @@ that, when set to ``True``, keeps a task from getting triggered if the
previous schedule for the task hasn't succeeded.


Latest Run Only
===============

Standard workflow behavior involves running a series of tasks for a
particular date/time range. Some workflows, however, perform tasks that
are independent of run time but need to be run on a schedule, much like a
standard cron job. In these cases, backfills or running jobs missed during
a pause just wastes CPU cycles.

For situations like this, you can use the ``LatestOnlyOperator`` to skip
tasks that are not being run during the most recent scheduled run for a
DAG. The ``LatestOnlyOperator`` skips all immediate downstream tasks, and
itself, if the time right now is not between its ``execution_time`` and the
next scheduled ``execution_time``.

One must be aware of the interaction between skipped tasks and trigger
rules. Skipped tasks will cascade through trigger rules ``all_success``
and ``all_failed`` but not ``all_done``, ``one_failed``, ``one_success``,
and ``dummy``. If you would like to use the ``LatestOnlyOperator`` with
trigger rules that do not cascade skips, you will need to ensure that the
``LatestOnlyOperator`` is **directly** upstream of the task you would like
to skip.

It is possible, through use of trigger rules to mix tasks that should run
in the typical date/time dependent mode and those using the
``LatestOnlyOperator``.

For example, consider the following dag:

.. code:: python
#dags/latest_only_with_trigger.py
import datetime as dt
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator
from airflow.utils.trigger_rule import TriggerRule
dag = DAG(
dag_id='latest_only_with_trigger',
schedule_interval=dt.timedelta(hours=4),
start_date=dt.datetime(2016, 9, 20),
)
latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
task1 = DummyOperator(task_id='task1', dag=dag)
task1.set_upstream(latest_only)
task2 = DummyOperator(task_id='task2', dag=dag)
task3 = DummyOperator(task_id='task3', dag=dag)
task3.set_upstream([task1, task2])
task4 = DummyOperator(task_id='task4', dag=dag,
trigger_rule=TriggerRule.ALL_DONE)
task4.set_upstream([task1, task2])
In the case of this dag, the ``latest_only`` task will show up as skipped
for all runs except the latest run. ``task1`` is directly downstream of
``latest_only`` and will also skip for all runs except the latest.
``task2`` is entirely independent of ``latest_only`` and will run in all
scheduled periods. ``task3`` is downstream of ``task1`` and ``task2`` and
because of the default ``trigger_rule`` being ``all_success`` will receive
a cascaded skip from ``task1``. ``task4`` is downstream of ``task1`` and
``task2`` but since its ``trigger_rule`` is set to ``all_done`` it will
trigger as soon as ``task1`` has been skipped (a valid completion state)
and ``task2`` has succeeded.

.. image:: img/latest_only_with_trigger.png


Zombies & Undeads
=================

Expand Down
Binary file added docs/img/latest_only_with_trigger.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def write_version(filename=os.path.join(*['airflow',
cloudant = ['cloudant>=0.5.9,<2.0'] # major update coming soon, clamp to 0.x

all_dbs = postgres + mysql + hive + mssql + hdfs + vertica + cloudant
devel = ['lxml>=3.3.4', 'nose', 'nose-parameterized', 'mock', 'click', 'jira', 'moto']
devel = ['lxml>=3.3.4', 'nose', 'nose-parameterized', 'mock', 'click', 'jira', 'moto', 'freezegun']
devel_minreq = devel + mysql + doc + password + s3
devel_hadoop = devel_minreq + hive + hdfs + webhdfs + kerberos
devel_all = devel + all_dbs + doc + samba + s3 + slack + crypto + oracle + docker
Expand Down
2 changes: 1 addition & 1 deletion tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@

import six

NUM_EXAMPLE_DAGS = 16
NUM_EXAMPLE_DAGS = 18
DEV_NULL = '/dev/null'
TEST_DAG_FOLDER = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'dags')
Expand Down
93 changes: 93 additions & 0 deletions tests/operators/latest_only_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function, unicode_literals

import datetime
import logging
import unittest

from airflow import configuration, DAG, settings
from airflow.jobs import BackfillJob
from airflow.models import TaskInstance
from airflow.operators.latest_only_operator import LatestOnlyOperator
from airflow.operators.dummy_operator import DummyOperator
from freezegun import freeze_time

DEFAULT_DATE = datetime.datetime(2016, 1, 1)
END_DATE = datetime.datetime(2016, 1, 2)
INTERVAL = datetime.timedelta(hours=12)
FROZEN_NOW = datetime.datetime(2016, 1, 2, 12, 1, 1)


def get_task_instances(task_id):
session = settings.Session()
return session \
.query(TaskInstance) \
.filter(TaskInstance.task_id == task_id) \
.order_by(TaskInstance.execution_date) \
.all()


class LatestOnlyOperatorTest(unittest.TestCase):

def setUp(self):
super(LatestOnlyOperatorTest, self).setUp()
configuration.load_test_config()
self.dag = DAG(
'test_dag',
default_args={
'owner': 'airflow',
'start_date': DEFAULT_DATE},
schedule_interval=INTERVAL)
self.addCleanup(self.dag.clear)
freezer = freeze_time(FROZEN_NOW)
freezer.start()
self.addCleanup(freezer.stop)

def test_run(self):
task = LatestOnlyOperator(
task_id='latest',
dag=self.dag)
task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)

def test_skipping(self):
latest_task = LatestOnlyOperator(
task_id='latest',
dag=self.dag)
downstream_task = DummyOperator(
task_id='downstream',
dag=self.dag)
downstream_task.set_upstream(latest_task)

latest_task.run(start_date=DEFAULT_DATE, end_date=END_DATE)
downstream_task.run(start_date=DEFAULT_DATE, end_date=END_DATE)

latest_instances = get_task_instances('latest')
exec_date_to_latest_state = {
ti.execution_date: ti.state for ti in latest_instances}
assert exec_date_to_latest_state == {
datetime.datetime(2016, 1, 1): 'success',
datetime.datetime(2016, 1, 1, 12): 'success',
datetime.datetime(2016, 1, 2): 'success',
}

downstream_instances = get_task_instances('downstream')
exec_date_to_downstream_state = {
ti.execution_date: ti.state for ti in downstream_instances}
assert exec_date_to_downstream_state == {
datetime.datetime(2016, 1, 1): 'skipped',
datetime.datetime(2016, 1, 1, 12): 'skipped',
datetime.datetime(2016, 1, 2): 'success',
}

0 comments on commit edf033b

Please sign in to comment.