Skip to content

Commit

Permalink
Fix flaky test_get_context_in_old_style_context_task test (apache#28585)
Browse files Browse the repository at this point in the history
The test was flaky because it was passing schedule "@once" as
default_args (wrongly) - as the result this test was set to
the default "daily" and since start date was set to 1st Jan 2022
it executed longer and longer every day (running the Python
Operator for every day between 1st Jan and now.

This PR cahnges the test to correctly pass "@once" schedule via the
`schedule' parameter.
  • Loading branch information
potiuk authored Dec 25, 2022
1 parent de4ea29 commit ba2c790
Showing 1 changed file with 12 additions and 14 deletions.
26 changes: 12 additions & 14 deletions tests/operators/test_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -924,21 +924,9 @@ def test_virtualenv_serializable_context_fields(self, create_task_instance):
*PythonVirtualenvOperator.AIRFLOW_SERIALIZABLE_CONTEXT_KEYS,
*intentionally_excluded_context_keys,
}

assert set(context) == declared_keys


DEFAULT_ARGS = {
"owner": "test",
"depends_on_past": True,
"start_date": timezone.datetime(2022, 1, 1),
"end_date": datetime.today(),
"schedule_interval": "@once",
"retries": 1,
"retry_delay": timedelta(minutes=1),
}


class TestCurrentContext:
def test_current_context_no_context_raise(self):
with pytest.raises(AirflowException):
Expand Down Expand Up @@ -1000,14 +988,24 @@ def clear_db():
clear_db_runs()


DEFAULT_ARGS = {
"owner": "test",
"depends_on_past": True,
"start_date": datetime(2022, 1, 1),
"end_date": datetime.today(),
"retries": 1,
"retry_delay": timedelta(minutes=1),
}


@pytest.mark.usefixtures("clear_db")
class TestCurrentContextRuntime:
def test_context_in_task(self):
with DAG(dag_id="assert_context_dag", default_args=DEFAULT_ARGS):
with DAG(dag_id="assert_context_dag", default_args=DEFAULT_ARGS, schedule="@once"):
op = MyContextAssertOperator(task_id="assert_context")
op.run(ignore_first_depends_on_past=True, ignore_ti_state=True)

def test_get_context_in_old_style_context_task(self):
with DAG(dag_id="edge_case_context_dag", default_args=DEFAULT_ARGS):
with DAG(dag_id="edge_case_context_dag", default_args=DEFAULT_ARGS, schedule="@once"):
op = PythonOperator(python_callable=get_all_the_context, task_id="get_all_the_context")
op.run(ignore_first_depends_on_past=True, ignore_ti_state=True)

0 comments on commit ba2c790

Please sign in to comment.