Skip to content

Commit

Permalink
Resolve aws emr deprecations in tests (apache#40020)
Browse files Browse the repository at this point in the history
  • Loading branch information
dirrao authored Jun 3, 2024
1 parent b5bb039 commit 19c145c
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 74 deletions.
4 changes: 2 additions & 2 deletions airflow/providers/amazon/aws/operators/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -830,8 +830,8 @@ def execute(self, context: Context) -> str | None:
trigger=EmrCreateJobFlowTrigger(
job_flow_id=self._job_flow_id,
aws_conn_id=self.aws_conn_id,
poll_interval=self.waiter_delay,
max_attempts=self.waiter_max_attempts,
waiter_delay=self.waiter_delay,
waiter_max_attempts=self.waiter_max_attempts,
),
method_name="execute_complete",
# timeout is set to ensure that if a trigger dies, the timeout does not restart
Expand Down
19 changes: 0 additions & 19 deletions tests/deprecations_ignore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -230,25 +230,6 @@
- tests/providers/amazon/aws/operators/test_ecs.py::TestEcsRunTaskOperator::test_wait_end_tasks
- tests/providers/amazon/aws/operators/test_ecs.py::TestEcsRunTaskOperator::test_with_defer
- tests/providers/amazon/aws/operators/test_eks.py::TestEksPodOperator::test_on_finish_action_handler
- tests/providers/amazon/aws/operators/test_emr_create_job_flow.py::TestEmrCreateJobFlowOperator::test_create_job_flow_deferrable
- tests/providers/amazon/aws/operators/test_emr_create_job_flow.py::TestEmrCreateJobFlowOperator::test_execute_returns_job_id
- tests/providers/amazon/aws/operators/test_emr_create_job_flow.py::TestEmrCreateJobFlowOperator::test_execute_with_wait
- tests/providers/amazon/aws/operators/test_emr_create_job_flow.py::TestEmrCreateJobFlowOperator::test_init
- tests/providers/amazon/aws/operators/test_emr_create_job_flow.py::TestEmrCreateJobFlowOperator::test_render_template
- tests/providers/amazon/aws/operators/test_emr_create_job_flow.py::TestEmrCreateJobFlowOperator::test_render_template_from_file
- tests/providers/amazon/aws/operators/test_emr_notebook_execution.py::TestEmrStartNotebookExecutionOperator::test_start_notebook_execution_http_code_fail
- tests/providers/amazon/aws/operators/test_emr_notebook_execution.py::TestEmrStartNotebookExecutionOperator::test_start_notebook_execution_no_wait_for_completion
- tests/providers/amazon/aws/operators/test_emr_notebook_execution.py::TestEmrStartNotebookExecutionOperator::test_start_notebook_execution_wait_for_completion
- tests/providers/amazon/aws/operators/test_emr_notebook_execution.py::TestEmrStartNotebookExecutionOperator::test_start_notebook_execution_wait_for_completion_fail_state
- tests/providers/amazon/aws/operators/test_emr_notebook_execution.py::TestEmrStartNotebookExecutionOperator::test_start_notebook_execution_wait_for_completion_multiple_attempts
- tests/providers/amazon/aws/operators/test_emr_notebook_execution.py::TestStopEmrNotebookExecutionOperator::test_stop_notebook_execution
- tests/providers/amazon/aws/operators/test_emr_notebook_execution.py::TestStopEmrNotebookExecutionOperator::test_stop_notebook_execution_wait_for_completion
- tests/providers/amazon/aws/operators/test_emr_notebook_execution.py::TestStopEmrNotebookExecutionOperator::test_stop_notebook_execution_wait_for_completion_fail_state
- tests/providers/amazon/aws/operators/test_emr_notebook_execution.py::TestStopEmrNotebookExecutionOperator::test_stop_notebook_execution_wait_for_completion_multiple_attempts
- tests/providers/amazon/aws/operators/test_emr_notebook_execution.py::TestStopEmrNotebookExecutionOperator::test_stop_notebook_execution_waiter_config
- tests/providers/amazon/aws/operators/test_emr_serverless.py::TestEmrServerlessCreateApplicationOperator::test_create_application_waiter_params
- tests/providers/amazon/aws/operators/test_emr_serverless.py::TestEmrServerlessDeleteOperator::test_delete_application_waiter_params
- tests/providers/amazon/aws/operators/test_emr_serverless.py::TestEmrServerlessStartJobOperator::test_start_job_waiter_params
- tests/providers/amazon/aws/secrets/test_secrets_manager.py::TestSecretsManagerBackend::test_get_conn_value_broken_field_mode
- tests/providers/amazon/aws/secrets/test_secrets_manager.py::TestSecretsManagerBackend::test_get_conn_value_broken_field_mode_extra_words_added
- tests/providers/amazon/aws/secrets/test_secrets_manager.py::TestSecretsManagerBackend::test_get_connection_broken_field_mode_extra_allows_nested_json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,22 +284,22 @@ def test_stop_notebook_execution_wait_for_completion_multiple_attempts(self, moc
@mock.patch.object(EmrHook, "conn")
def test_stop_notebook_execution_waiter_config(self, mock_conn, mock_waiter, _):
test_execution_id = "test-execution-id"
countdown = 400
waiter_max_attempts = 35
delay = 12

op = EmrStopNotebookExecutionOperator(
task_id="test-id",
notebook_execution_id=test_execution_id,
wait_for_completion=True,
waiter_countdown=countdown,
waiter_check_interval_seconds=delay,
waiter_max_attempts=waiter_max_attempts,
waiter_delay=delay,
)

op.execute(None)
mock_conn.stop_notebook_execution.assert_called_once_with(NotebookExecutionId=test_execution_id)
mock_waiter.assert_called_once_with(
mock.ANY,
NotebookExecutionId=test_execution_id,
WaiterConfig={"Delay": delay, "MaxAttempts": countdown // delay},
WaiterConfig={"Delay": delay, "MaxAttempts": waiter_max_attempts},
)
assert_expected_waiter_type(mock_waiter, "notebook_stopped")
164 changes: 115 additions & 49 deletions tests/providers/amazon/aws/operators/test_emr_serverless.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import pytest
from botocore.exceptions import WaiterError

from airflow.exceptions import AirflowException, TaskDeferred
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning, TaskDeferred
from airflow.providers.amazon.aws.hooks.emr import EmrServerlessHook
from airflow.providers.amazon.aws.links.emr import (
EmrServerlessCloudWatchLogsLink,
Expand Down Expand Up @@ -336,28 +336,51 @@ def test_application_in_failure_state(self, mock_conn, mock_get_waiter):
)

@pytest.mark.parametrize(
"waiter_delay, waiter_max_attempts, waiter_countdown, waiter_check_interval_seconds, expected",
"waiter_delay, waiter_max_attempts, waiter_countdown, waiter_check_interval_seconds, expected, warning",
[
(NOTSET, NOTSET, NOTSET, NOTSET, [60, 25]),
(30, 10, NOTSET, NOTSET, [30, 10]),
(NOTSET, NOTSET, 30 * 15, 15, [15, 30]),
(10, 20, 30, 40, [10, 20]),
(NOTSET, NOTSET, NOTSET, NOTSET, [60, 25], False),
(30, 10, NOTSET, NOTSET, [30, 10], False),
(NOTSET, NOTSET, 30 * 15, 15, [15, 30], True),
(10, 20, 30, 40, [10, 20], True),
],
)
def test_create_application_waiter_params(
self, waiter_delay, waiter_max_attempts, waiter_countdown, waiter_check_interval_seconds, expected
self,
waiter_delay,
waiter_max_attempts,
waiter_countdown,
waiter_check_interval_seconds,
expected,
warning,
):
operator = EmrServerlessCreateApplicationOperator(
task_id=task_id,
release_label=release_label,
job_type=job_type,
client_request_token=client_request_token,
config=config,
waiter_delay=waiter_delay,
waiter_max_attempts=waiter_max_attempts,
waiter_countdown=waiter_countdown,
waiter_check_interval_seconds=waiter_check_interval_seconds,
)
if warning:
with pytest.warns(
AirflowProviderDeprecationWarning,
match="The parameter waiter_.* has been deprecated to standardize naming conventions. Please use waiter_.* instead. .*In the future this will default to None and defer to the waiter's default value.",
):
operator = EmrServerlessCreateApplicationOperator(
task_id=task_id,
release_label=release_label,
job_type=job_type,
client_request_token=client_request_token,
config=config,
waiter_delay=waiter_delay,
waiter_max_attempts=waiter_max_attempts,
waiter_countdown=waiter_countdown,
waiter_check_interval_seconds=waiter_check_interval_seconds,
)
else:
operator = EmrServerlessCreateApplicationOperator(
task_id=task_id,
release_label=release_label,
job_type=job_type,
client_request_token=client_request_token,
config=config,
waiter_delay=waiter_delay,
waiter_max_attempts=waiter_max_attempts,
waiter_countdown=waiter_countdown,
waiter_check_interval_seconds=waiter_check_interval_seconds,
)
assert operator.wait_for_completion is True
assert operator.waiter_delay == expected[0]
assert operator.waiter_max_attempts == expected[1]
Expand Down Expand Up @@ -755,28 +778,51 @@ def test_cancel_job_run(self, mock_conn):
)

@pytest.mark.parametrize(
"waiter_delay, waiter_max_attempts, waiter_countdown, waiter_check_interval_seconds, expected",
"waiter_delay, waiter_max_attempts, waiter_countdown, waiter_check_interval_seconds, expected, warning",
[
(NOTSET, NOTSET, NOTSET, NOTSET, [60, 25]),
(30, 10, NOTSET, NOTSET, [30, 10]),
(NOTSET, NOTSET, 30 * 15, 15, [15, 30]),
(10, 20, 30, 40, [10, 20]),
(NOTSET, NOTSET, NOTSET, NOTSET, [60, 25], False),
(30, 10, NOTSET, NOTSET, [30, 10], False),
(NOTSET, NOTSET, 30 * 15, 15, [15, 30], True),
(10, 20, 30, 40, [10, 20], True),
],
)
def test_start_job_waiter_params(
self, waiter_delay, waiter_max_attempts, waiter_countdown, waiter_check_interval_seconds, expected
self,
waiter_delay,
waiter_max_attempts,
waiter_countdown,
waiter_check_interval_seconds,
expected,
warning,
):
operator = EmrServerlessStartJobOperator(
task_id=task_id,
application_id=application_id,
execution_role_arn=execution_role_arn,
job_driver=job_driver,
configuration_overrides=configuration_overrides,
waiter_delay=waiter_delay,
waiter_max_attempts=waiter_max_attempts,
waiter_countdown=waiter_countdown,
waiter_check_interval_seconds=waiter_check_interval_seconds,
)
if warning:
with pytest.warns(
AirflowProviderDeprecationWarning,
match="The parameter waiter_.* has been deprecated to standardize naming conventions. Please use waiter_.* instead. .*In the future this will default to None and defer to the waiter's default value.",
):
operator = EmrServerlessStartJobOperator(
task_id=task_id,
application_id=application_id,
execution_role_arn=execution_role_arn,
job_driver=job_driver,
configuration_overrides=configuration_overrides,
waiter_delay=waiter_delay,
waiter_max_attempts=waiter_max_attempts,
waiter_countdown=waiter_countdown,
waiter_check_interval_seconds=waiter_check_interval_seconds,
)
else:
operator = EmrServerlessStartJobOperator(
task_id=task_id,
application_id=application_id,
execution_role_arn=execution_role_arn,
job_driver=job_driver,
configuration_overrides=configuration_overrides,
waiter_delay=waiter_delay,
waiter_max_attempts=waiter_max_attempts,
waiter_countdown=waiter_countdown,
waiter_check_interval_seconds=waiter_check_interval_seconds,
)
assert operator.wait_for_completion is True
assert operator.waiter_delay == expected[0]
assert operator.waiter_max_attempts == expected[1]
Expand Down Expand Up @@ -1209,25 +1255,45 @@ def test_delete_application_failed_deletion(self, mock_conn, mock_get_waiter):
mock_conn.delete_application.assert_called_once_with(applicationId=application_id_delete_operator)

@pytest.mark.parametrize(
"waiter_delay, waiter_max_attempts, waiter_countdown, waiter_check_interval_seconds, expected",
"waiter_delay, waiter_max_attempts, waiter_countdown, waiter_check_interval_seconds, expected, warning",
[
(NOTSET, NOTSET, NOTSET, NOTSET, [60, 25]),
(30, 10, NOTSET, NOTSET, [30, 10]),
(NOTSET, NOTSET, 30 * 15, 15, [15, 30]),
(10, 20, 30, 40, [10, 20]),
(NOTSET, NOTSET, NOTSET, NOTSET, [60, 25], False),
(30, 10, NOTSET, NOTSET, [30, 10], False),
(NOTSET, NOTSET, 30 * 15, 15, [15, 30], True),
(10, 20, 30, 40, [10, 20], True),
],
)
def test_delete_application_waiter_params(
self, waiter_delay, waiter_max_attempts, waiter_countdown, waiter_check_interval_seconds, expected
self,
waiter_delay,
waiter_max_attempts,
waiter_countdown,
waiter_check_interval_seconds,
expected,
warning,
):
operator = EmrServerlessDeleteApplicationOperator(
task_id=task_id,
application_id=application_id,
waiter_delay=waiter_delay,
waiter_max_attempts=waiter_max_attempts,
waiter_countdown=waiter_countdown,
waiter_check_interval_seconds=waiter_check_interval_seconds,
)
if warning:
with pytest.warns(
AirflowProviderDeprecationWarning,
match="The parameter waiter_.* has been deprecated to standardize naming conventions. Please use waiter_.* instead. .*In the future this will default to None and defer to the waiter's default value.",
):
operator = EmrServerlessDeleteApplicationOperator(
task_id=task_id,
application_id=application_id,
waiter_delay=waiter_delay,
waiter_max_attempts=waiter_max_attempts,
waiter_countdown=waiter_countdown,
waiter_check_interval_seconds=waiter_check_interval_seconds,
)
else:
operator = EmrServerlessDeleteApplicationOperator(
task_id=task_id,
application_id=application_id,
waiter_delay=waiter_delay,
waiter_max_attempts=waiter_max_attempts,
waiter_countdown=waiter_countdown,
waiter_check_interval_seconds=waiter_check_interval_seconds,
)
assert operator.wait_for_completion is True
assert operator.waiter_delay == expected[0]
assert operator.waiter_max_attempts == expected[1]
Expand Down

0 comments on commit 19c145c

Please sign in to comment.