Skip to content

Commit

Permalink
[AIRFLOW-1486] Unexpected S3 writing log error
Browse files Browse the repository at this point in the history
Removed unexpected S3 writing log error and added tests for s3 logging.

Closes apache#2499 from skudriashev/airflow-1486
  • Loading branch information
skudriashev authored and aoen committed Aug 7, 2017
1 parent f5c8457 commit d9109d6
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 30 deletions.
40 changes: 20 additions & 20 deletions airflow/utils/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,13 @@ def read(self, remote_log_location, return_error=False):
except:
pass

# raise/return error if we get here
err = 'Could not read logs from {}'.format(remote_log_location)
logging.error(err)
return err if return_error else ''
# return error if needed
if return_error:
msg = 'Could not read logs from {}'.format(remote_log_location)
logging.error(msg)
return msg

return ''

def write(self, log, remote_log_location, append=True):
"""
Expand All @@ -108,25 +111,21 @@ def write(self, log, remote_log_location, append=True):
:param append: if False, any existing log file is overwritten. If True,
the new log is appended to any existing logs.
:type append: bool
"""
if self.hook:

if append:
old_log = self.read(remote_log_location)
log = old_log + '\n' + log
log = '\n'.join([old_log, log])

try:
self.hook.load_string(
log,
key=remote_log_location,
replace=True,
encrypt=configuration.getboolean('core', 'ENCRYPT_S3_LOGS'))
return
encrypt=configuration.getboolean('core', 'ENCRYPT_S3_LOGS'),
)
except:
pass

# raise/return error if we get here
logging.error('Could not write logs to {}'.format(remote_log_location))
logging.error('Could not write logs to {}'.format(remote_log_location))


class GCSLog(object):
Expand Down Expand Up @@ -183,10 +182,13 @@ def read(self, remote_log_location, return_error=False):
except:
pass

# raise/return error if we get here
err = 'Could not read logs from {}'.format(remote_log_location)
logging.error(err)
return err if return_error else ''
# return error if needed
if return_error:
msg = 'Could not read logs from {}'.format(remote_log_location)
logging.error(msg)
return msg

return ''

def write(self, log, remote_log_location, append=True):
"""
Expand All @@ -200,12 +202,11 @@ def write(self, log, remote_log_location, append=True):
:param append: if False, any existing log file is overwritten. If True,
the new log is appended to any existing logs.
:type append: bool
"""
if self.hook:
if append:
old_log = self.read(remote_log_location)
log = old_log + '\n' + log
log = '\n'.join([old_log, log])

try:
bkt, blob = self.parse_gcs_url(remote_log_location)
Expand All @@ -218,7 +219,6 @@ def write(self, log, remote_log_location, append=True):
tmpfile.flush()
self.hook.upload(bkt, blob, tmpfile.name)
except:
# raise/return error if we get here
logging.error('Could not write logs to {}'.format(remote_log_location))

def parse_gcs_url(self, gsurl):
Expand Down
107 changes: 97 additions & 10 deletions tests/utils/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,103 @@

import unittest

from airflow.exceptions import AirflowException
from airflow.utils import logging as logging_utils
from datetime import datetime, timedelta
import mock

class Logging(unittest.TestCase):
from airflow.utils import logging
from datetime import datetime


class TestLogging(unittest.TestCase):

def test_get_log_filename(self):
dag_id = 'dag_id'
task_id = 'task_id'
execution_date = datetime(2017, 1, 1, 0, 0, 0)
try_number = 0
filename = logging_utils.get_log_filename(dag_id, task_id, execution_date, try_number)
self.assertEqual(filename, 'dag_id/task_id/2017-01-01T00:00:00/1.log')
self.assertEqual(
logging.get_log_filename(
dag_id='dag_id',
task_id='task_id',
execution_date=datetime(2017, 1, 1, 0, 0, 0),
try_number=0,
),
'dag_id/task_id/2017-01-01T00:00:00/1.log',
)


class TestS3Log(unittest.TestCase):

def setUp(self):
super(TestS3Log, self).setUp()
self.remote_log_location = 'remote/log/location'
self.hook_patcher = mock.patch("airflow.hooks.S3_hook.S3Hook")
self.hook_mock = self.hook_patcher.start()
self.hook_inst_mock = self.hook_mock.return_value
self.hook_key_mock = self.hook_inst_mock.get_key.return_value
self.hook_key_mock.get_contents_as_string.return_value.decode.\
return_value = 'content'
self.logging_patcher = mock.patch("airflow.utils.logging.logging")
self.logging_mock = self.logging_patcher.start()

def tearDown(self):
self.logging_patcher.stop()
self.hook_patcher.stop()
super(TestS3Log, self).tearDown()

def test_init(self):
logging.S3Log()
self.hook_mock.assert_called_once_with('')

def test_init_raises(self):
self.hook_mock.side_effect = Exception('Failed to connect')
logging.S3Log()
self.logging_mock.error.assert_called_once_with(
'Could not create an S3Hook with connection id "". Please make '
'sure that airflow[s3] is installed and the S3 connection exists.'
)

def test_log_exists(self):
self.assertTrue(logging.S3Log().log_exists(self.remote_log_location))

def test_log_exists_none(self):
self.hook_inst_mock.get_key.return_value = None
self.assertFalse(logging.S3Log().log_exists(self.remote_log_location))

def test_log_exists_raises(self):
self.hook_inst_mock.get_key.side_effect = Exception('error')
self.assertFalse(logging.S3Log().log_exists(self.remote_log_location))

def test_log_exists_no_hook(self):
self.hook_mock.side_effect = Exception('Failed to connect')
self.assertFalse(logging.S3Log().log_exists(self.remote_log_location))

def test_read(self):
self.assertEqual(logging.S3Log().read(self.remote_log_location),
'content')

def test_read_key_empty(self):
self.hook_inst_mock.get_key.return_value = None
self.assertEqual(logging.S3Log().read(self.remote_log_location), '')

def test_read_raises(self):
self.hook_inst_mock.get_key.side_effect = Exception('error')
self.assertEqual(logging.S3Log().read(self.remote_log_location), '')

def test_read_raises_return_error(self):
self.hook_inst_mock.get_key.side_effect = Exception('error')
result = logging.S3Log().read(self.remote_log_location,
return_error=True)
msg = 'Could not read logs from %s' % self.remote_log_location
self.assertEqual(result, msg)
self.logging_mock.error.assert_called_once_with(msg)

def test_write(self):
logging.S3Log().write('text', self.remote_log_location)
self.hook_inst_mock.load_string.assert_called_once_with(
'content\ntext',
key=self.remote_log_location,
replace=True,
encrypt=False,
)

def test_write_raises(self):
self.hook_inst_mock.load_string.side_effect = Exception('error')
logging.S3Log().write('text', self.remote_log_location)
msg = 'Could not write logs to %s' % self.remote_log_location
self.logging_mock.error.assert_called_once_with(msg)

0 comments on commit d9109d6

Please sign in to comment.