Skip to content

Commit

Permalink
events/sorting: introduce queue & cpu-queue
Browse files Browse the repository at this point in the history
  • Loading branch information
AlonZivony authored and rafaeldtinoco committed Feb 2, 2022
1 parent 5612751 commit d9135a8
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 7 deletions.
56 changes: 56 additions & 0 deletions pkg/events/sorting/cpu_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package sorting

import (
"fmt"

"github.com/aquasecurity/tracee/pkg/external"
)

// Events queue with the ability to follow if it was updated since last check and insertion by time specific for CPU
// queue ordering
type cpuEventsQueue struct {
eventsQueue
IsUpdated bool
}

// InsertByTimestamp insert new event to the queue in the right position according to its timestamp
func (cq *cpuEventsQueue) InsertByTimestamp(newEvent *external.Event) error {
newNode := cq.pool.Alloc(newEvent)

cq.mutex.Lock()
defer cq.mutex.Unlock()
if cq.tail != nil &&
cq.tail.event.Timestamp > newEvent.Timestamp {
// We have a fresh event with a timestamp older than the last event received in this cpu's queue.
// This can only happen if this fresh event is a syscall event (for which we take the entry timestamp) which
// called some internal kernel functions (that are also traced). Insert the syscall event before these other
// events
insertLocation := cq.tail
for insertLocation.next != nil {
if insertLocation.next.event.Timestamp < newEvent.Timestamp {
break
}
if insertLocation.next == insertLocation {
return fmt.Errorf("encountered node with self reference at next")
}
insertLocation = insertLocation.next
}
cq.insertAfter(newNode, insertLocation)
} else {
cq.put(newNode)
}
return nil
}

// insertAfter insert new event to the queue after another node
// This is useful if new node place is not at the end of the queue but before it
func (cq *cpuEventsQueue) insertAfter(newNode *eventNode, baseEvent *eventNode) {
if baseEvent.next != nil {
baseEvent.next.previous = newNode
}
newNode.previous = baseEvent
newNode.next, baseEvent.next = baseEvent.next, newNode
if cq.head == baseEvent {
cq.head = newNode
}
}
7 changes: 0 additions & 7 deletions pkg/events/sorting/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,6 @@ import (
"github.com/aquasecurity/tracee/pkg/external"
)

type eventNode struct {
event *external.Event
previous *eventNode
next *eventNode
isAllocated bool
}

const poolFreeingPart = 2
const minPoolFreeingSize = 200

Expand Down
108 changes: 108 additions & 0 deletions pkg/events/sorting/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package sorting

import (
"fmt"
"sync"

"github.com/aquasecurity/tracee/pkg/external"
)

type eventNode struct {
event *external.Event
previous *eventNode
next *eventNode
isAllocated bool
}

// A double linked list used to store events in LIFO order
type eventsQueue struct {
pool eventsPool
tail *eventNode
head *eventNode
mutex sync.Mutex
}

// Put insert new event to the double linked list
func (eq *eventsQueue) Put(newEvent *external.Event) {
newNode := eq.pool.Alloc(newEvent)
eq.mutex.Lock()
defer eq.mutex.Unlock()
eq.put(newNode)
}

// Get remove the node at the head of the queue and return it
// Might return error with a valid event in case of internal pool error, for the user to know that an error occurred
// but was contained
func (eq *eventsQueue) Get() (*external.Event, error) {
eq.mutex.Lock()
defer eq.mutex.Unlock()
if eq.head == nil {
if eq.tail != nil {
return nil, fmt.Errorf("BUG: TAIL without a HEAD")
}
return nil, nil
}
headNode := eq.head
if headNode == eq.tail {
if headNode.next != nil || headNode.previous != nil {
return nil, fmt.Errorf("BUG: last existing node still conneced")
}
eq.tail = nil
eq.head = nil
} else {
if headNode.previous == nil {
return nil, fmt.Errorf("BUG: not TAIL lacking previous")
}
if headNode.next != nil {
return nil, fmt.Errorf("BUG: HEAD has next")
}
headNode.previous.next = nil
eq.head = headNode.previous
}
headNode.previous = nil
headNode.next = nil
extractedEvent := headNode.event
err := eq.pool.Free(headNode)
if err != nil {
eq.pool.Reset()
return extractedEvent, fmt.Errorf("error in queue's node freeing - %v", err)
}
return extractedEvent, nil
}

func (eq *eventsQueue) PeekHead() *external.Event {
eq.mutex.Lock()
defer eq.mutex.Unlock()
if eq.head == nil {
return nil
}
return eq.head.event
}

func (eq *eventsQueue) PeekTail() *external.Event {
eq.mutex.Lock()
defer eq.mutex.Unlock()
if eq.tail == nil {
return nil
}
return eq.tail.event
}

func (eq *eventsQueue) Empty() {
eq.mutex.Lock()
defer eq.mutex.Unlock()
eq.head = nil
eq.tail = nil
}

// Put insert new event to the double linked list
func (eq *eventsQueue) put(newNode *eventNode) {
if eq.tail != nil {
newNode.next = eq.tail
eq.tail.previous = newNode
}
if eq.head == nil {
eq.head = newNode
}
eq.tail = newNode
}

0 comments on commit d9135a8

Please sign in to comment.