Skip to content

Commit

Permalink
Glue Job Driver logging (apache#25142)
Browse files Browse the repository at this point in the history
Adds an optional `verbose` boolean to Glue job operators and sensors which defaults to False.  If set true, then Glue job logs will be passed through to the Airflow task logs.
  • Loading branch information
ferruzzi authored Jul 19, 2022
1 parent f0c9ac9 commit 5a77c46
Show file tree
Hide file tree
Showing 5 changed files with 265 additions and 38 deletions.
97 changes: 83 additions & 14 deletions airflow/providers/amazon/aws/hooks/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,18 @@
import warnings
from typing import Dict, List, Optional

import boto3

from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook

DEFAULT_LOG_SUFFIX = 'output'
FAILURE_LOG_SUFFIX = 'error'
# A filter value of ' ' translates to "match all".
# see: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html
DEFAULT_LOG_FILTER = ' '
FAILURE_LOG_FILTER = '?ERROR ?Exception'


class GlueJobHook(AwsBaseHook):
"""
Expand Down Expand Up @@ -136,32 +145,92 @@ def get_job_state(self, job_name: str, run_id: str) -> str:
job_run = glue_client.get_job_run(JobName=job_name, RunId=run_id, PredecessorsIncluded=True)
return job_run['JobRun']['JobRunState']

def job_completion(self, job_name: str, run_id: str) -> Dict[str, str]:
def print_job_logs(
self,
job_name: str,
run_id: str,
job_failed: bool = False,
next_token: Optional[str] = None,
) -> Optional[str]:
"""Prints the batch of logs to the Airflow task log and returns nextToken."""
log_client = boto3.client('logs')
response = {}

filter_pattern = FAILURE_LOG_FILTER if job_failed else DEFAULT_LOG_FILTER
log_group_prefix = self.conn.get_job_run(JobName=job_name, RunId=run_id)['JobRun']['LogGroupName']
log_group_suffix = FAILURE_LOG_SUFFIX if job_failed else DEFAULT_LOG_SUFFIX
log_group_name = f'{log_group_prefix}/{log_group_suffix}'

try:
if next_token:
response = log_client.filter_log_events(
logGroupName=log_group_name,
logStreamNames=[run_id],
filterPattern=filter_pattern,
nextToken=next_token,
)
else:
response = log_client.filter_log_events(
logGroupName=log_group_name,
logStreamNames=[run_id],
filterPattern=filter_pattern,
)
if len(response['events']):
messages = '\t'.join([event['message'] for event in response['events']])
self.log.info('Glue Job Run Logs:\n\t%s', messages)

except log_client.exceptions.ResourceNotFoundException:
self.log.warning(
'No new Glue driver logs found. This might be because there are no new logs, '
'or might be an error.\nIf the error persists, check the CloudWatch dashboard '
f'at: https://{self.conn_region_name}.console.aws.amazon.com/cloudwatch/home'
)

# If no new log events are available, filter_log_events will return None.
# In that case, check the same token again next pass.
return response.get('nextToken') or next_token

def job_completion(self, job_name: str, run_id: str, verbose: bool = False) -> Dict[str, str]:
"""
Waits until Glue job with job_name completes or
fails and return final state if finished.
Raises AirflowException when the job failed
:param job_name: unique job name per AWS account
:param run_id: The job-run ID of the predecessor job run
:param verbose: If True, more Glue Job Run logs show in the Airflow Task Logs. (default: False)
:return: Dict of JobRunState and JobRunId
"""
failed_states = ['FAILED', 'TIMEOUT']
finished_states = ['SUCCEEDED', 'STOPPED']
next_log_token = None
job_failed = False

while True:
job_run_state = self.get_job_state(job_name, run_id)
if job_run_state in finished_states:
self.log.info("Exiting Job %s Run State: %s", run_id, job_run_state)
return {'JobRunState': job_run_state, 'JobRunId': run_id}
if job_run_state in failed_states:
job_error_message = f"Exiting Job {run_id} Run State: {job_run_state}"
self.log.info(job_error_message)
raise AirflowException(job_error_message)
else:
self.log.info(
"Polling for AWS Glue Job %s current run state with status %s", job_name, job_run_state
)
time.sleep(self.JOB_POLL_INTERVAL)
try:
job_run_state = self.get_job_state(job_name, run_id)
if job_run_state in finished_states:
self.log.info('Exiting Job %s Run State: %s', run_id, job_run_state)
return {'JobRunState': job_run_state, 'JobRunId': run_id}
if job_run_state in failed_states:
job_failed = True
job_error_message = f'Exiting Job {run_id} Run State: {job_run_state}'
self.log.info(job_error_message)
raise AirflowException(job_error_message)
else:
self.log.info(
'Polling for AWS Glue Job %s current run state with status %s',
job_name,
job_run_state,
)
time.sleep(self.JOB_POLL_INTERVAL)
finally:
if verbose:
next_log_token = self.print_job_logs(
job_name=job_name,
run_id=run_id,
job_failed=job_failed,
next_token=next_log_token,
)

def get_or_create_glue_job(self) -> str:
"""
Expand Down
5 changes: 4 additions & 1 deletion airflow/providers/amazon/aws/operators/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class GlueJobOperator(BaseOperator):
:param create_job_kwargs: Extra arguments for Glue Job Creation
:param run_job_kwargs: Extra arguments for Glue Job Run
:param wait_for_completion: Whether or not wait for job run completion. (default: True)
:param verbose: If True, Glue Job Run logs show in the Airflow Task Logs. (default: False)
"""

template_fields: Sequence[str] = (
Expand Down Expand Up @@ -84,6 +85,7 @@ def __init__(
create_job_kwargs: Optional[dict] = None,
run_job_kwargs: Optional[dict] = None,
wait_for_completion: bool = True,
verbose: bool = False,
**kwargs,
):
super().__init__(**kwargs)
Expand All @@ -103,6 +105,7 @@ def __init__(
self.create_job_kwargs = create_job_kwargs
self.run_job_kwargs = run_job_kwargs or {}
self.wait_for_completion = wait_for_completion
self.verbose = verbose

def execute(self, context: 'Context'):
"""
Expand Down Expand Up @@ -141,7 +144,7 @@ def execute(self, context: 'Context'):
)
glue_job_run = glue_job.initialize_job(self.script_args, self.run_job_kwargs)
if self.wait_for_completion:
glue_job_run = glue_job.job_completion(self.job_name, glue_job_run['JobRunId'])
glue_job_run = glue_job.job_completion(self.job_name, glue_job_run['JobRunId'], self.verbose)
self.log.info(
"AWS Glue Job: %s status: %s. Run Id: %s",
self.job_name,
Expand Down
50 changes: 37 additions & 13 deletions airflow/providers/amazon/aws/sensors/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
import warnings
from typing import TYPE_CHECKING, Sequence
from typing import TYPE_CHECKING, List, Optional, Sequence

from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.glue import GlueJobHook
Expand All @@ -37,30 +37,54 @@ class GlueJobSensor(BaseSensorOperator):
:param job_name: The AWS Glue Job unique name
:param run_id: The AWS Glue current running job identifier
:param verbose: If True, more Glue Job Run logs show in the Airflow Task Logs. (default: False)
"""

template_fields: Sequence[str] = ('job_name', 'run_id')

def __init__(self, *, job_name: str, run_id: str, aws_conn_id: str = 'aws_default', **kwargs):
def __init__(
self,
*,
job_name: str,
run_id: str,
verbose: bool = False,
aws_conn_id: str = 'aws_default',
**kwargs,
):
super().__init__(**kwargs)
self.job_name = job_name
self.run_id = run_id
self.verbose = verbose
self.aws_conn_id = aws_conn_id
self.success_states = ['SUCCEEDED']
self.errored_states = ['FAILED', 'STOPPED', 'TIMEOUT']
self.success_states: List[str] = ['SUCCEEDED']
self.errored_states: List[str] = ['FAILED', 'STOPPED', 'TIMEOUT']
self.next_log_token: Optional[str] = None

def poke(self, context: 'Context'):
hook = GlueJobHook(aws_conn_id=self.aws_conn_id)
self.log.info("Poking for job run status :for Glue Job %s and ID %s", self.job_name, self.run_id)
self.log.info('Poking for job run status :for Glue Job %s and ID %s', self.job_name, self.run_id)
job_state = hook.get_job_state(job_name=self.job_name, run_id=self.run_id)
if job_state in self.success_states:
self.log.info("Exiting Job %s Run State: %s", self.run_id, job_state)
return True
elif job_state in self.errored_states:
job_error_message = f"Exiting Job {self.run_id} Run State: {job_state}"
raise AirflowException(job_error_message)
else:
return False
job_failed = False

try:
if job_state in self.success_states:
self.log.info('Exiting Job %s Run State: %s', self.run_id, job_state)
return True
elif job_state in self.errored_states:
job_failed = True
job_error_message = 'Exiting Job %s Run State: %s', self.run_id, job_state
self.log.info(job_error_message)
raise AirflowException(job_error_message)
else:
return False
finally:
if self.verbose:
self.next_log_token = hook.print_job_logs(
job_name=self.job_name,
run_id=self.run_id,
job_failed=job_failed,
next_token=self.next_log_token,
)


class AwsGlueJobSensor(GlueJobSensor):
Expand Down
54 changes: 50 additions & 4 deletions tests/providers/amazon/aws/operators/test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from parameterized import parameterized

from airflow import configuration
from airflow.configuration import conf
from airflow.providers.amazon.aws.hooks.glue import GlueJobHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
Expand All @@ -29,7 +29,7 @@
class TestGlueJobOperator(unittest.TestCase):
@mock.patch('airflow.providers.amazon.aws.hooks.glue.GlueJobHook')
def setUp(self, glue_hook_mock):
configuration.load_test_config()
conf.load_test_config()

self.glue_hook_mock = glue_hook_mock

Expand All @@ -39,12 +39,19 @@ def setUp(self, glue_hook_mock):
"/glue-examples/glue-scripts/sample_aws_glue_job.py",
]
)
@mock.patch.object(GlueJobHook, 'print_job_logs')
@mock.patch.object(GlueJobHook, 'get_job_state')
@mock.patch.object(GlueJobHook, 'initialize_job')
@mock.patch.object(GlueJobHook, "get_conn")
@mock.patch.object(S3Hook, "load_file")
def test_execute_without_failure(
self, script_location, mock_load_file, mock_get_conn, mock_initialize_job, mock_get_job_state
self,
script_location,
mock_load_file,
mock_get_conn,
mock_initialize_job,
mock_get_job_state,
mock_print_job_logs,
):
glue = GlueJobOperator(
task_id='test_glue_operator',
Expand All @@ -57,16 +64,52 @@ def test_execute_without_failure(
)
mock_initialize_job.return_value = {'JobRunState': 'RUNNING', 'JobRunId': '11111'}
mock_get_job_state.return_value = 'SUCCEEDED'

glue.execute({})

mock_initialize_job.assert_called_once_with({}, {})
mock_print_job_logs.assert_not_called()
assert glue.job_name == 'my_test_job'

@mock.patch.object(GlueJobHook, 'print_job_logs')
@mock.patch.object(GlueJobHook, 'get_job_state')
@mock.patch.object(GlueJobHook, 'initialize_job')
@mock.patch.object(GlueJobHook, "get_conn")
@mock.patch.object(S3Hook, "load_file")
def test_execute_with_verbose_logging(
self, mock_load_file, mock_get_conn, mock_initialize_job, mock_get_job_state, mock_print_job_logs
):
job_name = 'test_job_name'
job_run_id = '11111'
glue = GlueJobOperator(
task_id='test_glue_operator',
job_name=job_name,
script_location='s3_uri',
s3_bucket='bucket_name',
iam_role_name='role_arn',
verbose=True,
)
mock_initialize_job.return_value = {'JobRunState': 'RUNNING', 'JobRunId': job_run_id}
mock_get_job_state.return_value = 'SUCCEEDED'

glue.execute({})

mock_initialize_job.assert_called_once_with({}, {})
mock_print_job_logs.assert_called_once_with(
job_name=job_name,
run_id=job_run_id,
job_failed=False,
next_token=None,
)
assert glue.job_name == job_name

@mock.patch.object(GlueJobHook, 'print_job_logs')
@mock.patch.object(GlueJobHook, 'job_completion')
@mock.patch.object(GlueJobHook, 'initialize_job')
@mock.patch.object(GlueJobHook, "get_conn")
@mock.patch.object(S3Hook, "load_file")
def test_execute_without_waiting_for_completion(
self, mock_load_file, mock_get_conn, mock_initialize_job, mock_job_completion
self, mock_load_file, mock_get_conn, mock_initialize_job, mock_job_completion, mock_print_job_logs
):
glue = GlueJobOperator(
task_id='test_glue_operator',
Expand All @@ -79,8 +122,11 @@ def test_execute_without_waiting_for_completion(
wait_for_completion=False,
)
mock_initialize_job.return_value = {'JobRunState': 'RUNNING', 'JobRunId': '11111'}

job_run_id = glue.execute({})

mock_initialize_job.assert_called_once_with({}, {})
mock_job_completion.assert_not_called()
mock_print_job_logs.assert_not_called()
assert glue.job_name == 'my_test_job'
assert job_run_id == '11111'
Loading

0 comments on commit 5a77c46

Please sign in to comment.