Skip to content

Commit

Permalink
Fork alerts to a Dynamo table (airbnb#612)
Browse files Browse the repository at this point in the history
  • Loading branch information
austinbyers authored Mar 1, 2018
1 parent 282d893 commit 0f6a8f9
Show file tree
Hide file tree
Showing 18 changed files with 520 additions and 250 deletions.
4 changes: 4 additions & 0 deletions conf/global.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
"region": "us-east-1"
},
"infrastructure": {
"alerts_table": {
"read_capacity": 5,
"write_capacity": 5
},
"monitoring": {
"create_sns_topic": true
}
Expand Down
200 changes: 200 additions & 0 deletions stream_alert/rule_processor/alert_forward.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
"""
Copyright 2017-present, Airbnb Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from datetime import datetime
import json
import os
import time

import backoff
import boto3
from botocore.exceptions import ClientError

from stream_alert.shared import backoff_handlers
from stream_alert.shared.metrics import MetricLogger
from stream_alert.rule_processor import FUNCTION_NAME, LOGGER


class AlertForwarder(object):
"""Sends alerts to the Alert Processor and the alerts Dynamo table."""
# TODO: Do not send to Alert Processor after Alert Merger is implemented
BACKOFF_MAX_RETRIES = 6

def __init__(self, env):
"""Initialize the Forwarder with the boto3 clients and resource names.
Args:
env (dict): loaded dictionary containing environment information
"""
self.env = env
self.client_dynamo = boto3.client('dynamodb', region_name=self.env['lambda_region'])
self.client_lambda = boto3.client('lambda', region_name=self.env['lambda_region'])
self.function = os.environ['ALERT_PROCESSOR']
self.table = os.environ['ALERT_TABLE']

# Keep track of unprocessed items when retrying batch_write_item()
self.unprocessed_items = None

def _send_to_lambda(self, alerts):
"""Invoke Alert Processor directly
Sends a message to the alert processor with the following JSON format:
{
'record': record,
'rule_name': rule.rule_name,
'rule_description': rule.rule_function.__doc__ or DEFAULT_RULE_DESCRIPTION,
'log_source': str(payload.log_source),
'log_type': payload.type,
'outputs': rule.outputs,
'source_service': payload.service(),
'source_entity': payload.entity,
'context': rule.context
}
"""
for alert in alerts:
try:
data = json.dumps(alert, default=lambda o: o.__dict__)
except AttributeError as err:
LOGGER.error('An error occurred while dumping alert to JSON: %s '
'Alert: %s',
err.message,
alert)
continue

try:
response = self.client_lambda.invoke(
FunctionName=self.function,
InvocationType='Event',
Payload=data,
Qualifier='production'
)

except ClientError as err:
LOGGER.exception('An error occurred while sending alert to '
'\'%s:production\'. Error is: %s. Alert: %s',
self.function,
err.response,
data)
continue

if response['ResponseMetadata']['HTTPStatusCode'] != 202:
LOGGER.error('Failed to send alert to \'%s\': %s',
self.function, data)
continue

if self.env['lambda_alias'] != 'development':
LOGGER.info('Sent alert to \'%s\' with Lambda request ID \'%s\'',
self.function,
response['ResponseMetadata']['RequestId'])

def _alert_batches(self, alerts, batch_size=25):
"""Group alerts into batches of 25, the maximum allowed by Dynamo batch_write_item.
Yields:
(dict) The constructed request for batch_write_item, containing <= 25 alerts.
Maps table name to a list of requests.
"""
for i in range(0, len(alerts), batch_size):
batch = alerts[i:i+batch_size]
yield {
self.table: [
{
'PutRequest': {
'Item': {
'RuleName': {'S': alert['rule_name']},
'Timestamp': {
# ISO 8601 datetime format, and is unique for each alert
'S': datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%fZ')
},
'Cluster': {'S': os.environ['CLUSTER']},
'RuleDescription': {'S': alert['rule_description']},
'Outputs': {'SS': alert['outputs']},
# Compact JSON encoding (no extra spaces)
'Record': {'S': json.dumps(alert['record'], separators=(',', ':'))},
# TODO: Remove TTL after alert merger is implemented
'TTL': {'N': str(int(time.time()) + 7200)} # 2 hour TTL
}
}
}
for alert in batch
]
}

def _batch_write(self):
"""Write a batch of alerts to Dynamo, retrying with exponential backoff for failed items.
Returns:
(bool) True if *all* items were written successfully, False otherwise.
"""
@backoff.on_predicate(backoff.expo,
max_tries=self.BACKOFF_MAX_RETRIES, jitter=backoff.full_jitter,
on_backoff=backoff_handlers.backoff_handler,
on_success=backoff_handlers.success_handler,
on_giveup=backoff_handlers.giveup_handler)
@backoff.on_exception(backoff.expo, ClientError,
max_tries=self.BACKOFF_MAX_RETRIES, jitter=backoff.full_jitter,
on_backoff=backoff_handlers.backoff_handler,
on_success=backoff_handlers.success_handler,
on_giveup=backoff_handlers.giveup_handler)
def decorated_batch_write(forwarder):
"""batch_write_item with the unprocessed_items from the AlertForwarder instance.
There are 2 different errors to handle here:
(1) If Dynamo is unresponsive, a boto ClientError will be raised.
(2) The batch_write_item operation can fail halfway through, in which case the
unprocessed items are returned in the response. In this case, unprocessed items
are stored in the class instance, and we return False.
The backoff.on_predicate will automatically retry with any Falsey value, and
batch_write will run again, but only with the remaining unprocessed items.
Args:
forwarder (AlertForwarder): Instance of the AlertForwarder
Returns:
(bool) True if the batch write succeeded, False if there were UnprocessedItems.
"""
response = forwarder.client_dynamo.batch_write_item(
RequestItems=forwarder.unprocessed_items)
forwarder.unprocessed_items = response['UnprocessedItems']
return len(forwarder.unprocessed_items) == 0

return decorated_batch_write(self)

def _send_to_dynamo(self, alerts):
"""Write alerts in batches to Dynamo."""
for batch_num, batch in enumerate(self._alert_batches(alerts), start=1):
LOGGER.info('Sending batch %d to Dynamo with %d alert(s)',
batch_num, len(batch[self.table]))
self.unprocessed_items = batch
if not self._batch_write():
LOGGER.error('Unable to save alert batch; unprocessed items remain: %s',
json.dumps(self.unprocessed_items))
MetricLogger.log_metric(FUNCTION_NAME, MetricLogger.UNPROCESSED_ALERTS,
len(self.unprocessed_items[self.table]))

def send_alerts(self, alerts):
"""Send alerts to the Alert Processor and to the alerts Dynamo table.
Args:
alerts (list): A list of dictionaries representing json alerts.
"""
self._send_to_lambda(alerts)

# For now, don't blow up the rule processor if there is a problem sending to Dynamo.
# TODO: Remove/refine broad exception handling once tested.
try:
self._send_to_dynamo(alerts)
except Exception: # pylint: disable=broad-except
LOGGER.exception('Error saving alerts to Dynamo')
10 changes: 5 additions & 5 deletions stream_alert/rule_processor/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
import json

from stream_alert.rule_processor import FUNCTION_NAME, LOGGER
from stream_alert.rule_processor.alert_forward import AlertForwarder
from stream_alert.rule_processor.classifier import StreamClassifier
from stream_alert.rule_processor.config import load_config, load_env
from stream_alert.rule_processor.firehose import StreamAlertFirehose
from stream_alert.rule_processor.payload import load_stream_payload
from stream_alert.rule_processor.rules_engine import StreamRules
from stream_alert.rule_processor.sink import StreamSink
from stream_alert.shared.metrics import MetricLogger


Expand All @@ -47,9 +47,9 @@ def __init__(self, context, enable_alert_processor=True):
# Load the environment from the context arn
self.env = load_env(context)

# Instantiate the sink here to handle sending the triggered alerts to the
# Instantiate the send_alerts here to handle sending the triggered alerts to the
# alert processor
self.sinker = StreamSink(self.env)
self.alert_forwarder = AlertForwarder(self.env)

# Instantiate a classifier that is used for this run
self.classifier = StreamClassifier(config=self.config)
Expand Down Expand Up @@ -125,7 +125,7 @@ def run(self, event):
record_alerts = self._rule_engine.threat_intel_match(payload_with_normalized_records)
self._alerts.extend(record_alerts)
if record_alerts and self.enable_alert_processor:
self.sinker.sink(record_alerts)
self.alert_forwarder.send_alerts(record_alerts)

MetricLogger.log_metric(FUNCTION_NAME,
MetricLogger.TOTAL_RECORDS,
Expand Down Expand Up @@ -215,6 +215,6 @@ def _process_alerts(self, payload):
self._alerts.extend(record_alerts)

if self.enable_alert_processor:
self.sinker.sink(record_alerts)
self.alert_forwarder.send_alerts(record_alerts)

return payload_with_normalized_records
2 changes: 1 addition & 1 deletion stream_alert/rule_processor/rules_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def rule(cls, **opts):
A rule maps events (by `logs`) to a function that accepts an event
and returns a boolean. If the function returns `True`, then the event is
passed on to the sink(s). If the function returns `False`, the event is
passed on to the alert forwarder. If the function returns `False`, the event is
dropped.
"""
def decorator(rule):
Expand Down
94 changes: 0 additions & 94 deletions stream_alert/rule_processor/sink.py

This file was deleted.

2 changes: 2 additions & 0 deletions stream_alert/shared/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class MetricLogger(object):
TOTAL_S3_RECORDS = 'TotalS3Records'
TOTAL_STREAM_ALERT_APP_RECORDS = 'TotalStreamAlertAppRecords'
TRIGGERED_ALERTS = 'TriggeredAlerts'
UNPROCESSED_ALERTS = 'UnprocessedAlerts'
FIREHOSE_RECORDS_SENT = 'FirehoseRecordsSent'
FIREHOSE_FAILED_RECORDS = 'FirehoseFailedRecords'

Expand Down Expand Up @@ -83,6 +84,7 @@ class MetricLogger(object):
_default_value_lookup),
TRIGGERED_ALERTS: (_default_filter.format(TRIGGERED_ALERTS),
_default_value_lookup),
UNPROCESSED_ALERTS: (_default_filter.format(UNPROCESSED_ALERTS), _default_value_lookup),
FIREHOSE_RECORDS_SENT: (_default_filter.format(FIREHOSE_RECORDS_SENT),
_default_value_lookup),
FIREHOSE_FAILED_RECORDS: (_default_filter.format(FIREHOSE_FAILED_RECORDS),
Expand Down
8 changes: 6 additions & 2 deletions stream_alert_cli/terraform/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,16 @@ def generate_main(**kwargs):
# Setup Firehose Delivery Streams
generate_firehose(config, main_dict, logging_bucket)

# Configure global resources like Firehose alert delivery
# Configure global resources like Firehose alert delivery and alerts table
main_dict['module']['globals'] = {
'source': 'modules/tf_stream_alert_globals',
'account_id': config['global']['account']['aws_account_id'],
'region': config['global']['account']['region'],
'prefix': config['global']['account']['prefix']
'prefix': config['global']['account']['prefix'],
'alerts_table_read_capacity': (
config['global']['infrastructure']['alerts_table']['read_capacity']),
'alerts_table_write_capacity': (
config['global']['infrastructure']['alerts_table']['write_capacity'])
}

# KMS Key and Alias creation
Expand Down
Loading

0 comments on commit 0f6a8f9

Please sign in to comment.