From ed9bab09f34fbfd36e4b597f0a891ffd0f83cebc Mon Sep 17 00:00:00 2001 From: Vincent <97131062+vincbeck@users.noreply.github.com> Date: Tue, 9 Aug 2022 17:49:54 -0400 Subject: [PATCH] Remove log groups in the lambda system test (#25626) --- .../providers/amazon/aws/example_lambda.py | 21 +++++-- .../providers/amazon/aws/utils/__init__.py | 57 ++++++++++++++----- 2 files changed, 58 insertions(+), 20 deletions(-) diff --git a/tests/system/providers/amazon/aws/example_lambda.py b/tests/system/providers/amazon/aws/example_lambda.py index a9e5e37b6c88a..0b8f2805f0e0c 100644 --- a/tests/system/providers/amazon/aws/example_lambda.py +++ b/tests/system/providers/amazon/aws/example_lambda.py @@ -18,6 +18,7 @@ import json import zipfile from datetime import datetime +from typing import List, Optional, Tuple import boto3 @@ -26,7 +27,7 @@ from airflow.models.baseoperator import chain from airflow.providers.amazon.aws.operators.lambda_function import AwsLambdaInvokeFunctionOperator from airflow.utils.trigger_rule import TriggerRule -from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder +from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder, purge_logs DAG_ID = 'example_lambda' @@ -42,7 +43,7 @@ def test(*args): # Create a zip file containing one file "lambda_function.py" to deploy to the lambda function -def create_zip(content): +def create_zip(content: str): zip_output = io.BytesIO() with zipfile.ZipFile(zip_output, "w", zipfile.ZIP_DEFLATED) as zip_file: info = zipfile.ZipInfo("lambda_function.py") @@ -53,7 +54,7 @@ def create_zip(content): @task -def create_lambda(function_name, role_arn): +def create_lambda(function_name: str, role_arn: str): client = boto3.client('lambda') client.create_function( FunctionName=function_name, @@ -68,20 +69,29 @@ def create_lambda(function_name, role_arn): @task -def await_lambda(function_name): +def await_lambda(function_name: str): client = boto3.client('lambda') waiter = client.get_waiter('function_active_v2') waiter.wait(FunctionName=function_name) @task(trigger_rule=TriggerRule.ALL_DONE) -def delete_lambda(function_name): +def delete_lambda(function_name: str): client = boto3.client('lambda') client.delete_function( FunctionName=function_name, ) +@task(trigger_rule=TriggerRule.ALL_DONE) +def delete_logs(function_name: str) -> None: + generated_log_groups: List[Tuple[str, Optional[str]]] = [ + (f'/aws/lambda/{function_name}', None), + ] + + purge_logs(test_logs=generated_log_groups, force_delete=True, retry=True) + + with models.DAG( DAG_ID, schedule_interval='@once', @@ -110,6 +120,7 @@ def delete_lambda(function_name): invoke_lambda_function, # TEST TEARDOWN delete_lambda(lambda_function_name), + delete_logs(lambda_function_name), ) from tests.system.utils.watcher import watcher diff --git a/tests/system/providers/amazon/aws/utils/__init__.py b/tests/system/providers/amazon/aws/utils/__init__.py index a8492ea09bba6..b193c8cac3320 100644 --- a/tests/system/providers/amazon/aws/utils/__init__.py +++ b/tests/system/providers/amazon/aws/utils/__init__.py @@ -20,12 +20,13 @@ import logging import os from os.path import basename, splitext +from time import sleep from typing import List, Optional, Tuple from uuid import uuid4 import boto3 from botocore.client import BaseClient -from botocore.exceptions import NoCredentialsError +from botocore.exceptions import ClientError, NoCredentialsError from airflow.decorators import task @@ -34,6 +35,7 @@ DEFAULT_ENV_ID_PREFIX: str = 'env' DEFAULT_ENV_ID_LEN: int = 8 DEFAULT_ENV_ID: str = f'{DEFAULT_ENV_ID_PREFIX}{str(uuid4())[:DEFAULT_ENV_ID_LEN]}' +PURGE_LOGS_INTERVAL_PERIOD = 5 # All test file names will contain this string. TEST_FILE_IDENTIFIER: str = 'example' @@ -172,6 +174,7 @@ def fetch_variable(key: str, default_value: Optional[str] = None, test_name: Opt :param key: The name of the Parameter to fetch a value for. :param default_value: The default value to use if no value can be found. + :param test_name: The system test name. :return: The value of the parameter. """ @@ -199,30 +202,54 @@ def set_env_id() -> str: return env_id -def purge_logs(test_logs: List[Tuple[str, Optional[str]]]) -> None: +def purge_logs( + test_logs: List[Tuple[str, Optional[str]]], + force_delete: bool = False, + retry: bool = False, + retry_times: int = 3, +) -> None: """ Accepts a tuple in the format: ('log group name', 'log stream prefix'). For each log group, it will delete any log streams matching the provided - prefix then if the log group is empty, delete the group. If the group + prefix then if the log group is empty, delete the group. If the group is not empty that indicates there are logs not generated by the test and - those are left intact. + those are left intact. If `check_log_streams` is True, it will simply delete the log group regardless + of log streams within that log group. :param test_logs: A list of log_group/stream_prefix tuples to delete. + :param force_delete: Whether to check log streams within the log group before removal. If True, + removes the log group and all its log streams inside it + :param retry: Whether to retry if the log group/stream was not found. In some cases, the log group/stream + is created seconds after the main resource has been created. By default, it retries for 3 times + with a 5s waiting period + :param retry_times: Number of retries """ client: BaseClient = boto3.client('logs') for group, prefix in test_logs: - if prefix: - log_streams = client.describe_log_streams( - logGroupName=group, - logStreamNamePrefix=prefix, - )['logStreams'] - - for stream_name in [stream['logStreamName'] for stream in log_streams]: - client.delete_log_stream(logGroupName=group, logStreamName=stream_name) - - if not client.describe_log_streams(logGroupName=group)['logStreams']: - client.delete_log_group(logGroupName=group) + try: + if prefix: + log_streams = client.describe_log_streams( + logGroupName=group, + logStreamNamePrefix=prefix, + )['logStreams'] + + for stream_name in [stream['logStreamName'] for stream in log_streams]: + client.delete_log_stream(logGroupName=group, logStreamName=stream_name) + + if force_delete or not client.describe_log_streams(logGroupName=group)['logStreams']: + client.delete_log_group(logGroupName=group) + except ClientError as e: + if not retry or retry_times == 0 or e.response['Error']['Code'] != 'ResourceNotFoundException': + raise e + + sleep(PURGE_LOGS_INTERVAL_PERIOD) + purge_logs( + test_logs=test_logs, + force_delete=force_delete, + retry=retry, + retry_times=retry_times - 1, + ) @task