Skip to content

Commit

Permalink
Adding support for UDP based protocols.
Browse files Browse the repository at this point in the history
Refactored the decoder logic into its own package. Changed tcp, udp, and decoder packages to allow their dependencies to be injected for testability.
  • Loading branch information
andrewkroh committed Aug 12, 2015
1 parent ff15c7d commit 6fc4326
Show file tree
Hide file tree
Showing 10 changed files with 960 additions and 225 deletions.
138 changes: 138 additions & 0 deletions decoder/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package decoder

import (
"fmt"

"github.com/elastic/libbeat/logp"
"github.com/elastic/packetbeat/protos"
"github.com/elastic/packetbeat/protos/tcp"
"github.com/elastic/packetbeat/protos/udp"

"github.com/tsg/gopacket"
"github.com/tsg/gopacket/layers"
)

type DecoderStruct struct {
Parser *gopacket.DecodingLayerParser

sll layers.LinuxSLL
d1q layers.Dot1Q
lo layers.Loopback
eth layers.Ethernet
ip4 layers.IPv4
ip6 layers.IPv6
tcp layers.TCP
udp layers.UDP
payload gopacket.Payload
decoded []gopacket.LayerType

tcpProc tcp.Processor
udpProc udp.Processor
}

// Creates and returns a new DecoderStruct.
func NewDecoder(datalink layers.LinkType, tcp tcp.Processor, udp udp.Processor) (*DecoderStruct, error) {
d := DecoderStruct{tcpProc: tcp, udpProc: udp}

logp.Debug("pcapread", "Layer type: %s", datalink.String())

switch datalink {

case layers.LinkTypeLinuxSLL:
d.Parser = gopacket.NewDecodingLayerParser(
layers.LayerTypeLinuxSLL,
&d.sll, &d.d1q, &d.ip4, &d.ip6, &d.tcp, &d.udp, &d.payload)

case layers.LinkTypeEthernet:
d.Parser = gopacket.NewDecodingLayerParser(
layers.LayerTypeEthernet,
&d.eth, &d.d1q, &d.ip4, &d.ip6, &d.tcp, &d.udp, &d.payload)

case layers.LinkTypeNull: // loopback on OSx
d.Parser = gopacket.NewDecodingLayerParser(
layers.LayerTypeLoopback,
&d.lo, &d.d1q, &d.ip4, &d.ip6, &d.tcp, &d.udp, &d.payload)

default:
return nil, fmt.Errorf("Unsuported link type: %s", datalink.String())

}

d.decoded = []gopacket.LayerType{}

return &d, nil
}

func (decoder *DecoderStruct) DecodePacketData(data []byte, ci *gopacket.CaptureInfo) {

var err error
var packet protos.Packet

err = decoder.Parser.DecodeLayers(data, &decoder.decoded)
if err != nil {
// Ignore UnsupportedLayerType errors that can occur while parsing
// UDP packets.
lastLayer := decoder.decoded[len(decoder.decoded)-1]
_, unsupported := err.(gopacket.UnsupportedLayerType)
if !(unsupported && lastLayer == layers.LayerTypeUDP) {
logp.Debug("pcapread", "Decoding error: %s", err)
return
}
}

has_tcp := false
has_udp := false

for _, layerType := range decoder.decoded {
switch layerType {
case layers.LayerTypeIPv4:
logp.Debug("ip", "IPv4 packet")

packet.Tuple.Src_ip = decoder.ip4.SrcIP
packet.Tuple.Dst_ip = decoder.ip4.DstIP
packet.Tuple.Ip_length = 4

case layers.LayerTypeIPv6:
logp.Debug("ip", "IPv6 packet")

packet.Tuple.Src_ip = decoder.ip6.SrcIP
packet.Tuple.Dst_ip = decoder.ip6.DstIP
packet.Tuple.Ip_length = 16

case layers.LayerTypeTCP:
logp.Debug("ip", "TCP packet")

packet.Tuple.Src_port = uint16(decoder.tcp.SrcPort)
packet.Tuple.Dst_port = uint16(decoder.tcp.DstPort)

has_tcp = true

case layers.LayerTypeUDP:
logp.Debug("ip", "UDP packet")

packet.Tuple.Src_port = uint16(decoder.udp.SrcPort)
packet.Tuple.Dst_port = uint16(decoder.udp.DstPort)
packet.Payload = decoder.udp.Payload

has_udp = true

case gopacket.LayerTypePayload:
packet.Payload = decoder.payload
}
}

packet.Ts = ci.Timestamp
packet.Tuple.ComputeHashebles()

if has_udp {
decoder.udpProc.Process(&packet)
} else if has_tcp {
if len(packet.Payload) == 0 && !decoder.tcp.FIN {
// We have no use for this atm.
logp.Debug("pcapread", "Ignore empty non-FIN packet")
return
}

decoder.tcpProc.Process(&decoder.tcp, &packet)
}
}
166 changes: 166 additions & 0 deletions decoder/decoder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package decoder

import (
"strings"
"testing"

"github.com/elastic/packetbeat/protos"

"github.com/stretchr/testify/assert"
"github.com/tsg/gopacket"
"github.com/tsg/gopacket/layers"
)

type TestTcpProcessor struct {
tcphdr *layers.TCP
pkt *protos.Packet
}

func (l *TestTcpProcessor) Process(tcphdr *layers.TCP, pkt *protos.Packet) {
l.tcphdr = tcphdr
l.pkt = pkt
}

type TestUdpProcessor struct {
pkt *protos.Packet
}

func (l *TestUdpProcessor) Process(pkt *protos.Packet) {
l.pkt = pkt
}

// 172.16.16.164:1108 172.16.16.139:53 DNS 87 Standard query 0x0007 AXFR contoso.local
var ipv4TcpDns = []byte{
0x00, 0x0c, 0x29, 0xce, 0xd1, 0x9e, 0x00, 0x0c, 0x29, 0x7e, 0xec, 0xa4, 0x08, 0x00, 0x45, 0x00,
0x00, 0x49, 0x46, 0x54, 0x40, 0x00, 0x80, 0x06, 0x3b, 0x0b, 0xac, 0x10, 0x10, 0xa4, 0xac, 0x10,
0x10, 0x8b, 0x04, 0x54, 0x00, 0x35, 0x5d, 0x9f, 0x0c, 0x90, 0x1a, 0xef, 0x6f, 0x43, 0x50, 0x18,
0xfa, 0xf0, 0xbc, 0x3d, 0x00, 0x00, 0x00, 0x07, 0x01, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x6f, 0x73, 0x6f, 0x05, 0x6c, 0x6f, 0x63, 0x61, 0x6c,
0x00, 0x00, 0xfc, 0x00, 0x01, 0x4d, 0x53,
}

// Test that DecodePacket decodes and IPv4/TCP packet and invokes the TCP processor.
func TestDecodePacketData_ipv4Tcp(t *testing.T) {
p := gopacket.NewPacket(ipv4TcpDns, layers.LinkTypeEthernet, gopacket.Default)
if p.ErrorLayer() != nil {
t.Error("Failed to decode packet:", p.ErrorLayer().Error())
}
d, tcp, _ := newTestDecoder(t)
d.DecodePacketData(p.Data(), &p.Metadata().CaptureInfo)

assert.NotNil(t, tcp.pkt, "TCP packet not received")
assert.Equal(t, "172.16.16.164", tcp.pkt.Tuple.Src_ip.String())
assert.Equal(t, uint16(1108), tcp.pkt.Tuple.Src_port)
assert.Equal(t, "172.16.16.139", tcp.pkt.Tuple.Dst_ip.String())
assert.Equal(t, uint16(53), tcp.pkt.Tuple.Dst_port)
assert.NotEqual(t, -1, strings.Index(string(p.Data()), string(tcp.pkt.Payload)))
}

// 192.168.170.8:32795 192.168.170.20:53 DNS 74 Standard query 0x75c0 A www.netbsd.org
var ipv4UdpDns = []byte{
0x00, 0xc0, 0x9f, 0x32, 0x41, 0x8c, 0x00, 0xe0, 0x18, 0xb1, 0x0c, 0xad, 0x08, 0x00, 0x45, 0x00,
0x00, 0x3c, 0x00, 0x00, 0x40, 0x00, 0x40, 0x11, 0x65, 0x43, 0xc0, 0xa8, 0xaa, 0x08, 0xc0, 0xa8,
0xaa, 0x14, 0x80, 0x1b, 0x00, 0x35, 0x00, 0x28, 0xaf, 0x61, 0x75, 0xc0, 0x01, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x77, 0x77, 0x77, 0x06, 0x6e, 0x65, 0x74, 0x62, 0x73,
0x64, 0x03, 0x6f, 0x72, 0x67, 0x00, 0x00, 0x01, 0x00, 0x01,
}

// Test that DecodePacket decodes and IPv4/UDP packet and invokes the UDP processor.
func TestDecodePacketData_ipv4Udp(t *testing.T) {
p := gopacket.NewPacket(ipv4UdpDns, layers.LinkTypeEthernet, gopacket.Default)
if p.ErrorLayer() != nil {
t.Error("Failed to decode packet:", p.ErrorLayer().Error())
}
d, _, udp := newTestDecoder(t)
d.DecodePacketData(p.Data(), &p.Metadata().CaptureInfo)

assert.NotNil(t, udp.pkt, "UDP packet not received")
assert.Equal(t, "192.168.170.8", udp.pkt.Tuple.Src_ip.String())
assert.Equal(t, uint16(32795), udp.pkt.Tuple.Src_port)
assert.Equal(t, "192.168.170.20", udp.pkt.Tuple.Dst_ip.String())
assert.Equal(t, uint16(53), udp.pkt.Tuple.Dst_port)
assert.NotEqual(t, -1, strings.Index(string(p.Data()), string(udp.pkt.Payload)))
}

// IP6 2001:6f8:102d::2d0:9ff:fee3:e8de.59201 > 2001:6f8:900:7c0::2.80
var ipv6TcpHttpGet = []byte{
0x00, 0x11, 0x25, 0x82, 0x95, 0xb5, 0x00, 0xd0, 0x09, 0xe3, 0xe8, 0xde, 0x86, 0xdd, 0x60, 0x00,
0x00, 0x00, 0x01, 0x04, 0x06, 0x40, 0x20, 0x01, 0x06, 0xf8, 0x10, 0x2d, 0x00, 0x00, 0x02, 0xd0,
0x09, 0xff, 0xfe, 0xe3, 0xe8, 0xde, 0x20, 0x01, 0x06, 0xf8, 0x09, 0x00, 0x07, 0xc0, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xe7, 0x41, 0x00, 0x50, 0xab, 0xdc, 0xd6, 0x61, 0x01, 0x4a,
0x73, 0x9f, 0x50, 0x18, 0x16, 0x80, 0xf4, 0x48, 0x00, 0x00, 0x47, 0x45, 0x54, 0x20, 0x2f, 0x20,
0x48, 0x54, 0x54, 0x50, 0x2f, 0x31, 0x2e, 0x30, 0x0d, 0x0a, 0x48, 0x6f, 0x73, 0x74, 0x3a, 0x20,
0x63, 0x6c, 0x2d, 0x31, 0x39, 0x38, 0x35, 0x2e, 0x68, 0x61, 0x6d, 0x2d, 0x30, 0x31, 0x2e, 0x64,
0x65, 0x2e, 0x73, 0x69, 0x78, 0x78, 0x73, 0x2e, 0x6e, 0x65, 0x74, 0x0d, 0x0a, 0x41, 0x63, 0x63,
0x65, 0x70, 0x74, 0x3a, 0x20, 0x74, 0x65, 0x78, 0x74, 0x2f, 0x68, 0x74, 0x6d, 0x6c, 0x2c, 0x20,
0x74, 0x65, 0x78, 0x74, 0x2f, 0x70, 0x6c, 0x61, 0x69, 0x6e, 0x2c, 0x20, 0x74, 0x65, 0x78, 0x74,
0x2f, 0x63, 0x73, 0x73, 0x2c, 0x20, 0x74, 0x65, 0x78, 0x74, 0x2f, 0x73, 0x67, 0x6d, 0x6c, 0x2c,
0x20, 0x2a, 0x2f, 0x2a, 0x3b, 0x71, 0x3d, 0x30, 0x2e, 0x30, 0x31, 0x0d, 0x0a, 0x41, 0x63, 0x63,
0x65, 0x70, 0x74, 0x2d, 0x45, 0x6e, 0x63, 0x6f, 0x64, 0x69, 0x6e, 0x67, 0x3a, 0x20, 0x67, 0x7a,
0x69, 0x70, 0x2c, 0x20, 0x62, 0x7a, 0x69, 0x70, 0x32, 0x0d, 0x0a, 0x41, 0x63, 0x63, 0x65, 0x70,
0x74, 0x2d, 0x4c, 0x61, 0x6e, 0x67, 0x75, 0x61, 0x67, 0x65, 0x3a, 0x20, 0x65, 0x6e, 0x0d, 0x0a,
0x55, 0x73, 0x65, 0x72, 0x2d, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x3a, 0x20, 0x4c, 0x79, 0x6e, 0x78,
0x2f, 0x32, 0x2e, 0x38, 0x2e, 0x36, 0x72, 0x65, 0x6c, 0x2e, 0x32, 0x20, 0x6c, 0x69, 0x62, 0x77,
0x77, 0x77, 0x2d, 0x46, 0x4d, 0x2f, 0x32, 0x2e, 0x31, 0x34, 0x20, 0x53, 0x53, 0x4c, 0x2d, 0x4d,
0x4d, 0x2f, 0x31, 0x2e, 0x34, 0x2e, 0x31, 0x20, 0x4f, 0x70, 0x65, 0x6e, 0x53, 0x53, 0x4c, 0x2f,
0x30, 0x2e, 0x39, 0x2e, 0x38, 0x62, 0x0d, 0x0a, 0x0d, 0x0a,
}

// Test that DecodePacket decodes and IPv6/TCP packet and invokes the TCP processor.
func TestDecodePacketData_ipv6Tcp(t *testing.T) {
p := gopacket.NewPacket(ipv6TcpHttpGet, layers.LinkTypeEthernet, gopacket.Default)
if p.ErrorLayer() != nil {
t.Error("Failed to decode packet: ", p.ErrorLayer().Error())
}
d, tcp, _ := newTestDecoder(t)
d.DecodePacketData(p.Data(), &p.Metadata().CaptureInfo)

assert.NotNil(t, tcp.pkt, "TCP packet not received")
assert.Equal(t, "2001:6f8:102d:0:2d0:9ff:fee3:e8de", tcp.pkt.Tuple.Src_ip.String())
assert.Equal(t, uint16(59201), tcp.pkt.Tuple.Src_port)
assert.Equal(t, "2001:6f8:900:7c0::2", tcp.pkt.Tuple.Dst_ip.String())
assert.Equal(t, uint16(80), tcp.pkt.Tuple.Dst_port)
assert.NotEqual(t, -1, strings.Index(string(p.Data()), string(tcp.pkt.Payload)))
}

// 3ffe:507:0:1:200:86ff:fe05:80da.2415 > 3ffe:501:4819::42.53
var ipv6UdpDns = []byte{
0x00, 0x60, 0x97, 0x07, 0x69, 0xea, 0x00, 0x00, 0x86, 0x05, 0x80, 0xda, 0x86, 0xdd, 0x60, 0x00,
0x00, 0x00, 0x00, 0x61, 0x11, 0x40, 0x3f, 0xfe, 0x05, 0x07, 0x00, 0x00, 0x00, 0x01, 0x02, 0x00,
0x86, 0xff, 0xfe, 0x05, 0x80, 0xda, 0x3f, 0xfe, 0x05, 0x01, 0x48, 0x19, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x42, 0x09, 0x6f, 0x00, 0x35, 0x00, 0x61, 0xa3, 0x35, 0x5c, 0x78,
0x01, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x61, 0x01, 0x65, 0x01, 0x39,
0x01, 0x36, 0x01, 0x37, 0x01, 0x30, 0x01, 0x65, 0x01, 0x66, 0x01, 0x66, 0x01, 0x66, 0x01, 0x37,
0x01, 0x39, 0x01, 0x30, 0x01, 0x36, 0x01, 0x32, 0x01, 0x30, 0x01, 0x31, 0x01, 0x30, 0x01, 0x30,
0x01, 0x30, 0x01, 0x30, 0x01, 0x30, 0x01, 0x30, 0x01, 0x30, 0x01, 0x37, 0x01, 0x30, 0x01, 0x35,
0x01, 0x30, 0x01, 0x65, 0x01, 0x66, 0x01, 0x66, 0x01, 0x33, 0x03, 0x69, 0x70, 0x36, 0x03, 0x69,
0x6e, 0x74, 0x00, 0x00, 0x0c, 0x00, 0x01,
}

// Test that DecodePacket decodes and IPv6/UDP packet and invokes the UDP processor.
func TestDecodePacketData_ipv6Udp(t *testing.T) {
p := gopacket.NewPacket(ipv6UdpDns, layers.LinkTypeEthernet, gopacket.Default)
if p.ErrorLayer() != nil {
t.Error("Failed to decode packet:", p.ErrorLayer().Error())
}
d, _, udp := newTestDecoder(t)
d.DecodePacketData(p.Data(), &p.Metadata().CaptureInfo)

assert.NotNil(t, udp.pkt, "UDP packet not received")
assert.Equal(t, "3ffe:507:0:1:200:86ff:fe05:80da", udp.pkt.Tuple.Src_ip.String())
assert.Equal(t, uint16(2415), udp.pkt.Tuple.Src_port)
assert.Equal(t, "3ffe:501:4819::42", udp.pkt.Tuple.Dst_ip.String())
assert.Equal(t, uint16(53), udp.pkt.Tuple.Dst_port)
assert.NotEqual(t, -1, strings.Index(string(p.Data()), string(udp.pkt.Payload)))
}

// Creates a new TestDecoder that handles ethernet packets.
func newTestDecoder(t *testing.T) (*DecoderStruct, *TestTcpProcessor, *TestUdpProcessor) {
tcpLayer := &TestTcpProcessor{}
udpLayer := &TestUdpProcessor{}
d, err := NewDecoder(layers.LinkTypeEthernet, tcpLayer, udpLayer)
if err != nil {
t.Fatalf("Error creating decoder %v", err)
}
return d, tcpLayer, udpLayer
}
13 changes: 10 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/elastic/packetbeat/protos/redis"
"github.com/elastic/packetbeat/protos/tcp"
"github.com/elastic/packetbeat/protos/thrift"
"github.com/elastic/packetbeat/protos/udp"
"github.com/elastic/packetbeat/sniffer"
)

Expand Down Expand Up @@ -111,7 +112,13 @@ func main() {
protos.Protos.Register(proto, plugin)
}

if err = tcp.TcpInit(); err != nil {
tcpProc, err := tcp.NewTcp(&protos.Protos)
if err != nil {
logp.Critical(err.Error())
os.Exit(1)
}
udpProc, err := udp.NewUdp(&protos.Protos)
if err != nil {
logp.Critical(err.Error())
os.Exit(1)
}
Expand All @@ -134,7 +141,7 @@ func main() {
}

logp.Debug("main", "Initializing sniffer")
err = sniff.Init(false, afterInputsQueue)
err = sniff.Init(false, afterInputsQueue, tcpProc, udpProc)
if err != nil {
logp.Critical("Initializing sniffer failed: %v", err)
os.Exit(1)
Expand Down Expand Up @@ -184,7 +191,7 @@ func main() {
if service.WithMemProfile() {
// wait for all TCP streams to expire
time.Sleep(tcp.TCP_STREAM_EXPIRY * 1.2)
tcp.PrintTcpMap()
tcpProc.PrintTcpMap()
}
service.Cleanup()
}
Loading

0 comments on commit 6fc4326

Please sign in to comment.