Skip to content

Commit

Permalink
Glue system test cleanup (apache#25966)
Browse files Browse the repository at this point in the history
  • Loading branch information
ferruzzi authored Aug 26, 2022
1 parent 392471f commit 389c823
Showing 1 changed file with 37 additions and 55 deletions.
92 changes: 37 additions & 55 deletions tests/system/providers/amazon/aws/example_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from airflow import DAG
from airflow.decorators import task
from airflow.models.baseoperator import chain
from airflow.operators.python import get_current_context
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator
from airflow.providers.amazon.aws.operators.s3 import (
Expand Down Expand Up @@ -66,14 +65,19 @@
'''


@task
def get_role_name(arn):
return arn.split('/')[-1]


@task(trigger_rule=TriggerRule.ALL_DONE)
def delete_logs(job_id: str, glue_crawler_name: str) -> None:
def delete_logs(job_id: str, crawler_name: str) -> None:
"""
Glue generates four Cloudwatch log groups and multiple log streams and leaves them.
"""
generated_log_groups: List[Tuple[str, Optional[str]]] = [
# Format: ('log group name', 'log stream prefix')
('/aws-glue/crawlers', glue_crawler_name),
('/aws-glue/crawlers', crawler_name),
('/aws-glue/jobs/logs-v2', job_id),
('/aws-glue/jobs/error', job_id),
('/aws-glue/jobs/output', job_id),
Expand All @@ -83,22 +87,30 @@ def delete_logs(job_id: str, glue_crawler_name: str) -> None:


@task(trigger_rule=TriggerRule.ALL_DONE)
def glue_cleanup(glue_crawler_name: str, glue_job_name: str, glue_db_name: str) -> None:
def glue_cleanup(crawler_name: str, job_name: str, db_name: str) -> None:
client: BaseClient = boto3.client('glue')

client.delete_crawler(Name=glue_crawler_name)
client.delete_job(JobName=glue_job_name)
client.delete_database(Name=glue_db_name)
client.delete_crawler(Name=crawler_name)
client.delete_job(JobName=job_name)
client.delete_database(Name=db_name)


@task
def set_up(env_id, role_arn):
with DAG(
dag_id=DAG_ID,
schedule='@once',
start_date=datetime(2021, 1, 1),
tags=['example'],
catchup=False,
) as dag:
test_context = sys_test_context_task()

env_id = test_context[ENV_ID_KEY]
role_arn = test_context[ROLE_ARN_KEY]
glue_crawler_name = f'{env_id}_crawler'
glue_db_name = f'{env_id}_glue_db'
glue_job_name = f'{env_id}_glue_job'
bucket_name = f'{env_id}-bucket'

role_name = role_arn.split('/')[-1]
role_name = get_role_name(role_arn)

glue_crawler_config = {
'Name': glue_crawler_name,
Expand All @@ -107,54 +119,31 @@ def set_up(env_id, role_arn):
'Targets': {'S3Targets': [{'Path': f'{bucket_name}/input'}]},
}

ti = get_current_context()['ti']
ti.xcom_push(key='bucket_name', value=bucket_name)
ti.xcom_push(key='glue_db_name', value=glue_db_name)
ti.xcom_push(key='glue_crawler_config', value=glue_crawler_config)
ti.xcom_push(key='glue_crawler_name', value=glue_crawler_name)
ti.xcom_push(key='glue_job_name', value=glue_job_name)
ti.xcom_push(key='role_name', value=role_name)


with DAG(
dag_id=DAG_ID,
schedule='@once',
start_date=datetime(2021, 1, 1),
tags=['example'],
catchup=False,
) as dag:
test_context = sys_test_context_task()

test_setup = set_up(
env_id=test_context[ENV_ID_KEY],
role_arn=test_context[ROLE_ARN_KEY],
)

create_bucket = S3CreateBucketOperator(
task_id='create_bucket',
bucket_name=test_setup['bucket_name'],
bucket_name=bucket_name,
)

upload_csv = S3CreateObjectOperator(
task_id='upload_csv',
s3_bucket=test_setup['bucket_name'],
s3_bucket=bucket_name,
s3_key='input/input.csv',
data=EXAMPLE_CSV,
replace=True,
)

upload_script = S3CreateObjectOperator(
task_id='upload_script',
s3_bucket=test_setup['bucket_name'],
s3_bucket=bucket_name,
s3_key='etl_script.py',
data=EXAMPLE_SCRIPT.format(db_name=test_setup['glue_db_name'], bucket_name=test_setup['bucket_name']),
data=EXAMPLE_SCRIPT.format(db_name=glue_db_name, bucket_name=bucket_name),
replace=True,
)

# [START howto_operator_glue_crawler]
crawl_s3 = GlueCrawlerOperator(
task_id='crawl_s3',
config=test_setup['glue_crawler_config'],
config=glue_crawler_config,
# Waits by default, set False to test the Sensor below
wait_for_completion=False,
)
Expand All @@ -163,17 +152,17 @@ def set_up(env_id, role_arn):
# [START howto_sensor_glue_crawler]
wait_for_crawl = GlueCrawlerSensor(
task_id='wait_for_crawl',
crawler_name=test_setup['glue_crawler_name'],
crawler_name=glue_crawler_name,
)
# [END howto_sensor_glue_crawler]

# [START howto_operator_glue]
submit_glue_job = GlueJobOperator(
task_id='submit_glue_job',
job_name=test_setup['glue_job_name'],
script_location=f's3://{test_setup["bucket_name"]}/etl_script.py',
s3_bucket=test_setup['bucket_name'],
iam_role_name=test_setup['role_name'],
job_name=glue_job_name,
script_location=f's3://{bucket_name}/etl_script.py',
s3_bucket=bucket_name,
iam_role_name=role_name,
create_job_kwargs={'GlueVersion': '3.0', 'NumberOfWorkers': 2, 'WorkerType': 'G.1X'},
# Waits by default, set False to test the Sensor below
wait_for_completion=False,
Expand All @@ -183,7 +172,7 @@ def set_up(env_id, role_arn):
# [START howto_sensor_glue]
wait_for_job = GlueJobSensor(
task_id='wait_for_job',
job_name=test_setup['glue_job_name'],
job_name=glue_job_name,
# Job ID extracted from previous Glue Job Operator task
run_id=submit_glue_job.output,
)
Expand All @@ -192,20 +181,13 @@ def set_up(env_id, role_arn):
delete_bucket = S3DeleteBucketOperator(
task_id='delete_bucket',
trigger_rule=TriggerRule.ALL_DONE,
bucket_name=test_setup['bucket_name'],
bucket_name=bucket_name,
force_delete=True,
)

clean_up = glue_cleanup(
test_setup['glue_crawler_name'],
test_setup['glue_job_name'],
test_setup['glue_db_name'],
)

chain(
# TEST SETUP
test_context,
test_setup,
create_bucket,
upload_csv,
upload_script,
Expand All @@ -215,9 +197,9 @@ def set_up(env_id, role_arn):
submit_glue_job,
wait_for_job,
# TEST TEARDOWN
clean_up,
glue_cleanup(glue_crawler_name, glue_job_name, glue_db_name),
delete_bucket,
delete_logs(submit_glue_job.output, test_setup['glue_crawler_name']),
delete_logs(submit_glue_job.output, glue_crawler_name),
)

from tests.system.utils.watcher import watcher
Expand Down

0 comments on commit 389c823

Please sign in to comment.