Skip to content

Commit

Permalink
Allow custom timetable as a DAG argument (apache#17414)
Browse files Browse the repository at this point in the history
  • Loading branch information
uranusjr authored Aug 31, 2021
1 parent 9028a78 commit 3fc57de
Show file tree
Hide file tree
Showing 23 changed files with 626 additions and 374 deletions.
8 changes: 4 additions & 4 deletions airflow/api/common/experimental/mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,13 @@ def get_execution_dates(dag, execution_date, future, past):
else:
start_date = execution_date
start_date = execution_date if not past else start_date
if dag.schedule_interval == '@once':
dates = [start_date]
elif not dag.schedule_interval:
# If schedule_interval is None, need to look at existing DagRun if the user wants future or
if not dag.timetable.can_run:
# If the DAG never schedules, need to look at existing DagRun if the user wants future or
# past runs.
dag_runs = dag.get_dagruns_between(start_date=start_date, end_date=end_date)
dates = sorted({d.execution_date for d in dag_runs})
elif not dag.timetable.periodic:
dates = [start_date]
else:
dates = [
info.logical_date for info in dag.iter_dagrun_infos_between(start_date, end_date, align=False)
Expand Down
2 changes: 1 addition & 1 deletion airflow/jobs/backfill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ def _get_dag_run(self, dagrun_info: DagRunInfo, dag: DAG, session: Session = Non
run_date = dagrun_info.logical_date

# consider max_active_runs but ignore when running subdags
respect_dag_max_active_limit = bool(dag.schedule_interval and not dag.is_subdag)
respect_dag_max_active_limit = bool(dag.timetable.can_run and not dag.is_subdag)

current_active_dag_count = dag.get_num_active_runs(external_trigger=False)

Expand Down
92 changes: 59 additions & 33 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import traceback
import warnings
from collections import OrderedDict
from datetime import datetime, timedelta
from datetime import datetime, timedelta, tzinfo
from inspect import signature
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -72,7 +72,6 @@
from airflow.stats import Stats
from airflow.timetables.base import DagRunInfo, TimeRestriction, Timetable
from airflow.timetables.interval import CronDataIntervalTimetable, DeltaDataIntervalTimetable
from airflow.timetables.schedules import Schedule
from airflow.timetables.simple import NullTimetable, OnceTimetable
from airflow.typing_compat import Literal, RePatternType
from airflow.utils import timezone
Expand All @@ -92,11 +91,34 @@

log = logging.getLogger(__name__)

ScheduleInterval = Union[str, timedelta, relativedelta]
DEFAULT_VIEW_PRESETS = ['tree', 'graph', 'duration', 'gantt', 'landing_times']
ORIENTATION_PRESETS = ['LR', 'TB', 'RL', 'BT']

ScheduleIntervalArgNotSet = type("ScheduleIntervalArgNotSet", (), {})

DagStateChangeCallback = Callable[[Context], None]
ScheduleInterval = Union[str, timedelta, relativedelta]
ScheduleIntervalArg = Union[ScheduleInterval, None, Type[ScheduleIntervalArgNotSet]]


# Backward compatibility: If neither schedule_interval nor timetable is
# *provided by the user*, default to a one-day interval.
DEFAULT_SCHEDULE_INTERVAL = timedelta(days=1)


def create_timetable(interval: ScheduleIntervalArg, timezone: tzinfo) -> Timetable:
"""Create a Timetable instance from a ``schedule_interval`` argument."""
if interval is ScheduleIntervalArgNotSet:
return DeltaDataIntervalTimetable(DEFAULT_SCHEDULE_INTERVAL)
if interval is None:
return NullTimetable()
if interval == "@once":
return OnceTimetable()
if isinstance(interval, (timedelta, relativedelta)):
return DeltaDataIntervalTimetable(interval)
if isinstance(interval, str):
return CronDataIntervalTimetable(interval, timezone)
raise ValueError(f"{interval!r} is not a valid schedule_interval.")


def get_last_dagrun(dag_id, session, include_externally_triggered=False):
Expand Down Expand Up @@ -256,7 +278,8 @@ def __init__(
self,
dag_id: str,
description: Optional[str] = None,
schedule_interval: Optional[ScheduleInterval] = timedelta(days=1),
schedule_interval: ScheduleIntervalArg = ScheduleIntervalArgNotSet,
timetable: Optional[Timetable] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
full_filepath: Optional[str] = None,
Expand Down Expand Up @@ -349,7 +372,18 @@ def __init__(
if 'end_date' in self.default_args:
self.default_args['end_date'] = timezone.convert_to_utc(self.default_args['end_date'])

self.schedule_interval = schedule_interval
# Calculate the DAG's timetable.
if timetable is None:
self.timetable = create_timetable(schedule_interval, self.timezone)
if schedule_interval is ScheduleIntervalArgNotSet:
schedule_interval = DEFAULT_SCHEDULE_INTERVAL
self.schedule_interval: ScheduleInterval = schedule_interval
elif schedule_interval is ScheduleIntervalArgNotSet:
self.timetable = timetable
self.schedule_interval = self.timetable.summary
else:
raise TypeError("cannot specify both 'schedule_interval' and 'timetable'")

if isinstance(template_searchpath, str):
template_searchpath = [template_searchpath]
self.template_searchpath = template_searchpath
Expand Down Expand Up @@ -494,7 +528,7 @@ def is_fixed_time_schedule(self):
stacklevel=2,
)
try:
return not self.timetable._schedule._should_fix_dst
return not self.timetable._should_fix_dst
except AttributeError:
return True

Expand All @@ -505,24 +539,25 @@ def following_schedule(self, dttm):
:param dttm: utc datetime
:return: utc datetime
"""
current = pendulum.instance(dttm)
between = TimeRestriction(earliest=None, latest=None, catchup=True)
next_info = self.timetable.next_dagrun_info(current, between)
next_info = self.timetable.next_dagrun_info(
last_automated_dagrun=pendulum.instance(dttm),
restriction=TimeRestriction(earliest=None, latest=None, catchup=True),
)
if next_info is None:
return None
return next_info.data_interval.start

def previous_schedule(self, dttm):
from airflow.timetables.interval import _DataIntervalTimetable

warnings.warn(
"`DAG.previous_schedule()` is deprecated.",
category=DeprecationWarning,
stacklevel=2,
)
try:
schedule: Schedule = self.timetable._schedule
except AttributeError:
if not isinstance(self.timetable, _DataIntervalTimetable):
return None
return schedule.get_prev(pendulum.instance(dttm))
return self.timetable._get_prev(pendulum.instance(dttm))

def next_dagrun_info(
self,
Expand Down Expand Up @@ -551,8 +586,8 @@ def next_dagrun_info(
# and someone is passing datetime.datetime into this function. We should
# fix whatever is doing that.
return self.timetable.next_dagrun_info(
timezone.coerce_datetime(date_last_automated_dagrun),
self._time_restriction,
last_automated_dagrun=timezone.coerce_datetime(date_last_automated_dagrun),
restriction=self._time_restriction,
)

def next_dagrun_after_date(self, date_last_automated_dagrun: Optional[pendulum.DateTime]):
Expand Down Expand Up @@ -584,20 +619,6 @@ def _time_restriction(self) -> TimeRestriction:
latest = None
return TimeRestriction(earliest, latest, self.catchup)

@cached_property
def timetable(self) -> Timetable:
interval = self.schedule_interval
if interval is None:
return NullTimetable()
if interval == "@once":
return OnceTimetable()
if isinstance(interval, (timedelta, relativedelta)):
return DeltaDataIntervalTimetable(interval)
if isinstance(interval, str):
return CronDataIntervalTimetable(interval, self.timezone)
type_name = type(interval).__name__
raise TypeError(f"{type_name} is not a valid DAG.schedule_interval.")

def iter_dagrun_infos_between(
self,
earliest: Optional[pendulum.DateTime],
Expand Down Expand Up @@ -637,7 +658,7 @@ def iter_dagrun_infos_between(
if self.is_subdag:
align = False

info = self.timetable.next_dagrun_info(None, restriction)
info = self.timetable.next_dagrun_info(last_automated_dagrun=None, restriction=restriction)
if info is None:
# No runs to be scheduled between the user-supplied timeframe. But
# if align=False, "invent" a data interval for the timeframe itself.
Expand All @@ -653,7 +674,10 @@ def iter_dagrun_infos_between(
# Generate naturally according to schedule.
while info is not None:
yield info
info = self.timetable.next_dagrun_info(info.logical_date, restriction)
info = self.timetable.next_dagrun_info(
last_automated_dagrun=info.logical_date,
restriction=restriction,
)

def get_run_dates(self, start_date, end_date=None):
"""
Expand Down Expand Up @@ -844,7 +868,7 @@ def owner(self) -> str:

@property
def allow_future_exec_dates(self) -> bool:
return settings.ALLOW_FUTURE_EXEC_DATES and self.schedule_interval is None
return settings.ALLOW_FUTURE_EXEC_DATES and not self.timetable.can_run

@provide_session
def get_concurrency_reached(self, session=None) -> bool:
Expand Down Expand Up @@ -2112,7 +2136,9 @@ def create_dagrun(
)

if run_type == DagRunType.MANUAL and data_interval is None and execution_date is not None:
data_interval = self.timetable.infer_data_interval(timezone.coerce_datetime(execution_date))
data_interval = self.timetable.infer_data_interval(
run_after=timezone.coerce_datetime(execution_date),
)

run = DagRun(
dag_id=self.dag_id,
Expand Down
4 changes: 2 additions & 2 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ def _emit_true_scheduling_delay_stats_for_finished_state(self, finished_tis):
This method will be used in the update_state method when the state of the DagRun
is updated to a completed status (either success or failure). The method will find the first
started task within the DAG and calculate the expected DagRun start time (based on
dag.execution_date & dag.schedule_interval), and minus these two values to get the delay.
dag.execution_date & dag.timetable), and minus these two values to get the delay.
The emitted data may contains outlier (e.g. when the first task was cleared, so
the second task's start_date will be used), but we can get rid of the outliers
on the stats side through the dashboards tooling built.
Expand All @@ -598,7 +598,7 @@ def _emit_true_scheduling_delay_stats_for_finished_state(self, finished_tis):
try:
dag = self.get_dag()

if not self.dag.schedule_interval or self.dag.schedule_interval == "@once":
if not self.dag.timetable.periodic:
# We can't emit this metric if there is no following schedule to calculate from!
return

Expand Down
7 changes: 4 additions & 3 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,10 +880,11 @@ def get_previous_dagrun(

dr.dag = dag

# We always ignore schedule in dagrun lookup when `state` is given or `schedule_interval is None`.
# For legacy reasons, when `catchup=True`, we use `get_previous_scheduled_dagrun` unless
# We always ignore schedule in dagrun lookup when `state` is given
# or the DAG is never scheduled. For legacy reasons, when
# `catchup=True`, we use `get_previous_scheduled_dagrun` unless
# `ignore_schedule` is `True`.
ignore_schedule = state is not None or dag.schedule_interval is None
ignore_schedule = state is not None or not dag.timetable.can_run
if dag.catchup is True and not ignore_schedule:
last_dagrun = dr.get_previous_scheduled_dagrun(session=session)
else:
Expand Down
12 changes: 11 additions & 1 deletion airflow/serialization/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@
}
},
"timezone": {
"type": "string"
"anyOf": [
{ "type": "string" },
{ "type": "integer" }
]
},
"dict": {
"description": "A python dictionary containing values of any type",
Expand Down Expand Up @@ -87,6 +90,13 @@
{ "$ref": "#/definitions/typed_relativedelta" }
]
},
"timetable": {
"type": "object",
"properties": {
"type": { "type": "string" },
"value": { "$ref": "#/definitions/dict" }
}
},
"catchup": { "type": "boolean" },
"is_subdag": { "type": "boolean" },
"fileloc": { "type" : "string"},
Expand Down
Loading

0 comments on commit 3fc57de

Please sign in to comment.