-
Notifications
You must be signed in to change notification settings - Fork 0
/
pcap_scrub.py
executable file
·168 lines (142 loc) · 7.35 KB
/
pcap_scrub.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#!/usr/bin/env python
#---------------------
# file - pcap_scrub.py
# --------------------
import argparse
import os
import dpkt
import time
from dpkt.ip import IP, IP_PROTO_UDP
from dpkt.udp import UDP
import datetime
import socket
from os import listdir
from os.path import isfile, join
def mac_addr(address):
"""Convert a MAC address to a readable/printable string
Args:
address (str): a MAC address in hex form (e.g. '\x01\x02\x03\x04\x05\x06')
Returns:
str: Printable/readable MAC address
"""
return ':'.join('%02x' % compat_ord(b) for b in address)
def inet_to_str(inet):
"""Convert inet object to a string
Args:
inet (inet struct): inet network address
Returns:
str: Printable/readable IP address
"""
# First try ipv4 and then ipv6
try:
return socket.inet_ntop(socket.AF_INET, inet)
except ValueError:
return socket.inet_ntop(socket.AF_INET6, inet)
def parse_arguments( ):
parser = argparse.ArgumentParser( description='Process a PCAP file for anonymization' )
parser.add_argument( "-p", "--port", type=str, help="specify application layer port you wish to remove")
parser.add_argument( "-P", "--application-protocol", type=str, help="specify application layer protocol you wish to remove")
parser.add_argument( "-f", "--flow", type=str, help="specify a source,destination pair of IP addresses to scrub" )
parser.add_argument( "-s", "--srcport", type=int, help="specify the source port of the application layer protocol you wish to remove")
parser.add_argument( "-S", "--source", type=str, help="specify a source addresss to drop")
parser.add_argument( "-O", "--output", type=str, help="specify a destination folder for PCAP")
parser.add_argument( "-D", "--dest", type=str, help="specify a destination addresss to drop")
parser.add_argument( "-d", "--destport", type=int, help="specify the destination port of the application layer protocol you wish to remove")
parser.add_argument( 'target', metavar='pcap', type=str, help='the actual pcap file you wish to process')
return parser.parse_args( )
def process_single_pcap( arguments, filename ):
directory = os.path.dirname( filename )
base_pcap = os.path.basename( filename )
(name,ext) = os.path.splitext( base_pcap )
if "-" in name:
name = name.split("-")[0]
if arguments.output and os.path.isdir(arguments.output):
print( "saving cleaned PCAP file to {}".format(arguments.output) )
cleaned_pcap = os.path.join( arguments.output,name+".cleaned." + str(time.time()) + ".pcap" )
else:
cleaned_pcap = os.path.join( directory,name+".cleaned." + str(time.time()) + ".pcap" )
# https://programtalk.com/python-examples/dpkt.pcap.Writer/
writer = open( cleaned_pcap, "wb" )
pcap_writer = dpkt.pcap.Writer( writer )
if arguments.application_protocol:
arguments.application_protocol = arguments.application_protocol.split(",")
if arguments.port:
arguments.port = arguments.port.split(",")
with open( filename, "rb" ) as file_object:
pcap_reader = dpkt.pcap.Reader( file_object )
for ts, buf in pcap_reader:
ethernet_layer = dpkt.ethernet.Ethernet(buf)
# Make sure the Ethernet frame contains an IP packet
if not isinstance(ethernet_layer.data, dpkt.ip.IP):
if arguments.application_protocol and "cdp" in arguments.application_protocol:
if ( ethernet_layer.data.__class__.__name__.lower() == "cdp" ):
print( "found a CDP frame to remove" )
continue
else:
ip_layer = ethernet_layer.data
application_layer = ip_layer.data
if type(ip_layer.data) == UDP:
is_udp = True
is_tcp = False
else:
is_udp = False
is_tcp = True
try:
source_port = application_layer.sport
except:
source_port = 0
try:
dest_port = application_layer.dport
except:
dest_port = 0
#if dest_port == 80 and len(application_layer.data) > 0:
# try:
# http_layer = dpkt.http.Request(application_layer.data)
# except:
# pass
if arguments.application_protocol and len(arguments.application_protocol)>0:
if is_udp and source_port == 138 and dest_port == 138:
if "browser" in arguments.application_protocol:
print("found a BROWSER protocol frame to remove")
continue
elif is_tcp and source_port == 110 or dest_port == 110:
if "pop" in arguments.application_protocol:
print("found a POP protocol frame to remove")
continue
elif is_tcp and source_port == 389 or dest_port == 389:
if "ldap" in arguments.application_protocol:
print("found an LDAP protocol frame to remove")
continue
if arguments.port:
if source_port in arguments.port or dest_port in arguments.port:
print( "omitting a frame based on the port specifier list {}".format(arguments.port) )
continue
if arguments.flow:
pair_of_ips = arguments.flow.split(",")
source_ip = inet_to_str(ip_layer.src)
dest_ip = inet_to_str(ip_layer.dst)
if source_ip in pair_of_ips and dest_ip in pair_of_ips:
print( "found matching flow to drop" )
continue
if arguments.source and arguments.source == inet_to_str(ip_layer.src):
print( "found a packet with matching source {} to drop".format(inet_to_str(ip_layer.src)) )
continue
if arguments.dest and arguments.dest == inet_to_str(ip_layer.dst):
print( "found a packet with matching destination {} to drop".format(inet_to_str(ip_layer.dst)) )
continue
# https://stackoverflow.com/questions/55765722/how-do-i-use-the-timestamp-from-the-header-of-a-live-capture-in-dpkt-writer
pcap_writer.writepkt( buf,ts )
writer.flush()
def process_pcap( arguments ):
if os.path.isfile( arguments.target ):
print( "processing a single file '{}'".format(arguments.target) )
process_single_pcap( arguments, arguments.target )
elif os.path.isdir( arguments.target ):
print( "processing entire directory of PCAP {}".format(arguments.target) )
onlyfiles = [f for f in listdir(arguments.target) if isfile(join(arguments.target, f))]
for f in onlyfiles:
print( "processing PCAP file {}".format(f) )
process_single_pcap( arguments, f )
if __name__ == "__main__":
arguments = parse_arguments( )
process_pcap( arguments )