Skip to content

Commit

Permalink
Remove log groups in the lambda system test (apache#25626)
Browse files Browse the repository at this point in the history
  • Loading branch information
vincbeck authored Aug 9, 2022
1 parent d5f40d7 commit ed9bab0
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 20 deletions.
21 changes: 16 additions & 5 deletions tests/system/providers/amazon/aws/example_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import json
import zipfile
from datetime import datetime
from typing import List, Optional, Tuple

import boto3

Expand All @@ -26,7 +27,7 @@
from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.operators.lambda_function import AwsLambdaInvokeFunctionOperator
from airflow.utils.trigger_rule import TriggerRule
from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder
from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder, purge_logs

DAG_ID = 'example_lambda'

Expand All @@ -42,7 +43,7 @@ def test(*args):


# Create a zip file containing one file "lambda_function.py" to deploy to the lambda function
def create_zip(content):
def create_zip(content: str):
zip_output = io.BytesIO()
with zipfile.ZipFile(zip_output, "w", zipfile.ZIP_DEFLATED) as zip_file:
info = zipfile.ZipInfo("lambda_function.py")
Expand All @@ -53,7 +54,7 @@ def create_zip(content):


@task
def create_lambda(function_name, role_arn):
def create_lambda(function_name: str, role_arn: str):
client = boto3.client('lambda')
client.create_function(
FunctionName=function_name,
Expand All @@ -68,20 +69,29 @@ def create_lambda(function_name, role_arn):


@task
def await_lambda(function_name):
def await_lambda(function_name: str):
client = boto3.client('lambda')
waiter = client.get_waiter('function_active_v2')
waiter.wait(FunctionName=function_name)


@task(trigger_rule=TriggerRule.ALL_DONE)
def delete_lambda(function_name):
def delete_lambda(function_name: str):
client = boto3.client('lambda')
client.delete_function(
FunctionName=function_name,
)


@task(trigger_rule=TriggerRule.ALL_DONE)
def delete_logs(function_name: str) -> None:
generated_log_groups: List[Tuple[str, Optional[str]]] = [
(f'/aws/lambda/{function_name}', None),
]

purge_logs(test_logs=generated_log_groups, force_delete=True, retry=True)


with models.DAG(
DAG_ID,
schedule_interval='@once',
Expand Down Expand Up @@ -110,6 +120,7 @@ def delete_lambda(function_name):
invoke_lambda_function,
# TEST TEARDOWN
delete_lambda(lambda_function_name),
delete_logs(lambda_function_name),
)

from tests.system.utils.watcher import watcher
Expand Down
57 changes: 42 additions & 15 deletions tests/system/providers/amazon/aws/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@
import logging
import os
from os.path import basename, splitext
from time import sleep
from typing import List, Optional, Tuple
from uuid import uuid4

import boto3
from botocore.client import BaseClient
from botocore.exceptions import NoCredentialsError
from botocore.exceptions import ClientError, NoCredentialsError

from airflow.decorators import task

Expand All @@ -34,6 +35,7 @@
DEFAULT_ENV_ID_PREFIX: str = 'env'
DEFAULT_ENV_ID_LEN: int = 8
DEFAULT_ENV_ID: str = f'{DEFAULT_ENV_ID_PREFIX}{str(uuid4())[:DEFAULT_ENV_ID_LEN]}'
PURGE_LOGS_INTERVAL_PERIOD = 5

# All test file names will contain this string.
TEST_FILE_IDENTIFIER: str = 'example'
Expand Down Expand Up @@ -172,6 +174,7 @@ def fetch_variable(key: str, default_value: Optional[str] = None, test_name: Opt
:param key: The name of the Parameter to fetch a value for.
:param default_value: The default value to use if no value can be found.
:param test_name: The system test name.
:return: The value of the parameter.
"""

Expand Down Expand Up @@ -199,30 +202,54 @@ def set_env_id() -> str:
return env_id


def purge_logs(test_logs: List[Tuple[str, Optional[str]]]) -> None:
def purge_logs(
test_logs: List[Tuple[str, Optional[str]]],
force_delete: bool = False,
retry: bool = False,
retry_times: int = 3,
) -> None:
"""
Accepts a tuple in the format: ('log group name', 'log stream prefix').
For each log group, it will delete any log streams matching the provided
prefix then if the log group is empty, delete the group. If the group
prefix then if the log group is empty, delete the group. If the group
is not empty that indicates there are logs not generated by the test and
those are left intact.
those are left intact. If `check_log_streams` is True, it will simply delete the log group regardless
of log streams within that log group.
:param test_logs: A list of log_group/stream_prefix tuples to delete.
:param force_delete: Whether to check log streams within the log group before removal. If True,
removes the log group and all its log streams inside it
:param retry: Whether to retry if the log group/stream was not found. In some cases, the log group/stream
is created seconds after the main resource has been created. By default, it retries for 3 times
with a 5s waiting period
:param retry_times: Number of retries
"""
client: BaseClient = boto3.client('logs')

for group, prefix in test_logs:
if prefix:
log_streams = client.describe_log_streams(
logGroupName=group,
logStreamNamePrefix=prefix,
)['logStreams']

for stream_name in [stream['logStreamName'] for stream in log_streams]:
client.delete_log_stream(logGroupName=group, logStreamName=stream_name)

if not client.describe_log_streams(logGroupName=group)['logStreams']:
client.delete_log_group(logGroupName=group)
try:
if prefix:
log_streams = client.describe_log_streams(
logGroupName=group,
logStreamNamePrefix=prefix,
)['logStreams']

for stream_name in [stream['logStreamName'] for stream in log_streams]:
client.delete_log_stream(logGroupName=group, logStreamName=stream_name)

if force_delete or not client.describe_log_streams(logGroupName=group)['logStreams']:
client.delete_log_group(logGroupName=group)
except ClientError as e:
if not retry or retry_times == 0 or e.response['Error']['Code'] != 'ResourceNotFoundException':
raise e

sleep(PURGE_LOGS_INTERVAL_PERIOD)
purge_logs(
test_logs=test_logs,
force_delete=force_delete,
retry=retry,
retry_times=retry_times - 1,
)


@task
Expand Down

0 comments on commit ed9bab0

Please sign in to comment.