Skip to content

Commit

Permalink
Terraform 0.11.X, Cloudwatch Log Output, Firehose Bug Fixes (airbnb#724)
Browse files Browse the repository at this point in the history
* [outputs] cloudwatch-log-output

* [rule processor] resolve airbnb#708 - firehose record size limit bug

* [tf] upgrade tf and provider to 0.11.7/1.17.0

* [firehose] reset firehose client on backoff, add tests for functionality
  • Loading branch information
jacknagz authored May 3, 2018
1 parent ac28895 commit 170a4a5
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 17 deletions.
33 changes: 29 additions & 4 deletions stream_alert/alert_processor/outputs/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,8 @@ def _firehose_request_wrapper(json_alert, delivery_stream):
LOGGER.info('Sending %s to aws-firehose:%s', alert, delivery_stream)

resp = _firehose_request_wrapper(json_alert, delivery_stream)

if resp.get('RecordId'):
LOGGER.info('%s successfully sent to aws-firehose:%s with RecordId:%s',
alert, delivery_stream, resp['RecordId'])
LOGGER.info('%s successfully sent to aws-firehose:%s',
alert, delivery_stream)

return self._log_status(resp, descriptor)

Expand Down Expand Up @@ -369,3 +367,30 @@ def dispatch(self, alert, descriptor):

response = queue.send_message(MessageBody=json.dumps(alert.record, separators=(',', ':')))
return self._log_status(response, descriptor)


@StreamAlertOutput
class CloudwatchLogOutput(AWSOutput):
"""Print alerts to the Cloudwatch Logger"""
__service__ = 'aws-cloudwatch-log'

@classmethod
def get_user_defined_properties(cls):
"""Get properties that must be asssigned by the user when configuring a new Lambda
Returns:
OrderedDict: Contains various OutputProperty items
"""
return OrderedDict([
('descriptor',
OutputProperty(description='a short and unique descriptor for the cloudwatch log')),
])

def dispatch(self, alert, descriptor):
"""Send alert to Cloudwatch Logger for Lambda
Args:
alert (Alert): Alert instance which triggered a rule
descriptor (str): Output descriptor
"""
LOGGER.info('New Alert:\n%s', json.dumps(alert.output_dict(), indent=4))
return self._log_status(True, descriptor)
22 changes: 18 additions & 4 deletions stream_alert/rule_processor/firehose.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ class StreamAlertFirehose(object):
MAX_RECORD_SIZE = 1000 * 1000 - 2

def __init__(self, region, firehose_config, log_sources):
self._firehose_client = boto3.client('firehose', region_name=region)
self._region = region
self._firehose_client = boto3.client('firehose', region_name=self._region)
# Expand enabled logs into specific subtypes
self._enabled_logs = self._load_enabled_log_sources(firehose_config, log_sources)

Expand All @@ -64,6 +65,10 @@ def enabled_logs(self):
list: casts the set of enabled logs into a list for JSON serialization"""
return list(self._enabled_logs)

def _reset_firehose_client(self):
"""When errors are thrown, reset the Firehose client"""
self._firehose_client = boto3.client('firehose', region_name=self._region)

def _segment_records_by_size(self, record_batch):
"""Segment record groups by size
Expand Down Expand Up @@ -103,7 +108,8 @@ def _limit_record_size(cls, batch):
Args:
batch (list): Record batch to iterate on
"""
for index, record in enumerate(batch):
for index in reversed(xrange(len(batch))):
record = batch[index]
if len(json.dumps(record, separators=(",", ":"))) > cls.MAX_RECORD_SIZE:
# Show the first 1k bytes in order to not overload CloudWatch logs
LOGGER.error('The following record is too large'
Expand Down Expand Up @@ -138,6 +144,14 @@ def sanitize_keys(cls, record):

return new_record

def _backoff_handler_firehose_reset(self, details):
"""Custom backoff handler to re-instantiate the Firehose Client"""
LOGGER.info('[Backoff]: Calling \'%s\' again in %f seconds with %d tries so far',
details['target'].__name__,
details['wait'],
details['tries'])
self._reset_firehose_client()

def _firehose_request_helper(self, stream_name, record_batch):
"""Send record batches to Firehose
Expand All @@ -154,14 +168,14 @@ def _firehose_request_helper(self, stream_name, record_batch):
max_tries=self.MAX_BACKOFF_ATTEMPTS,
max_value=self.MAX_BACKOFF_FIBO_VALUE,
jitter=backoff.full_jitter,
on_backoff=backoff_handler(),
on_backoff=backoff_handler(debug_only=False),
on_success=success_handler(),
on_giveup=giveup_handler())
@backoff.on_exception(backoff.fibo,
exceptions_to_backoff,
max_tries=self.MAX_BACKOFF_ATTEMPTS,
jitter=backoff.full_jitter,
on_backoff=backoff_handler(),
on_backoff=self._backoff_handler_firehose_reset,
on_success=success_handler(),
on_giveup=giveup_handler())
def firehose_request_wrapper(data):
Expand Down
4 changes: 2 additions & 2 deletions stream_alert_cli/terraform/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
from stream_alert_cli.terraform.threat_intel_downloader import generate_threat_intel_downloader

RESTRICTED_CLUSTER_NAMES = ('main', 'athena')
TERRAFORM_VERSIONS = {'application': '~> 0.10.6', 'provider': {'aws': '~> 1.11.0'}}
TERRAFORM_VERSIONS = {'application': '~> 0.11.7', 'provider': {'aws': '~> 1.17.0'}}


def generate_s3_bucket(bucket, logging, **kwargs):
Expand Down Expand Up @@ -411,7 +411,7 @@ def generate_global_lambda_settings(config, config_name, generate_func, tf_tmp_f
message (str): Message will be logged by LOGGER.
"""
if not config['lambda'].get(config_name):
LOGGER_CLI.info('Config for \'%s\' not in lambda.json', config_name)
LOGGER_CLI.warning('Config for \'%s\' not in lambda.json', config_name)
remove_temp_terraform_file(tf_tmp_file, message)
return

Expand Down
8 changes: 4 additions & 4 deletions terraform/modules/tf_lambda/output.tf
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
// Defined only if the Lambda is in a VPC
output "function_vpc_arn" {
value = "${aws_lambda_function.function_vpc.arn}"
value = "${join(" ", aws_lambda_function.function_vpc.*.arn)}"
}

// Defined only if the Lambda is NOT in a VPC
output "function_no_vpc_arn" {
value = "${aws_lambda_function.function_no_vpc.arn}"
value = "${join(" ", aws_lambda_function.function_no_vpc.*.arn)}"
}

output "role_arn" {
value = "${aws_iam_role.role.arn}"
value = "${aws_iam_role.role.0.arn}"
}

output "role_id" {
value = "${aws_iam_role.role.id}"
value = "${aws_iam_role.role.0.id}"
}
3 changes: 3 additions & 0 deletions tests/unit/conf/outputs.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
{
"aws-cloudwatch-log": [
"unit_test_default"
],
"aws-firehose": {
"unit_test_delivery_stream": "unit_test_delivery_stream"
},
Expand Down
23 changes: 22 additions & 1 deletion tests/unit/stream_alert_alert_processor/test_outputs/test_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
LambdaOutput,
S3Output,
SNSOutput,
SQSOutput
SQSOutput,
CloudwatchLogOutput
)
from stream_alert_cli.helpers import create_lambda_function
from tests.unit.stream_alert_alert_processor import (
Expand Down Expand Up @@ -211,3 +212,23 @@ def test_dispatch(self, log_mock):

log_mock.assert_called_with('Successfully sent alert to %s:%s',
self.SERVICE, self.DESCRIPTOR)


class TestCloudwatchLogOutput(object):
"""Test class for CloudwatchLogOutput"""
DESCRIPTOR = 'unit_test_default'
SERVICE = 'aws-cloudwatch-log'

def setup(self):
"""Create the Cloudwatch dispatcher"""
self._dispatcher = CloudwatchLogOutput(REGION, ACCOUNT_ID, FUNCTION_NAME, CONFIG)

@patch('logging.Logger.info')
def test_dispatch(self, log_mock):
"""Cloudwatch - Dispatch"""
alert = get_alert()

assert_true(self._dispatcher.dispatch(alert, self.DESCRIPTOR))
assert_equal(log_mock.call_count, 2)
log_mock.assert_called_with('Successfully sent alert to %s:%s',
self.SERVICE, self.DESCRIPTOR)
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def test_output_loading():
'aws-s3',
'aws-sns',
'aws-sqs',
'aws-cloudwatch-log',
'carbonblack',
'github',
'jira',
Expand Down
30 changes: 28 additions & 2 deletions tests/unit/stream_alert_rule_processor/test_firehose.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# pylint: disable=protected-access,no-self-use
from mock import patch
from moto import mock_kinesis
from nose.tools import (assert_equal, assert_false, assert_true)
from nose.tools import (assert_equal, assert_false, assert_not_equal, assert_true)

from stream_alert.rule_processor.firehose import StreamAlertFirehose
from stream_alert.shared.config import load_config
Expand Down Expand Up @@ -240,6 +240,24 @@ def test_load_enabled_sources_invalid_log(self, mock_logging):
assert_equal(len(sa_firehose._enabled_logs), 0)
assert_true(mock_logging.error.called)

@patch('stream_alert.rule_processor.firehose.LOGGER')
@mock_kinesis
def test_firehose_reset(self, mock_logging):
"""StreamAlertFirehose - Test Reset Firehose Client"""
def test_func():
pass
sa_firehose = StreamAlertFirehose(region='us-east-1', firehose_config={}, log_sources={})

id_1 = id(sa_firehose._firehose_client)
sa_firehose._backoff_handler_firehose_reset({
'target': test_func,
'wait': '0.134315135',
'tries': 3})
id_2 = id(sa_firehose._firehose_client)

assert_true(mock_logging.info.called)
assert_not_equal(id_1, id_2)

def test_segment_records_by_size(self):
"""StreamAlertFirehose - Segment Large Records"""
sa_firehose = StreamAlertFirehose(region='us-east-1', firehose_config={}, log_sources={})
Expand Down Expand Up @@ -314,10 +332,18 @@ def test_limit_record_size(self, mock_logging):
'data': {
'super': 'secret'
}
},
# add another unit_test_sample_log to verify in a different position
{
'unit_key_01': 1,
'unit_key_02': 'test' * 250001 # is 4 bytes higher than max
},
{
'test': 1
}
]

StreamAlertFirehose._limit_record_size(test_events)

assert_true(len(test_events), 2)
assert_true(len(test_events), 3)
assert_true(mock_logging.error.called)

0 comments on commit 170a4a5

Please sign in to comment.