Skip to content

Commit

Permalink
Add SNS and SQS outputs (airbnb#620)
Browse files Browse the repository at this point in the history
  • Loading branch information
austinbyers authored and Austin Byers committed Mar 6, 2018
1 parent 69f8a13 commit 9362906
Show file tree
Hide file tree
Showing 32 changed files with 480 additions and 238 deletions.
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ valid-metaclass-classmethod-first-arg=mcs
[DESIGN]

# Maximum number of arguments for function / method
max-args=5
max-args=8

# Maximum number of attributes for a class (see R0902).
max-attributes=15
Expand Down
4 changes: 0 additions & 4 deletions conf/lambda.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@
"throttles_alarm_period_secs": 120,
"throttles_alarm_threshold": 0
},
"outputs": {
"aws-lambda": [],
"aws-s3": []
},
"source_bucket": "PREFIX_GOES_HERE.streamalert.source",
"source_current_hash": "<auto_generated>",
"source_object_key": "<auto_generated>",
Expand Down
6 changes: 6 additions & 0 deletions conf/outputs.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@
"aws-s3": {
"sample-bucket": "sample.bucket.name"
},
"aws-sns": {
"sample-topic": "sample-topic-name"
},
"aws-sqs": {
"sample-queue": "sample-queue-name"
},
"komand": [
"sample-integration"
],
Expand Down
2 changes: 2 additions & 0 deletions docs/source/outputs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ Out of the box, StreamAlert supports:

* **AWS Lambda**
* **AWS S3**
* **AWS SNS**
* **AWS SQS**
* **Komand**
* **PagerDuty**
* **Phantom**
Expand Down
21 changes: 18 additions & 3 deletions manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,25 @@ def _add_output_subparser(subparsers):
Examples:
manage.py output new --service <service_name>
manage.py output new --service aws-s3
manage.py output new --service pagerduty
manage.py output new --service slack
The following outputs are supported:
aws-firehose
aws-lambda
aws-s3
aws-sns
aws-sqs
github
jira
komand
pagerduty
pagerduty-incident
pagerduty-v2
phantom
slack
""".format(version))
output_parser = subparsers.add_parser(
'output',
Expand All @@ -108,8 +122,9 @@ def _add_output_subparser(subparsers):
output_parser.add_argument(
'--service',
choices=[
'aws-firehose', 'aws-lambda', 'aws-s3', 'jira', 'komand', 'pagerduty', 'pagerduty-v2',
'pagerduty-incident', 'phantom', 'slack', 'github'
'aws-firehose', 'aws-lambda', 'aws-s3', 'aws-sns', 'aws-sqs',
'github', 'jira', 'komand', 'pagerduty', 'pagerduty-incident', 'pagerduty-v2',
'phantom', 'slack',
],
required=True,
help=ARGPARSE_SUPPRESS)
Expand Down
12 changes: 8 additions & 4 deletions stream_alert/alert_processor/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,16 @@ def handler(event, context):
if not config:
return

region = context.invoked_function_arn.split(':')[3]
split_arn = context.invoked_function_arn.split(':')
region = split_arn[3]
account_id = split_arn[4]
function_name = context.function_name

# Return the current list of statuses back to the caller
return list(run(event, region, function_name, config))
return list(run(event, region, account_id, function_name, config))


def run(alert, region, function_name, config):
def run(alert, region, account_id, function_name, config):
"""Send an Alert to its described outputs.
Args:
Expand All @@ -70,6 +72,7 @@ def run(alert, region, function_name, config):
}
region (str): The AWS region of the currently executing Lambda function
account_id (str): The 12-digit AWS account ID of the currently executing Lambda function
function_name (str): The name of the lambda function
config (dict): The loaded configuration for outputs from conf/outputs.json
Expand Down Expand Up @@ -101,7 +104,8 @@ def run(alert, region, function_name, config):
continue

# Retrieve the proper class to handle dispatching the alerts of this services
dispatcher = StreamAlertOutput.create_dispatcher(service, region, function_name, config)
dispatcher = StreamAlertOutput.create_dispatcher(
service, region, account_id, function_name, config)

if not dispatcher:
continue
Expand Down
195 changes: 127 additions & 68 deletions stream_alert/alert_processor/outputs/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class KinesisFirehoseOutput(AWSOutput):

@classmethod
def get_user_defined_properties(cls):
"""Properties asssigned by the user when configuring a new Firehose output
"""Properties assigned by the user when configuring a new Firehose output
Every output should return a dict that contains a 'descriptor' with a description of the
integration being configured.
Expand Down Expand Up @@ -146,7 +146,90 @@ def _firehose_request_wrapper(json_alert, delivery_stream):
delivery_stream,
resp['RecordId'])

return self._log_status(resp)
return self._log_status(resp, kwargs['descriptor'])


@StreamAlertOutput
class LambdaOutput(AWSOutput):
"""LambdaOutput handles all alert dispatching to AWS Lambda"""
__service__ = 'aws-lambda'

@classmethod
def get_user_defined_properties(cls):
"""Get properties that must be assigned by the user when configuring a new Lambda
output. This should be sensitive or unique information for this use-case that needs
to come from the user.
Every output should return a dict that contains a 'descriptor' with a description of the
integration being configured.
Sending to Lambda also requires a user provided Lambda function name and optional qualifier
(if applicable for the user's use case). A fully-qualified AWS ARN is also acceptable for
this value. This value should not be masked during input and is not a credential requirement
that needs encrypted.
Returns:
OrderedDict: Contains various OutputProperty items
"""
return OrderedDict([
('descriptor',
OutputProperty(description='a short and unique descriptor for this Lambda function '
'configuration (ie: abbreviated name)')),
('aws_value',
OutputProperty(description='the AWS Lambda function name, with the optional '
'qualifier (aka \'alias\'), to use for this '
'configuration (ie: output_function:qualifier)',
input_restrictions={' '})),
])

def dispatch(self, **kwargs):
"""Send alert to a Lambda function
The alert gets dumped to a JSON string to be sent to the Lambda function
Args:
**kwargs: consists of any combination of the following items:
descriptor (str): Service descriptor (ie: slack channel, pd integration)
rule_name (str): Name of the triggered rule
alert (dict): Alert relevant to the triggered rule
"""
alert = kwargs['alert']
alert_string = json.dumps(alert['record'])
function_name = self.config[self.__service__][kwargs['descriptor']]

# Check to see if there is an optional qualifier included here
# Acceptable values for the output configuration are the full ARN,
# a function name followed by a qualifier, or just a function name:
# 'arn:aws:lambda:aws-region:acct-id:function:function-name:prod'
# 'function-name:prod'
# 'function-name'
# Checking the length of the list for 2 or 8 should account for all
# times a qualifier is provided.
parts = function_name.split(':')
if len(parts) == 2 or len(parts) == 8:
function = parts[-2]
qualifier = parts[-1]
else:
function = parts[-1]
qualifier = None

LOGGER.debug('Sending alert to Lambda function %s', function_name)

client = boto3.client('lambda', region_name=self.region)
# Use the qualifier if it's available. Passing an empty qualifier in
# with `Qualifier=''` or `Qualifier=None` does not work and thus we
# have to perform different calls to client.invoke().
if qualifier:
resp = client.invoke(FunctionName=function,
InvocationType='Event',
Payload=alert_string,
Qualifier=qualifier)
else:
resp = client.invoke(FunctionName=function,
InvocationType='Event',
Payload=alert_string)

return self._log_status(resp, kwargs['descriptor'])


@StreamAlertOutput
Expand All @@ -156,7 +239,7 @@ class S3Output(AWSOutput):

@classmethod
def get_user_defined_properties(cls):
"""Get properties that must be asssigned by the user when configuring a new S3
"""Get properties that must be assigned by the user when configuring a new S3
output. This should be sensitive or unique information for this use-case that needs
to come from the user.
Expand Down Expand Up @@ -225,87 +308,63 @@ def dispatch(self, **kwargs):
Bucket=bucket,
Key=key)

return self._log_status(resp)
return self._log_status(resp, kwargs['descriptor'])


@StreamAlertOutput
class LambdaOutput(AWSOutput):
"""LambdaOutput handles all alert dispatching to AWS Lambda"""
__service__ = 'aws-lambda'
class SNSOutput(AWSOutput):
"""Handle all alert dispatching for AWS SNS"""
__service__ = 'aws-sns'

@classmethod
def get_user_defined_properties(cls):
"""Get properties that must be asssigned by the user when configuring a new Lambda
output. This should be sensitive or unique information for this use-case that needs
to come from the user.
Every output should return a dict that contains a 'descriptor' with a description of the
integration being configured.
Sending to Lambda also requires a user provided Lambda function name and optional qualifier
(if applicabale for the user's use case). A fully-qualified AWS ARN is also acceptable for
this value. This value should not be masked during input and is not a credential requirement
that needs encrypted.
"""Properties assigned by the user when configuring a new SNS output.
Returns:
OrderedDict: Contains various OutputProperty items
OrderedDict: With 'descriptor' and 'aws_value' OutputProperty tuples
"""
return OrderedDict([
('descriptor',
OutputProperty(description='a short and unique descriptor for this Lambda function '
'configuration (ie: abbreviated name)')),
('aws_value',
OutputProperty(description='the AWS Lambda function name, with the optional '
'qualifier (aka \'alias\'), to use for this '
'configuration (ie: output_function:qualifier)',
input_restrictions={' '})),
('descriptor', OutputProperty(
description='a short and unique descriptor for this SNS topic')),
('aws_value', OutputProperty(description='SNS topic name'))
])

def dispatch(self, **kwargs):
"""Send alert to a Lambda function
"""Send alert to an SNS topic"""
# SNS topics can only be accessed via their ARN
topic_name = self.config[self.__service__][kwargs['descriptor']]
topic_arn = 'arn:aws:sns:{}:{}:{}'.format(self.region, self.account_id, topic_name)
topic = boto3.resource('sns', region_name=self.region).Topic(topic_arn)

The alert gets dumped to a JSON string to be sent to the Lambda function
response = topic.publish(Message=json.dumps(kwargs['alert'], indent=2))
return self._log_status(response, kwargs['descriptor'])

Args:
**kwargs: consists of any combination of the following items:
descriptor (str): Service descriptor (ie: slack channel, pd integration)
rule_name (str): Name of the triggered rule
alert (dict): Alert relevant to the triggered rule
"""
alert = kwargs['alert']
alert_string = json.dumps(alert['record'])
function_name = self.config[self.__service__][kwargs['descriptor']]

# Check to see if there is an optional qualifier included here
# Acceptable values for the output configuration are the full ARN,
# a function name followed by a qualifier, or just a function name:
# 'arn:aws:lambda:aws-region:acct-id:function:function-name:prod'
# 'function-name:prod'
# 'function-name'
# Checking the length of the list for 2 or 8 should account for all
# times a qualifier is provided.
parts = function_name.split(':')
if len(parts) == 2 or len(parts) == 8:
function = parts[-2]
qualifier = parts[-1]
else:
function = parts[-1]
qualifier = None
@StreamAlertOutput
class SQSOutput(AWSOutput):
"""Handle all alert dispatching for AWS SQS"""
__service__ = 'aws-sqs'

LOGGER.debug('Sending alert to Lambda function %s', function_name)
@classmethod
def get_user_defined_properties(cls):
"""Properties assigned by the user when configuring a new SQS output.
client = boto3.client('lambda', region_name=self.region)
# Use the qualifier if it's available. Passing an empty qualifier in
# with `Qualifier=''` or `Qualifier=None` does not work and thus we
# have to perform different calls to client.invoke().
if qualifier:
resp = client.invoke(FunctionName=function,
InvocationType='Event',
Payload=alert_string,
Qualifier=qualifier)
else:
resp = client.invoke(FunctionName=function,
InvocationType='Event',
Payload=alert_string)
Returns:
OrderedDict: With 'descriptor' and 'aws_value' OutputProperty tuples
"""
return OrderedDict([
('descriptor', OutputProperty(
description='a short and unique descriptor for this SQS queue')),
('aws_value', OutputProperty(description='SQS queue name'))
])

return self._log_status(resp)
def dispatch(self, **kwargs):
"""Send alert to an SQS queue"""
# SQS queues can only be accessed via their URL
queue_name = self.config[self.__service__][kwargs['descriptor']]
queue_url = 'https://sqs.{}.amazonaws.com/{}/{}'.format(
self.region, self.account_id, queue_name)
queue = boto3.resource('sqs', region_name=self.region).Queue(queue_url)

response = queue.send_message(MessageBody=json.dumps(kwargs['alert']))
return self._log_status(response, kwargs['descriptor'])
4 changes: 2 additions & 2 deletions stream_alert/alert_processor/outputs/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def dispatch(self, **kwargs):
"""
credentials = self._load_creds(kwargs['descriptor'])
if not credentials:
return self._log_status(False)
return self._log_status(False, kwargs['descriptor'])

username_password = "{}:{}".format(credentials['username'],
credentials['access_token'])
Expand All @@ -102,4 +102,4 @@ def dispatch(self, **kwargs):
except OutputRequestFailure:
success = False

return self._log_status(success)
return self._log_status(success, kwargs['descriptor'])
Loading

0 comments on commit 9362906

Please sign in to comment.