diff --git a/airflow/api/common/mark_tasks.py b/airflow/api/common/mark_tasks.py
index a280d0f381773..22cde7febf78d 100644
--- a/airflow/api/common/mark_tasks.py
+++ b/airflow/api/common/mark_tasks.py
@@ -24,7 +24,6 @@
from sqlalchemy.orm import contains_eager
from sqlalchemy.orm.session import Session as SASession
-from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import DAG
from airflow.models.dagrun import DagRun
from airflow.models.operator import Operator
@@ -371,7 +370,7 @@ def set_dag_run_state_to_success(
run_id: Optional[str] = None,
commit: bool = False,
session: SASession = NEW_SESSION,
-):
+) -> List[TaskInstance]:
"""
Set the dag run for a specific execution date and its task instances
to success.
@@ -418,7 +417,7 @@ def set_dag_run_state_to_failed(
run_id: Optional[str] = None,
commit: bool = False,
session: SASession = NEW_SESSION,
-):
+) -> List[TaskInstance]:
"""
Set the dag run for a specific execution date or run_id and its running task instances
to failed.
@@ -472,15 +471,15 @@ def set_dag_run_state_to_failed(
return set_state(tasks=tasks, run_id=run_id, state=State.FAILED, commit=commit, session=session)
-@provide_session
-def set_dag_run_state_to_running(
+def __set_dag_run_state_to_running_or_queued(
*,
+ new_state: DagRunState,
dag: DAG,
execution_date: Optional[datetime] = None,
run_id: Optional[str] = None,
commit: bool = False,
session: SASession = NEW_SESSION,
-):
+) -> List[TaskInstance]:
"""
Set the dag run for a specific execution date to running.
@@ -492,7 +491,7 @@ def set_dag_run_state_to_running(
:return: If commit is true, list of tasks that have been updated,
otherwise list of tasks that will be updated
"""
- res: List[BaseOperator] = []
+ res: List[TaskInstance] = []
if not (execution_date is None) ^ (run_id is None):
return res
@@ -512,7 +511,45 @@ def set_dag_run_state_to_running(
raise ValueError(f'DagRun with run_id: {run_id} not found')
# Mark the dag run to running.
if commit:
- _set_dag_run_state(dag.dag_id, run_id, DagRunState.RUNNING, session)
+ _set_dag_run_state(dag.dag_id, run_id, new_state, session)
# To keep the return type consistent with the other similar functions.
return res
+
+
+@provide_session
+def set_dag_run_state_to_running(
+ *,
+ dag: DAG,
+ execution_date: Optional[datetime] = None,
+ run_id: Optional[str] = None,
+ commit: bool = False,
+ session: SASession = NEW_SESSION,
+) -> List[TaskInstance]:
+ return __set_dag_run_state_to_running_or_queued(
+ new_state=DagRunState.RUNNING,
+ dag=dag,
+ execution_date=execution_date,
+ run_id=run_id,
+ commit=commit,
+ session=session,
+ )
+
+
+@provide_session
+def set_dag_run_state_to_queued(
+ *,
+ dag: DAG,
+ execution_date: Optional[datetime] = None,
+ run_id: Optional[str] = None,
+ commit: bool = False,
+ session: SASession = NEW_SESSION,
+) -> List[TaskInstance]:
+ return __set_dag_run_state_to_running_or_queued(
+ new_state=DagRunState.QUEUED,
+ dag=dag,
+ execution_date=execution_date,
+ run_id=run_id,
+ commit=commit,
+ session=session,
+ )
diff --git a/airflow/www/static/js/dag.js b/airflow/www/static/js/dag.js
index ee9b31146637b..7f44a86531053 100644
--- a/airflow/www/static/js/dag.js
+++ b/airflow/www/static/js/dag.js
@@ -225,6 +225,7 @@ export function callModal(t, d, extraLinks, tryNumbers, sd, drID) {
export function callModalDag(dag) {
$('#dagModal').modal({});
$('#dagModal').css('margin-top', '0');
+ $('#run_id').text(dag.run_id);
executionDate = dag.execution_date;
dagRunId = dag.run_id;
updateButtonUrl(buttons.dag_graph_view, {
diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html
index f2444ababb70a..19c07ed77f68b 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -172,7 +172,7 @@
{{ dag_docs(doc_md) }}
-
+
@@ -375,7 +375,7 @@
Task Actions
-
+
@@ -383,6 +383,8 @@
Task Actions
{{ 'SUBDAG' if dag.parent_dag is defined and dag.parent_dag else 'DAG' }}: {{dag.dag_id}}
+
+ Run Id:
@@ -393,15 +395,30 @@
-
-
-
+
+
Update State
+
+
+
+
+
Re-run
+
+
+
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 180aa6464378f..ae70427fb3b66 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -91,7 +91,11 @@
import airflow
from airflow import models, plugins_manager, settings
-from airflow.api.common.mark_tasks import set_dag_run_state_to_failed, set_dag_run_state_to_success
+from airflow.api.common.mark_tasks import (
+ set_dag_run_state_to_failed,
+ set_dag_run_state_to_queued,
+ set_dag_run_state_to_success,
+)
from airflow.compat.functools import cached_property
from airflow.configuration import AIRFLOW_CONFIG, conf
from airflow.exceptions import AirflowException
@@ -2117,6 +2121,34 @@ def _mark_dagrun_state_as_success(self, dag_id, dag_run_id, confirmed, origin):
return response
+ def _mark_dagrun_state_as_queued(self, dag_id: str, dag_run_id: str, confirmed: bool, origin: str):
+ if not dag_run_id:
+ flash('Invalid dag_run_id', 'error')
+ return redirect(origin)
+
+ dag = current_app.dag_bag.get_dag(dag_id)
+
+ if not dag:
+ flash(f'Cannot find DAG: {dag_id}', 'error')
+ return redirect(origin)
+
+ new_dag_state = set_dag_run_state_to_queued(dag=dag, run_id=dag_run_id, commit=confirmed)
+
+ if confirmed:
+ flash('Marked the DagRun as queued.')
+ return redirect(origin)
+
+ else:
+ details = '\n'.join(str(t) for t in new_dag_state)
+
+ response = self.render_template(
+ 'airflow/confirm.html',
+ message="Here's the list of task instances you are about to change",
+ details=details,
+ )
+
+ return response
+
@expose('/dagrun_failed', methods=['POST'])
@auth.has_access(
[
@@ -2149,6 +2181,22 @@ def dagrun_success(self):
origin = get_safe_url(request.form.get('origin'))
return self._mark_dagrun_state_as_success(dag_id, dag_run_id, confirmed, origin)
+ @expose('/dagrun_queued', methods=['POST'])
+ @auth.has_access(
+ [
+ (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN),
+ ]
+ )
+ @action_logging
+ def dagrun_queued(self):
+ """Queue DagRun so tasks that haven't run yet can be started."""
+ dag_id = request.form.get('dag_id')
+ dag_run_id = request.form.get('dag_run_id')
+ confirmed = request.form.get('confirmed') == 'true'
+ origin = get_safe_url(request.form.get('origin'))
+ return self._mark_dagrun_state_as_queued(dag_id, dag_run_id, confirmed, origin)
+
@expose("/dagrun_details")
@auth.has_access(
[
diff --git a/tests/api/common/test_mark_tasks.py b/tests/api/common/test_mark_tasks.py
index 7662eeca6bcd8..7e461dc4e3098 100644
--- a/tests/api/common/test_mark_tasks.py
+++ b/tests/api/common/test_mark_tasks.py
@@ -17,6 +17,7 @@
# under the License.
from datetime import timedelta
+from typing import Callable
import pytest
from sqlalchemy.orm import eagerload
@@ -26,6 +27,7 @@
_create_dagruns,
_DagRunInfo,
set_dag_run_state_to_failed,
+ set_dag_run_state_to_queued,
set_dag_run_state_to_running,
set_dag_run_state_to_success,
set_state,
@@ -519,19 +521,23 @@ def test_set_running_dag_run_to_failed(self):
assert dr.get_task_instance('run_after_loop').state == State.FAILED
self._verify_dag_run_dates(self.dag1, date, State.FAILED, middle_time)
- def test_set_running_dag_run_to_running(self):
- date = self.execution_dates[0]
+ @pytest.mark.parametrize(
+ "dag_run_alter_function, new_state",
+ [(set_dag_run_state_to_running, State.RUNNING), (set_dag_run_state_to_queued, State.QUEUED)],
+ )
+ def test_set_running_dag_run_to_activate_state(self, dag_run_alter_function: Callable, new_state: State):
+ date = self.execution_dates[0] # type: ignore
dr = self._create_test_dag_run(State.RUNNING, date)
middle_time = timezone.utcnow()
self._set_default_task_instance_states(dr)
- altered = set_dag_run_state_to_running(dag=self.dag1, run_id=dr.run_id, commit=True)
+ altered = dag_run_alter_function(dag=self.dag1, run_id=dr.run_id, commit=True) # type: ignore
# None of the tasks should be altered, only the dag itself
assert len(altered) == 0
- self._verify_dag_run_state(self.dag1, date, State.RUNNING)
+ self._verify_dag_run_state(self.dag1, date, new_state) # type: ignore
self._verify_task_instance_states_remain_default(dr)
- self._verify_dag_run_dates(self.dag1, date, State.RUNNING, middle_time)
+ self._verify_dag_run_dates(self.dag1, date, new_state, middle_time) # type: ignore
def test_set_success_dag_run_to_success(self):
date = self.execution_dates[0]
@@ -562,19 +568,23 @@ def test_set_success_dag_run_to_failed(self):
assert dr.get_task_instance('run_after_loop').state == State.FAILED
self._verify_dag_run_dates(self.dag1, date, State.FAILED, middle_time)
- def test_set_success_dag_run_to_running(self):
- date = self.execution_dates[0]
+ @pytest.mark.parametrize(
+ "dag_run_alter_function,new_state",
+ [(set_dag_run_state_to_running, State.RUNNING), (set_dag_run_state_to_queued, State.QUEUED)],
+ )
+ def test_set_success_dag_run_to_activate_state(self, dag_run_alter_function: Callable, new_state: State):
+ date = self.execution_dates[0] # type: ignore
dr = self._create_test_dag_run(State.SUCCESS, date)
middle_time = timezone.utcnow()
self._set_default_task_instance_states(dr)
- altered = set_dag_run_state_to_running(dag=self.dag1, run_id=dr.run_id, commit=True)
+ altered = dag_run_alter_function(dag=self.dag1, run_id=dr.run_id, commit=True) # type: ignore
# None of the tasks should be altered, but only the dag object should be changed
assert len(altered) == 0
- self._verify_dag_run_state(self.dag1, date, State.RUNNING)
+ self._verify_dag_run_state(self.dag1, date, new_state) # type: ignore
self._verify_task_instance_states_remain_default(dr)
- self._verify_dag_run_dates(self.dag1, date, State.RUNNING, middle_time)
+ self._verify_dag_run_dates(self.dag1, date, new_state, middle_time) # type: ignore
def test_set_failed_dag_run_to_success(self):
date = self.execution_dates[0]
@@ -606,19 +616,23 @@ def test_set_failed_dag_run_to_failed(self):
assert dr.get_task_instance('run_after_loop').state == State.FAILED
self._verify_dag_run_dates(self.dag1, date, State.FAILED, middle_time)
- def test_set_failed_dag_run_to_running(self):
- date = self.execution_dates[0]
+ @pytest.mark.parametrize(
+ "dag_run_alter_function,state",
+ [(set_dag_run_state_to_running, State.RUNNING), (set_dag_run_state_to_queued, State.QUEUED)],
+ )
+ def test_set_failed_dag_run_to_activate_state(self, dag_run_alter_function: Callable, state: State):
+ date = self.execution_dates[0] # type: ignore
dr = self._create_test_dag_run(State.SUCCESS, date)
middle_time = timezone.utcnow()
self._set_default_task_instance_states(dr)
- altered = set_dag_run_state_to_running(dag=self.dag1, run_id=dr.run_id, commit=True)
+ altered = dag_run_alter_function(dag=self.dag1, run_id=dr.run_id, commit=True) # type: ignore
# None of the tasks should be altered, since we've only altered the DAG itself
assert len(altered) == 0
- self._verify_dag_run_state(self.dag1, date, State.RUNNING)
+ self._verify_dag_run_state(self.dag1, date, state) # type: ignore
self._verify_task_instance_states_remain_default(dr)
- self._verify_dag_run_dates(self.dag1, date, State.RUNNING, middle_time)
+ self._verify_dag_run_dates(self.dag1, date, state, middle_time) # type: ignore
def test_set_state_without_commit(self):
date = self.execution_dates[0]
@@ -632,6 +646,13 @@ def test_set_state_without_commit(self):
self._verify_dag_run_state(self.dag1, date, State.RUNNING)
self._verify_task_instance_states_remain_default(dr)
+ will_be_altered = set_dag_run_state_to_queued(dag=self.dag1, run_id=dr.run_id, commit=False)
+
+ # None of the tasks will be altered.
+ assert len(will_be_altered) == 0
+ self._verify_dag_run_state(self.dag1, date, State.RUNNING)
+ self._verify_task_instance_states_remain_default(dr)
+
will_be_altered = set_dag_run_state_to_failed(dag=self.dag1, run_id=dr.run_id, commit=False)
# Only the running task should be altered.
@@ -695,6 +716,8 @@ def test_set_dag_run_state_edge_cases(self):
assert len(altered) == 0
altered = set_dag_run_state_to_running(dag=None, execution_date=self.execution_dates[0])
assert len(altered) == 0
+ altered = set_dag_run_state_to_queued(dag=None, execution_date=self.execution_dates[0])
+ assert len(altered) == 0
# No dag_run_id
altered = set_dag_run_state_to_success(dag=self.dag1, run_id=None)
@@ -703,6 +726,8 @@ def test_set_dag_run_state_edge_cases(self):
assert len(altered) == 0
altered = set_dag_run_state_to_running(dag=self.dag1, run_id=None)
assert len(altered) == 0
+ altered = set_dag_run_state_to_queued(dag=self.dag1, run_id=None)
+ assert len(altered) == 0
# This will throw ValueError since dag.last_dagrun is None
# need to be 0 does not exist.