Skip to content

Commit

Permalink
events/sorting: introduce events sorter
Browse files Browse the repository at this point in the history
  • Loading branch information
AlonZivony authored and rafaeldtinoco committed Feb 2, 2022
1 parent d9135a8 commit f686559
Show file tree
Hide file tree
Showing 3 changed files with 267 additions and 0 deletions.
256 changes: 256 additions & 0 deletions pkg/events/sorting/sorting.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
// Package sorting is responsible for sorting incoming events from the BPF programs chronologically.
//
// There are 3 known sources to events sorting issues:
// 1. In perf buffer, events are read in round robing order from CPUs buffers (and not according to invocation time).
// 2. Syscall events are invoked after internal events of the syscall (though the syscall happened before the
// internal events).
// 3. Virtual CPUs might enter sleep mode by host machine scheduler and send events after some delay.
//
// To address the events perf buffers issue, the events are divided to queues according to the source CPU. This way
// the events are almost ordered (except for syscalls). The syscall events are inserted to their right chronological
// place manually.
// This way, all events which occurred before the last event of the most delaying CPU could be sent forward with
// guaranteed order.
// To make sure syscall events are not missed when sending, a small delay is needed.
// Lastly, to address the vCPU sleep issue (which might cause up to 2 events received in a delay), the events need to be
// sent after a delay which is bigger than max possible vCPU sleep time (which is just an increase of the syscall events
// delay sending).
//
// To summarize the algorithm main logic, here is textual simulation of the operation (assume that 2 scheduler ticks
// are larger than max possible vCPU sleep time):
// -------------------------------------------------------------------
// Tn = Timestamp (n == TOD)
// #m = Event's Source CPU
//
// ### Initial State
//
// [ CPU 0 ] [ CPU 1 ] [ CPU 2 ]
// HEAD T1 T2 T4
// T3 T5
// T6
// TAIL T8
//
// ### Scheduler Tick #1
//
// Incoming events: T9#1, T11#2, T13#1, T10#2, T12#2
//
// Queues state after insert:
// [ CPU 0 ] [ CPU 1 ] [ CPU 2 ]
// HEAD T1 T2 T4
// T3 T5 T10 +
// T6 T9 + T11 +
// TAIL T8 T13 + T12 +
//
// - No event sent.
// - Oldest timestamp = T1.
// - T8 is oldest timestamp in most recent timestamps.
// - In 2 ticks from now: send all events up to T8.
// - Bigger timestamps than T8 (+) will be sent in future scheduling.
//
// ### Scheduler Tick #2
//
// Incoming events: T7#0, T22#1, T23#2, T20#0, T25#1, T24#2, T21#0
//
// Queues state after insert:
// [ CPU 0 ] [ CPU 1 ] [ CPU 2 ]
// HEAD T1 ^ T2 ^ T4 ^
// T3 ^ T5 ^ T10
// T6 ^ T9 T11
// T7 +^ T13 T12
// T8 ^ T22 + T23 +
// T20 + T25 + T24 +
// TAIL T21 +
//
// - No event sent.
// - Oldest timestamp = T1.
// - T21 is oldest timestamp in most recent timestamps.
// - In 2 ticks from now: send all events up to T21.
// - T8 is previous oldest timestamp in most recent timestamps.
// - Next tick: send all events up to T8.
// - Bigger timestamps than T21 (+) will be sent in future scheduling.
//
// ### Scheduler Tick #3
//
// Incoming events: T30#0, T34#1, T35#2, T31#0, T36#2, T32#0, T37#2, T33#0, T38#2, T50#1, T51#1
//
// Queues state after insert:
// [ CPU 0 ] [ CPU 1 ] [ CPU 2 ]
// HEAD T20 ^ T9 ^ T10 ^
// T21 ^ T13 ^ T11 ^
// T30 + T22 T12 ^
// T31 + T23 T24
// T32 + T25 T35 +
// T33 + T34 + T36 +
// T50 + T37 +
// TAIL T51 + T38 +
//
// - Max sent timestamp = T8.
// - Oldest timestamp = T9.
// - T33 is oldest timestamp in most recent timestamps.
// - In 2 ticks from now: send all events up to T33.
// - T21 is previous oldest timestamp in most recent timestamps.
// - Next tick: send all events up to T21.
// - Bigger timestamps than T33 (+) will be sent in future scheduling.
// -------------------------------------------------------------------
package sorting

import (
gocontext "context"
"fmt"
"math"
"sync"
"time"

"github.com/aquasecurity/tracee/pkg/external"
"github.com/aquasecurity/tracee/pkg/utils/environment"
)

// The minimum time of delay before sending events forward.
// It should resolve disorders originated from the way syscalls timestamps are taken (about 1ms disorder) and potential
// vCPU sleep (up to 98ms) [source - https://kinvolk.io/blog/2018/02/timing-issues-when-using-bpf-with-virtual-cpus/]
const minDelay = 100 * time.Millisecond
const eventsPassingInterval = 50 * time.Millisecond
const intervalsAmountThresholdForDelay = int(minDelay / eventsPassingInterval)

// EventsChronologicalSorter is an object responsible for sorting arriving events from perf buffer according to their
// chronological order - the time they were invoked in the kernel.
type EventsChronologicalSorter struct {
cpuEventsQueues []cpuEventsQueue // Each CPU has its own events queue because events per CPU arrive in almost chronological order
outputChanMutex sync.Mutex
extractionSavedTimestamps []int // Buffer to store timestamps of events for delayed extraction
errorChan chan<- error
}

func InitEventSorter() (*EventsChronologicalSorter, error) {
cpusAmount, err := environment.GetCPUAmount()
if err != nil {
return nil, err
}
newSorter := EventsChronologicalSorter{}
newSorter.cpuEventsQueues = make([]cpuEventsQueue, cpusAmount)
return &newSorter, nil
}

func (sorter *EventsChronologicalSorter) StartPipeline(ctx gocontext.Context, in <-chan *external.Event) (
chan *external.Event, chan error) {
out := make(chan *external.Event, 1000)
errc := make(chan error, 1)
go sorter.Start(in, out, ctx, errc)
return out, errc
}

// Start is the main function of the EventsChronologicalSorter class, which orders input events from events channels
// and pass forward all ordered events to the output channel after each interval.
// When exits, the sorter will send forward all buffered events in ordered matter.
func (sorter *EventsChronologicalSorter) Start(in <-chan *external.Event, out chan<- *external.Event,
ctx gocontext.Context, errc chan error) {
sorter.errorChan = errc
defer close(out)
defer close(errc)
ticker := time.NewTicker(eventsPassingInterval)
for {
select {
case newEvent := <-in:
if newEvent == nil {
sorter.sendEvents(out, math.MaxInt64)
return
}
sorter.addEvent(newEvent)
case <-ticker.C:
sorter.updateSavedTimestamps()
if len(sorter.extractionSavedTimestamps) > intervalsAmountThresholdForDelay {
extractionTimestamp := sorter.extractionSavedTimestamps[0]
sorter.extractionSavedTimestamps = sorter.extractionSavedTimestamps[1:]
go sorter.sendEvents(out, extractionTimestamp)
}

case <-ctx.Done():
sorter.sendEvents(out, math.MaxInt64)
return
}
}
}

// addEvent add a new event to the appropriate place in queue according to its timestamp
func (sorter *EventsChronologicalSorter) addEvent(newEvent *external.Event) {
cq := &sorter.cpuEventsQueues[newEvent.ProcessorID]
err := cq.InsertByTimestamp(newEvent)
if err != nil {
sorter.errorChan <- err
}
cq.IsUpdated = true
}

// sendEvents send to output channel all events up to given timestamp
func (sorter *EventsChronologicalSorter) sendEvents(outputChan chan<- *external.Event, extractionMaxTimestamp int) {
sorter.outputChanMutex.Lock()
defer sorter.outputChanMutex.Unlock()
for {
mostDelayingQueue, err := sorter.getMostDelayingEventCPUQueue()
if err != nil || mostDelayingQueue.PeekHead().Timestamp > extractionMaxTimestamp {
break
}
extractionEvent, err := mostDelayingQueue.Get()
if err != nil {
sorter.errorChan <- err
if extractionEvent == nil {
mostDelayingQueue.Empty()
continue
}
}
outputChan <- extractionEvent
}
}

// updateSavedTimestamps add current most delaying timestamp to saved list
func (sorter *EventsChronologicalSorter) updateSavedTimestamps() {
mostDelayingLastEventTimestamp, err := sorter.getMostDelayedLastCPUEventTimestamp()
if err != nil { // An error means no new event was received since last update
if len(sorter.extractionSavedTimestamps) > 0 {
mostDelayingLastEventTimestamp = sorter.extractionSavedTimestamps[len(sorter.extractionSavedTimestamps)-1]
} else {
mostDelayingLastEventTimestamp = 0
}
}
sorter.extractionSavedTimestamps = append(sorter.extractionSavedTimestamps, mostDelayingLastEventTimestamp)
}

// getMostDelayingEventCPUQueue search for the CPU queue which contains the oldest event
// Return nil if no valid queue found
func (sorter *EventsChronologicalSorter) getMostDelayingEventCPUQueue() (*cpuEventsQueue, error) {
var mostDelayingEventQueue *cpuEventsQueue = nil
for i := 0; i < len(sorter.cpuEventsQueues); i++ {
cq := &sorter.cpuEventsQueues[i]
if cq.PeekHead() != nil &&
(mostDelayingEventQueue == nil ||
cq.PeekHead().Timestamp < mostDelayingEventQueue.PeekHead().Timestamp) {
mostDelayingEventQueue = cq
}
}
if mostDelayingEventQueue == nil {
return nil, fmt.Errorf("no queue with events found")
}
return mostDelayingEventQueue, nil
}

// getMostDelayedLastCPUEventTimestamp search for the CPU queue with the oldest last inserted event which was updated since
// last check
// Queues which were not updated since last check are ignored to prevent events starvation if a CPU is not active
func (sorter *EventsChronologicalSorter) getMostDelayedLastCPUEventTimestamp() (int, error) {
var mostDelayingEventQueue *cpuEventsQueue = nil
for i := 0; i < len(sorter.cpuEventsQueues); i++ {
cq := &sorter.cpuEventsQueues[i]
queueTail := cq.PeekTail()
if queueTail != nil &&
cq.IsUpdated == true &&
(mostDelayingEventQueue == nil ||
queueTail.Timestamp < mostDelayingEventQueue.PeekTail().Timestamp) {
mostDelayingEventQueue = &sorter.cpuEventsQueues[i]
}
cq.IsUpdated = false // Mark that the values of the queue were checked from previous time
}
if mostDelayingEventQueue == nil {
return 0, fmt.Errorf("no valid CPU events queue was updated since last interval")
}
return mostDelayingEventQueue.PeekTail().Timestamp, nil
}
3 changes: 3 additions & 0 deletions tracee-ebpf/tracee/events_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ func (t *Tracee) handleEvents(ctx gocontext.Context) {
eventsChan, errc := t.decodeEvents(ctx)
errcList = append(errcList, errc)

eventsChan, errc = t.eventsSorter.StartPipeline(ctx, eventsChan)
errcList = append(errcList, errc)

// Sink pipeline stage.
errc = t.processEvents(ctx, eventsChan)
errcList = append(errcList, errc)
Expand Down
8 changes: 8 additions & 0 deletions tracee-ebpf/tracee/tracee.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/aquasecurity/libbpfgo/helpers"
"github.com/aquasecurity/tracee/pkg/bucketscache"
"github.com/aquasecurity/tracee/pkg/containers"
"github.com/aquasecurity/tracee/pkg/events/sorting"
"github.com/aquasecurity/tracee/pkg/external"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcapgo"
Expand Down Expand Up @@ -175,6 +176,7 @@ type Tracee struct {
ngIfacesIndex map[int]int
containers *containers.Containers
processTree *ProcessTree
eventsSorter *sorting.EventsChronologicalSorter
}

type counter int32
Expand Down Expand Up @@ -385,6 +387,12 @@ func New(cfg Config) (*Tracee, error) {
t.Close()
return nil, fmt.Errorf("error creating process tree: %v", err)
}

t.eventsSorter, err = sorting.InitEventSorter()
if err != nil {
return nil, err
}

return t, nil
}

Expand Down

0 comments on commit f686559

Please sign in to comment.