From e34e3bcb327320143452e2f8d117b51861465be0 Mon Sep 17 00:00:00 2001 From: tek Date: Mon, 27 Jun 2022 19:57:45 +0200 Subject: [PATCH] Fixes flake8 issues --- harpoon/commands/robtex.py | 6 ------ harpoon/commands/subdomains.py | 13 ++++++------- harpoon/lib/archiveis.py | 11 +++++++---- harpoon/lib/archiveorg.py | 7 ++++--- harpoon/lib/bird.py | 1 + harpoon/lib/certspotter.py | 9 ++++++--- harpoon/lib/hibp.py | 6 +++--- harpoon/lib/memento.py | 5 +++-- harpoon/lib/pgp.py | 2 +- harpoon/lib/pulsedive.py | 6 ++---- harpoon/lib/urlhaus.py | 7 +++---- harpoon/lib/xforce.py | 5 ----- tests/command/test_circl.py | 2 -- tests/command/test_crtsh.py | 2 -- tests/command/test_dnsdb.py | 2 -- tests/command/test_hashlookup.py | 2 -- tests/command/test_hybrid.py | 2 -- tests/command/test_otx.py | 2 -- tests/command/test_quad9.py | 2 -- tests/command/test_safebrowsing.py | 2 -- tests/test_utils.py | 20 ++++++++++++++++++++ tests/tests_utils.py | 23 ----------------------- 22 files changed, 56 insertions(+), 81 deletions(-) create mode 100644 tests/test_utils.py delete mode 100644 tests/tests_utils.py diff --git a/harpoon/commands/robtex.py b/harpoon/commands/robtex.py index 4789cbf..db94de4 100644 --- a/harpoon/commands/robtex.py +++ b/harpoon/commands/robtex.py @@ -1,13 +1,9 @@ #! /usr/bin/env python -import sys -import operator import json import pytz -from dateutil.parser import parse from harpoon.commands.base import Command from harpoon.lib.robtex import Robtex, RobtexError from harpoon.lib.utils import json_serial, unbracket -from datetime import date, datetime class CommandRobtex(Command): @@ -143,5 +139,3 @@ def intel(self, type, query, data): ) except RobtexError: print("Robtex query failed") - - diff --git a/harpoon/commands/subdomains.py b/harpoon/commands/subdomains.py index 1e01e78..d247dac 100644 --- a/harpoon/commands/subdomains.py +++ b/harpoon/commands/subdomains.py @@ -1,7 +1,6 @@ #! /usr/bin/env python from harpoon.commands.base import Command from harpoon.commands.censyscmd import CommandCensys -from censys.asm import AsmClient from harpoon.lib.utils import unbracket from passivetotal.libs.enrichment import EnrichmentRequest from virus_total_apis import PublicApi, PrivateApi @@ -78,16 +77,16 @@ def prepare_data(self, subdomains=dict, data=dict, source=str): def intel(self, type, query, data): if type == "domain": try: - #subdomains = self.censys_certs(unbracket(query), self._confif_data, True) - #self.prepare_data(subdomains, data, "Censys") + # subdomains = self.censys_certs(unbracket(query), self._confif_data, True) + # self.prepare_data(subdomains, data, "Censys") pass except CensysRateLimitExceededException: print('Censys quota exceeded!') - #subdomains = self.pt(unbracket(query), conf, True) - #self.prepare_data(subdomains, data, "PassiveTotal") - #subdomains = self.vt(unbracket(query), conf, True) - #self.prepare_data(subdomains, data, "VirusTotal") + # subdomains = self.pt(unbracket(query), conf, True) + # self.prepare_data(subdomains, data, "PassiveTotal") + # subdomains = self.vt(unbracket(query), conf, True) + # self.prepare_data(subdomains, data, "VirusTotal") else: pass diff --git a/harpoon/lib/archiveis.py b/harpoon/lib/archiveis.py index f6c7c5b..f557440 100644 --- a/harpoon/lib/archiveis.py +++ b/harpoon/lib/archiveis.py @@ -24,12 +24,15 @@ def download_cache(cache_url): r = requests.get(cache_url) data = r.text t1 = data.find('\n\n\n\n\n\n') - t2 = data.find('
') - #t3 = data.find('
') + # t3 = data.find('') diff --git a/harpoon/lib/bird.py b/harpoon/lib/bird.py index 896707c..5aaf97b 100644 --- a/harpoon/lib/bird.py +++ b/harpoon/lib/bird.py @@ -1,5 +1,6 @@ import tweepy + class Bird(object): def __init__(self, config): self.consumer_key = config['consumer_key'] diff --git a/harpoon/lib/certspotter.py b/harpoon/lib/certspotter.py index ccd77d3..e980c16 100644 --- a/harpoon/lib/certspotter.py +++ b/harpoon/lib/certspotter.py @@ -1,6 +1,5 @@ import requests from requests.auth import HTTPBasicAuth -import json class CertSpotterError(Exception): @@ -35,6 +34,10 @@ def _get(self, query, params={}): else: raise CertSpotterError("Invalid HTTP status code %i" % r.status_code) - def search(self, domain, include_subdomains=False): - return self._get("v1/issuances", params = {'domain': domain, "expand": ["dns_names", "issuer", "cert"], "include_subdomains": include_subdomains}) + return self._get( + "v1/issuances", + params={ + "domain": domain, + "expand": ["dns_names", "issuer", "cert"], + "include_subdomains": include_subdomains}) diff --git a/harpoon/lib/hibp.py b/harpoon/lib/hibp.py index 4317fef..5e1d685 100644 --- a/harpoon/lib/hibp.py +++ b/harpoon/lib/hibp.py @@ -1,5 +1,4 @@ import requests -import json class HibpError(Exception): @@ -26,7 +25,9 @@ def _request(self, query): """ Request an url """ - r = requests.get('https://haveibeenpwned.com' + query, headers= {'User-Agent': self.ua, 'hibp-api-key': self.key}) + r = requests.get( + "https://haveibeenpwned.com" + query, + headers={'User-Agent': self.ua, 'hibp-api-key': self.key}) if r.status_code != 200: if r.status_code == 404: raise HibpNotFound() @@ -52,4 +53,3 @@ def get_pastes(self, account): def check_pwd(self, pwd): return self._request('/api/v3/pwnedpassword/%s' % pwd).json() - diff --git a/harpoon/lib/memento.py b/harpoon/lib/memento.py index 959b072..df3aff7 100644 --- a/harpoon/lib/memento.py +++ b/harpoon/lib/memento.py @@ -3,13 +3,14 @@ from urllib.parse import urljoin, quote from dateutil.parser import parse + class MementoClient(object): """ Implement Memento Protocol """ def __init__(self, base_url='http://archive.is/'): self.base_url = base_url - self.linkre = re.compile('^<(?P[^>]+)>; rel="(?P[a-z ]+)"(; datetime="(?P[^"]+)"|,|; type="application/link-format"; from="(?P[^"]+)"; until="(?P[^"]+)")') + self.linkre = re.compile(r'^<(?P[^>]+)>; rel="(?P[a-z ]+)"(; datetime="(?P[^"]+)"|,|; type="application/link-format"; from="(?P[^"]+)"; until="(?P[^"]+)")') # noqa: E501 def _parselinks(self, data): """ @@ -56,7 +57,7 @@ def snapshots(self, url): if d['type'] in ['memento', 'first last memento']: snapshots.append({ 'url': original, - 'date':d['date'], + 'date': d['date'], 'archive': d['url'] }) return snapshots diff --git a/harpoon/lib/pgp.py b/harpoon/lib/pgp.py index 89cfd38..9323845 100644 --- a/harpoon/lib/pgp.py +++ b/harpoon/lib/pgp.py @@ -14,7 +14,7 @@ def search(search): return [] res = [] soup = BeautifulSoup(r.text, 'lxml') - emailsearch = re.compile("([\w\(\) ]+) <([\w@\.]+)>") + emailsearch = re.compile(r"([\w\(\) ]+) <([\w@\.]+)>") for pre in soup.find_all('pre')[1:]: key = {} t = str(pre) diff --git a/harpoon/lib/pulsedive.py b/harpoon/lib/pulsedive.py index 5778e55..582ab6d 100644 --- a/harpoon/lib/pulsedive.py +++ b/harpoon/lib/pulsedive.py @@ -6,6 +6,7 @@ def __init__(self, message): Exception.__init__(self, message) self.message = message + class PulseDiveNotFound(PulseDiveError): pass @@ -37,7 +38,7 @@ def indicators_by_value(self, val, historical=False): information is retrieved using the indicator value. Querying links and properties by indicator value won't work. """ - params = {'indicator': val} + params = {'indicator': val} if historical: params['historical'] = "1" return self._query("info.php", params) @@ -53,6 +54,3 @@ def threat(self, threat): def threat_indicators(self, tid): return self._query("info.php", {'tid': tid, 'get': 'links'}) - - - diff --git a/harpoon/lib/urlhaus.py b/harpoon/lib/urlhaus.py index 40e9f67..8b9d823 100644 --- a/harpoon/lib/urlhaus.py +++ b/harpoon/lib/urlhaus.py @@ -1,7 +1,6 @@ #! /usr/bin/env python3 import re - import requests @@ -57,7 +56,7 @@ def download_sample(self, url, query): if r.ok: try: return r.content - except: + except: # noqa: E722 raise UrlHausError() else: return None @@ -85,9 +84,9 @@ def get_payload(self, query): """ url = "{}payload/".format(self.base_url) # check if the hash is md5 or sha256 - if re.match("[A-Fa-f0-9]{64}", query.strip()): + if re.match(r"[A-Fa-f0-9]{64}", query.strip()): return self.launch_post_query(url, "sha256_hash", query.strip()) - elif re.match("[a-fA-F\d]{32}", query.strip()): + elif re.match(r"[a-fA-F\d]{32}", query.strip()): return self.launch_post_query(url, "md5_hash", query.strip()) else: raise UrlHausError() diff --git a/harpoon/lib/xforce.py b/harpoon/lib/xforce.py index 09e9847..2bd3394 100644 --- a/harpoon/lib/xforce.py +++ b/harpoon/lib/xforce.py @@ -1,6 +1,5 @@ import requests import base64 -import json class XforceExchangeFailed(Exception): @@ -99,7 +98,3 @@ def whois(self, domain): Returns a JSON object containing information about the given host address. """ return self._request('/whois/' + domain) - - - - diff --git a/tests/command/test_circl.py b/tests/command/test_circl.py index 2449fea..29d8ddd 100644 --- a/tests/command/test_circl.py +++ b/tests/command/test_circl.py @@ -1,5 +1,3 @@ -import pytest - from harpoon.commands.circl import CommandCircl from ..utils import launch_plugin diff --git a/tests/command/test_crtsh.py b/tests/command/test_crtsh.py index 171bfe1..83bedb3 100644 --- a/tests/command/test_crtsh.py +++ b/tests/command/test_crtsh.py @@ -1,5 +1,3 @@ -import pytest - from harpoon.commands.crtsh import CommandCertsh from ..utils import launch_plugin diff --git a/tests/command/test_dnsdb.py b/tests/command/test_dnsdb.py index b3e2f73..9a82bcb 100644 --- a/tests/command/test_dnsdb.py +++ b/tests/command/test_dnsdb.py @@ -1,5 +1,3 @@ -import pytest - from harpoon.commands.dnsdbcmd import DnsDbTotal from ..utils import launch_plugin diff --git a/tests/command/test_hashlookup.py b/tests/command/test_hashlookup.py index 52027c5..3bf08c0 100644 --- a/tests/command/test_hashlookup.py +++ b/tests/command/test_hashlookup.py @@ -1,5 +1,3 @@ -import pytest - from harpoon.commands.hashlookup import CommandHashLookup from ..utils import launch_plugin diff --git a/tests/command/test_hybrid.py b/tests/command/test_hybrid.py index 80aa1f3..3836c4e 100644 --- a/tests/command/test_hybrid.py +++ b/tests/command/test_hybrid.py @@ -1,5 +1,3 @@ -import pytest - from harpoon.commands.hybrid import CommandHybridAnalysis from ..utils import launch_plugin diff --git a/tests/command/test_otx.py b/tests/command/test_otx.py index 796efe6..13dcaaf 100644 --- a/tests/command/test_otx.py +++ b/tests/command/test_otx.py @@ -1,5 +1,3 @@ -import pytest - from harpoon.commands.otx import CommandOtx from ..utils import launch_plugin diff --git a/tests/command/test_quad9.py b/tests/command/test_quad9.py index 72b51d1..eea018c 100644 --- a/tests/command/test_quad9.py +++ b/tests/command/test_quad9.py @@ -1,5 +1,3 @@ -import pytest - from harpoon.commands.quad9 import CommandQuad9 from ..utils import launch_plugin diff --git a/tests/command/test_safebrowsing.py b/tests/command/test_safebrowsing.py index 92885c9..716f7c7 100644 --- a/tests/command/test_safebrowsing.py +++ b/tests/command/test_safebrowsing.py @@ -1,5 +1,3 @@ -import pytest - from harpoon.commands.safebrowsing import CommandSafeBrowsing from ..utils import launch_plugin diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..9293d5d --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,20 @@ +from harpoon.lib.utils import unbracket, bracket, same_url, typeguess + + +class TestUtils: + def test_bracket(self): + assert unbracket("domain[.]com") == "domain.com" + assert bracket("domain.com") == "domain[.]com" + + def test_same_url(self): + assert same_url("http://example.org", "https://example.org") + assert same_url("http://example.org", "http://www.example.org") + assert same_url("http://exemple.org", "http://example.org") is False + + def test_typeguess(self): + assert typeguess("44c13fc77ee90ef4040a0c99a9be999e") == "md5" + assert typeguess("d82b0fffdda6d7120dd8c14da32208278e2a287f") == "sha1" + assert typeguess("fef75dbbf6297c151a7112cb5f98884e4928716f0725826c42086e6c21a9894d") == "sha256" + assert typeguess("10.4.4.4") == "IPv4" + assert typeguess("fce8::1") == "IPv6" + assert typeguess("domain.com") == "domain" diff --git a/tests/tests_utils.py b/tests/tests_utils.py deleted file mode 100644 index 3b8bf59..0000000 --- a/tests/tests_utils.py +++ /dev/null @@ -1,23 +0,0 @@ -import unittest -from harpoon.lib.utils import unbracket, bracket, same_url, typeguess - -class TestUtils(unittest.TestCase): - def test_bracket(self): - self.assertEqual(unbracket("domain[.]com"), "domain.com") - self.assertEqual(bracket("domain.com"), "domain[.]com") - - def test_same_url(self): - self.assertTrue(same_url("http://example.org", "https://example.org")) - self.assertTrue(same_url("http://example.org", "http://www.example.org")) - self.assertFalse(same_url("http://exemple.org", "http://example.org")) - - def test_typeguess(self): - self.assertEqual(typeguess("44c13fc77ee90ef4040a0c99a9be999e"), "md5") - self.assertEqual(typeguess("d82b0fffdda6d7120dd8c14da32208278e2a287f"), "sha1") - self.assertEqual(typeguess("fef75dbbf6297c151a7112cb5f98884e4928716f0725826c42086e6c21a9894d"), "sha256") - self.assertEqual(typeguess("10.4.4.4"), "IPv4") - self.assertEqual(typeguess("fce8::1"), "IPv6") - self.assertEqual(typeguess("domain.com"), "domain") - -if __name__ == '__main__': - unittest.main()