Skip to content

Commit

Permalink
[apps] Correctly update aliyun timestamp (airbnb#978)
Browse files Browse the repository at this point in the history
  • Loading branch information
chunyong-lin authored Aug 22, 2019
1 parent a078b4a commit bf6244b
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 8 deletions.
53 changes: 51 additions & 2 deletions stream_alert/apps/_apps/aliyun.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class AliyunApp(AppIntegration):
https://www.alibabacloud.com/help/doc-detail/28849.htm
"""

# The maximum number of results to be returned. Valid values: 0 to 50.
_MAX_RESULTS = 50

def __init__(self, event, context):
Expand Down Expand Up @@ -76,16 +77,64 @@ def date_formatter(cls):
return '%Y-%m-%dT%H:%M:%SZ'

def _gather_logs(self):

"""Fetch ActionTrail events and return a list of events
Example response from do_action_with_exception method
{
'EndTime': '2019-08-22T04:41:32Z',
'NextToken': '2',
'RequestId': '562D9C08-E766-4038-B49F-B0D2BE1980FE',
'StartTime': '2019-08-01T04:31:52Z',
'Events': [{
'eventId': '60.152_1566447558068_1247',
'eventVersion': '1',
'acsRegion': 'cn-hangzhou',
'additionalEventData': {
'mfaChecked': 'true',
'callbackUrl': 'https://home.console.aliyun.com/'
},
'eventType': 'ConsoleSignin',
'errorMessage': 'success',
'eventTime': '2019-08-22T04:19:18Z',
'eventName': 'ConsoleSignin',
'userIdentity': {
'userName': 'dead_joke',
'type': 'ram-user',
'principalId': '222222222222222222',
'accountId': '1111111111111111'
},
'eventSource': 'signin.aliyun.com',
'requestId': '60.152_1566447558068_1247',
'userAgent': 'some browser version',
'sourceIpAddress': '1.1.1.1',
'serviceName': 'AasSub'
}, {
'eventId': '029B39F0-5E23-4931-B4C9-BA72C7261ADF',
...
'eventTime': '2019-08-21T22:26:09Z',
...
}]
}
"""
try:
response = self.client.do_action_with_exception(self.request)
json_response = json.loads(response)

# Note: ActionTrail API return ActionTrail events in sorted order, and
# it is latest events first. There still has a small chance that it may not get
# all the logs when there are still more logs to pull when lambda function
# timeout reached, and remaining logs will be lost because the last_timestamp
# is updated to "EndTime" during the first lambda function call.
#
# To lower the data loss possibility, suggest to have longer timeout for lambda
# function (aliyun app) and set app schedule more frequently, e.g. every 10 mins
self._last_timestamp = json_response['EndTime']
if 'NextToken' in json_response:
self._more_to_poll = True
self.request.set_NextToken(json_response['NextToken'])
else:
self._more_to_poll = False
self._last_timestamp = json_response['EndTime']

return json_response['Events']

Expand Down
2 changes: 1 addition & 1 deletion stream_alert/apps/app_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def _initialize(self):
"""Method for performing any startup steps, like setting state to running"""
# Perform another safety check to make sure this is not being invoked already
if self._config.is_running:
LOGGER.error('[%s] App already running', self)
LOGGER.warning('[%s] App already running', self)
return False

# Check if this is an invocation spawned from a previous partial execution
Expand Down
78 changes: 74 additions & 4 deletions tests/unit/stream_alert_apps/test_apps/test_aliyun.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
import json
import os

from mock import patch
from moto import mock_ssm
from nose.tools import assert_equal, assert_false, assert_items_equal
from nose.tools import assert_equal, assert_false, assert_items_equal, assert_true

from aliyunsdkcore.acs_exception.exceptions import ServerException

Expand All @@ -39,7 +40,7 @@ def setup(self):
self._test_app_name = 'aliyun'
put_mock_params(self._test_app_name)
self._event = get_event(self._test_app_name)
self._context = get_mock_lambda_context(self._test_app_name)
self._context = get_mock_lambda_context(self._test_app_name, milliseconds=100000)
self._app = AliyunApp(self._event, self._context)

def test_sleep_seconds(self):
Expand Down Expand Up @@ -94,10 +95,79 @@ def test_gather_logs_entries(self, client_mock):
"""AliyunApp - Gather Logs with some entries"""
client_mock.return_value = '{"NextToken":"20","RequestId":'\
'"B1DE97F8-5450-4593-AB38-FB61B799E91D",' \
'"Events":["filler data","filler data"],' \
'"Events":[{"eventTime":"123"},{"eventTime":"123"}],' \
'"EndTime":"2018-07-23T19:28:00Z",' \
'"StartTime":"2018-06-23T19:28:30Z"}'
logs = self._app._gather_logs()
assert_equal(2, len(logs))
assert self._app._more_to_poll # nosec
assert_true(self._app._more_to_poll)
assert_equal(self._app.request.get_NextToken(), "20")

@patch('stream_alert.apps.app_base.AppIntegration._invoke_successive_app')
@patch('stream_alert.apps.batcher.Batcher._send_logs_to_lambda')
@patch('stream_alert.apps._apps.aliyun.AliyunApp._sleep_seconds')
@patch('aliyunsdkcore.client.AcsClient.do_action_with_exception')
def test_gather_logs_last_timestamp(self, client_mock, sleep_mock, batcher_mock, _):
"""AliyunApp - Test last_timestamp"""
# mock 3 responses
mock_resps = [
{
'NextToken': '50',
'RequestId': 'AAAAAAAA',
'Events': [
{
'eventTime': '2018-06-23T19:29:00Z'
},
{
'eventTime': '2018-06-23T19:28:00Z'
}
],
'EndTime': '2018-07-23T19:28:00Z',
'StartTime': '2018-06-23T19:28:30Z'
},
{
'NextToken': '100',
'RequestId': 'BBBBBBBBB',
'Events': [
{
'eventTime': '2018-06-24T19:29:00Z'
},
{
'eventTime': '2018-06-24T19:28:00Z'
}
],
'EndTime': '2018-07-23T19:28:00Z',
'StartTime': '2018-06-23T19:28:30Z'
},
{
'NextToken': '150',
'RequestId': 'CCCCCCCC',
'Events': [
{
'eventTime': '2018-06-25T19:29:00Z'
},
{
'eventTime': '2018-06-25T19:28:00Z'
}
],
'EndTime': '2018-07-23T19:28:00Z',
'StartTime': '2018-06-23T19:28:30Z'
}
]
client_mock.side_effect = [json.dumps(r, separators=(',', ':')) for r in mock_resps]

# Mock remaining time. _sleep_seconds() methods will be called twice when
# make a call to gather logs via Aliyun API. Set sleep second to a large number
# to mimic corner case that there are still more logs to pull while lambda function
# timeout is reached. In this case, the _last_timestamp stamp should be updated
# correctly.
sleep_mock.side_effect = [0, 0, 0, 0, 1000000, 0]

# Mock 3 batcher call to invoke successive lambda function since there are more logs
batcher_mock.side_effect = [True, True, True]

self._app.gather()
assert_equal(self._app._poll_count, 3)
assert_true(self._app._more_to_poll)
assert_equal(self._app.request.get_NextToken(), "150")
assert_equal(self._app._last_timestamp, '2018-07-23T19:28:00Z')
2 changes: 1 addition & 1 deletion tests/unit/stream_alert_apps/test_apps/test_app_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def test_initialize(self):
"""App Integration - Initialize, Valid"""
assert_true(self._app._initialize())

@patch('logging.Logger.error')
@patch('logging.Logger.warning')
def test_initialize_running(self, log_mock):
"""App Integration - Initialize, Already Running"""
self._app._config.current_state = 'running'
Expand Down

0 comments on commit bf6244b

Please sign in to comment.