Skip to content

Commit

Permalink
Merge pull request airbnb#598 from airbnb/fix_sqs_delete_type_error
Browse files Browse the repository at this point in the history
[athena] fix TypeError while delete messages from SQS
  • Loading branch information
jacknagz authored Feb 14, 2018
2 parents 954b70b + ecab817 commit d508896
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 11 deletions.
17 changes: 7 additions & 10 deletions stream_alert/athena_partition_refresh/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ class StreamAlertSQSClient(object):
"""
QUEUENAME = 'streamalert_athena_data_bucket_notifications'
MAX_SQS_GET_MESSAGE_COUNT = 10
SQS_BACKOFF_MAX_RETRIES = 10

def __init__(self, config):
"""Initialize the StreamAlertSQS Client
Expand Down Expand Up @@ -478,10 +479,10 @@ def delete_messages(self):
if not self.processed_messages:
LOGGER.error('No processed messages to delete')
return

@backoff.on_predicate(backoff.fibo,
lambda len_messages: len_messages > 0,
max_value=10,
max_tries=self.SQS_BACKOFF_MAX_RETRIES,
jitter=backoff.full_jitter,
on_backoff=_backoff_handler,
on_success=_success_handler)
Expand All @@ -492,10 +493,6 @@ def _delete_messages_from_queue():
# Pop processed records from the list to be deleted
message_batch = [self.processed_messages.pop() for _ in range(batch)]

# This debug info should be removed when Issue #590 is fixed.
# https://github.com/airbnb/streamalert/issues/590
LOGGER.debug('The messages to be deleted: \n%s', message_batch)

# Try to delete the batch
resp = self.sqs_client.delete_message_batch(
QueueUrl=self.athena_sqs_url,
Expand All @@ -513,11 +510,11 @@ def _delete_messages_from_queue():
len(resp['Failed']), json.dumps(resp['Failed']))
# Add the failed messages back to the processed_messages attribute
# to be retried via backoff
self.processed_messages.extend([[message
for message
in message_batch
if message['MessageId'] == failed_message['Id']]
for failed_message in resp['Failed']])
failed_message_ids = [message['Id'] for message in resp['Failed']]
push_bach_messages = [message for message in message_batch
if message['MessageId'] in failed_message_ids]

self.processed_messages.extend(push_bach_messages)

return len(self.processed_messages)

Expand Down
19 changes: 19 additions & 0 deletions tests/unit/helpers/aws_mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,22 @@ def get_query_execution(self, **kwargs):
def get_query_results(self, **kwargs): # pylint: disable=unused-argument
"""Get the results of a executed query"""
return {'ResultSet': {'Rows': [{'Data': self.results}] if self.results else []}}


class MockSqsClient(object):
"""Mock SQS client"""

def __init__(self, **kwargs):
self.region = kwargs.get('region')
self.failed = kwargs.get('failed')

def delete_message_batch(self, **kwargs): # pylint: disable=unused-argument
"""Mock error handling in SQS delete_message_batch method"""
if self.failed:
return {'Failed': [{'Id': '1'}]}

return {'Successful': [{'foo': 'bar'}]}

def list_queues(self, **kwargs): # pylint: disable=unused-argument,no-self-use
"""Mock list_queues method"""
return {'QueueUrls': ['url_foo', 'url_bar']}
19 changes: 18 additions & 1 deletion tests/unit/stream_alert_athena_partition_refresh/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from nose.tools import (
assert_equal,
assert_false,
assert_is_instance,
assert_is_none,
assert_true,
raises,
Expand All @@ -43,7 +44,7 @@
StreamAlertAthenaClient,
StreamAlertSQSClient,
)
from tests.unit.helpers.aws_mocks import MockAthenaClient
from tests.unit.helpers.aws_mocks import MockAthenaClient, MockSqsClient
from tests.unit.helpers.base import mock_open

GLOBAL_FILE = 'conf/global.json'
Expand Down Expand Up @@ -308,6 +309,22 @@ def test_delete_messages_failure(self, mock_logging, mock_sqs_client):

assert_true(mock_logging.error.called)

@patch.object(StreamAlertSQSClient, 'SQS_BACKOFF_MAX_RETRIES', 1)
@patch('stream_alert.athena_partition_refresh.main.LOGGER')
@patch('boto3.client')
def test_delete_messages_failure_retries(self, mock_sqs_client, mock_logging): #pylint: disable=no-self-use
"""Athena SQS - Delete Messages - Failure Response and push back messages to queue"""
mock_sqs_client.return_value = MockSqsClient(failed=True)

client = StreamAlertSQSClient(CONFIG_DATA)
client.processed_messages = [{'MessageId': '1', 'ReceiptHandle': 'handle1'},
{'MessageId': '2', 'ReceiptHandle': 'handle2'}]
client.delete_messages()
for message in client.processed_messages:
assert_is_instance(message, dict)

assert_true(mock_logging.error.called_with('Failed to delete the messages with following'))

@patch('stream_alert.athena_partition_refresh.main.LOGGER')
def test_delete_messages_none_processed(self, mock_logging):
"""Athena SQS - Delete Messages - No Processed Messages"""
Expand Down

0 comments on commit d508896

Please sign in to comment.