diff --git a/x-pack/auditbeat/module/system/socket/events.go b/x-pack/auditbeat/module/system/socket/events.go index bf8ff5360dc..043a0565b1b 100644 --- a/x-pack/auditbeat/module/system/socket/events.go +++ b/x-pack/auditbeat/module/system/socket/events.go @@ -628,15 +628,15 @@ func (e *udpv6SendMsgCall) Update(s *state) error { } type udpQueueRcvSkb struct { - Meta tracing.Metadata `kprobe:"metadata"` - Sock uintptr `kprobe:"sock"` - Size uint32 `kprobe:"size"` - LAddr uint32 `kprobe:"laddr"` - LPort uint16 `kprobe:"lport"` - IPHdr uint16 `kprobe:"iphdr"` - UDPHdr uint16 `kprobe:"udphdr"` - Base uintptr `kprobe:"base"` - Packet [pktHeaderDumpBytes]byte `kprobe:"packet,greedy"` + Meta tracing.Metadata `kprobe:"metadata"` + Sock uintptr `kprobe:"sock"` + Size uint32 `kprobe:"size"` + LAddr uint32 `kprobe:"laddr"` + LPort uint16 `kprobe:"lport"` + IPHdr uint16 `kprobe:"iphdr"` + UDPHdr uint16 `kprobe:"udphdr"` + Base uintptr `kprobe:"base"` + Packet [skBuffDataDumpBytes]byte `kprobe:"packet,greedy"` } func validIPv4Headers(ipHdr uint16, udpHdr uint16, data []byte) bool { @@ -656,6 +656,15 @@ func validIPv6Headers(ipHdr uint16, udpHdr uint16, data []byte) bool { } func (e *udpQueueRcvSkb) asFlow() flow { + f := flow{ + sock: e.Sock, + pid: e.Meta.PID, + inetType: inetTypeIPv4, + proto: protoUDP, + dir: directionInbound, + lastSeen: kernelTime(e.Meta.Timestamp), + local: newEndpointIPv4(e.LAddr, e.LPort, 0, 0), + } if valid := validIPv4Headers(e.IPHdr, e.UDPHdr, e.Packet[:]); !valid { // Check if we're dealing with pointers // TODO: This should check for SK_BUFF_HAS_POINTERS. Instead is just @@ -678,7 +687,7 @@ func (e *udpQueueRcvSkb) asFlow() flow { } } if !valid { - return flow{} + return f } } var raddr uint32 @@ -686,16 +695,8 @@ func (e *udpQueueRcvSkb) asFlow() flow { // the remote is this packet's source raddr = tracing.MachineEndian.Uint32(e.Packet[e.IPHdr+12:]) rport = tracing.MachineEndian.Uint16(e.Packet[e.UDPHdr:]) - return flow{ - sock: e.Sock, - pid: e.Meta.PID, - inetType: inetTypeIPv4, - proto: protoUDP, - dir: directionInbound, - lastSeen: kernelTime(e.Meta.Timestamp), - local: newEndpointIPv4(e.LAddr, e.LPort, 0, 0), - remote: newEndpointIPv4(raddr, rport, 1, uint64(e.Size)+minIPv4UdpPacketSize), - } + f.remote = newEndpointIPv4(raddr, rport, 1, uint64(e.Size)+minIPv4UdpPacketSize) + return f } // String returns a representation of the event. @@ -716,19 +717,28 @@ func (e *udpQueueRcvSkb) Update(s *state) error { } type udpv6QueueRcvSkb struct { - Meta tracing.Metadata `kprobe:"metadata"` - Sock uintptr `kprobe:"sock"` - Size uint32 `kprobe:"size"` - LAddrA uint64 `kprobe:"laddra"` - LAddrB uint64 `kprobe:"laddrb"` - LPort uint16 `kprobe:"lport"` - IPHdr uint16 `kprobe:"iphdr"` - UDPHdr uint16 `kprobe:"udphdr"` - Base uintptr `kprobe:"base"` - Packet [pktHeaderDumpBytes]byte `kprobe:"packet,greedy"` + Meta tracing.Metadata `kprobe:"metadata"` + Sock uintptr `kprobe:"sock"` + Size uint32 `kprobe:"size"` + LAddrA uint64 `kprobe:"laddra"` + LAddrB uint64 `kprobe:"laddrb"` + LPort uint16 `kprobe:"lport"` + IPHdr uint16 `kprobe:"iphdr"` + UDPHdr uint16 `kprobe:"udphdr"` + Base uintptr `kprobe:"base"` + Packet [skBuffDataDumpBytes]byte `kprobe:"packet,greedy"` } func (e *udpv6QueueRcvSkb) asFlow() flow { + f := flow{ + sock: e.Sock, + pid: e.Meta.PID, + inetType: inetTypeIPv6, + proto: protoUDP, + dir: directionInbound, + lastSeen: kernelTime(e.Meta.Timestamp), + local: newEndpointIPv6(e.LAddrA, e.LAddrB, e.LPort, 0, 0), + } if valid := validIPv6Headers(e.IPHdr, e.UDPHdr, e.Packet[:]); !valid { // Check if we're dealing with pointers // TODO: This only works in little-endian, same as in udpQueueRcvSkb @@ -743,7 +753,7 @@ func (e *udpv6QueueRcvSkb) asFlow() flow { } } if !valid { - return flow{} + return f } } var raddrA, raddrB uint64 @@ -752,16 +762,8 @@ func (e *udpv6QueueRcvSkb) asFlow() flow { raddrA = tracing.MachineEndian.Uint64(e.Packet[e.IPHdr+8:]) raddrB = tracing.MachineEndian.Uint64(e.Packet[e.IPHdr+16:]) rport = tracing.MachineEndian.Uint16(e.Packet[e.UDPHdr:]) - return flow{ - sock: e.Sock, - pid: e.Meta.PID, - inetType: inetTypeIPv6, - proto: protoUDP, - dir: directionInbound, - lastSeen: kernelTime(e.Meta.Timestamp), - local: newEndpointIPv6(e.LAddrA, e.LAddrB, e.LPort, 0, 0), - remote: newEndpointIPv6(raddrA, raddrB, rport, 1, uint64(e.Size)+minIPv6UdpPacketSize), - } + f.remote = newEndpointIPv6(raddrA, raddrB, rport, 1, uint64(e.Size)+minIPv6UdpPacketSize) + return f } // String returns a representation of the event. diff --git a/x-pack/auditbeat/module/system/socket/kprobes.go b/x-pack/auditbeat/module/system/socket/kprobes.go index 390b87de080..71887f0108b 100644 --- a/x-pack/auditbeat/module/system/socket/kprobes.go +++ b/x-pack/auditbeat/module/system/socket/kprobes.go @@ -19,8 +19,10 @@ import ( "github.com/elastic/beats/x-pack/auditbeat/tracing" ) -// Enough for padding + mac_hdr + max ip_hdr + udp_hdr -const pktHeaderDumpBytes = 8 * 12 +// This is how many data we dump from sk_buff->data to read full packet headers +// (IP + UDP header). This has been observed to include up to 100 bytes of +// padding. +const skBuffDataDumpBytes = 256 // ProbeTransform transforms a probe before its installed. type ProbeTransform func(helper.ProbeDef) helper.ProbeDef @@ -300,7 +302,7 @@ var sharedKProbes = []helper.ProbeDef{ Probe: tracing.Probe{ Name: "udp_queue_rcv_skb", Address: "udp_queue_rcv_skb", - Fetchargs: "sock={{.P1}} size=+{{.SK_BUFF_LEN}}({{.P2}}):u32 laddr=+{{.INET_SOCK_LADDR}}({{.P1}}):u32 lport=+{{.INET_SOCK_LPORT}}({{.P1}}):u16 iphdr=+{{.SK_BUFF_NETWORK}}({{.P2}}):u16 udphdr=+{{.SK_BUFF_TRANSPORT}}({{.P2}}):u16 base=+{{.SK_BUFF_HEAD}}({{.P2}}) packet=" + helper.MakeMemoryDump("+{{.SK_BUFF_HEAD}}({{.P2}})", 0, pktHeaderDumpBytes), + Fetchargs: "sock={{.P1}} size=+{{.SK_BUFF_LEN}}({{.P2}}):u32 laddr=+{{.INET_SOCK_LADDR}}({{.P1}}):u32 lport=+{{.INET_SOCK_LPORT}}({{.P1}}):u16 iphdr=+{{.SK_BUFF_NETWORK}}({{.P2}}):u16 udphdr=+{{.SK_BUFF_TRANSPORT}}({{.P2}}):u16 base=+{{.SK_BUFF_HEAD}}({{.P2}}) packet=" + helper.MakeMemoryDump("+{{.SK_BUFF_HEAD}}({{.P2}})", 0, skBuffDataDumpBytes), }, Decoder: helper.NewStructDecoder(func() interface{} { return new(udpQueueRcvSkb) }), }, @@ -463,7 +465,7 @@ var ipv6KProbes = []helper.ProbeDef{ Probe: tracing.Probe{ Name: "udpv6_queue_rcv_skb", Address: "udpv6_queue_rcv_skb", - Fetchargs: "sock={{.P1}} size=+{{.SK_BUFF_LEN}}({{.P2}}):u32 laddra={{.INET_SOCK_V6_LADDR_A}}({{.P1}}){{.INET_SOCK_V6_TERM}} laddrb={{.INET_SOCK_V6_LADDR_B}}({{.P1}}){{.INET_SOCK_V6_TERM}} lport=+{{.INET_SOCK_LPORT}}({{.P1}}):u16 iphdr=+{{.SK_BUFF_NETWORK}}({{.P2}}):u16 udphdr=+{{.SK_BUFF_TRANSPORT}}({{.P2}}):u16 base=+{{.SK_BUFF_HEAD}}({{.P2}}) packet=" + helper.MakeMemoryDump("+{{.SK_BUFF_HEAD}}({{.P2}})", 0, pktHeaderDumpBytes), + Fetchargs: "sock={{.P1}} size=+{{.SK_BUFF_LEN}}({{.P2}}):u32 laddra={{.INET_SOCK_V6_LADDR_A}}({{.P1}}){{.INET_SOCK_V6_TERM}} laddrb={{.INET_SOCK_V6_LADDR_B}}({{.P1}}){{.INET_SOCK_V6_TERM}} lport=+{{.INET_SOCK_LPORT}}({{.P1}}):u16 iphdr=+{{.SK_BUFF_NETWORK}}({{.P2}}):u16 udphdr=+{{.SK_BUFF_TRANSPORT}}({{.P2}}):u16 base=+{{.SK_BUFF_HEAD}}({{.P2}}) packet=" + helper.MakeMemoryDump("+{{.SK_BUFF_HEAD}}({{.P2}})", 0, skBuffDataDumpBytes), }, Decoder: helper.NewStructDecoder(func() interface{} { return new(udpv6QueueRcvSkb) }), }, diff --git a/x-pack/auditbeat/module/system/socket/state.go b/x-pack/auditbeat/module/system/socket/state.go index 3e1f411568f..3d871b1e253 100644 --- a/x-pack/auditbeat/module/system/socket/state.go +++ b/x-pack/auditbeat/module/system/socket/state.go @@ -611,11 +611,19 @@ func (s *state) mutualEnrich(sock *socket, f *flow) { if sockNoPID := sock.pid == 0; sockNoPID != (f.pid == 0) { if sockNoPID { sock.pid = f.pid - sock.process = f.process } else { f.pid = sock.pid + } + } + if sockNoProcess := sock.process == nil; sockNoProcess != (f.process == nil) { + if sockNoProcess { + sock.process = f.process + } else { f.process = sock.process } + } else if sock.process == nil && sock.pid != 0 { + sock.process = s.getProcess(sock.pid) + f.process = sock.process } } @@ -632,6 +640,7 @@ func (s *state) createFlow(ref flow) error { ref.createdTime = ref.lastSeenTime s.mutualEnrich(sock, &ref) + // don't create the flow yet if it doesn't have a populated remote address if ref.remote.addr.IP == nil { return nil @@ -658,6 +667,8 @@ func (s *state) OnSockDestroyed(ptr uintptr, pid uint32) error { // Enrich with pid if sock.pid == 0 && pid != 0 { sock.pid = pid + } + if sock.process == nil && sock.pid != 0 { sock.process = s.getProcess(pid) } // Keep the sock around in case it's a connected TCP socket, as still some diff --git a/x-pack/auditbeat/module/system/socket/state_test.go b/x-pack/auditbeat/module/system/socket/state_test.go index aa931659965..8133280b4c8 100644 --- a/x-pack/auditbeat/module/system/socket/state_test.go +++ b/x-pack/auditbeat/module/system/socket/state_test.go @@ -126,12 +126,145 @@ func TestTCPConnWithProcess(t *testing.T) { } } -func assertValue(t *testing.T, ev beat.Event, expected interface{}, field string) bool { - value, err := ev.GetValue(field) +func TestUDPOutgoingSinglePacketWithProcess(t *testing.T) { + const ( + localIP = "192.168.33.10" + remoteIP = "172.19.12.13" + localPort = 38842 + remotePort = 53 + sock uintptr = 0xff1234 + ) + st := makeState(nil, (*logWrapper)(t), time.Second, 0, time.Second) + lPort, rPort := be16(localPort), be16(remotePort) + lAddr, rAddr := ipv4(localIP), ipv4(remoteIP) + evs := []event{ + callExecve(meta(1234, 1234, 1), []string{"/usr/bin/exfil-udp"}), + &commitCreds{Meta: meta(1234, 1234, 2), UID: 501, GID: 20, EUID: 501, EGID: 20}, + &execveRet{Meta: meta(1234, 1234, 2), Retval: 1234}, + &inetCreate{Meta: meta(1234, 1235, 5), Proto: 0}, + &sockInitData{Meta: meta(1234, 1235, 5), Sock: sock}, + &udpSendMsgCall{ + Meta: meta(1234, 1235, 6), + Sock: sock, + Size: 123, + LAddr: lAddr, + RAddr: rAddr, + AltRAddr: 0, + LPort: lPort, + RPort: rPort, + AltRPort: 0, + }, + &inetReleaseCall{Meta: meta(1234, 1235, 17), Sock: sock}, + &doExit{Meta: meta(1234, 1234, 18)}, + } + if err := feedEvents(evs, st, t); err != nil { + t.Fatal(err) + } + st.ExpireOlder() + flows, err := getFlows(st.DoneFlows(), all) if err != nil { - t.Fatal(err, "field", field) + t.Fatal(err) } - return assert.Equal(t, expected, value) + assert.Len(t, flows, 1) + flow := flows[0] + t.Log("read flow", flow) + for field, expected := range map[string]interface{}{ + "source.ip": localIP, + "source.port": localPort, + "source.packets": uint64(1), + "source.bytes": uint64(151), + "client.ip": localIP, + "client.port": localPort, + "destination.ip": remoteIP, + "destination.port": remotePort, + "destination.packets": uint64(0), + "destination.bytes": uint64(0), + "server.ip": remoteIP, + "server.port": remotePort, + "network.direction": "outbound", + "network.transport": "udp", + "network.type": "ipv4", + "process.pid": 1234, + "process.name": "exfil-udp", + "user.id": "501", + } { + assertValue(t, flow, expected, field) + } +} + +func TestUDPIncomingSinglePacketWithProcess(t *testing.T) { + const ( + localIP = "192.168.33.10" + remoteIP = "172.19.12.13" + localPort = 38842 + remotePort = 53 + sock uintptr = 0xff1234 + ) + st := makeState(nil, (*logWrapper)(t), time.Second, 0, time.Second) + lPort, rPort := be16(localPort), be16(remotePort) + lAddr, rAddr := ipv4(localIP), ipv4(remoteIP) + var packet [256]byte + var ipHdr, udpHdr uint16 = 2, 64 + packet[ipHdr] = 0x45 + tracing.MachineEndian.PutUint32(packet[ipHdr+12:], rAddr) + tracing.MachineEndian.PutUint16(packet[udpHdr:], rPort) + evs := []event{ + callExecve(meta(1234, 1234, 1), []string{"/usr/bin/exfil-udp"}), + &commitCreds{Meta: meta(1234, 1234, 2), UID: 501, GID: 20, EUID: 501, EGID: 20}, + &execveRet{Meta: meta(1234, 1234, 2), Retval: 1234}, + &inetCreate{Meta: meta(1234, 1235, 5), Proto: 0}, + &sockInitData{Meta: meta(1234, 1235, 5), Sock: sock}, + &udpQueueRcvSkb{ + Meta: meta(1234, 1235, 5), + Sock: sock, + Size: 123, + LAddr: lAddr, + LPort: lPort, + IPHdr: ipHdr, + UDPHdr: udpHdr, + Packet: packet, + }, + &inetReleaseCall{Meta: meta(1234, 1235, 17), Sock: sock}, + &doExit{Meta: meta(1234, 1234, 18)}, + } + if err := feedEvents(evs, st, t); err != nil { + t.Fatal(err) + } + st.ExpireOlder() + flows, err := getFlows(st.DoneFlows(), all) + if err != nil { + t.Fatal(err) + } + assert.Len(t, flows, 1) + flow := flows[0] + t.Log("read flow", flow) + for field, expected := range map[string]interface{}{ + "source.ip": remoteIP, + "source.port": remotePort, + "source.packets": uint64(1), + "source.bytes": uint64(151), + "client.ip": remoteIP, + "client.port": remotePort, + "destination.ip": localIP, + "destination.port": localPort, + "destination.packets": uint64(0), + "destination.bytes": uint64(0), + "server.ip": localIP, + "server.port": localPort, + "network.direction": "inbound", + "network.transport": "udp", + "network.type": "ipv4", + "process.pid": 1234, + "process.name": "exfil-udp", + "user.id": "501", + } { + assertValue(t, flow, expected, field) + } +} + +func assertValue(t *testing.T, ev beat.Event, expected interface{}, field string) bool { + value, err := ev.GetValue(field) + return assert.Nil(t, err, field) && assert.Equal(t, expected, value, field) } func be16(val uint16) uint16 { diff --git a/x-pack/auditbeat/tests/system/test_system_socket.py b/x-pack/auditbeat/tests/system/test_system_socket.py index 90724508a62..9c3350fb97a 100644 --- a/x-pack/auditbeat/tests/system/test_system_socket.py +++ b/x-pack/auditbeat/tests/system/test_system_socket.py @@ -3,6 +3,7 @@ import platform import random import socket +import struct import time import unittest from auditbeat_xpack import * @@ -120,6 +121,18 @@ def test_dns_long_request(self): 'socket.flow_inactive_timeout': '2s' }) + def test_dns_udp_ipv6(self): + """ + test DNS enrichment of UDP/IPv6 session + """ + self.with_runner(DNSTestCase(network="ipv6", transport="udp")) + + def test_dns_unidirectional_udp(self): + """ + test DNS enrichment of unidirectional UDP + """ + self.with_runner(DNSTestCase(transport="udp", bidirectional=False)) + def with_runner(self, test, extra_conf=dict()): enable_ipv6_loopback() conf = { @@ -484,41 +497,113 @@ def expected(self): ]) +class SocketFactory: + + def __init__(self, network, transport): + self.network = network + self.transport = transport + if self.network == "ipv4": + self.fn = socket_ipv4 + elif self.network == "ipv6": + self.fn = socket_ipv6 + else: + raise Exception("invalid network: " + self.network) + if self.transport == "tcp": + self.sock_type = socket.SOCK_STREAM + self.sock_proto = socket.IPPROTO_TCP + elif self.transport == "udp": + self.sock_type = socket.SOCK_DGRAM + self.sock_proto = socket.IPPROTO_UDP + else: + raise Exception("invalid transport: " + self.transport) + + def __call__(self, **kwargs): + return self.fn(self.sock_type, self.sock_proto, **kwargs) + + +def transaction_udp(client, client_addr, server, server_addr, req, resp): + client.sendto(req, server_addr) + msg, _ = server.recvfrom(len(req)) + server.sendto(resp, client_addr) + msg, _ = client.recvfrom(len(resp)) + + +def transaction_tcp(client, client_addr, server, server_addr, req, resp): + server.listen(8) + client.connect(server_addr) + accepted, _ = server.accept() + client.send(req) + accepted.recv(len(req)) + accepted.send(resp) + client.recv(len(resp)) + accepted.close() + + +def transaction_udp_oneway(client, client_addr, server, server_addr, req, resp): + client.sendto(req, server_addr) + msg, _ = server.recvfrom(len(req)) + + class DNSTestCase: - def __init__(self, enabled=True, delay_seconds=0): + + def __init__(self, enabled=True, delay_seconds=0, network="ipv4", transport="tcp", bidirectional=True): self.dns_enabled = enabled self.delay = delay_seconds + self.socket_factory = SocketFactory(network, transport) + self.transaction = transaction_tcp if transport == "tcp" else transaction_udp + self.bidirectional = bidirectional + if not self.bidirectional: + assert transport == "udp" + self.transaction = transaction_udp_oneway def run(self): + A = "\x00\x01" + AAAA = "\x00\x1c" + q = A if self.socket_factory.network == "ipv4" else AAAA + + dns_factory = SocketFactory(self.socket_factory.network, "udp") + dns_cli, self.dns_client_addr = dns_factory() + dns_srv, self.dns_server_addr = dns_factory(port=53) + client, self.client_addr = self.socket_factory() + server, self.server_addr = self.socket_factory() + + raw_addr = ip_str_to_raw(self.server_addr[0]) req = "\x74\xba\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x07elastic" \ - "\x02co\x00\x00\x01\x00\x01" + "\x02co\x00" + q + "\x00\x01" resp = "\x74\xba\x81\x80\x00\x01\x00\x01\x00\x00\x00\x00\x07elastic" \ - "\x02co\x00\x00\x01\x00\x01\xc0\x0c\x00\x01\x00\x01\x00\x00" \ - "\x00\x9c\x00\x04" # Append IPv4 ip here + "\x02co\x00" + q + "\x00\x01\xc0\x0c" + q + "\x00\x01\x00\x00" \ + "\x00\x9c" + struct.pack(">H", len(raw_addr)) + raw_addr - dns_cli, self.dns_client_addr = socket_ipv4(socket.SOCK_DGRAM, socket.IPPROTO_UDP) - dns_srv, self.dns_server_addr = socket_ipv4(socket.SOCK_DGRAM, socket.IPPROTO_UDP, port=53) - client, self.client_addr = socket_ipv4(socket.SOCK_STREAM, socket.IPPROTO_TCP) - server, self.server_addr = socket_ipv4(socket.SOCK_STREAM, socket.IPPROTO_TCP) - dns_cli.sendto(req, self.dns_server_addr) - msg, _ = dns_srv.recvfrom(64) - dns_srv.sendto(resp + ipv4_str_to_raw(self.server_addr[0]), self.dns_client_addr) - msg, _ = dns_cli.recvfrom(64) + transaction_udp(dns_cli, self.dns_client_addr, + dns_srv, self.dns_server_addr, + req, resp) dns_cli.close() dns_srv.close() time.sleep(self.delay) - server.listen(8) - client.connect(self.server_addr) - accepted, _ = server.accept() - client.send("GET / HTTP/1.1\r\nHost: elastic.co\r\n\r\n") - accepted.recv(64) - accepted.send("HTTP/1.1 404 Not Found\r\n\r\n") - client.recv(64) - accepted.close() + self.transaction(client, self.client_addr, + server, self.server_addr, + "GET / HTTP/1.1\r\nHost: elastic.co\r\n\r\n", + "HTTP/1.1 404 Not Found\r\n\r\n") client.close() server.close() def expected(self): + + if self.socket_factory.transport == "tcp": + client_bytes = Comparison(operator.gt, 80) + client_packets = Comparison(operator.gt, 2) + server_bytes = Comparison(operator.gt, 2) + server_packets = Comparison(operator.gt, 2) + net_bytes = Comparison(operator.gt, 83) + net_packets = Comparison(operator.gt, 5) + else: + client_bytes = Comparison(operator.gt, 5) + client_packets = 1 + server_bytes = Comparison(operator.gt, 5) if self.bidirectional else 0 + server_packets = 0 + self.bidirectional + net_bytes = Comparison(operator.gt, 5 + 6 * self.bidirectional) + net_packets = 1 + self.bidirectional + expected_events = [ { "agent.type": "auditbeat", @@ -539,7 +624,7 @@ def expected(self): "network.direction": "inbound", "network.packets": 2, "network.transport": "udp", - "network.type": "ipv4", + "network.type": self.socket_factory.network, "process.pid": os.getpid(), "server.bytes": Comparison(operator.gt, 30), "server.ip": self.dns_server_addr[0], @@ -552,34 +637,35 @@ def expected(self): "user.id": str(os.getuid()), }, { "agent.type": "auditbeat", - "client.bytes": Comparison(operator.gt, 80), + "client.bytes": client_bytes, "client.ip": self.client_addr[0], - "client.packets": Comparison(operator.gt, 2), + "client.packets": client_packets, "client.port": self.client_addr[1], - "destination.bytes": Comparison(operator.gt, 80), + "destination.bytes": server_bytes, "destination.domain": "elastic.co", "destination.ip": self.server_addr[0], - "destination.packets": Comparison(operator.gt, 2), + "destination.packets": server_packets, "destination.port": self.server_addr[1], "event.action": "network_flow", "event.category": "network_traffic", "event.dataset": "socket", "event.kind": "event", "event.module": "system", + "network.packets": net_bytes, "network.direction": "inbound", - "network.packets": Comparison(operator.gt, 5), - "network.transport": "tcp", - "network.type": "ipv4", + "network.packets": net_packets, + "network.transport": self.socket_factory.transport, + "network.type": self.socket_factory.network, "process.pid": os.getpid(), - "server.bytes": Comparison(operator.gt, 80), + "server.bytes": server_bytes, "server.domain": "elastic.co", "server.ip": self.server_addr[0], - "server.packets": Comparison(operator.gt, 2), + "server.packets": server_packets, "server.port": self.server_addr[1], "service.type": "system", - "source.bytes": Comparison(operator.gt, 80), + "source.bytes": client_bytes, "source.ip": self.client_addr[0], - "source.packets": Comparison(operator.gt, 2), + "source.packets": client_packets, "source.port": self.client_addr[1], }, ] @@ -601,11 +687,11 @@ def random_address_ipv4(): return '127.{}.{}.{}'.format(random.randint(0, 255), random.randint(0, 255), random.randint(1, 254)) -def ipv4_str_to_raw(ip): - return ''.join(map(lambda x: chr(int(x)), ip.split('.'))) +def ip_str_to_raw(ip): + return socket.inet_pton(socket.AF_INET6 if ':' in ip else socket.AF_INET, ip) -def socket_ipv6(type, proto): +def socket_ipv6(type, proto, port=0): if not socket.has_ipv6: raise Exception('No IPv6 support!') addr = random_address_ipv6() @@ -613,7 +699,7 @@ def socket_ipv6(type, proto): if rv != 0: raise Exception("add ip returned {}".format(rv)) sock = socket.socket(socket.AF_INET6, type, proto) - sock.bind((addr, 0)) + sock.bind((addr, port)) return sock, sock.getsockname()