Skip to content

Commit

Permalink
Implemented network blacklisting in target filters
Browse files Browse the repository at this point in the history
  • Loading branch information
TheJokr committed Apr 3, 2020
1 parent b2332d4 commit 6442351
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 15 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pytricia~=1.0.0
40 changes: 25 additions & 15 deletions tcpreq/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
import time
import random
import itertools
from ipaddress import IPv4Address, IPv6Address
from ipaddress import IPv4Address, IPv6Address, IPv4Network, IPv6Network
import socket
import asyncio

from pytricia import PyTricia

from .types import AnyIPAddress
from .opts import parser
from .limiter import TokenBucket
Expand Down Expand Up @@ -73,21 +75,29 @@ def main() -> None:
ipv6_src = next((a for a in addrs if isinstance(a, IPv6Address)), None)
del addrs

# Create blacklist patricia trees from multiple sources
ipv4_bl: Optional["PyTricia[bool]"] = None if ipv4_src is None else PyTricia(32, socket.AF_INET)
ipv6_bl: Optional["PyTricia[bool]"] = None if ipv6_src is None else PyTricia(128, socket.AF_INET6)
for net in itertools.chain.from_iterable(args.blacklist):
if isinstance(net, IPv4Network) and ipv4_bl is not None and net not in ipv4_bl:
ipv4_bl[net] = True
elif ipv6_bl is not None and isinstance(net, IPv6Network) and net not in ipv6_bl:
ipv6_bl[net] = True

# Aggregate targets from multiple sources
tgt_set: Set[Tuple[AnyIPAddress, int]] = set(args.target)
tgt_set.update(itertools.chain.from_iterable(args.nmap))
tgt_set.update(itertools.chain.from_iterable(args.zmap))

# Filter targets by IP version
ipv4_tgts: List[Tuple[IPv4Address, int]] = []
ipv6_tgts: List[Tuple[IPv6Address, int]] = []
for tgt in tgt_set:
if isinstance(tgt[0], IPv4Address) and ipv4_src is not None:
ipv4_tgts.append(tgt) # type: ignore
elif ipv6_src is not None and isinstance(tgt[0], IPv6Address):
ipv6_tgts.append(tgt) # type: ignore

del tgt_set
# Filter targets by IP version and blacklist
ipv4_set: Set[Tuple[IPv4Address, int]] = set()
ipv6_set: Set[Tuple[IPv6Address, int]] = set()
for tgt in itertools.chain(args.target, itertools.chain.from_iterable(args.nmap),
itertools.chain.from_iterable(args.zmap)):
if isinstance(tgt[0], IPv4Address) and ipv4_bl is not None and tgt[0] not in ipv4_bl:
ipv4_set.add(tgt)
elif ipv6_bl is not None and isinstance(tgt[0], IPv6Address) and tgt[0] not in ipv6_bl:
ipv6_set.add(tgt)

ipv4_tgts = list(ipv4_set)
ipv6_tgts = list(ipv6_set)
del ipv4_bl, ipv6_bl, ipv4_set, ipv6_set
if not ipv4_tgts and not ipv6_tgts:
parser.print_usage()
print(parser.prog + ": error: at least one valid target is required")
Expand Down

0 comments on commit 6442351

Please sign in to comment.