Skip to content

Commit

Permalink
AIP-39: DagRun.data_interval_start|end (apache#16352)
Browse files Browse the repository at this point in the history
Co-authored-by: Ash Berlin-Taylor <[email protected]>
  • Loading branch information
uranusjr and ashb authored Aug 12, 2021
1 parent 951006e commit 4556828
Show file tree
Hide file tree
Showing 32 changed files with 848 additions and 385 deletions.
4 changes: 3 additions & 1 deletion airflow/api/common/experimental/mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,9 @@ def get_execution_dates(dag, execution_date, future, past):
dag_runs = dag.get_dagruns_between(start_date=start_date, end_date=end_date)
dates = sorted({d.execution_date for d in dag_runs})
else:
dates = dag.get_run_dates(start_date, end_date, align=False)
dates = [
info.logical_date for info in dag.iter_dagrun_infos_between(start_date, end_date, align=False)
]
return dates


Expand Down
19 changes: 14 additions & 5 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from airflow.models import DagModel, DagRun
from airflow.security import permissions
from airflow.utils.session import provide_session
from airflow.utils.state import State
from airflow.utils.types import DagRunType


Expand Down Expand Up @@ -242,21 +243,29 @@ def post_dag_run(dag_id, session):
except ValidationError as err:
raise BadRequest(detail=str(err))

execution_date = post_body["execution_date"]
run_id = post_body["run_id"]
dagrun_instance = (
session.query(DagRun)
.filter(
DagRun.dag_id == dag_id,
or_(DagRun.run_id == post_body["run_id"], DagRun.execution_date == post_body["execution_date"]),
or_(DagRun.run_id == run_id, DagRun.execution_date == execution_date),
)
.first()
)
if not dagrun_instance:
dag_run = DagRun(dag_id=dag_id, run_type=DagRunType.MANUAL, **post_body)
session.add(dag_run)
session.commit()
dag_run = current_app.dag_bag.get_dag(dag_id).create_dagrun(
run_type=DagRunType.MANUAL,
run_id=run_id,
execution_date=execution_date,
state=State.QUEUED,
conf=post_body.get("conf"),
external_trigger=True,
dag_hash=current_app.dag_bag.dags_hash.get(dag_id),
)
return dagrun_schema.dump(dag_run)

if dagrun_instance.execution_date == post_body["execution_date"]:
if dagrun_instance.execution_date == execution_date:
raise AlreadyExists(
detail=f"DAGRun with DAG ID: '{dag_id}' and "
f"DAGRun ExecutionDate: '{post_body['execution_date']}' already exists"
Expand Down
47 changes: 29 additions & 18 deletions airflow/jobs/backfill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

import time
from collections import OrderedDict
from datetime import datetime
from typing import Optional, Set

import pendulum
from sqlalchemy import and_
from sqlalchemy.orm.session import Session, make_transient
from tabulate import tabulate
Expand All @@ -42,6 +42,7 @@
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import BACKFILL_QUEUED_DEPS
from airflow.timetables.base import DagRunInfo
from airflow.utils import helpers, timezone
from airflow.utils.configuration import tmp_configuration_copy
from airflow.utils.session import provide_session
Expand Down Expand Up @@ -283,17 +284,19 @@ def _manage_executor_state(self, running):
ti.handle_failure_with_callback(error=msg)

@provide_session
def _get_dag_run(self, run_date: datetime, dag: DAG, session: Session = None):
def _get_dag_run(self, dagrun_info: DagRunInfo, dag: DAG, session: Session = None):
"""
Returns a dag run for the given run date, which will be matched to an existing
dag run if available or create a new dag run otherwise. If the max_active_runs
limit is reached, this function will return None.
:param run_date: the execution date for the dag run
:param dagrun_info: Schedule information for the dag run
:param dag: DAG
:param session: the database session object
:return: a DagRun in state RUNNING or None
"""
run_date = dagrun_info.logical_date

# consider max_active_runs but ignore when running subdags
respect_dag_max_active_limit = bool(dag.schedule_interval and not dag.is_subdag)

Expand All @@ -317,6 +320,7 @@ def _get_dag_run(self, run_date: datetime, dag: DAG, session: Session = None):

run = run or dag.create_dagrun(
execution_date=run_date,
data_interval=dagrun_info.data_interval,
start_date=timezone.utcnow(),
state=State.RUNNING,
external_trigger=False,
Expand Down Expand Up @@ -690,14 +694,14 @@ def tabulate_tis_set(set_tis: Set[TaskInstance]) -> str:
return err

@provide_session
def _execute_for_run_dates(self, run_dates, ti_status, executor, pickle_id, start_date, session=None):
def _execute_dagruns(self, dagrun_infos, ti_status, executor, pickle_id, start_date, session=None):
"""
Computes the dag runs and their respective task instances for
the given run dates and executes the task instances.
Returns a list of execution dates of the dag runs that were executed.
:param run_dates: Execution dates for dag runs
:type run_dates: list
:param dagrun_infos: Schedule information for dag runs
:type dagrun_infos: list[DagRunInfo]
:param ti_status: internal BackfillJob status structure to tis track progress
:type ti_status: BackfillJob._DagRunTaskStatus
:param executor: the executor to use, it must be previously started
Expand All @@ -709,9 +713,9 @@ def _execute_for_run_dates(self, run_dates, ti_status, executor, pickle_id, star
:param session: the current session object
:type session: sqlalchemy.orm.session.Session
"""
for next_run_date in run_dates:
for dagrun_info in dagrun_infos:
for dag in [self.dag] + self.dag.subdags:
dag_run = self._get_dag_run(next_run_date, dag, session=session)
dag_run = self._get_dag_run(dagrun_info, dag, session=session)
tis_map = self._task_instances_for_dag_run(dag_run, session=session)
if dag_run is None:
continue
Expand Down Expand Up @@ -755,8 +759,13 @@ def _execute(self, session=None):

start_date = self.bf_start_date

# Get intervals between the start/end dates, which will turn into dag runs
run_dates = self.dag.get_run_dates(start_date=start_date, end_date=self.bf_end_date, align=True)
# Get DagRun schedule between the start/end dates, which will turn into dag runs.
dagrun_start_date = timezone.coerce_datetime(start_date)
if self.bf_end_date is None:
dagrun_end_date = pendulum.now(timezone.utc)
else:
dagrun_end_date = pendulum.instance(self.bf_end_date)
dagrun_infos = list(self.dag.iter_dagrun_infos_between(dagrun_start_date, dagrun_end_date))
if self.run_backwards:
tasks_that_depend_on_past = [t.task_id for t in self.dag.task_dict.values() if t.depends_on_past]
if tasks_that_depend_on_past:
Expand All @@ -765,9 +774,10 @@ def _execute(self, session=None):
",".join(tasks_that_depend_on_past)
)
)
run_dates = run_dates[::-1]
dagrun_infos = dagrun_infos[::-1]

if len(run_dates) == 0:
dagrun_info_count = len(dagrun_infos)
if dagrun_info_count == 0:
self.log.info("No run dates were found for the given dates and dag interval.")
return

Expand All @@ -788,17 +798,18 @@ def _execute(self, session=None):
executor.job_id = "backfill"
executor.start()

ti_status.total_runs = len(run_dates) # total dag runs in backfill
ti_status.total_runs = dagrun_info_count # total dag runs in backfill

try:
remaining_dates = ti_status.total_runs
while remaining_dates > 0:
dates_to_process = [
run_date for run_date in run_dates if run_date not in ti_status.executed_dag_run_dates
dagrun_infos_to_process = [
dagrun_info
for dagrun_info in dagrun_infos
if dagrun_info.logical_date not in ti_status.executed_dag_run_dates
]

self._execute_for_run_dates(
run_dates=dates_to_process,
self._execute_dagruns(
dagrun_infos=dagrun_infos_to_process,
ti_status=ti_status,
executor=executor,
pickle_id=pickle_id,
Expand Down
5 changes: 2 additions & 3 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -953,14 +953,13 @@ def _create_dag_runs(self, dag_models: Iterable[DagModel], session: Session) ->
run_type=DagRunType.SCHEDULED,
execution_date=dag_model.next_dagrun,
state=State.QUEUED,
data_interval=dag_model.next_dagrun_data_interval,
external_trigger=False,
session=session,
dag_hash=dag_hash,
creating_job_id=self.id,
)
dag_model.next_dagrun, dag_model.next_dagrun_create_after = dag.next_dagrun_info(
dag_model.next_dagrun
)
dag_model.calculate_dagrun_date_fields(dag, dag_model.next_dagrun, 0)

# TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in
# memory for larger dags? or expunge_all()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Add data_interval_[start|end] to DagModel and DagRun.
Revision ID: 142555e44c17
Revises: e9304a3141f0
Create Date: 2021-06-09 08:28:02.089817
"""

from alembic import op
from sqlalchemy import TIMESTAMP, Column
from sqlalchemy.dialects import mssql, mysql

# Revision identifiers, used by Alembic.
revision = "142555e44c17"
down_revision = "54bebd308c5f"
branch_labels = None
depends_on = None


def _use_date_time2(conn):
result = conn.execute(
"""SELECT CASE WHEN CONVERT(VARCHAR(128), SERVERPROPERTY ('productversion'))
like '8%' THEN '2000' WHEN CONVERT(VARCHAR(128), SERVERPROPERTY ('productversion'))
like '9%' THEN '2005' ELSE '2005Plus' END AS MajorVersion"""
).fetchone()
mssql_version = result[0]
return mssql_version not in ("2000", "2005")


def _get_timestamp(conn):
dialect_name = conn.dialect.name
if dialect_name == "mysql":
return mysql.TIMESTAMP(fsp=6, timezone=True)
if dialect_name != "mssql":
return TIMESTAMP(timezone=True)
if _use_date_time2(conn):
return mssql.DATETIME2(precision=6)
return mssql.DATETIME


def upgrade():
"""Apply data_interval fields to DagModel and DagRun."""
column_type = _get_timestamp(op.get_bind())
with op.batch_alter_table("dag_run") as batch_op:
batch_op.add_column(Column("data_interval_start", column_type))
batch_op.add_column(Column("data_interval_end", column_type))
with op.batch_alter_table("dag") as batch_op:
batch_op.add_column(Column("next_dagrun_data_interval_start", column_type))
batch_op.add_column(Column("next_dagrun_data_interval_end", column_type))


def downgrade():
"""Unapply data_interval fields to DagModel and DagRun."""
with op.batch_alter_table("dag_run") as batch_op:
batch_op.drop_column("data_interval_start")
batch_op.drop_column("data_interval_end")
with op.batch_alter_table("dag") as batch_op:
batch_op.drop_column("next_dagrun_data_interval_start")
batch_op.drop_column("next_dagrun_data_interval_end")
7 changes: 4 additions & 3 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1259,10 +1259,11 @@ def run(
start_date = start_date or self.start_date
end_date = end_date or self.end_date or timezone.utcnow()

for execution_date in self.dag.get_run_dates(start_date, end_date, align=False):
TaskInstance(self, execution_date).run(
for info in self.dag.iter_dagrun_infos_between(start_date, end_date, align=False):
ignore_depends_on_past = info.logical_date == start_date and ignore_first_depends_on_past
TaskInstance(self, info.logical_date).run(
mark_success=mark_success,
ignore_depends_on_past=(execution_date == start_date and ignore_first_depends_on_past),
ignore_depends_on_past=ignore_depends_on_past,
ignore_ti_state=ignore_ti_state,
)

Expand Down
Loading

0 comments on commit 4556828

Please sign in to comment.