Skip to content

Commit

Permalink
move enrichment logic to the event pipeline
Browse files Browse the repository at this point in the history
This is done in order to ensure container enrichment queries
will not block the event pipeline.

Adds a stage where events are enriched with container runtime data.
Events will be enriched in the decode state if the data was
previously queried.
In that case, the pipeline stage will be skipped.
  • Loading branch information
NDStrahilevitz authored and rafaeldtinoco committed May 17, 2022
1 parent 826de80 commit 12fada1
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 34 deletions.
2 changes: 1 addition & 1 deletion cmd/tracee-rules/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func TestOutputTemplates(t *testing.T) {
"a":123,"b":"c","d":true,"f":{"123":"456","foo":"bar"}
},
"Context":{
"timestamp":1321321,"processorId":0,"processId":21312,"threadId":0,"parentProcessId":0,"hostProcessId":0,"hostThreadId":0,"hostParentProcessId":0,"userId":0,"mountNamespace":0,"pidNamespace":0,"processName":"","hostName":"","containerId":"abbc123","containerImage":"", "containerName":"","podName":"","podNamespace":"","podUID":"","eventId":"0","eventName":"execve","argsNum":0,"returnValue":0,"stackAddresses":null,"args":null
"timestamp":1321321,"processorId":0,"processId":21312,"threadId":0,"parentProcessId":0,"hostProcessId":0,"hostThreadId":0,"hostParentProcessId":0,"userId":0,"mountNamespace":0,"pidNamespace":0,"processName":"","hostName":"","cgroupId":0,"containerId":"abbc123","containerImage":"", "containerName":"","podName":"","podNamespace":"","podUID":"","eventId":"0","eventName":"execve","argsNum":0,"returnValue":0,"stackAddresses":null,"args":null
},
"SigMetadata":{
"ID":"TRC-1","Version":"0.1.0","Name":"Standard Input/Output Over Socket","Description":"Redirection of process's standard input/output to socket","Tags":["linux","container"],"Properties":{"MITRE ATT\u0026CK":"Persistence: Server Software Component","Severity":3}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.17
require (
github.com/Masterminds/sprig/v3 v3.2.2
github.com/aquasecurity/libbpfgo v0.2.5-libbpf-0.7.0
github.com/aquasecurity/tracee/types v0.0.0-20220414114616-ad3bb476b006
github.com/aquasecurity/tracee/types v0.0.0-20220501185220-11d35a849685
github.com/containerd/containerd v1.6.1
github.com/docker/docker v20.10.13+incompatible
github.com/google/gopacket v1.1.19
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ github.com/alexflint/go-filemutex v1.1.0/go.mod h1:7P4iRhttt/nUvUOrYIhcpMzv2G6CY
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/aquasecurity/libbpfgo v0.2.5-libbpf-0.7.0 h1:BpW7qxkveYXx8TCtvYWIvmliPqaTCz/IYs1i+Gyj0MQ=
github.com/aquasecurity/libbpfgo v0.2.5-libbpf-0.7.0/go.mod h1:/+clceXE103FaXvVTIY2HAkQjxNtkra4DRWvZYr2SKw=
github.com/aquasecurity/tracee/types v0.0.0-20220414114616-ad3bb476b006 h1:CIO/cV0sgM6JUXzSYEduTL5lwpghgiSj0TiAAQCigrc=
github.com/aquasecurity/tracee/types v0.0.0-20220414114616-ad3bb476b006/go.mod h1:l8MikK8yNCxoFVFq+WqvRg3kiDUX4wDTQwo7oD7YnWM=
github.com/aquasecurity/tracee/types v0.0.0-20220501185220-11d35a849685 h1:NETd4p5PNBL7zVE+E2wk8W+PXeqLENWDhkkoZN6C730=
github.com/aquasecurity/tracee/types v0.0.0-20220501185220-11d35a849685/go.mod h1:l8MikK8yNCxoFVFq+WqvRg3kiDUX4wDTQwo7oD7YnWM=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
Expand Down
63 changes: 33 additions & 30 deletions pkg/containers/containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,54 +218,57 @@ func (c *Containers) CgroupUpdate(cgroupId uint64, path string, ctime time.Time)
c.cgroups[uint32(cgroupId)] = info
c.mtx.Unlock()

err := c.enrichCgroupInfo(cgroupId)

//don't fail on enrichment but log to stderr
if err != nil {
fmt.Fprintf(os.Stderr, "failed to enrich container data: %v", err)
}

return info, nil
}

// enrichCgroupInfo checks for a given cgroupId if it is relevant to some running container
// EnrichCgroupInfo checks for a given cgroupId if it is relevant to some running container
// it then calls the runtime info service to gather additional data from the container's runtime
func (c *Containers) enrichCgroupInfo(cgroupId uint64) error {
// it returns the retrieved metadata and a relevant error
// this function shouldn't be called twice for the same cgroupId unless attempting a retry
func (c *Containers) EnrichCgroupInfo(cgroupId uint64) (cruntime.ContainerMetadata, error) {
var metadata cruntime.ContainerMetadata

c.mtx.RLock()
info, ok := c.cgroups[uint32(cgroupId)]
c.mtx.RUnlock()

//if there is no cgroup anymore for some reason, return early
if !ok {
return fmt.Errorf("no cgroup to enrich")
return metadata, fmt.Errorf("no cgroup to enrich")
}

containerId := info.Container.ContainerId
runtime := info.Runtime

if containerId != "" && runtime != cruntime.Unknown {
//There might be a performance overhead with the cancel
//But, I think it will be negligable since this code path shouldn't be reached too frequently
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
metadata, err := c.enricher.Get(containerId, runtime, ctx)
defer cancel()
//if enrichment fails, just return early
if err != nil {
return err
}
if containerId == "" {
return metadata, fmt.Errorf("no containerId")
}

info.Container = metadata
c.mtx.Lock()
//we read the dictionary again to make sure the cgroup still exists
//otherwise we risk reintroducing it despite not existing
_, ok = c.cgroups[uint32(cgroupId)]
if ok {
c.cgroups[uint32(cgroupId)] = info
}
c.mtx.Unlock()
if runtime == cruntime.Unknown {
return metadata, fmt.Errorf("unknown runtime")
}

return nil
//There might be a performance overhead with the cancel
//But, I think it will be negligable since this code path shouldn't be reached too frequently
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
metadata, err := c.enricher.Get(containerId, runtime, ctx)
defer cancel()
//if enrichment fails, just return early
if err != nil {
return metadata, err
}

info.Container = metadata
c.mtx.Lock()
//we read the dictionary again to make sure the cgroup still exists
//otherwise we risk reintroducing it despite not existing
_, ok = c.cgroups[uint32(cgroupId)]
if ok {
c.cgroups[uint32(cgroupId)] = info
}
c.mtx.Unlock()

return metadata, nil
}

var (
Expand Down
124 changes: 124 additions & 0 deletions pkg/ebpf/events_enrich.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package ebpf

import (
gocontext "context"

"github.com/aquasecurity/tracee/pkg/containers/runtime"
"github.com/aquasecurity/tracee/types/trace"
)

func (t *Tracee) enrichContainerEvents(ctx gocontext.Context, in <-chan *trace.Event) (chan *trace.Event, chan error) {
type enrichResult struct {
cgroupId uint64
result runtime.ContainerMetadata
err error
}

queues := make(map[uint64]chan *trace.Event) //map between cgroupId and queues
attempted := make(map[uint64]bool) //map between cgroupId and enrichment attempt
enriches := make(chan enrichResult, 10) //this channel has a small buffer to reduce chance for blocking
out := make(chan *trace.Event, 10000)
errc := make(chan error, 1)
done := make(chan struct{}, 1)

go func() {
defer close(out)
defer close(errc)
for {
select {
case <-ctx.Done():
done <- struct{}{}
return
case event := <-in:
cgroupId := uint64(event.CgroupID)
//if the event is the cgroup_mkdir event we need the cgroupId from it's argument
if event.EventID == int(CgroupMkdirEventID) {
cgroupId, _ = getEventArgUint64Val(event, "cgroup_id")
}
//if the event is a cgroup_rmdir we need to clean the attempt entry so the cgroupId can be reused
if event.EventID == int(CgroupRmdirEventID) {
cgroupId, _ = getEventArgUint64Val(event, "cgroup_id")
delete(attempted, cgroupId)
}
//if non container event and not cgroup_mkdir or the event is already enriched, skip this stage
if (event.ContainerID == "" && event.EventID != int(CgroupMkdirEventID)) || event.ContainerImage != "" || attempted[uint64(cgroupId)] {
out <- event
continue
} else {
//if the container queue has not been created - create the container queue and invoke the enrich query
if _, ok := queues[cgroupId]; !ok {
queues[cgroupId] = make(chan *trace.Event, 1000)

go func() {
metadata, err := t.containers.EnrichCgroupInfo(cgroupId)
enriches <- enrichResult{cgroupId, metadata, err}
}()
}
queues[cgroupId] <- event
}
}
}
}()

go func() {
for enrich := range enriches {
cgroupId := enrich.cgroupId
//mark the query as finished, it should not be attempted again and no new events should enter it's queue
attempted[cgroupId] = true
if enrich.err != nil {
//only send error if it's not a non existing cgroup error
if enrich.err.Error() != "no cgroup to enrich" {
t.handleError(enrich.err)
}
for evt := range queues[cgroupId] {
select {
case out <- evt:
case <-done:
return
}

//at this point no new events should enter this queue, since the attempt was marked
//we need this break condition, otherwise we will wait for new events forever
if len(queues[cgroupId]) < 1 {
break
}
}
} else {
containerImage := enrich.result.Image
containerName := enrich.result.Name
podName := enrich.result.Pod.Name
podNamespace := enrich.result.Pod.Namespace
podUid := enrich.result.Pod.UID

//go through the queue and inject the enrichment data that was missing during decoding
for evt := range queues[cgroupId] {
if evt.ContainerID != "" {
evt.ContainerImage = containerImage
evt.ContainerName = containerName
evt.PodName = podName
evt.PodNamespace = podNamespace
evt.PodUID = podUid
}
select {
case out <- evt:
case <-done:
return
}

//at this point no new events should enter this queue, since the attempt was marked
//we need this break condition, otherwise we will wait for new events forever
if len(queues[cgroupId]) < 1 {
break
}
}
}

//after enrichment was done and all events were processed we can close and delete the channel from the map
//subsequent events will be enriched during decoding
close(queues[cgroupId])
delete(queues, cgroupId)
}
}()

return out, errc
}
9 changes: 9 additions & 0 deletions pkg/ebpf/events_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ func (t *Tracee) handleEvents(ctx gocontext.Context) {
eventsChan, errc = t.processEvents(ctx, eventsChan)
errcList = append(errcList, errc)

// Enrichment stage
// In this stage container events are enriched with additional runtime data
// Events may be enriched in the initial decode state if the enrichment data has been stored in the Containers structure
// In that case, this pipeline stage will be quickly skipped
// This is done in a separate stage to ensure enrichment is non blocking (since container runtime calls may timeout and block the pipeline otherwise)
eventsChan, errc = t.enrichContainerEvents(ctx, eventsChan)
errcList = append(errcList, errc)

// Derive events stage
// In this stage events go through a derivation function
eventsChan, errc = t.deriveEvents(ctx, eventsChan)
Expand Down Expand Up @@ -186,6 +194,7 @@ func (t *Tracee) decodeEvents(outerCtx gocontext.Context) (<-chan *trace.Event,
PIDNS: int(ctx.PidID),
ProcessName: string(bytes.TrimRight(ctx.Comm[:], "\x00")),
HostName: string(bytes.TrimRight(ctx.UtsName[:], "\x00")),
CgroupID: uint(ctx.CgroupID),
ContainerID: containerInfo.ContainerId,
ContainerImage: containerInfo.Image,
ContainerName: containerInfo.Name,
Expand Down

0 comments on commit 12fada1

Please sign in to comment.