Skip to content

Commit

Permalink
Working IP lookup yay
Browse files Browse the repository at this point in the history
  • Loading branch information
pirxthepilot committed Sep 2, 2022
1 parent 8d3441e commit 8ffbcd1
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 269 deletions.
38 changes: 19 additions & 19 deletions tests/test_ui_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
HistoricalWhois,
Resolutions,
)
from wtfis.ui.view import View
from wtfis.ui.view import DomainView


def mock_get_ipwhois(ip, pool) -> IpWhois:
Expand All @@ -40,9 +40,9 @@ def view01(test_data):
ipwhois_client.get_ipwhois = MagicMock(side_effect=lambda ip: mock_get_ipwhois(ip, ipwhois_pool))
ip_enrich = ipwhois_client.bulk_get_ipwhois(resolutions, 3)

return View(
return DomainView(
console=Console(),
domain=Domain.parse_obj(json.loads(test_data("vt_domain_gist.json"))),
entity=Domain.parse_obj(json.loads(test_data("vt_domain_gist.json"))),
resolutions=resolutions,
whois=Whois.parse_obj(json.loads(test_data("pt_whois_gist.json"))),
ip_enrich=ip_enrich,
Expand All @@ -55,9 +55,9 @@ def view02(test_data):
gist.github.com VT whois. Resolution and whois tests only. Test empty enrichment
and max_resolutions=1
"""
return View(
return DomainView(
console=Console(),
domain=MagicMock(),
entity=MagicMock(),
resolutions=Resolutions.parse_obj(json.loads(test_data("vt_resolutions_gist.json"))),
whois=HistoricalWhois.parse_obj(json.loads(test_data("vt_whois_gist.json"))),
ip_enrich=IpWhoisMap(__root__={}),
Expand All @@ -68,9 +68,9 @@ def view02(test_data):
@pytest.fixture()
def view03(test_data):
""" bbc.co.uk VT whois. Whois panel test only. Test whois with no domain field. """
return View(
return DomainView(
console=Console(),
domain=MagicMock(),
entity=MagicMock(),
resolutions=MagicMock(),
whois=HistoricalWhois.parse_obj(json.loads(test_data("vt_whois_bbc.json"))),
ip_enrich=[],
Expand All @@ -83,9 +83,9 @@ def view04(test_data):
google.com domain. Domain and resolution test only. Test domain with 1 malicious
analysis point, and empty resolutions.
"""
return View(
return DomainView(
console=Console(),
domain=Domain.parse_obj(json.loads(test_data("vt_domain_google.json"))),
entity=Domain.parse_obj(json.loads(test_data("vt_domain_google.json"))),
resolutions=None,
whois=MagicMock(),
ip_enrich=[],
Expand All @@ -95,9 +95,9 @@ def view04(test_data):
@pytest.fixture()
def view05(test_data):
""" tucows.com domain. Domain test only. Test domain with negative reputation and no popularity."""
return View(
return DomainView(
console=Console(),
domain=Domain.parse_obj(json.loads(test_data("vt_domain_tucows.json"))),
entity=Domain.parse_obj(json.loads(test_data("vt_domain_tucows.json"))),
resolutions=MagicMock(),
whois=MagicMock(),
ip_enrich=[],
Expand All @@ -107,9 +107,9 @@ def view05(test_data):
@pytest.fixture()
def view06(test_data):
""" exmple.com VT whois. Whois test only. Test empty whois_map."""
return View(
return DomainView(
console=Console(),
domain=MagicMock(),
entity=MagicMock(),
resolutions=MagicMock(),
whois=HistoricalWhois.parse_obj(json.loads(test_data("vt_whois_example_2.json"))),
ip_enrich=[],
Expand All @@ -126,9 +126,9 @@ def view07(test_data):
shodan_client.get_ip = MagicMock(side_effect=lambda ip: mock_shodan_get_ip(ip, shodan_pool))
ip_enrich = shodan_client.bulk_get_ip(resolutions, 3)

return View(
return DomainView(
console=Console(),
domain=MagicMock(),
entity=MagicMock(),
resolutions=resolutions,
whois=MagicMock(),
ip_enrich=ip_enrich,
Expand All @@ -145,9 +145,9 @@ def view08(test_data):
shodan_client.get_ip = MagicMock(side_effect=lambda ip: mock_shodan_get_ip(ip, shodan_pool))
ip_enrich = shodan_client.bulk_get_ip(resolutions, 1)

return View(
return DomainView(
console=Console(),
domain=MagicMock(),
entity=MagicMock(),
resolutions=resolutions,
whois=MagicMock(),
ip_enrich=ip_enrich,
Expand All @@ -165,9 +165,9 @@ def view09(test_data):
shodan_client.get_ip = MagicMock(side_effect=lambda ip: mock_shodan_get_ip(ip, shodan_pool))
ip_enrich = shodan_client.bulk_get_ip(resolutions, 1)

return View(
return DomainView(
console=Console(),
domain=MagicMock(),
entity=MagicMock(),
resolutions=resolutions,
whois=MagicMock(),
ip_enrich=ip_enrich,
Expand Down
7 changes: 7 additions & 0 deletions wtfis/clients/ipwhois.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,10 @@ def bulk_get_ipwhois(
if ipwhois:
ipwhois_map[ipwhois.ip] = ipwhois
return IpWhoisMap(__root__=ipwhois_map)

def single_get_ipwhois(self, ip: str) -> IpWhoisMap:
ipwhois_map = {}
ipwhois = self.get_ipwhois(ip)
if ipwhois:
ipwhois_map[ip] = ipwhois
return IpWhoisMap(__root__=ipwhois_map)
26 changes: 17 additions & 9 deletions wtfis/clients/shodan.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from shodan import Shodan
from shodan.exception import APIError
from typing import Optional

from wtfis.models.shodan import ShodanIp, ShodanIpMap
from wtfis.models.virustotal import Resolutions
Expand All @@ -12,8 +13,14 @@ class ShodanClient:
def __init__(self, api_key: str) -> None:
self.s = Shodan(api_key)

def get_ip(self, ip: str) -> ShodanIp:
return ShodanIp.parse_obj(self.s.host(ip, minify=False))
def get_ip(self, ip: str) -> Optional[ShodanIp]:
try:
return ShodanIp.parse_obj(self.s.host(ip, minify=False))
except APIError as e:
if str(e) == "Invalid API key":
raise APIError("Invalid Shodan API key")
else:
return None

def bulk_get_ip(
self,
Expand All @@ -24,13 +31,14 @@ def bulk_get_ip(
for idx, ip in enumerate(resolutions.data):
if idx == max_ips_to_enrich:
break
try:
ip_data = self.get_ip(ip.attributes.ip_address)
except APIError as e:
if str(e) == "Invalid API key":
raise APIError("Invalid Shodan API key")
else:
ip_data = None
ip_data = self.get_ip(ip.attributes.ip_address)
if ip_data:
shodan_map[ip_data.ip_str] = ip_data
return ShodanIpMap(__root__=shodan_map)

def single_get_ip(self, ip: str) -> ShodanIpMap:
shodan_map = {}
ip_data = self.get_ip(ip)
if ip_data:
shodan_map[ip] = ip_data
return ShodanIpMap(__root__=shodan_map)
11 changes: 11 additions & 0 deletions wtfis/clients/virustotal.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
HistoricalWhois,
Resolutions,
)
from wtfis.utils import is_ip


class VTClient(BaseClient):
Expand All @@ -33,3 +34,13 @@ def get_domain_whois(self, domain: str) -> HistoricalWhois:

def get_ip_address(self, ip: str) -> IpAddress:
return IpAddress.parse_obj(self._get(f"/ip_addresses/{ip}"))

def get_ip_whois(self, ip: str) -> HistoricalWhois:
return HistoricalWhois.parse_obj(self._get(f"/ip_addresses/{ip}/historical_whois"))

def get_whois(self, entity: str) -> HistoricalWhois:
""" Generalized for domain and IP """
if is_ip(entity):
return self.get_ip_whois(entity)
else:
return self.get_domain_whois(entity)
73 changes: 50 additions & 23 deletions wtfis/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
from wtfis.clients.shodan import ShodanClient
from wtfis.clients.virustotal import VTClient
from wtfis.models.virustotal import Domain
from wtfis.utils import error_and_exit, is_ip
from wtfis.utils import error_and_exit, is_ip, refang
from wtfis.ui.progress import get_progress
from wtfis.ui.view import View
from wtfis.ui.view import DomainView, IpAddressView
from wtfis.version import get_version


Expand Down Expand Up @@ -90,46 +90,65 @@ def main():
vt = VTClient(os.environ.get("VT_API_KEY"))
progress.update(task1, advance=33)
if is_ip(args.entity):
entity = vt.get_ip_address(args.entity)
entity = vt.get_ip_address(refang(args.entity))
else:
entity = vt.get_domain(args.entity)
progress.update(task1, advance=33)

# Resolutions and IP enrichments
if args.max_resolutions != 0:
resolutions = vt.get_domain_resolutions(args.entity)
progress.update(task1, advance=33)
# Domain resolutions and IP enrichments
if isinstance(entity, Domain):
if args.max_resolutions != 0:
resolutions = vt.get_domain_resolutions(args.entity)
progress.update(task1, completed=100)

if args.use_shodan:
# Shodan
task2 = progress.add_task("Fetching IP enrichments from Shodan")
shodan = ShodanClient(os.environ.get("SHODAN_API_KEY"))
progress.update(task2, advance=50)
ip_enrich = shodan.bulk_get_ip(resolutions, args.max_resolutions)
progress.update(task2, advance=50)
else:
# IPWhois
task2 = progress.add_task("Fetching IP enrichments from IPWhois")
ipwhois = IpWhoisClient()
progress.update(task2, advance=50)
ip_enrich = ipwhois.bulk_get_ipwhois(resolutions, args.max_resolutions)
progress.update(task2, advance=50)
else:
resolutions = None
ip_enrich = []

# IP address enrichments
else:
progress.update(task1, completed=100)

if args.use_shodan:
# Shodan
task2 = progress.add_task("Fetching IP enrichments from Shodan")
shodan = ShodanClient(os.environ.get("SHODAN_API_KEY"))
progress.update(task2, advance=50)
ip_enrich = shodan.bulk_get_ip(resolutions, args.max_resolutions)
ip_enrich = shodan.single_get_ip(entity.data.id_)
progress.update(task2, advance=50)
else:
# IPWhois
task2 = progress.add_task("Fetching IP enrichments from IPWhois")
ipwhois = IpWhoisClient()
progress.update(task2, advance=50)
ip_enrich = ipwhois.bulk_get_ipwhois(resolutions, args.max_resolutions)
ip_enrich = ipwhois.single_get_ipwhois(entity.data.id_)
progress.update(task2, advance=50)
else:
resolutions = None
ip_enrich = []
progress.update(task1, completed=100)

# Whois
# Use Passivetotal if relevant environment variables exist, otherwise keep using VT
if os.environ.get("PT_API_USER") and os.environ.get("PT_API_KEY"):
task3 = progress.add_task("Fetching domain whois from Passivetotal")
pt = PTClient(os.environ.get("PT_API_USER"), os.environ.get("PT_API_KEY"))
progress.update(task3, advance=50)
whois = pt.get_whois(args.entity)
whois = pt.get_whois(entity.data.id_)
progress.update(task3, advance=50)
else:
task3 = progress.add_task("Fetching domain whois from Virustotal")
whois = vt.get_domain_whois(args.entity)
whois = vt.get_whois(entity.data.id_)
progress.update(task3, advance=100)
except (HTTPError, JSONDecodeError, APIError) as e:
progress.stop()
Expand All @@ -139,12 +158,20 @@ def main():
error_and_exit(f"Data model validation error: {e}")

# Output
view = View(
console,
entity,
resolutions,
whois,
ip_enrich,
max_resolutions=args.max_resolutions,
)
if isinstance(entity, Domain):
view = DomainView(
console,
entity,
resolutions,
whois,
ip_enrich,
max_resolutions=args.max_resolutions,
)
else:
view = IpAddressView(
console,
entity,
whois,
ip_enrich,
)
view.print(one_column=args.one_column)
6 changes: 2 additions & 4 deletions wtfis/ui/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
from typing import Generator, List, Optional, Union
from typing import Any, Generator, List, Optional, Union

from wtfis.models.ipwhois import IpWhois, IpWhoisMap
from wtfis.models.passivetotal import Whois
from wtfis.models.shodan import ShodanIp, ShodanIpMap
from wtfis.models.virustotal import (
Domain,
HistoricalWhois,
IpAddress,
LastAnalysisStats,
PopularityRanks,
)
Expand All @@ -37,7 +35,7 @@ class BaseView(abc.ABC):
def __init__(
self,
console: Console,
entity: Union[Domain, IpAddress],
entity: Any,
whois: Union[Whois, HistoricalWhois],
ip_enrich: Union[IpWhoisMap, ShodanIpMap],
) -> None:
Expand Down
Loading

0 comments on commit 8ffbcd1

Please sign in to comment.