Skip to content

Commit

Permalink
Merge pull request apache#2091 from jlowin/post-execute-hook
Browse files Browse the repository at this point in the history
bolkedebruin committed Feb 19, 2017

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
2 parents fe78816 + 50902d0 commit 6613676
Showing 4 changed files with 57 additions and 7 deletions.
9 changes: 9 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
@@ -11,6 +11,15 @@ assists people when migrating to a new version.

A new DaskExecutor allows Airflow tasks to be run in Dask Distributed clusters.

### Deprecated Features
These features are marked for deprecation. They may still work (and raise a `DeprecationWarning`), but are no longer
supported and will be removed entirely in Airflow 2.0

- `post_execute()` hooks now take two arguments, `context` and `result`
(AIRFLOW-886)

Previously, post_execute() only took one argument, `context`.

## Airflow 1.8

### Database
27 changes: 21 additions & 6 deletions airflow/models.py
Original file line number Diff line number Diff line change
@@ -1374,7 +1374,22 @@ def signal_handler(signum, frame):
if result is not None:
self.xcom_push(key=XCOM_RETURN_KEY, value=result)

task_copy.post_execute(context=context)
# TODO remove deprecated behavior in Airflow 2.0
try:
task_copy.post_execute(context=context, result=result)
except TypeError as e:
if 'unexpected keyword argument' in str(e):
warnings.warn(
'BaseOperator.post_execute() now takes two '
'arguments, `context` and `result`, but "{}" only '
'expected one. This behavior is deprecated and '
'will be removed in a future version of '
'Airflow.'.format(self.task_id),
category=DeprecationWarning)
task_copy.post_execute(context=context)
else:
raise

Stats.incr('operator_successes_{}'.format(
self.task.__class__.__name__), 1, 1)
self.state = State.SUCCESS
@@ -2154,8 +2169,7 @@ def priority_weight_total(self):

def pre_execute(self, context):
"""
This is triggered right before self.execute, it's mostly a hook
for people deriving operators.
This hook is triggered right before self.execute() is called.
"""
pass

@@ -2168,10 +2182,11 @@ def execute(self, context):
"""
raise NotImplementedError()

def post_execute(self, context):
def post_execute(self, context, result=None):
"""
This is triggered right after self.execute, it's mostly a hook
for people deriving operators.
This hook is triggered right after self.execute() is called.
It is passed the execution context and any results returned by the
operator.
"""
pass

2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -209,7 +209,7 @@ def do_setup():
'flask-swagger==0.2.13',
'flask-wtf==0.12',
'funcsigs==1.0.0',
'future>=0.15.0, <0.16',
'future>=0.15.0, <0.17',
'gitpython>=2.0.2',
'gunicorn>=19.3.0, <19.4.0', # 19.4.? seemed to have issues
'jinja2>=2.7.3, <2.9.0',
26 changes: 26 additions & 0 deletions tests/models.py
Original file line number Diff line number Diff line change
@@ -649,3 +649,29 @@ def test_xcom_pull_different_execution_date(self):
key=key,
include_prior_dates=True),
value)

def test_post_execute_hook(self):
"""
Test that post_execute hook is called with the Operator's result.
The result ('error') will cause an error to be raised and trapped.
"""

class TestError(Exception):
pass

class TestOperator(PythonOperator):
def post_execute(self, context, result):
if result == 'error':
raise TestError('expected error.')

dag = models.DAG(dag_id='test_post_execute_dag')
task = TestOperator(
task_id='test_operator',
dag=dag,
python_callable=lambda: 'error',
owner='airflow',
start_date=datetime.datetime(2017, 2, 1))
ti = TI(task=task, execution_date=datetime.datetime.now())

with self.assertRaises(TestError):
ti.run()

0 comments on commit 6613676

Please sign in to comment.