-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
0 parents
commit bd06ded
Showing
8 changed files
with
336 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
# Byte-compiled / optimized / DLL files | ||
__pycache__/ | ||
*.py[cod] | ||
*$py.class | ||
|
||
# C extensions | ||
*.so | ||
|
||
# Jupyter Notebook | ||
.ipynb_checkpoints/ | ||
|
||
# IPython | ||
profile_default/ | ||
ipython_config.py | ||
|
||
# pyenv | ||
.python-version | ||
|
||
# Environments | ||
.env | ||
.venv | ||
env/ | ||
venv/ | ||
ENV/ | ||
env.bak/ | ||
venv.bak/ | ||
|
||
# mypy | ||
.mypy_cache/ | ||
.dmypy.json | ||
dmypy.json | ||
|
||
# Pyre type checker | ||
.pyre/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
[pycodestyle] | ||
ignore = W504 | ||
max-line-length = 100 | ||
|
||
[mypy] | ||
show_error_context = True | ||
warn_redundant_casts = True | ||
warn_unused_ignores = True | ||
warn_return_any = True | ||
|
||
[mypy-tcpreq.*] | ||
disallow_any_explicit = True | ||
disallow_untyped_defs = True | ||
disallow_incomplete_defs = True | ||
disallow_untyped_decorators = True |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
if __name__ == "__main__": | ||
# TODO | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
from .segment import Segment | ||
from .options import end_of_options, noop as noop_option, MSSOption |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
import itertools | ||
|
||
|
||
# Checksum is calculated in little endian order and converted via int.to_bytes at the end | ||
def calc_checksum(src_addr: bytes, dst_addr: bytes, tcp_head: bytes, tcp_payload: bytes) -> bytes: | ||
# TCP Protocol Number: 6 | ||
acc = (6 << 8) # zero + PTCL/Next Header | ||
|
||
# IPv6 jumbograms are not supported! | ||
# This makes the IPv6 pseudo header calculation equal to the IPv4 case | ||
tcp_length = len(tcp_head) + len(tcp_payload) | ||
if tcp_length > 0xffff: | ||
raise ValueError("Segment too long") | ||
|
||
# Switch tcp_length bytes. On little endian systems this is necessary due to | ||
# switched byte representation, on big endian systems this is necessary | ||
# to convert the stored value to the little endian representation | ||
tcp_length = ((tcp_length << 8) | (tcp_length >> 8)) & 0xffff | ||
acc += tcp_length | ||
|
||
# acc_iter returns the next 2 consecutive bytes, padding with 0 | ||
byte_chain = itertools.chain(src_addr, dst_addr, tcp_head, tcp_payload) | ||
acc_iter = itertools.zip_longest(byte_chain, byte_chain, fillvalue=0) | ||
for high, low in acc_iter: | ||
acc += (low << 8) | high | ||
|
||
# Fold carry into acc | ||
carry = acc >> 16 | ||
while carry: | ||
acc = (acc & 0xffff) + carry | ||
carry = acc >> 16 | ||
|
||
# Calculate one's complement of acc | ||
acc = ~acc & 0xffff | ||
|
||
return acc.to_bytes(2, "little") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
from typing import TypeVar, Type, Sized, SupportsBytes, Dict, Union, Generator | ||
|
||
|
||
# Options are immutable | ||
class BaseOption(Sized, SupportsBytes): | ||
"""Common base class for all options.""" | ||
def __init__(self, data: bytes) -> None: | ||
self._raw = data | ||
|
||
def __len__(self) -> int: | ||
return len(self._raw) | ||
|
||
def __bytes__(self) -> bytes: | ||
return self._raw | ||
|
||
|
||
class _LegacyOption(BaseOption): | ||
"""Class for the two option kinds without a length octet.""" | ||
def __init__(self, kind: int) -> None: | ||
super(_LegacyOption, self).__init__(kind.to_bytes(1, "big")) | ||
|
||
# Contrary to SizedOption, _LegacyOption instances are singletons. | ||
# Therefore its from_bytes method is to be called directly on the instances. | ||
def from_bytes(self, data: bytearray) -> "_LegacyOption": | ||
if len(data) < 1: | ||
raise ValueError("Data too short") | ||
|
||
del data[0] | ||
return self | ||
|
||
|
||
end_of_options = _LegacyOption(0) | ||
noop = _LegacyOption(1) | ||
|
||
_T = TypeVar("_T", bound="SizedOption") | ||
|
||
|
||
class SizedOption(BaseOption): | ||
"""Base class for all option kinds with a length octet.""" | ||
def __init__(self, kind: int, payload: bytes) -> None: | ||
opt_head = kind.to_bytes(1, "big") | ||
opt_head += len(payload).to_bytes(1, "big") | ||
super(SizedOption, self).__init__(opt_head + payload) | ||
|
||
@classmethod | ||
def from_bytes(cls: Type[_T], data: bytearray) -> _T: | ||
if len(data) < 2: | ||
raise ValueError("Data too short") | ||
elif data[1] < 2 or data[1] > len(data): | ||
raise ValueError("Illegal option length") | ||
|
||
length = data[1] | ||
res = cls.__new__(cls) # type: _T | ||
res._raw = bytes(data[:length]) | ||
del data[:length] | ||
return res | ||
|
||
@property | ||
def size(self) -> int: | ||
return self._raw[1] | ||
|
||
@property | ||
def payload(self) -> bytes: | ||
return self._raw[2:] | ||
|
||
|
||
class MSSOption(SizedOption): | ||
def __init__(self, mss: int) -> None: | ||
super(MSSOption, self).__init__(2, mss.to_bytes(2, "big")) | ||
|
||
@classmethod | ||
def from_bytes(cls: Type[_T], data: bytearray) -> _T: | ||
res = super(MSSOption, cls).from_bytes(data) | ||
if res.size != 4: | ||
raise ValueError("Illegal option length") | ||
return res | ||
|
||
@property | ||
def mss(self) -> int: | ||
return int.from_bytes(self._raw[2:4], "big") | ||
|
||
|
||
# mypy currently doesn't support function attributes | ||
# See https://github.com/python/mypy/issues/2087 | ||
_PARSE_KIND_TBL: Dict[int, Union[_LegacyOption, Type[SizedOption]]] = { | ||
0: end_of_options, | ||
1: noop, | ||
2: MSSOption | ||
} | ||
|
||
|
||
def parse_options(data: bytearray) -> Generator[BaseOption, None, None]: | ||
"""Parse header options based on their kind. Default to SizedOption.""" | ||
while data: | ||
kind = data[0] | ||
opt = _PARSE_KIND_TBL.get(kind, SizedOption).from_bytes(data) | ||
yield opt | ||
|
||
if opt is end_of_options: | ||
return |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
from typing import Sized, SupportsBytes, ClassVar, Sequence, Tuple | ||
import struct | ||
import math | ||
|
||
from .options import BaseOption, parse_options | ||
from .checksum import calc_checksum | ||
|
||
|
||
# Segments are immutable | ||
class Segment(Sized, SupportsBytes): | ||
# src_port, dst_port, seq, ack_seq, data offset/reserved bits, flags, window, checksum, up | ||
_TCP_HEAD: ClassVar[struct.Struct] = struct.Struct(">HHIIBBHHH") | ||
assert _TCP_HEAD.size == 20 | ||
|
||
# src and dst are (addr: bytes, port: int) tuples. flags int takes precedence over bools. | ||
def __init__(self, src: Tuple[bytes, int], dst: Tuple[bytes, int], seq: int, window: int, | ||
ack_seq: int = 0, cwr: bool = False, ece: bool = False, urg: bool = False, | ||
ack: bool = False, psh: bool = False, rst: bool = False, syn: bool = False, | ||
fin: bool = False, flags: int = None, checksum: bytes = None, up: int = 0, | ||
options: Sequence[BaseOption] = (), payload: bytes = b'') -> None: | ||
opt_len = sum(len(o) for o in options) | ||
head_rows = 5 | ||
if opt_len: | ||
head_rows += math.ceil(opt_len / 4.0) | ||
|
||
doff_rsrvd = (head_rows << 4) # data offset + reserved bits (zeros) | ||
if flags is None: | ||
flags = ((cwr << 7) | (ece << 6) | (urg << 5) | (ack << 4) | | ||
(psh << 3) | (rst << 2) | (syn << 1) | fin) | ||
|
||
self._head_len = head_rows * 4 | ||
head = bytearray(self._head_len) | ||
try: | ||
self._TCP_HEAD.pack_into(head, 0, src[1], dst[1], seq, ack_seq, | ||
doff_rsrvd, flags, window, 0, up) | ||
except struct.error as e: | ||
raise OverflowError(str(e)) from e | ||
if opt_len: | ||
head[20:20 + opt_len] = b''.join(map(bytes, options)) | ||
|
||
if checksum is None: | ||
# Checksum field is explicitly set to zero in _TCP_HEAD.pack_into call | ||
checksum = calc_checksum(src[0], dst[0], head, payload) | ||
head[16:18] = checksum | ||
|
||
self._raw = bytes(head) + payload | ||
self._options = tuple(options) | ||
|
||
def make_reply(self, src_addr: bytes, dst_addr: bytes, window: int, seq: int = None, | ||
ack_seq: int = None, cwr: bool = False, ece: bool = False, urg: bool = False, | ||
ack: bool = False, psh: bool = False, rst: bool = False, syn: bool = False, | ||
fin: bool = False, flags: int = None, checksum: bytes = None, up: int = 0, | ||
options: Sequence[BaseOption] = (), payload: bytes = b'') -> "Segment": | ||
if seq is None: | ||
if self.flags & 0x10: | ||
seq = self.ack_seq | ||
else: | ||
raise ValueError("SEQ not given and ACK not present on this segment") | ||
|
||
if ack_seq is None: | ||
syn_fin = self.flags & 0x03 | ||
syn_fin = (syn_fin >> 1) + (syn_fin & 0x01) | ||
payload_len = len(self) - self._head_len | ||
ack_seq = (self.seq + payload_len + syn_fin) % 0x1_0000_0000 # == 2^32 | ||
|
||
src = (src_addr, self.dst_port) | ||
dst = (dst_addr, self.src_port) | ||
return Segment(src, dst, seq, window, ack_seq, cwr, ece, urg, ack, | ||
psh, rst, syn, fin, flags, checksum, up, options, payload) | ||
|
||
@classmethod | ||
def from_bytes(cls, src_addr: bytes, dst_addr: bytes, data: bytearray) -> "Segment": | ||
dlen = len(data) | ||
if dlen < 20: | ||
raise ValueError("Data too short") | ||
|
||
head_len = (data[12] >> 2) & 0b00111100 # == (data[12] >> 4) * 4 | ||
if dlen < head_len: | ||
raise ValueError("Illegal data offset") | ||
|
||
check_old = data[16:18] | ||
data[16:18] = b"\x00\x00" | ||
|
||
# calc_checksum doesn't differentiate between TCP header and payload | ||
checksum = calc_checksum(src_addr, dst_addr, b'', data) | ||
data[16:18] = check_old | ||
if checksum != check_old: | ||
raise ValueError("Checksum mismatch") | ||
|
||
# parse_options consumes used data so it must be copied here | ||
opt_data = bytearray(data[20:head_len]) | ||
options = tuple(parse_options(opt_data)) | ||
if any(b != 0 for b in opt_data): | ||
raise ValueError("Illegal end-of-header padding") | ||
|
||
res = cls.__new__(cls) # type: Segment | ||
res._head_len = head_len | ||
res._raw = bytes(data) | ||
res._options = options | ||
return res | ||
|
||
def __len__(self) -> int: | ||
return len(self._raw) | ||
|
||
def __bytes__(self) -> bytes: | ||
return self._raw | ||
|
||
@property | ||
def src_port(self) -> int: | ||
return int.from_bytes(self._raw[0:2], "big") | ||
|
||
@property | ||
def dst_port(self) -> int: | ||
return int.from_bytes(self._raw[2:4], "big") | ||
|
||
@property | ||
def seq(self) -> int: | ||
return int.from_bytes(self._raw[4:8], "big") | ||
|
||
@property | ||
def ack_seq(self) -> int: | ||
return int.from_bytes(self._raw[8:12], "big") | ||
|
||
@property | ||
def flags(self) -> int: | ||
return self._raw[13] | ||
|
||
@property | ||
def window(self) -> int: | ||
return int.from_bytes(self._raw[14:16], "big") | ||
|
||
@property | ||
def checksum(self) -> bytes: | ||
return self._raw[16:18] | ||
|
||
@property | ||
def up(self) -> int: | ||
return int.from_bytes(self._raw[18:20], "big") | ||
|
||
@property | ||
def options(self) -> Tuple[BaseOption, ...]: | ||
return self._options | ||
|
||
@property | ||
def payload(self) -> bytes: | ||
return self._raw[self._head_len:] |