Skip to content

Commit

Permalink
添加功能,将流量数据集加到mysql中
Browse files Browse the repository at this point in the history
  • Loading branch information
ZGC-BUPT-aimafan committed Dec 5, 2024
1 parent 4968f92 commit 65e19fb
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 5 deletions.
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pip install pypcaptools

## Quick Start

1. 分流
```python
from pypcaptools import PcapHandler

Expand All @@ -24,4 +25,23 @@ session_num, output_path = ph.split_flow(output_dir, tcp_from_first_packet=False

# 分流之后以json格式输出,输出一个json文件,其中每一个单元表示一条流,TCP流必须从握手阶段开始,从中途开始的TCP流会被丢弃
session_num, output_path = ph.split_flow(output_dir, tcp_from_first_packet=True, output_type="json")
```

2. 将流量分流并加入到mysql数据库中
```python
from pypcaptools import PcapToDatabaseHandler
db_config = {
"host": "",
"port": 3306,
"user": "root",
"password": "password",
"database": "traffic",
"table": "table",
}

# 参数依次为 处理的pcap路径、mysql配置、应用层协议类型、访问网站/行为、采集机器、table注释
handler = PcapToDatabaseHandler(
"test.pcap", db_config, "https", "github.com", "vultr10", "测试用数据集"
)
handler.split_flow_to_database()
```
7 changes: 2 additions & 5 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@

setup(
name="pypcaptools",
version="1.2",
version="1.3",
packages=find_packages(where="src"),
package_dir={"": "src"},
install_requires=[
"dpkt==1.9.8",
"scapy==2.6.0",
],
install_requires=["dpkt==1.9.8", "scapy==2.6.0", "mysql-connector-python==9.1.0"],
entry_points={
"console_scripts": [
"pypcaptools.split_flow=pypcaptools.splitter:split_flow",
Expand Down
1 change: 1 addition & 0 deletions src/pypcaptools/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .pcaphandler import PcapHandler
from .pcaptodatabasehandler import PcapToDatabaseHandler
80 changes: 80 additions & 0 deletions src/pypcaptools/mysql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import mysql.connector


class TrafficDB:
def __init__(self, host, port, user, password, database, table, comment):
self.host = host
self.user = user
self.password = password
self.database = database
self.port = port
self.table = table
self.conn = None
self.cursor = None
self.comment = comment

def connect(self):
try:
self.conn = mysql.connector.connect(
host=self.host,
user=self.user,
password=self.password,
port=self.port,
connection_timeout=300,
)
self.cursor = self.conn.cursor()
self.create_database()
self.create_table()
print("已经成功连接到mysql服务器")
except mysql.connector.Error as error:
raise mysql.connector.Error(f"Error connecting to MySQL database: {error}")

def create_database(self):
self.cursor.execute(f"CREATE DATABASE IF NOT EXISTS {self.database}")
self.cursor.execute(f"USE {self.database}")

def create_table(self):
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS {self.table} (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
entry_time DATETIME NOT NULL COMMENT '入库时间',
capture_time DATETIME COMMENT '采集时间',
source_ip VARCHAR(45) NOT NULL COMMENT '源IP地址',
destination_ip VARCHAR(45) NOT NULL COMMENT '目的IP地址',
source_port SMALLINT UNSIGNED NOT NULL COMMENT '源端口',
destination_port SMALLINT UNSIGNED NOT NULL COMMENT '目的端口',
timestamp MEDIUMBLOB COMMENT '时间戳(绝对)',
payload MEDIUMBLOB NOT NULL COMMENT 'payload长度+方向',
protocol VARCHAR(30) COMMENT '协议(HTTPs、Vmess、Tor、Obfs4等)',
transport_protocol ENUM('TCP', 'UDP') COMMENT '传输层协议',
accessed_website VARCHAR(255) COMMENT '访问网站域名/应用',
packet_length INT UNSIGNED COMMENT '包长度',
packet_length_no_payload INT UNSIGNED COMMENT '去除payload为0的包长度',
collection_machine VARCHAR(255) COMMENT '采集机器',
pcap_path VARCHAR(255) COMMENT '原始pcap路径',
UNIQUE (source_ip, destination_ip, source_port, destination_port, pcap_path, protocol, capture_time)
) COMMENT = '{self.comment}';
"""
self.cursor.execute(create_table_sql)
self.conn.commit()

def close(self):
if self.conn:
self.conn.close()
print("MySQL connection closed")

def add_traffic(self, traffic_dic):
# 构建插入语句
# + 记录首次发现时间
columns = ", ".join(traffic_dic.keys())
placeholders = ", ".join(["%s"] * len(traffic_dic))
insert_sql = f"""
INSERT IGNORE INTO {self.table} ({columns})
VALUES ({placeholders});
"""
values = tuple(traffic_dic.values())
self.cursor.execute(insert_sql, values)
self.conn.commit()

def __del__(self):
self.close()
1 change: 1 addition & 0 deletions src/pypcaptools/pcaphandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ def split_flow(
tcp_from_first_packet=False,
output_type="pcap",
):
# TODO: 加入并行
"""
output_dir: 分流之后存储的路径
min_pcaket_num: 流中最少有多少个数据包, 默认为0
Expand Down
92 changes: 92 additions & 0 deletions src/pypcaptools/pcaptodatabasehandler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import pickle
from datetime import datetime

from pypcaptools.mysql import TrafficDB
from pypcaptools.pcaphandler import PcapHandler


def serialization(src_data):
return pickle.dumps(src_data)


class PcapToDatabaseHandler(PcapHandler):
def __init__(
self,
db_config: dict,
input_pcap_file,
protocol,
accessed_website,
collection_machine="",
comment="",
):
# db_config = {"host": ,"port": ,"user": ,"password": , "database": ,"table": }
# input_pcap_file:处理的pcap路径
# protocol:协议类型(应用层协议)
# accessed_website:访问的网站/应用
# collection_machine:用于收集的机器
super().__init__(input_pcap_file)
self.db_config = db_config
self.protocol = protocol
self.accessed_website = accessed_website
self.collection_machine = collection_machine
self.pcap_path = input_pcap_file
self.comment = comment

def _save_to_database(self, tcpstream, min_packet_num, comment):
host = self.db_config["host"]
user = self.db_config["user"]
port = self.db_config["port"]
password = self.db_config["password"]
database = self.db_config["database"]
table = self.db_config["table"]

traffic = TrafficDB(host, port, user, password, database, table, comment)
traffic.connect()

for stream in tcpstream:
if len(tcpstream[stream]) <= min_packet_num:
continue
traffic_dic = {}
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
traffic_dic["entry_time"] = now
first_time = tcpstream[stream][0][0]
traffic_dic["capture_time"] = datetime.fromtimestamp(first_time).strftime(
"%Y-%m-%d %H:%M:%S"
)
(
traffic_dic["source_ip"],
traffic_dic["source_port"],
traffic_dic["destination_ip"],
traffic_dic["destination_port"],
traffic_dic["transport_protocol"],
) = stream.split("_")

# 初始化两个列表
relative_timestamps = []
payload_list = []
for packet in tcpstream[stream]:
time, payload, packet_num = packet
relative_time = time - first_time
relative_timestamps.append(f"{relative_time:.6f}")
payload_list.append(payload)
traffic_dic["timestamp"] = serialization(relative_timestamps)
traffic_dic["payload"] = serialization(payload_list)
traffic_dic["protocol"] = self.protocol
traffic_dic["accessed_website"] = self.accessed_website
traffic_dic["packet_length"] = len(payload_list)
traffic_dic["packet_length_no_payload"] = len(
[item for item in payload_list if item != "+0" and item != "-0"]
)
traffic_dic["collection_machine"] = self.collection_machine
traffic_dic["pcap_path"] = self.pcap_path

traffic.add_traffic(traffic_dic)

def split_flow_to_database(self, min_packet_num=3, tcp_from_first_packet=False):
# comment:介绍一下这个table
tcpstream = self._process_pcap_file(self.input_pcap_file, tcp_from_first_packet)
self._save_to_database(tcpstream, min_packet_num, self.comment)


if __name__ == "__main__":
pass

0 comments on commit 65e19fb

Please sign in to comment.