Skip to content

Commit

Permalink
Added pattern searching functionality for OpenDNS Investigate wrapper…
Browse files Browse the repository at this point in the history
… and tests
  • Loading branch information
Leeren Chang committed Mar 26, 2018
1 parent 82e1c9b commit b202895
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 18 deletions.
45 changes: 29 additions & 16 deletions tests/opendns_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,118 +156,131 @@ def test_categorization_response_error(self):
with T.assert_raises(ResponseError):
i.categorization(domains)

def _test_api_call_get(self, call, endpoint, request, expected_query_params, api_response, expected_result):
def _test_api_call_get(self, call, endpoint, request, expected_url_params,
api_response, expected_result, expected_query_params=None):
"""
Tests a OpenDNS call by mocking out the HTTP GET request.
Args:
call: function in OpenDNSApi to call.
endpoint: endpoint of OpenDNS API that is hit (appended to base url)
request: call arguments
expected_query_params: query parameters that should be passed to API
expected_url_params: URL parameters that should be passed to API
api_response: the expected response by the API
expected_result: what call should return (given the api response provided)
expected_query_params: query parameters that should be passed to API
"""
with patch.object(self.opendns, '_requests') as request_mock:
request_mock.multi_get.return_value = api_response
result = call(request)

url = self.opendns._to_url(endpoint.format(expected_query_params))
request_mock.multi_get.assert_called_with([url])
url = self.opendns._to_url(endpoint.format(expected_url_params))
request_mock.multi_get.assert_called_with([url], expected_query_params)
T.assert_equal(result, expected_result)

def test_security(self):
self._test_api_call_get(call=self.opendns.security,
endpoint=u'security/name/{0}.json',
request=['domain'],
expected_query_params='domain',
expected_url_params='domain',
api_response={},
expected_result={})

def test_whois_emails(self):
self._test_api_call_get(call=self.opendns.whois_emails,
endpoint=u'whois/emails/{0}',
request=['[email protected]'],
expected_query_params='[email protected]',
expected_url_params='[email protected]',
api_response={},
expected_result={})

def test_whois_nameservers(self):
self._test_api_call_get(call=self.opendns.whois_nameservers,
endpoint=u'whois/nameservers/{0}',
request=['ns.dns.com'],
expected_query_params='ns.dns.com',
expected_url_params='ns.dns.com',
api_response={},
expected_result={})

def test_whois_domains(self):
self._test_api_call_get(call=self.opendns.whois_domains,
endpoint=u'whois/{0}',
request=['google.com'],
expected_query_params='google.com',
expected_url_params='google.com',
api_response={},
expected_result={})

def test_whois_domains_history(self):
self._test_api_call_get(call=self.opendns.whois_domains_history,
endpoint=u'whois/{0}/history',
request=['5esb.biz'],
expected_query_params='5esb.biz',
expected_url_params='5esb.biz',
api_response={},
expected_result={})

def test_coocurrences(self):
self._test_api_call_get(call=self.opendns.cooccurrences,
endpoint=u'recommendations/name/{0}.json',
request=['domain'],
expected_query_params='domain',
expected_url_params='domain',
api_response={},
expected_result={})

def test_rr_history(self):
self._test_api_call_get(call=self.opendns.rr_history,
endpoint=u'dnsdb/ip/a/{0}.json',
request=['8.8.8.8'],
expected_query_params='8.8.8.8',
expected_url_params='8.8.8.8',
api_response={},
expected_result={})

def test_latest_malicious(self):
self._test_api_call_get(call=self.opendns.latest_malicious,
endpoint=u'ips/{0}/latest_domains',
request=['8.8.8.8'],
expected_query_params='8.8.8.8',
expected_url_params='8.8.8.8',
api_response={},
expected_result={})

def test_domain_tag(self):
self._test_api_call_get(call=self.opendns.domain_tag,
endpoint=u'domains/{0}/latest_tags',
request=['domain'],
expected_query_params='domain',
expected_url_params='domain',
api_response={},
expected_result={})

def test_dns_rr(self):
self._test_api_call_get(call=self.opendns.dns_rr,
endpoint=u'dnsdb/name/a/{0}.json',
request=['domain'],
expected_query_params='domain',
expected_url_params='domain',
api_response={},
expected_result={})

def test_related_domains(self):
self._test_api_call_get(call=self.opendns.related_domains,
endpoint=u'links/name/{0}.json',
request=['domain'],
expected_query_params='domain',
expected_url_params='domain',
api_response={},
expected_result={})

def test_sample(self):
self._test_api_call_get(call=self.opendns.sample,
endpoint=u'sample/{0}',
request=['0492d93195451e41f568f68e7704eb0812bc2b19'],
expected_query_params='0492d93195451e41f568f68e7704eb0812bc2b19',
expected_url_params='0492d93195451e41f568f68e7704eb0812bc2b19',
api_response={},
expected_result={})

def test_search(self):
self._test_api_call_get(call=self.opendns.search,
endpoint=u'search/{}',
request=['pattern'],
expected_url_params='pattern',
api_response={},
expected_result={},
expected_query_params={'start': '-30days',
'includecategory': 'false',
'limit': 1000})
25 changes: 23 additions & 2 deletions threat_intel/opendns.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,14 @@ def domain_score(self, domains):
return self._multi_post(url_path, domains)

@MultiRequest.error_handling
def _multi_get(self, cache_api_name, fmt_url_path, url_params):
def _multi_get(self, cache_api_name, fmt_url_path, url_params, query_params=None):
"""Makes multiple GETs to an OpenDNS endpoint.
Args:
cache_api_name: string api_name for caching
fmt_url_path: format string for building URL paths
url_params: An enumerable of strings used in building URLs
query_params - None / dict / list of dicts containing query params
Returns:
A dict of {url_param: api_result}
"""
Expand All @@ -140,7 +141,7 @@ def _multi_get(self, cache_api_name, fmt_url_path, url_params):

if len(url_params):
urls = self._to_urls(fmt_url_path, url_params)
responses = self._requests.multi_get(urls)
responses = self._requests.multi_get(urls, query_params)
for url_param, response in zip(url_params, responses):
if self._cache:
self._cache.cache_value(cache_api_name, url_param, response)
Expand Down Expand Up @@ -294,6 +295,26 @@ def sample(self, hashes):
fmt_url_path = u'sample/{0}'
return self._multi_get(api_name, fmt_url_path, hashes)

def search(self, patterns, start=30, limit=1000, include_category=False):
"""Performs pattern searches against the Investigate database.
Args:
patterns: An enumerable of RegEx domain patterns to search for
start: How far back results extend from in days (max is 30)
limit: Number of results to show (max is 1000)
include_category: Include OpenDNS security categories
Returns:
An enumerable of matching domain strings
"""
api_name = 'opendns-patterns'
fmt_url_path = u'search/{0}'
start = '-{0}days'.format(start)
include_category = str(include_category).lower()
query_params = {'start': start,
'limit': limit,
'includecategory': include_category}
return self._multi_get(api_name, fmt_url_path, patterns, query_params)


class ResponseError(Exception):

Expand Down

0 comments on commit b202895

Please sign in to comment.