Skip to content

Commit

Permalink
Replace freezegun with time-machine (apache#28193)
Browse files Browse the repository at this point in the history
The primary driver for this was a niggle that the durations output for
one test was reporting over 52 years:

> 1670340356.40s call     tests/jobs/test_base_job.py::TestBaseJob::test_heartbeat

It turns out this was caused by freezegun, but time_machine fixes this.
It also might be a bit faster, but that isn't a noticeable difference for
us. (No runtime difference for the changed files, but it does make
collection quicker: from 10s to 8s)
  • Loading branch information
ashb authored Dec 12, 2022
1 parent fb5182b commit 4d0fd8e
Show file tree
Hide file tree
Showing 32 changed files with 194 additions and 208 deletions.
3 changes: 1 addition & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,6 @@ def write_version(filename: str = str(AIRFLOW_SOURCES_ROOT / "airflow" / "git_ve
"types-croniter",
"types-Deprecated",
"types-docutils",
"types-freezegun",
"types-paramiko",
"types-protobuf",
"types-python-dateutil",
Expand Down Expand Up @@ -373,7 +372,6 @@ def write_version(filename: str = str(AIRFLOW_SOURCES_ROOT / "airflow" / "git_ve
"flake8-colors",
"flake8-implicit-str-concat",
"flaky",
"freezegun",
"gitpython",
"ipdb",
"isort",
Expand Down Expand Up @@ -408,6 +406,7 @@ def write_version(filename: str = str(AIRFLOW_SOURCES_ROOT / "airflow" / "git_ve
"requests_mock",
"rich-click>=1.5",
"semver",
"time-machine",
"towncrier",
"twine",
"wheel",
Expand Down
4 changes: 2 additions & 2 deletions tests/api/client/test_local_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import pendulum
import pytest
from freezegun import freeze_time
import time_machine

from airflow.api.client.local_client import Client
from airflow.example_dags import example_bash_operator
Expand Down Expand Up @@ -72,7 +72,7 @@ def test_trigger_dag(self, mock):
run_after=pendulum.instance(EXECDATE_NOFRACTIONS)
)

with freeze_time(EXECDATE):
with time_machine.travel(EXECDATE, tick=False):
# no execution date, execution date should be set automatically

self.client.trigger_dag(dag_id=test_dag_id)
Expand Down
4 changes: 2 additions & 2 deletions tests/api_connexion/endpoints/test_dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from unittest import mock

import pytest
from freezegun import freeze_time
import time_machine
from parameterized import parameterized

from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
Expand Down Expand Up @@ -1335,7 +1335,7 @@ def test_should_respond_200(self, state, run_type, dag_maker, session):
}

@pytest.mark.parametrize("invalid_state", ["running"])
@freeze_time(TestDagRunEndpoint.default_time)
@time_machine.travel(TestDagRunEndpoint.default_time)
def test_should_response_400_for_non_existing_dag_run_state(self, invalid_state, dag_maker):
dag_id = "TEST_DAG_ID"
dag_run_id = "TEST_DAG_RUN_ID"
Expand Down
4 changes: 2 additions & 2 deletions tests/api_connexion/schemas/test_dataset_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
from __future__ import annotations

from freezegun import freeze_time
import time_machine

from airflow.api_connexion.schemas.dataset_schema import (
DatasetCollection,
Expand All @@ -37,7 +37,7 @@ def setup_method(self) -> None:
clear_db_dags()
clear_db_datasets()
self.timestamp = "2022-06-10T12:02:44+00:00"
self.freezer = freeze_time(self.timestamp)
self.freezer = time_machine.travel(self.timestamp, tick=False)
self.freezer.start()

def teardown_method(self) -> None:
Expand Down
20 changes: 10 additions & 10 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
from contextlib import ExitStack, suppress
from datetime import datetime, timedelta

import freezegun
import pytest
import time_machine

# We should set these before loading _any_ of the rest of airflow so that the
# unit test mode config is set as early as possible.
Expand Down Expand Up @@ -396,7 +396,7 @@ def pytest_runtest_setup(item):
@pytest.fixture
def frozen_sleep(monkeypatch):
"""
Use freezegun to "stub" sleep, so that it takes no time, but that
Use time-machine to "stub" sleep, so that it takes no time, but that
``datetime.now()`` appears to move forwards
If your module under test does ``import time`` and then ``time.sleep``::
Expand All @@ -412,21 +412,21 @@ def test_something(frozen_sleep, monkeypatch):
monkeypatch.setattr('my_mod.sleep', frozen_sleep)
my_mod.fn_under_test()
"""
freezegun_control = None
traveller = None

def fake_sleep(seconds):
nonlocal freezegun_control
nonlocal traveller
utcnow = datetime.utcnow()
if freezegun_control is not None:
freezegun_control.stop()
freezegun_control = freezegun.freeze_time(utcnow + timedelta(seconds=seconds))
freezegun_control.start()
if traveller is not None:
traveller.stop()
traveller = time_machine.travel(utcnow + timedelta(seconds=seconds))
traveller.start()

monkeypatch.setattr("time.sleep", fake_sleep)
yield fake_sleep

if freezegun_control is not None:
freezegun_control.stop()
if traveller is not None:
traveller.stop()


@pytest.fixture(scope="session")
Expand Down
4 changes: 2 additions & 2 deletions tests/core/test_sentry.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from unittest import mock

import pytest
from freezegun import freeze_time
import time_machine
from sentry_sdk import configure_scope

from airflow.operators.python import PythonOperator
Expand Down Expand Up @@ -129,7 +129,7 @@ def test_add_tagging(self, sentry, task_instance):
for key, value in scope._tags.items():
assert TEST_SCOPE[key] == value

@freeze_time(CRUMB_DATE.isoformat())
@time_machine.travel(CRUMB_DATE)
def test_add_breadcrumbs(self, sentry, task_instance):
"""
Test adding breadcrumbs.
Expand Down
6 changes: 3 additions & 3 deletions tests/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from unittest.mock import MagicMock, PropertyMock

import pytest
from freezegun import freeze_time
import time_machine
from sqlalchemy import func

from airflow.callbacks.callback_requests import CallbackRequest, DagCallbackRequest, SlaCallbackRequest
Expand Down Expand Up @@ -470,7 +470,7 @@ def test_recently_modified_file_is_parsed_with_mtime_mode(
manager._file_stats = {
"file_1.py": DagFileStat(1, 0, last_finish_time, timedelta(seconds=1.0), 1),
}
with freeze_time(freezed_base_time):
with time_machine.travel(freezed_base_time):
manager.set_file_paths(dag_files)
assert manager._file_path_queue == collections.deque()
# File Path Queue will be empty as the "modified time" < "last finish time"
Expand All @@ -481,7 +481,7 @@ def test_recently_modified_file_is_parsed_with_mtime_mode(
# than the last_parse_time but still less than now - min_file_process_interval
file_1_new_mtime = freezed_base_time - timedelta(seconds=5)
file_1_new_mtime_ts = file_1_new_mtime.timestamp()
with freeze_time(freezed_base_time):
with time_machine.travel(freezed_base_time):
manager.set_file_paths(dag_files)
assert manager._file_path_queue == collections.deque()
# File Path Queue will be empty as the "modified time" < "last finish time"
Expand Down
6 changes: 3 additions & 3 deletions tests/executors/test_celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
# leave this it is used by the test worker
import celery.contrib.testing.tasks # noqa: F401
import pytest
import time_machine
from celery import Celery
from celery.result import AsyncResult
from freezegun import freeze_time
from kombu.asynchronous import set_event_loop
from parameterized import parameterized

Expand Down Expand Up @@ -162,7 +162,7 @@ def test_try_adopt_task_instances_none(self):
assert executor.try_adopt_task_instances(tis) == tis

@pytest.mark.backend("mysql", "postgres")
@freeze_time("2020-01-01")
@time_machine.travel("2020-01-01", tick=False)
def test_try_adopt_task_instances(self):
start_date = timezone.utcnow() - timedelta(days=2)

Expand Down Expand Up @@ -270,7 +270,7 @@ def test_check_for_stalled_tasks(self, create_dummy_dag, dag_maker, session, moc
assert ti.external_executor_id is None

@pytest.mark.backend("mysql", "postgres")
@freeze_time("2020-01-01")
@time_machine.travel("2020-01-01", tick=False)
def test_pending_tasks_timeout_with_appropriate_config_setting(self):
start_date = timezone.utcnow() - timedelta(days=2)

Expand Down
4 changes: 2 additions & 2 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

import psutil
import pytest
from freezegun import freeze_time
import time_machine
from sqlalchemy import func

import airflow.example_dags
Expand Down Expand Up @@ -3290,7 +3290,7 @@ def dict_from_obj(obj):

assert dag3.get_last_dagrun().creating_job_id == self.scheduler_job.id

@freeze_time(DEFAULT_DATE + datetime.timedelta(days=1, seconds=9))
@time_machine.travel(DEFAULT_DATE + datetime.timedelta(days=1, seconds=9), tick=False)
@mock.patch("airflow.jobs.scheduler_job.Stats.timing")
def test_start_dagruns(self, stats_timing, dag_maker):
"""
Expand Down
6 changes: 3 additions & 3 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import jinja2
import pendulum
import pytest
import time_machine
from dateutil.relativedelta import relativedelta
from freezegun import freeze_time
from sqlalchemy import inspect

import airflow
Expand Down Expand Up @@ -2002,7 +2002,7 @@ def make_dag(dag_id, schedule, start_date, catchup):
# The DR should be scheduled in the last 2 hours, not 6 hours ago
assert next_date == six_hours_ago_to_the_hour

@freeze_time(timezone.datetime(2020, 1, 5))
@time_machine.travel(timezone.datetime(2020, 1, 5), tick=False)
def test_next_dagrun_info_timedelta_schedule_and_catchup_false(self):
"""
Test that the dag file processor does not create multiple dagruns
Expand All @@ -2022,7 +2022,7 @@ def test_next_dagrun_info_timedelta_schedule_and_catchup_false(self):
next_info = dag.next_dagrun_info(next_info.data_interval)
assert next_info and next_info.logical_date == timezone.datetime(2020, 1, 5)

@freeze_time(timezone.datetime(2020, 5, 4))
@time_machine.travel(timezone.datetime(2020, 5, 4))
def test_next_dagrun_info_timedelta_schedule_and_catchup_true(self):
"""
Test that the dag file processor creates multiple dagruns
Expand Down
14 changes: 7 additions & 7 deletions tests/models/test_dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from unittest.mock import patch

import pytest
from freezegun import freeze_time
import time_machine
from sqlalchemy import func
from sqlalchemy.exc import OperationalError

Expand Down Expand Up @@ -879,7 +879,7 @@ def test_sync_to_db_syncs_dag_specific_perms_on_update(self):
"""
db_clean_up()
session = settings.Session()
with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 0)) as frozen_time:
with time_machine.travel(tz.datetime(2020, 1, 5, 0, 0, 0), tick=False) as frozen_time:
dagbag = DagBag(
dag_folder=os.path.join(TEST_DAGS_FOLDER, "test_example_bash_operator.py"),
include_examples=False,
Expand All @@ -889,7 +889,7 @@ def test_sync_to_db_syncs_dag_specific_perms_on_update(self):

def _sync_to_db():
mock_sync_perm_for_dag.reset_mock()
frozen_time.tick(20)
frozen_time.shift(20)
dagbag.sync_to_db(session=session)

dag = dagbag.dags["test_example_bash_operator"]
Expand Down Expand Up @@ -950,7 +950,7 @@ def test_get_dag_with_dag_serialization(self):
Serialized DAG table after 'min_serialized_dag_fetch_interval' seconds are passed.
"""

with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 0)):
with time_machine.travel((tz.datetime(2020, 1, 5, 0, 0, 0)), tick=False):
example_bash_op_dag = DagBag(include_examples=True).dags.get("example_bash_operator")
SerializedDagModel.write_dag(dag=example_bash_op_dag)

Expand All @@ -962,18 +962,18 @@ def test_get_dag_with_dag_serialization(self):

# Check that if min_serialized_dag_fetch_interval has not passed we do not fetch the DAG
# from DB
with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 4)):
with time_machine.travel((tz.datetime(2020, 1, 5, 0, 0, 4)), tick=False):
with assert_queries_count(0):
assert dag_bag.get_dag("example_bash_operator").tags == ["example", "example2"]

# Make a change in the DAG and write Serialized DAG to the DB
with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 6)):
with time_machine.travel((tz.datetime(2020, 1, 5, 0, 0, 6)), tick=False):
example_bash_op_dag.tags += ["new_tag"]
SerializedDagModel.write_dag(dag=example_bash_op_dag)

# Since min_serialized_dag_fetch_interval is passed verify that calling 'dag_bag.get_dag'
# fetches the Serialized DAG from DB
with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 8)):
with time_machine.travel((tz.datetime(2020, 1, 5, 0, 0, 8)), tick=False):
with assert_queries_count(2):
updated_ser_dag_1 = dag_bag.get_dag("example_bash_operator")
updated_ser_dag_1_update_time = dag_bag.dags_last_fetched["example_bash_operator"]
Expand Down
18 changes: 9 additions & 9 deletions tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

import pendulum
import pytest
from freezegun import freeze_time
import time_machine

from airflow import models, settings
from airflow.decorators import task, task_group
Expand Down Expand Up @@ -539,11 +539,11 @@ def _raise_if_exception():
assert ti.next_kwargs is None
assert ti.state == state

@freeze_time("2021-09-19 04:56:35", as_kwarg="frozen_time")
def test_retry_delay(self, dag_maker, frozen_time=None):
def test_retry_delay(self, dag_maker, time_machine):
"""
Test that retry delays are respected
"""
time_machine.move_to("2021-09-19 04:56:35", tick=False)
with dag_maker(dag_id="test_retry_handling"):
task = BashOperator(
task_id="test_retry_handling_op",
Expand All @@ -568,12 +568,12 @@ def run_with_error(ti):
assert ti.try_number == 2

# second run -- still up for retry because retry_delay hasn't expired
frozen_time.tick(delta=datetime.timedelta(seconds=3))
time_machine.coordinates.shift(3)
run_with_error(ti)
assert ti.state == State.UP_FOR_RETRY

# third run -- failed
frozen_time.tick(delta=datetime.datetime.resolution)
time_machine.coordinates.shift(datetime.datetime.resolution)
run_with_error(ti)
assert ti.state == State.FAILED

Expand Down Expand Up @@ -731,7 +731,7 @@ def run_ti_and_assert(
expected_try_number,
expected_task_reschedule_count,
):
with freeze_time(run_date):
with time_machine.travel(run_date, tick=False):
try:
ti.run()
except AirflowException:
Expand Down Expand Up @@ -831,7 +831,7 @@ def run_ti_and_assert(
expected_task_reschedule_count,
):
ti.refresh_from_task(task)
with freeze_time(run_date):
with time_machine.travel(run_date, tick=False):
try:
ti.run()
except AirflowException:
Expand Down Expand Up @@ -930,7 +930,7 @@ def run_ti_and_assert(
expected_task_reschedule_count,
):
ti.refresh_from_task(task)
with freeze_time(run_date):
with time_machine.travel(run_date, tick=False):
try:
ti.run()
except AirflowException:
Expand Down Expand Up @@ -998,7 +998,7 @@ def run_ti_and_assert(
expected_try_number,
expected_task_reschedule_count,
):
with freeze_time(run_date):
with time_machine.travel(run_date, tick=False):
try:
ti.run()
except AirflowException:
Expand Down
Loading

0 comments on commit 4d0fd8e

Please sign in to comment.