Skip to content

Commit

Permalink
Fixes OTX, CertSpotter and Cybercure
Browse files Browse the repository at this point in the history
  • Loading branch information
Te-k committed Jun 7, 2022
1 parent dca38dd commit 8cb8602
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 131 deletions.
18 changes: 11 additions & 7 deletions harpoon/commands/certspotter.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
#! /usr/bin/env python
import os
import sys
import json
from harpoon.commands.base import Command
from harpoon.lib.utils import bracket, unbracket, json_serial
from harpoon.lib.utils import unbracket, json_serial
from harpoon.lib.certspotter import CertSpotter, CertSpotterError


class CommandCertSpotter(Command):
"""
# Cert Spotter Command
Search in Certificate Transparency database Cert Spotter https://sslmate.com/certspotter. Only actual certificate can be searched without paid plan
Search in Certificate Transparency database Cert Spotter https://sslmate.com/certspotter.
Only current certificates can be searched without paid plan
* Search certificates for a domain : `harpoon certspotter search DOMAIN`
* Search certificates for a domain and its subdomains : `harpoon certspotter search -s DOMAIN`
Expand All @@ -23,13 +24,15 @@ def add_arguments(self, parser):
subparsers = parser.add_subparsers(help='Subcommand')
parser_a = subparsers.add_parser('search', help='Search certificates for a domain')
parser_a.add_argument('DOMAIN', help='domain')
parser_a.add_argument('--subdomains', '-s', help='Search for the domain and its subdomains', action='store_true')
parser_a.add_argument(
'--subdomains', '-s',
help='Search for the domain and its subdomains', action='store_true')
parser_a.set_defaults(subcommand='search')
self.parser = parser

def run(self, conf, args, plugins):
def run(self, args, plugins):
try:
cs = CertSpotter(conf['CertSpotter']['key'])
cs = CertSpotter(self._config_data['CertSpotter']['key'])
except KeyError:
cs = CertSpotter()

Expand All @@ -38,7 +41,8 @@ def run(self, conf, args, plugins):
try:
res = cs.search(unbracket(args.DOMAIN), include_subdomains=args.subdomains)
except CertSpotterError:
print("Error with the API, likely because you need a paid plan to search expired certs. Check censys or crtsh plugins instead")
print("Error with the API, likely because you need a paid plan to search expired certs.")
print("Check censys or crtsh plugins instead")
sys.exit(1)
else:
print(json.dumps(res, sort_keys=True, indent=4, separators=(',', ': '), default=json_serial))
Expand Down
7 changes: 4 additions & 3 deletions harpoon/commands/cybercure.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,16 @@ class CommandCyberCure(Command):

def add_arguments(self, parser):
subparsers = parser.add_subparsers(help='Subcommand')
parser_a = subparsers.add_parser('ip', help='Returns a response whether an indicator exists in cybercure.ai database, if it is exists it will provide also a link for visual presentation of the indicator history.')
parser_a = subparsers.add_parser('ip', help="Returns a response whether an\
indicator exists in cybercure.ai database")
parser_a.add_argument('IP', help='IP address')
parser_a.set_defaults(subcommand='ip')
parser_b = subparsers.add_parser('file', help='Information on a list of IPs')
parser_b.add_argument('FILE', help='Filename')
parser_b.set_defaults(subcommand='file')
self.parser = parser

def run(self, conf, args, plugins):
def run(self, args, plugins):
cybercure = CyberCure(token='reserved_for_future')
if 'subcommand' in args:
if args.subcommand == 'ip':
Expand All @@ -62,7 +63,7 @@ def run(self, conf, args, plugins):
except CyberCureError:
print("%s;;" % ip)
else:
print ("%s;%s;%s" % (
print("{};{};{}".format(
ip,
infos['exists'],
infos['visual'] if 'visual' in infos else ''
Expand Down
247 changes: 126 additions & 121 deletions harpoon/commands/otx.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#! /usr/bin/env python
import sys
import json
import pytz
from dateutil.parser import parse
Expand Down Expand Up @@ -39,15 +38,21 @@ class CommandOtx(Command):
"""
name = "otx"
description = "Requests information from AlienVault OTX"
config = { 'AlienVaultOtx': ['key']}
config = {'AlienVaultOtx': ['key']}

def add_arguments(self, parser):
# TODO: rewrite as subcommands
parser.add_argument('--pulse', '-p', help='Event infos')
parser.add_argument('--search', '-s', help='Search for indicators')
parser.add_argument('--file', '-f', help='Check from a list of indicators in a file')
parser.add_argument('--raw', '-r', help='Print raw information', action='store_true')
parser.add_argument('--json', '-j', help='Print json information', action='store_true')
parser.add_argument('--type', '-t', help='Type for search', default="guess", choices=["guess", "domain", "IPv4", "IPv6", "url", "md5", "sha1", "sha256", "pehash", "imphash", "cidr", "file_path", "hostname", "mutex", "cve"])
parser.add_argument(
'--type', '-t',
help='Type for search',
default="guess",
choices=["guess", "domain", "IPv4", "IPv6", "url", "md5", "sha1",
"sha256", "pehash", "imphash", "cidr", "file_path", "hostname", "mutex", "cve"])
self.parser = parser

def run(self, args, plugins):
Expand Down Expand Up @@ -141,7 +146,7 @@ def run(self, args, plugins):
for u in res['url_list']['url_list']:
if "result" in u:
if "urlworker" in u["result"]:
if "ip" in u["result"]["urlworker"] :
if "ip" in u["result"]["urlworker"]:
print("\t[%s] %s on IP %s" % (
u["date"],
u["url"],
Expand Down Expand Up @@ -190,132 +195,132 @@ def run(self, args, plugins):
else:
self.parser.print_help()

def intel(self, type, query, data):
if type == "domain":
print("[+] Checking OTX...")
try:
otx = OTXv2(self._config_data["AlienVaultOtx"]["key"])
res = otx.get_indicator_details_full(IndicatorTypes.DOMAIN, query)
for pulse in res["general"]["pulse_info"]["pulses"]:
data["reports"].append({
"date": parse(pulse["created"]).astimezone(pytz.utc),
"title": pulse["name"],
def intel_domain(self, query, data):
print("[+] Checking OTX...")
try:
otx = OTXv2(self._config_data["AlienVaultOtx"]["key"])
res = otx.get_indicator_details_full(IndicatorTypes.DOMAIN, query)
for pulse in res["general"]["pulse_info"]["pulses"]:
data["reports"].append({
"date": parse(pulse["created"]).astimezone(pytz.utc),
"title": pulse["name"],
"source": "OTX",
"url": "https://otx.alienvault.com/pulse/{}".format(pulse["id"])
})
# Get Passive DNS
if "passive_dns" in res:
for r in res["passive_dns"]["passive_dns"]:
data["passive_dns"].append({
"ip": r["hostname"],
"first": parse(r["first"]).astimezone(pytz.utc),
"last": parse(r["last"]).astimezone(pytz.utc),
"source": "OTX",
"url": "https://otx.alienvault.com/pulse/{}".format(pulse["id"])
})
# Get Passive DNS
if "passive_dns" in res:
for r in res["passive_dns"]["passive_dns"]:
data["passive_dns"].append({
"ip": r["hostname"],
"first": parse(r["first"]).astimezone(pytz.utc),
"last": parse(r["last"]).astimezone(pytz.utc),
if "url_list" in res:
for r in res["url_list"]["url_list"]:
if "result" in r:
data["urls"].append({
"date": parse(r["date"]).astimezone(pytz.utc),
"url": r["url"],
"ip": r["result"]["urlworker"]["ip"]
if "ip" in r["result"]["urlworker"]
else "",
"source": "OTX",
})
if "url_list" in res:
for r in res["url_list"]["url_list"]:
if "result" in r:
data["urls"].append({
"date": parse(r["date"]).astimezone(pytz.utc),
"url": r["url"],
"ip": r["result"]["urlworker"]["ip"]
if "ip" in r["result"]["urlworker"]
else "",
"source": "OTX",
})
else:
data["urls"].append({
"date": parse(r["date"]).astimezone(pytz.utc),
"url": r["url"],
"ip": "",
"source": "OTX",
})
# Some pulses have domains as hostnames
res = otx.get_indicator_details_full(IndicatorTypes.HOSTNAME, query)
for pulse in res["general"]["pulse_info"]["pulses"]:
data["reports"].append({
"date": parse(pulse["created"]).astimezone(pytz.utc),
"title": pulse["name"],
"source": "OTX",
"url": "https://otx.alienvault.com/pulse/{}".format(pulse["id"])
})
except AttributeError:
print("OTX crashed ¯\_(ツ)_/¯")
elif type == "ip":
print("[+] Checking OTX...")
try:
otx = OTXv2(self._config_data["AlienVaultOtx"]["key"])
res = otx.get_indicator_details_full(IndicatorTypes.IPv4, query)
for pulse in res["general"]["pulse_info"]["pulses"]:
data["reports"].append({
"date": parse(pulse["created"]).astimezone(pytz.utc),
"title": pulse["name"],
"source": "OTX",
"url": "https://otx.alienvault.com/pulse/{}".format(pulse["id"])
})
# Get Passive DNS
if "passive_dns" in res:
for r in res["passive_dns"]["passive_dns"]:
data["passive_dns"].append({
"domain": r["hostname"],
"first": parse(r["first"]).astimezone(pytz.utc),
"last": parse(r["last"]).astimezone(pytz.utc),
else:
data["urls"].append({
"date": parse(r["date"]).astimezone(pytz.utc),
"url": r["url"],
"ip": "",
"source": "OTX",
})
if "url_list" in res:
for r in res["url_list"]["url_list"]:
if "result" in r:
data["urls"].append({
"date": parse(r["date"]).astimezone(pytz.utc),
"url": r["url"],
"ip": r["result"]["urlworker"]["ip"]
if "ip" in r["result"]["urlworker"]
else "",
"source": "OTX",
})
else:
data["urls"].append({
"date": parse(r["date"]).astimezone(pytz.utc),
"url": r["url"],
"ip": "",
"source": "OTX",
})
except AttributeError:
print("OTX crashed ¯\_(ツ)_/¯")
elif type == "hash":
t = typeguess(query)
print("[+] Checking OTX...")
try:
otx = OTXv2(self._config_data["AlienVaultOtx"]["key"])
res = otx.get_indicator_details_full(OTX_TYPES[t], query)
for pulse in res["general"]["pulse_info"]["pulses"]:
data["reports"].append({
"date": parse(pulse["created"]).astimezone(pytz.utc),
"title": pulse["name"],
# Some pulses have domains as hostnames
res = otx.get_indicator_details_full(IndicatorTypes.HOSTNAME, query)
for pulse in res["general"]["pulse_info"]["pulses"]:
data["reports"].append({
"date": parse(pulse["created"]).astimezone(pytz.utc),
"title": pulse["name"],
"source": "OTX",
"url": "https://otx.alienvault.com/pulse/{}".format(pulse["id"])
})
except AttributeError:
print("OTX crashed ¯\\_(ツ)_/¯")

def intel_ip(self, query, data):
print("[+] Checking OTX...")
try:
otx = OTXv2(self._config_data["AlienVaultOtx"]["key"])
res = otx.get_indicator_details_full(IndicatorTypes.IPv4, query)
for pulse in res["general"]["pulse_info"]["pulses"]:
data["reports"].append({
"date": parse(pulse["created"]).astimezone(pytz.utc),
"title": pulse["name"],
"source": "OTX",
"url": "https://otx.alienvault.com/pulse/{}".format(pulse["id"])
})
# Get Passive DNS
if "passive_dns" in res:
for r in res["passive_dns"]["passive_dns"]:
data["passive_dns"].append({
"domain": r["hostname"],
"first": parse(r["first"]).astimezone(pytz.utc),
"last": parse(r["last"]).astimezone(pytz.utc),
"source": "OTX",
"url": "https://otx.alienvault.com/pulse/{}".format(pulse["id"])
})
if "analysis" in res:
if "analysis" in res["analysis"]:
if "plugins" in res["analysis"]["analysis"]:
if "cuckoo" in res["analysis"]["analysis"]["plugins"]:
done = []
for d in res["analysis"]["analysis"]["plugins"]["cuckoo"]["result"]["network"]["domains"]:
if "url_list" in res:
for r in res["url_list"]["url_list"]:
if "result" in r:
data["urls"].append({
"date": parse(r["date"]).astimezone(pytz.utc),
"url": r["url"],
"ip": r["result"]["urlworker"]["ip"]
if "ip" in r["result"]["urlworker"]
else "",
"source": "OTX",
})
else:
data["urls"].append({
"date": parse(r["date"]).astimezone(pytz.utc),
"url": r["url"],
"ip": "",
"source": "OTX",
})
except AttributeError:
print("OTX crashed ¯\\_(ツ)_/¯")

def intel_hash(self, query, data):
print("[+] Checking OTX...")
t = typeguess(query)
try:
otx = OTXv2(self._config_data["AlienVaultOtx"]["key"])
res = otx.get_indicator_details_full(OTX_TYPES[t], query)
for pulse in res["general"]["pulse_info"]["pulses"]:
data["reports"].append({
"date": parse(pulse["created"]).astimezone(pytz.utc),
"title": pulse["name"],
"source": "OTX",
"url": "https://otx.alienvault.com/pulse/{}".format(pulse["id"])
})
if "analysis" in res:
if "analysis" in res["analysis"]:
if "plugins" in res["analysis"]["analysis"]:
if "cuckoo" in res["analysis"]["analysis"]["plugins"]:
done = []
for d in res["analysis"]["analysis"]["plugins"]["cuckoo"]["result"]["network"]["domains"]:
data["network"].append({
"source": "OTX",
"url": "https://otx.alienvault.com/indicator/file/{}".format(query),
"host": d["domain"],
"host2": d["ip"]
})
done.append(d["ip"])
done.append(d["domain"])
for ip in res["analysis"]["analysis"]["plugins"]["cuckoo"]["result"]["network"]["hosts"]:
if ip["ip"] not in done:
data["network"].append({
"source": "OTX",
"url": "https://otx.alienvault.com/indicator/file/{}".format(query),
"host": d["domain"],
"host2": d["ip"]
"host": ip["ip"],
})
done.append(d["ip"])
done.append(d["domain"])
for ip in res["analysis"]["analysis"]["plugins"]["cuckoo"]["result"]["network"]["hosts"]:
if ip["ip"] not in done:
data["network"].append({
"source": "OTX",
"url": "https://otx.alienvault.com/indicator/file/{}".format(query),
"host": ip["ip"],
})

except AttributeError:
print("OTX crashed ¯\_(ツ)_/¯")
except AttributeError:
print("OTX crashed ¯\\_(ツ)_/¯")
18 changes: 18 additions & 0 deletions tests/command/test_otx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import pytest

from harpoon.commands.otx import CommandOtx
from ..utils import launch_plugin


class TestCommandOtx:
def test_search(self):
launch_plugin(
CommandOtx,
["otx", "-s", "37.49.230.155"]
)

def test_pulse(self):
launch_plugin(
CommandOtx,
["otx", "-p", "61b72e243e746d2994b3ba54"]
)

0 comments on commit 8cb8602

Please sign in to comment.