Skip to content

Commit

Permalink
[Auditbeat] Fixes for system/socket dataset UDP and DNS (elastic#14315)
Browse files Browse the repository at this point in the history
This fixes a couple of problems with the dataset:

- In some cases incoming UDP packets won't be processed by the dataset
  due to excessive padding making the headers not fit into the first
  96 bytes dumped. Dump 256 bytes to make sure headers are collected.
- Single packet flows didn't have proper process and DNS enrichment.
  • Loading branch information
adriansr authored Oct 30, 2019
1 parent 2a526ac commit c148759
Show file tree
Hide file tree
Showing 5 changed files with 320 additions and 86 deletions.
84 changes: 43 additions & 41 deletions x-pack/auditbeat/module/system/socket/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,15 +628,15 @@ func (e *udpv6SendMsgCall) Update(s *state) error {
}

type udpQueueRcvSkb struct {
Meta tracing.Metadata `kprobe:"metadata"`
Sock uintptr `kprobe:"sock"`
Size uint32 `kprobe:"size"`
LAddr uint32 `kprobe:"laddr"`
LPort uint16 `kprobe:"lport"`
IPHdr uint16 `kprobe:"iphdr"`
UDPHdr uint16 `kprobe:"udphdr"`
Base uintptr `kprobe:"base"`
Packet [pktHeaderDumpBytes]byte `kprobe:"packet,greedy"`
Meta tracing.Metadata `kprobe:"metadata"`
Sock uintptr `kprobe:"sock"`
Size uint32 `kprobe:"size"`
LAddr uint32 `kprobe:"laddr"`
LPort uint16 `kprobe:"lport"`
IPHdr uint16 `kprobe:"iphdr"`
UDPHdr uint16 `kprobe:"udphdr"`
Base uintptr `kprobe:"base"`
Packet [skBuffDataDumpBytes]byte `kprobe:"packet,greedy"`
}

func validIPv4Headers(ipHdr uint16, udpHdr uint16, data []byte) bool {
Expand All @@ -656,6 +656,15 @@ func validIPv6Headers(ipHdr uint16, udpHdr uint16, data []byte) bool {
}

func (e *udpQueueRcvSkb) asFlow() flow {
f := flow{
sock: e.Sock,
pid: e.Meta.PID,
inetType: inetTypeIPv4,
proto: protoUDP,
dir: directionInbound,
lastSeen: kernelTime(e.Meta.Timestamp),
local: newEndpointIPv4(e.LAddr, e.LPort, 0, 0),
}
if valid := validIPv4Headers(e.IPHdr, e.UDPHdr, e.Packet[:]); !valid {
// Check if we're dealing with pointers
// TODO: This should check for SK_BUFF_HAS_POINTERS. Instead is just
Expand All @@ -678,24 +687,16 @@ func (e *udpQueueRcvSkb) asFlow() flow {
}
}
if !valid {
return flow{}
return f
}
}
var raddr uint32
var rport uint16
// the remote is this packet's source
raddr = tracing.MachineEndian.Uint32(e.Packet[e.IPHdr+12:])
rport = tracing.MachineEndian.Uint16(e.Packet[e.UDPHdr:])
return flow{
sock: e.Sock,
pid: e.Meta.PID,
inetType: inetTypeIPv4,
proto: protoUDP,
dir: directionInbound,
lastSeen: kernelTime(e.Meta.Timestamp),
local: newEndpointIPv4(e.LAddr, e.LPort, 0, 0),
remote: newEndpointIPv4(raddr, rport, 1, uint64(e.Size)+minIPv4UdpPacketSize),
}
f.remote = newEndpointIPv4(raddr, rport, 1, uint64(e.Size)+minIPv4UdpPacketSize)
return f
}

// String returns a representation of the event.
Expand All @@ -716,19 +717,28 @@ func (e *udpQueueRcvSkb) Update(s *state) error {
}

type udpv6QueueRcvSkb struct {
Meta tracing.Metadata `kprobe:"metadata"`
Sock uintptr `kprobe:"sock"`
Size uint32 `kprobe:"size"`
LAddrA uint64 `kprobe:"laddra"`
LAddrB uint64 `kprobe:"laddrb"`
LPort uint16 `kprobe:"lport"`
IPHdr uint16 `kprobe:"iphdr"`
UDPHdr uint16 `kprobe:"udphdr"`
Base uintptr `kprobe:"base"`
Packet [pktHeaderDumpBytes]byte `kprobe:"packet,greedy"`
Meta tracing.Metadata `kprobe:"metadata"`
Sock uintptr `kprobe:"sock"`
Size uint32 `kprobe:"size"`
LAddrA uint64 `kprobe:"laddra"`
LAddrB uint64 `kprobe:"laddrb"`
LPort uint16 `kprobe:"lport"`
IPHdr uint16 `kprobe:"iphdr"`
UDPHdr uint16 `kprobe:"udphdr"`
Base uintptr `kprobe:"base"`
Packet [skBuffDataDumpBytes]byte `kprobe:"packet,greedy"`
}

func (e *udpv6QueueRcvSkb) asFlow() flow {
f := flow{
sock: e.Sock,
pid: e.Meta.PID,
inetType: inetTypeIPv6,
proto: protoUDP,
dir: directionInbound,
lastSeen: kernelTime(e.Meta.Timestamp),
local: newEndpointIPv6(e.LAddrA, e.LAddrB, e.LPort, 0, 0),
}
if valid := validIPv6Headers(e.IPHdr, e.UDPHdr, e.Packet[:]); !valid {
// Check if we're dealing with pointers
// TODO: This only works in little-endian, same as in udpQueueRcvSkb
Expand All @@ -743,7 +753,7 @@ func (e *udpv6QueueRcvSkb) asFlow() flow {
}
}
if !valid {
return flow{}
return f
}
}
var raddrA, raddrB uint64
Expand All @@ -752,16 +762,8 @@ func (e *udpv6QueueRcvSkb) asFlow() flow {
raddrA = tracing.MachineEndian.Uint64(e.Packet[e.IPHdr+8:])
raddrB = tracing.MachineEndian.Uint64(e.Packet[e.IPHdr+16:])
rport = tracing.MachineEndian.Uint16(e.Packet[e.UDPHdr:])
return flow{
sock: e.Sock,
pid: e.Meta.PID,
inetType: inetTypeIPv6,
proto: protoUDP,
dir: directionInbound,
lastSeen: kernelTime(e.Meta.Timestamp),
local: newEndpointIPv6(e.LAddrA, e.LAddrB, e.LPort, 0, 0),
remote: newEndpointIPv6(raddrA, raddrB, rport, 1, uint64(e.Size)+minIPv6UdpPacketSize),
}
f.remote = newEndpointIPv6(raddrA, raddrB, rport, 1, uint64(e.Size)+minIPv6UdpPacketSize)
return f
}

// String returns a representation of the event.
Expand Down
10 changes: 6 additions & 4 deletions x-pack/auditbeat/module/system/socket/kprobes.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"github.com/elastic/beats/x-pack/auditbeat/tracing"
)

// Enough for padding + mac_hdr + max ip_hdr + udp_hdr
const pktHeaderDumpBytes = 8 * 12
// This is how many data we dump from sk_buff->data to read full packet headers
// (IP + UDP header). This has been observed to include up to 100 bytes of
// padding.
const skBuffDataDumpBytes = 256

// ProbeTransform transforms a probe before its installed.
type ProbeTransform func(helper.ProbeDef) helper.ProbeDef
Expand Down Expand Up @@ -300,7 +302,7 @@ var sharedKProbes = []helper.ProbeDef{
Probe: tracing.Probe{
Name: "udp_queue_rcv_skb",
Address: "udp_queue_rcv_skb",
Fetchargs: "sock={{.P1}} size=+{{.SK_BUFF_LEN}}({{.P2}}):u32 laddr=+{{.INET_SOCK_LADDR}}({{.P1}}):u32 lport=+{{.INET_SOCK_LPORT}}({{.P1}}):u16 iphdr=+{{.SK_BUFF_NETWORK}}({{.P2}}):u16 udphdr=+{{.SK_BUFF_TRANSPORT}}({{.P2}}):u16 base=+{{.SK_BUFF_HEAD}}({{.P2}}) packet=" + helper.MakeMemoryDump("+{{.SK_BUFF_HEAD}}({{.P2}})", 0, pktHeaderDumpBytes),
Fetchargs: "sock={{.P1}} size=+{{.SK_BUFF_LEN}}({{.P2}}):u32 laddr=+{{.INET_SOCK_LADDR}}({{.P1}}):u32 lport=+{{.INET_SOCK_LPORT}}({{.P1}}):u16 iphdr=+{{.SK_BUFF_NETWORK}}({{.P2}}):u16 udphdr=+{{.SK_BUFF_TRANSPORT}}({{.P2}}):u16 base=+{{.SK_BUFF_HEAD}}({{.P2}}) packet=" + helper.MakeMemoryDump("+{{.SK_BUFF_HEAD}}({{.P2}})", 0, skBuffDataDumpBytes),
},
Decoder: helper.NewStructDecoder(func() interface{} { return new(udpQueueRcvSkb) }),
},
Expand Down Expand Up @@ -463,7 +465,7 @@ var ipv6KProbes = []helper.ProbeDef{
Probe: tracing.Probe{
Name: "udpv6_queue_rcv_skb",
Address: "udpv6_queue_rcv_skb",
Fetchargs: "sock={{.P1}} size=+{{.SK_BUFF_LEN}}({{.P2}}):u32 laddra={{.INET_SOCK_V6_LADDR_A}}({{.P1}}){{.INET_SOCK_V6_TERM}} laddrb={{.INET_SOCK_V6_LADDR_B}}({{.P1}}){{.INET_SOCK_V6_TERM}} lport=+{{.INET_SOCK_LPORT}}({{.P1}}):u16 iphdr=+{{.SK_BUFF_NETWORK}}({{.P2}}):u16 udphdr=+{{.SK_BUFF_TRANSPORT}}({{.P2}}):u16 base=+{{.SK_BUFF_HEAD}}({{.P2}}) packet=" + helper.MakeMemoryDump("+{{.SK_BUFF_HEAD}}({{.P2}})", 0, pktHeaderDumpBytes),
Fetchargs: "sock={{.P1}} size=+{{.SK_BUFF_LEN}}({{.P2}}):u32 laddra={{.INET_SOCK_V6_LADDR_A}}({{.P1}}){{.INET_SOCK_V6_TERM}} laddrb={{.INET_SOCK_V6_LADDR_B}}({{.P1}}){{.INET_SOCK_V6_TERM}} lport=+{{.INET_SOCK_LPORT}}({{.P1}}):u16 iphdr=+{{.SK_BUFF_NETWORK}}({{.P2}}):u16 udphdr=+{{.SK_BUFF_TRANSPORT}}({{.P2}}):u16 base=+{{.SK_BUFF_HEAD}}({{.P2}}) packet=" + helper.MakeMemoryDump("+{{.SK_BUFF_HEAD}}({{.P2}})", 0, skBuffDataDumpBytes),
},
Decoder: helper.NewStructDecoder(func() interface{} { return new(udpv6QueueRcvSkb) }),
},
Expand Down
13 changes: 12 additions & 1 deletion x-pack/auditbeat/module/system/socket/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,11 +611,19 @@ func (s *state) mutualEnrich(sock *socket, f *flow) {
if sockNoPID := sock.pid == 0; sockNoPID != (f.pid == 0) {
if sockNoPID {
sock.pid = f.pid
sock.process = f.process
} else {
f.pid = sock.pid
}
}
if sockNoProcess := sock.process == nil; sockNoProcess != (f.process == nil) {
if sockNoProcess {
sock.process = f.process
} else {
f.process = sock.process
}
} else if sock.process == nil && sock.pid != 0 {
sock.process = s.getProcess(sock.pid)
f.process = sock.process
}
}

Expand All @@ -632,6 +640,7 @@ func (s *state) createFlow(ref flow) error {

ref.createdTime = ref.lastSeenTime
s.mutualEnrich(sock, &ref)

// don't create the flow yet if it doesn't have a populated remote address
if ref.remote.addr.IP == nil {
return nil
Expand All @@ -658,6 +667,8 @@ func (s *state) OnSockDestroyed(ptr uintptr, pid uint32) error {
// Enrich with pid
if sock.pid == 0 && pid != 0 {
sock.pid = pid
}
if sock.process == nil && sock.pid != 0 {
sock.process = s.getProcess(pid)
}
// Keep the sock around in case it's a connected TCP socket, as still some
Expand Down
141 changes: 137 additions & 4 deletions x-pack/auditbeat/module/system/socket/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,145 @@ func TestTCPConnWithProcess(t *testing.T) {
}
}

func assertValue(t *testing.T, ev beat.Event, expected interface{}, field string) bool {
value, err := ev.GetValue(field)
func TestUDPOutgoingSinglePacketWithProcess(t *testing.T) {
const (
localIP = "192.168.33.10"
remoteIP = "172.19.12.13"
localPort = 38842
remotePort = 53
sock uintptr = 0xff1234
)
st := makeState(nil, (*logWrapper)(t), time.Second, 0, time.Second)
lPort, rPort := be16(localPort), be16(remotePort)
lAddr, rAddr := ipv4(localIP), ipv4(remoteIP)
evs := []event{
callExecve(meta(1234, 1234, 1), []string{"/usr/bin/exfil-udp"}),
&commitCreds{Meta: meta(1234, 1234, 2), UID: 501, GID: 20, EUID: 501, EGID: 20},
&execveRet{Meta: meta(1234, 1234, 2), Retval: 1234},
&inetCreate{Meta: meta(1234, 1235, 5), Proto: 0},
&sockInitData{Meta: meta(1234, 1235, 5), Sock: sock},
&udpSendMsgCall{
Meta: meta(1234, 1235, 6),
Sock: sock,
Size: 123,
LAddr: lAddr,
RAddr: rAddr,
AltRAddr: 0,
LPort: lPort,
RPort: rPort,
AltRPort: 0,
},
&inetReleaseCall{Meta: meta(1234, 1235, 17), Sock: sock},
&doExit{Meta: meta(1234, 1234, 18)},
}
if err := feedEvents(evs, st, t); err != nil {
t.Fatal(err)
}
st.ExpireOlder()
flows, err := getFlows(st.DoneFlows(), all)
if err != nil {
t.Fatal(err, "field", field)
t.Fatal(err)
}
return assert.Equal(t, expected, value)
assert.Len(t, flows, 1)
flow := flows[0]
t.Log("read flow", flow)
for field, expected := range map[string]interface{}{
"source.ip": localIP,
"source.port": localPort,
"source.packets": uint64(1),
"source.bytes": uint64(151),
"client.ip": localIP,
"client.port": localPort,
"destination.ip": remoteIP,
"destination.port": remotePort,
"destination.packets": uint64(0),
"destination.bytes": uint64(0),
"server.ip": remoteIP,
"server.port": remotePort,
"network.direction": "outbound",
"network.transport": "udp",
"network.type": "ipv4",
"process.pid": 1234,
"process.name": "exfil-udp",
"user.id": "501",
} {
assertValue(t, flow, expected, field)
}
}

func TestUDPIncomingSinglePacketWithProcess(t *testing.T) {
const (
localIP = "192.168.33.10"
remoteIP = "172.19.12.13"
localPort = 38842
remotePort = 53
sock uintptr = 0xff1234
)
st := makeState(nil, (*logWrapper)(t), time.Second, 0, time.Second)
lPort, rPort := be16(localPort), be16(remotePort)
lAddr, rAddr := ipv4(localIP), ipv4(remoteIP)
var packet [256]byte
var ipHdr, udpHdr uint16 = 2, 64
packet[ipHdr] = 0x45
tracing.MachineEndian.PutUint32(packet[ipHdr+12:], rAddr)
tracing.MachineEndian.PutUint16(packet[udpHdr:], rPort)
evs := []event{
callExecve(meta(1234, 1234, 1), []string{"/usr/bin/exfil-udp"}),
&commitCreds{Meta: meta(1234, 1234, 2), UID: 501, GID: 20, EUID: 501, EGID: 20},
&execveRet{Meta: meta(1234, 1234, 2), Retval: 1234},
&inetCreate{Meta: meta(1234, 1235, 5), Proto: 0},
&sockInitData{Meta: meta(1234, 1235, 5), Sock: sock},
&udpQueueRcvSkb{
Meta: meta(1234, 1235, 5),
Sock: sock,
Size: 123,
LAddr: lAddr,
LPort: lPort,
IPHdr: ipHdr,
UDPHdr: udpHdr,
Packet: packet,
},
&inetReleaseCall{Meta: meta(1234, 1235, 17), Sock: sock},
&doExit{Meta: meta(1234, 1234, 18)},
}
if err := feedEvents(evs, st, t); err != nil {
t.Fatal(err)
}
st.ExpireOlder()
flows, err := getFlows(st.DoneFlows(), all)
if err != nil {
t.Fatal(err)
}
assert.Len(t, flows, 1)
flow := flows[0]
t.Log("read flow", flow)
for field, expected := range map[string]interface{}{
"source.ip": remoteIP,
"source.port": remotePort,
"source.packets": uint64(1),
"source.bytes": uint64(151),
"client.ip": remoteIP,
"client.port": remotePort,
"destination.ip": localIP,
"destination.port": localPort,
"destination.packets": uint64(0),
"destination.bytes": uint64(0),
"server.ip": localIP,
"server.port": localPort,
"network.direction": "inbound",
"network.transport": "udp",
"network.type": "ipv4",
"process.pid": 1234,
"process.name": "exfil-udp",
"user.id": "501",
} {
assertValue(t, flow, expected, field)
}
}

func assertValue(t *testing.T, ev beat.Event, expected interface{}, field string) bool {
value, err := ev.GetValue(field)
return assert.Nil(t, err, field) && assert.Equal(t, expected, value, field)
}

func be16(val uint16) uint16 {
Expand Down
Loading

0 comments on commit c148759

Please sign in to comment.