Skip to content

Commit

Permalink
Add queue button to click-on-DagRun interface. (apache#21555)
Browse files Browse the repository at this point in the history
* Initial implementation of adding Queue button to DagRun interface

* Implement the test cases

* FIX Add all required MyPy ignores

* FIX import

* Update airflow/www/views.py

FIX Documentation

Co-authored-by: Brent Bovenzi <[email protected]>

* update modal UI

Co-authored-by: Brent Bovenzi <[email protected]>
  • Loading branch information
Jorricks and bbovenzi authored Mar 10, 2022
1 parent da34ff4 commit afd3c13
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 35 deletions.
53 changes: 45 additions & 8 deletions airflow/api/common/mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from sqlalchemy.orm import contains_eager
from sqlalchemy.orm.session import Session as SASession

from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import DAG
from airflow.models.dagrun import DagRun
from airflow.models.operator import Operator
Expand Down Expand Up @@ -371,7 +370,7 @@ def set_dag_run_state_to_success(
run_id: Optional[str] = None,
commit: bool = False,
session: SASession = NEW_SESSION,
):
) -> List[TaskInstance]:
"""
Set the dag run for a specific execution date and its task instances
to success.
Expand Down Expand Up @@ -418,7 +417,7 @@ def set_dag_run_state_to_failed(
run_id: Optional[str] = None,
commit: bool = False,
session: SASession = NEW_SESSION,
):
) -> List[TaskInstance]:
"""
Set the dag run for a specific execution date or run_id and its running task instances
to failed.
Expand Down Expand Up @@ -472,15 +471,15 @@ def set_dag_run_state_to_failed(
return set_state(tasks=tasks, run_id=run_id, state=State.FAILED, commit=commit, session=session)


@provide_session
def set_dag_run_state_to_running(
def __set_dag_run_state_to_running_or_queued(
*,
new_state: DagRunState,
dag: DAG,
execution_date: Optional[datetime] = None,
run_id: Optional[str] = None,
commit: bool = False,
session: SASession = NEW_SESSION,
):
) -> List[TaskInstance]:
"""
Set the dag run for a specific execution date to running.
Expand All @@ -492,7 +491,7 @@ def set_dag_run_state_to_running(
:return: If commit is true, list of tasks that have been updated,
otherwise list of tasks that will be updated
"""
res: List[BaseOperator] = []
res: List[TaskInstance] = []

if not (execution_date is None) ^ (run_id is None):
return res
Expand All @@ -512,7 +511,45 @@ def set_dag_run_state_to_running(
raise ValueError(f'DagRun with run_id: {run_id} not found')
# Mark the dag run to running.
if commit:
_set_dag_run_state(dag.dag_id, run_id, DagRunState.RUNNING, session)
_set_dag_run_state(dag.dag_id, run_id, new_state, session)

# To keep the return type consistent with the other similar functions.
return res


@provide_session
def set_dag_run_state_to_running(
*,
dag: DAG,
execution_date: Optional[datetime] = None,
run_id: Optional[str] = None,
commit: bool = False,
session: SASession = NEW_SESSION,
) -> List[TaskInstance]:
return __set_dag_run_state_to_running_or_queued(
new_state=DagRunState.RUNNING,
dag=dag,
execution_date=execution_date,
run_id=run_id,
commit=commit,
session=session,
)


@provide_session
def set_dag_run_state_to_queued(
*,
dag: DAG,
execution_date: Optional[datetime] = None,
run_id: Optional[str] = None,
commit: bool = False,
session: SASession = NEW_SESSION,
) -> List[TaskInstance]:
return __set_dag_run_state_to_running_or_queued(
new_state=DagRunState.QUEUED,
dag=dag,
execution_date=execution_date,
run_id=run_id,
commit=commit,
session=session,
)
1 change: 1 addition & 0 deletions airflow/www/static/js/dag.js
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ export function callModal(t, d, extraLinks, tryNumbers, sd, drID) {
export function callModalDag(dag) {
$('#dagModal').modal({});
$('#dagModal').css('margin-top', '0');
$('#run_id').text(dag.run_id);
executionDate = dag.execution_date;
dagRunId = dag.run_id;
updateButtonUrl(buttons.dag_graph_view, {
Expand Down
39 changes: 28 additions & 11 deletions airflow/www/templates/airflow/dag.html
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ <h4 class="pull-right" style="user-select: none;-moz-user-select: auto;">
</div>
</div>
{{ dag_docs(doc_md) }}
<!-- Modal -->
<!-- Modal for Task Instance -->
<div class="modal fade" id="taskInstanceModal" tabindex="-1" role="dialog" aria-labelledby="taskInstanceModalLabel" aria-hidden="true">
<div class="modal-dialog">
<div class="modal-content">
Expand Down Expand Up @@ -375,14 +375,16 @@ <h4>Task Actions</h4>
</div>
</div>
</div>
<!-- Modal for dag -->
<!-- Modal for DAG run -->
<div class="modal fade" id="dagModal" tabindex="-1" role="dialog" aria-labelledby="dagModalLabel" aria-hidden="true">
<div class="modal-dialog">
<div class="modal-content">
<div class="modal-header">
<button type="button" class="close" data-dismiss="modal" aria-label="Close"><span aria-hidden="true">&times;</span></button>
<h4 class="modal-title" id="dagModalLabel">
<span class="text-muted">{{ 'SUBDAG' if dag.parent_dag is defined and dag.parent_dag else 'DAG' }}:</span> {{dag.dag_id}}
<br>
<span class="text-muted">Run Id:</span> <span id="run_id"></span>
</h4>
</div>
<div class="modal-body">
Expand All @@ -393,15 +395,30 @@ <h4 class="modal-title" id="dagModalLabel">
<input type="hidden" name="dag_id" value="{{ dag.dag_id }}">
<input type="hidden" name="dag_run_id">
<input type="hidden" name="origin" value="{{ request.base_url }}">
<button type="button" id="btn_dagrun_clear" class="btn btn-primary" data-action="{{ url_for('Airflow.dagrun_clear') }}">
Clear
</button>
<button type="button" id="btn_dagrun_failed" class="btn btn-primary" data-action="{{ url_for('Airflow.dagrun_failed') }}">
Mark Failed
</button>
<button type="button" id="btn_dagrun_success" class="btn btn-primary" data-action="{{ url_for('Airflow.dagrun_success') }}">
Mark Success
</button>
<div>
<h5>Update State</h5>
<button type="button" id="btn_dagrun_failed" class="btn btn-primary" data-action="{{ url_for('Airflow.dagrun_failed') }}">
Mark Failed
</button>
<button type="button" id="btn_dagrun_success" class="btn btn-primary" data-action="{{ url_for('Airflow.dagrun_success') }}">
Mark Success
</button>
</div>
<div>
<h5>Re-run</h5>
<button type="button" id="btn_dagrun_clear" class="btn btn-primary" data-action="{{ url_for('Airflow.dagrun_clear') }}">
Clear existing tasks
</button>
<button
type="button"
id="btn_dagrun_queued"
class="btn btn-primary"
data-action="{{ url_for('Airflow.dagrun_queued') }}"
title="Queue up new tasks to make the DAG run up-to-date with any DAG file changes."
>
Queue up new tasks
</button>
</div>
</form>
</span>
<span class="col-md-4 text-right">
Expand Down
50 changes: 49 additions & 1 deletion airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,11 @@

import airflow
from airflow import models, plugins_manager, settings
from airflow.api.common.mark_tasks import set_dag_run_state_to_failed, set_dag_run_state_to_success
from airflow.api.common.mark_tasks import (
set_dag_run_state_to_failed,
set_dag_run_state_to_queued,
set_dag_run_state_to_success,
)
from airflow.compat.functools import cached_property
from airflow.configuration import AIRFLOW_CONFIG, conf
from airflow.exceptions import AirflowException
Expand Down Expand Up @@ -2117,6 +2121,34 @@ def _mark_dagrun_state_as_success(self, dag_id, dag_run_id, confirmed, origin):

return response

def _mark_dagrun_state_as_queued(self, dag_id: str, dag_run_id: str, confirmed: bool, origin: str):
if not dag_run_id:
flash('Invalid dag_run_id', 'error')
return redirect(origin)

dag = current_app.dag_bag.get_dag(dag_id)

if not dag:
flash(f'Cannot find DAG: {dag_id}', 'error')
return redirect(origin)

new_dag_state = set_dag_run_state_to_queued(dag=dag, run_id=dag_run_id, commit=confirmed)

if confirmed:
flash('Marked the DagRun as queued.')
return redirect(origin)

else:
details = '\n'.join(str(t) for t in new_dag_state)

response = self.render_template(
'airflow/confirm.html',
message="Here's the list of task instances you are about to change",
details=details,
)

return response

@expose('/dagrun_failed', methods=['POST'])
@auth.has_access(
[
Expand Down Expand Up @@ -2149,6 +2181,22 @@ def dagrun_success(self):
origin = get_safe_url(request.form.get('origin'))
return self._mark_dagrun_state_as_success(dag_id, dag_run_id, confirmed, origin)

@expose('/dagrun_queued', methods=['POST'])
@auth.has_access(
[
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN),
]
)
@action_logging
def dagrun_queued(self):
"""Queue DagRun so tasks that haven't run yet can be started."""
dag_id = request.form.get('dag_id')
dag_run_id = request.form.get('dag_run_id')
confirmed = request.form.get('confirmed') == 'true'
origin = get_safe_url(request.form.get('origin'))
return self._mark_dagrun_state_as_queued(dag_id, dag_run_id, confirmed, origin)

@expose("/dagrun_details")
@auth.has_access(
[
Expand Down
55 changes: 40 additions & 15 deletions tests/api/common/test_mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# under the License.

from datetime import timedelta
from typing import Callable

import pytest
from sqlalchemy.orm import eagerload
Expand All @@ -26,6 +27,7 @@
_create_dagruns,
_DagRunInfo,
set_dag_run_state_to_failed,
set_dag_run_state_to_queued,
set_dag_run_state_to_running,
set_dag_run_state_to_success,
set_state,
Expand Down Expand Up @@ -519,19 +521,23 @@ def test_set_running_dag_run_to_failed(self):
assert dr.get_task_instance('run_after_loop').state == State.FAILED
self._verify_dag_run_dates(self.dag1, date, State.FAILED, middle_time)

def test_set_running_dag_run_to_running(self):
date = self.execution_dates[0]
@pytest.mark.parametrize(
"dag_run_alter_function, new_state",
[(set_dag_run_state_to_running, State.RUNNING), (set_dag_run_state_to_queued, State.QUEUED)],
)
def test_set_running_dag_run_to_activate_state(self, dag_run_alter_function: Callable, new_state: State):
date = self.execution_dates[0] # type: ignore
dr = self._create_test_dag_run(State.RUNNING, date)
middle_time = timezone.utcnow()
self._set_default_task_instance_states(dr)

altered = set_dag_run_state_to_running(dag=self.dag1, run_id=dr.run_id, commit=True)
altered = dag_run_alter_function(dag=self.dag1, run_id=dr.run_id, commit=True) # type: ignore

# None of the tasks should be altered, only the dag itself
assert len(altered) == 0
self._verify_dag_run_state(self.dag1, date, State.RUNNING)
self._verify_dag_run_state(self.dag1, date, new_state) # type: ignore
self._verify_task_instance_states_remain_default(dr)
self._verify_dag_run_dates(self.dag1, date, State.RUNNING, middle_time)
self._verify_dag_run_dates(self.dag1, date, new_state, middle_time) # type: ignore

def test_set_success_dag_run_to_success(self):
date = self.execution_dates[0]
Expand Down Expand Up @@ -562,19 +568,23 @@ def test_set_success_dag_run_to_failed(self):
assert dr.get_task_instance('run_after_loop').state == State.FAILED
self._verify_dag_run_dates(self.dag1, date, State.FAILED, middle_time)

def test_set_success_dag_run_to_running(self):
date = self.execution_dates[0]
@pytest.mark.parametrize(
"dag_run_alter_function,new_state",
[(set_dag_run_state_to_running, State.RUNNING), (set_dag_run_state_to_queued, State.QUEUED)],
)
def test_set_success_dag_run_to_activate_state(self, dag_run_alter_function: Callable, new_state: State):
date = self.execution_dates[0] # type: ignore
dr = self._create_test_dag_run(State.SUCCESS, date)
middle_time = timezone.utcnow()
self._set_default_task_instance_states(dr)

altered = set_dag_run_state_to_running(dag=self.dag1, run_id=dr.run_id, commit=True)
altered = dag_run_alter_function(dag=self.dag1, run_id=dr.run_id, commit=True) # type: ignore

# None of the tasks should be altered, but only the dag object should be changed
assert len(altered) == 0
self._verify_dag_run_state(self.dag1, date, State.RUNNING)
self._verify_dag_run_state(self.dag1, date, new_state) # type: ignore
self._verify_task_instance_states_remain_default(dr)
self._verify_dag_run_dates(self.dag1, date, State.RUNNING, middle_time)
self._verify_dag_run_dates(self.dag1, date, new_state, middle_time) # type: ignore

def test_set_failed_dag_run_to_success(self):
date = self.execution_dates[0]
Expand Down Expand Up @@ -606,19 +616,23 @@ def test_set_failed_dag_run_to_failed(self):
assert dr.get_task_instance('run_after_loop').state == State.FAILED
self._verify_dag_run_dates(self.dag1, date, State.FAILED, middle_time)

def test_set_failed_dag_run_to_running(self):
date = self.execution_dates[0]
@pytest.mark.parametrize(
"dag_run_alter_function,state",
[(set_dag_run_state_to_running, State.RUNNING), (set_dag_run_state_to_queued, State.QUEUED)],
)
def test_set_failed_dag_run_to_activate_state(self, dag_run_alter_function: Callable, state: State):
date = self.execution_dates[0] # type: ignore
dr = self._create_test_dag_run(State.SUCCESS, date)
middle_time = timezone.utcnow()
self._set_default_task_instance_states(dr)

altered = set_dag_run_state_to_running(dag=self.dag1, run_id=dr.run_id, commit=True)
altered = dag_run_alter_function(dag=self.dag1, run_id=dr.run_id, commit=True) # type: ignore

# None of the tasks should be altered, since we've only altered the DAG itself
assert len(altered) == 0
self._verify_dag_run_state(self.dag1, date, State.RUNNING)
self._verify_dag_run_state(self.dag1, date, state) # type: ignore
self._verify_task_instance_states_remain_default(dr)
self._verify_dag_run_dates(self.dag1, date, State.RUNNING, middle_time)
self._verify_dag_run_dates(self.dag1, date, state, middle_time) # type: ignore

def test_set_state_without_commit(self):
date = self.execution_dates[0]
Expand All @@ -632,6 +646,13 @@ def test_set_state_without_commit(self):
self._verify_dag_run_state(self.dag1, date, State.RUNNING)
self._verify_task_instance_states_remain_default(dr)

will_be_altered = set_dag_run_state_to_queued(dag=self.dag1, run_id=dr.run_id, commit=False)

# None of the tasks will be altered.
assert len(will_be_altered) == 0
self._verify_dag_run_state(self.dag1, date, State.RUNNING)
self._verify_task_instance_states_remain_default(dr)

will_be_altered = set_dag_run_state_to_failed(dag=self.dag1, run_id=dr.run_id, commit=False)

# Only the running task should be altered.
Expand Down Expand Up @@ -695,6 +716,8 @@ def test_set_dag_run_state_edge_cases(self):
assert len(altered) == 0
altered = set_dag_run_state_to_running(dag=None, execution_date=self.execution_dates[0])
assert len(altered) == 0
altered = set_dag_run_state_to_queued(dag=None, execution_date=self.execution_dates[0])
assert len(altered) == 0

# No dag_run_id
altered = set_dag_run_state_to_success(dag=self.dag1, run_id=None)
Expand All @@ -703,6 +726,8 @@ def test_set_dag_run_state_edge_cases(self):
assert len(altered) == 0
altered = set_dag_run_state_to_running(dag=self.dag1, run_id=None)
assert len(altered) == 0
altered = set_dag_run_state_to_queued(dag=self.dag1, run_id=None)
assert len(altered) == 0

# This will throw ValueError since dag.last_dagrun is None
# need to be 0 does not exist.
Expand Down

0 comments on commit afd3c13

Please sign in to comment.