Skip to content

Commit

Permalink
events_enrich: protect enrich queues with RWLock
Browse files Browse the repository at this point in the history
RWLock should work fine for protecting the map since writing
to the channels is actually just a read from the map to get the channel.
Writing to the map is only done in order to create a queue or delete it.
  • Loading branch information
NDStrahilevitz authored and rafaeldtinoco committed May 20, 2022
1 parent a3c64bc commit fb16513
Showing 1 changed file with 27 additions and 10 deletions.
37 changes: 27 additions & 10 deletions pkg/ebpf/events_enrich.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package ebpf

import (
gocontext "context"
"sync"

"github.com/aquasecurity/tracee/pkg/containers/runtime"
"github.com/aquasecurity/tracee/types/trace"
"sync"
)

func (t *Tracee) enrichContainerEvents(ctx gocontext.Context, in <-chan *trace.Event) (chan *trace.Event, chan error) {
Expand All @@ -15,9 +16,10 @@ func (t *Tracee) enrichContainerEvents(ctx gocontext.Context, in <-chan *trace.E
}

queues := make(map[uint64]chan *trace.Event) // cgroupId and queues
attempted := make(map[uint64]*sync.Mutex) // cgroupId and enrichment attempt expressed as transaction lock
attemptedMutex := new(sync.RWMutex) // big lock for attempted map
enriches := make(chan enrichResult, 10) // small buffer to reduce chance for blocking
queuesMutex := sync.RWMutex{}
attempted := make(map[uint64]*sync.Mutex) // cgroupId and enrichment attempt expressed as transaction lock
attemptedMutex := sync.RWMutex{} // big lock for attempted map
enriches := make(chan enrichResult, 10) // small buffer to reduce chance for blocking
out := make(chan *trace.Event, 10000)
errc := make(chan error, 1)
done := make(chan struct{}, 1)
Expand All @@ -41,7 +43,7 @@ func (t *Tracee) enrichContainerEvents(ctx gocontext.Context, in <-chan *trace.E
cgroupId, _ = getEventArgUint64Val(event, "cgroup_id")
attemptedMutex.Lock()
delete(attempted, cgroupId)
attemptedMutex.Lock()
attemptedMutex.Unlock()
}
// non container event and not cgroup_mkdir (or the event already enriched) skip per cgroupId caching
if (event.ContainerID == "" && event.EventID != int(CgroupMkdirEventID)) || event.ContainerImage != "" {
Expand All @@ -60,15 +62,22 @@ func (t *Tracee) enrichContainerEvents(ctx gocontext.Context, in <-chan *trace.E
continue
} else {
//if the container queue has not been created - create the container queue and invoke to enrich query
if _, ok := queues[cgroupId]; !ok {
queuesMutex.RLock()
_, exists := queues[cgroupId]
queuesMutex.RUnlock()
if !exists {
queuesMutex.Lock()
queues[cgroupId] = make(chan *trace.Event, 1000)
queuesMutex.Unlock()

go func() {
metadata, err := t.containers.EnrichCgroupInfo(cgroupId)
enriches <- enrichResult{cgroupId, metadata, err}
}()
}
queuesMutex.RLock()
queues[cgroupId] <- event
queuesMutex.RUnlock()
}
}
}
Expand All @@ -77,7 +86,7 @@ func (t *Tracee) enrichContainerEvents(ctx gocontext.Context, in <-chan *trace.E
go func() {
for enrich := range enriches {
cgroupId := enrich.cgroupId
mutex := new(sync.Mutex)
mutex := &sync.Mutex{}
mutex.Lock() // place already acquired mutex in the attempted map
attemptedMutex.Lock()
attempted[cgroupId] = mutex // now attempted[cgroupId] is a transaction, only happens once and has begun
Expand All @@ -87,6 +96,9 @@ func (t *Tracee) enrichContainerEvents(ctx gocontext.Context, in <-chan *trace.E
if enrich.err.Error() != "no cgroup to enrich" {
t.handleError(enrich.err)
}
queuesMutex.RLock()
queue := queues[cgroupId]
queuesMutex.RUnlock()
for evt := range queues[cgroupId] {
select {
case out <- evt:
Expand All @@ -96,7 +108,7 @@ func (t *Tracee) enrichContainerEvents(ctx gocontext.Context, in <-chan *trace.E

//at this point no new events should enter this queue, since the attempt was marked
//we need this break condition, otherwise we will wait for new events forever
if len(queues[cgroupId]) < 1 {
if len(queue) < 1 {
break
}
}
Expand All @@ -108,7 +120,10 @@ func (t *Tracee) enrichContainerEvents(ctx gocontext.Context, in <-chan *trace.E
podUid := enrich.result.Pod.UID

//go through the queue and inject the enrichment data that was missing during decoding
for evt := range queues[cgroupId] {
queuesMutex.RLock()
queue := queues[cgroupId]
queuesMutex.RUnlock()
for evt := range queue {
if evt.ContainerID != "" {
evt.ContainerImage = containerImage
evt.ContainerName = containerName
Expand All @@ -124,7 +139,7 @@ func (t *Tracee) enrichContainerEvents(ctx gocontext.Context, in <-chan *trace.E

//at this point no new events should enter this queue, since the attempt was marked
//we need this break condition, otherwise we will wait for new events forever
if len(queues[cgroupId]) < 1 {
if len(queue) < 1 {
break
}
}
Expand All @@ -133,8 +148,10 @@ func (t *Tracee) enrichContainerEvents(ctx gocontext.Context, in <-chan *trace.E

//after enrichment was done and all events were processed we can close and delete the channel from the map
//subsequent events will be enriched during decoding
queuesMutex.Lock()
close(queues[cgroupId])
delete(queues, cgroupId)
queuesMutex.Unlock()
}
}()

Expand Down

0 comments on commit fb16513

Please sign in to comment.