Skip to content

Commit

Permalink
Speed up RITA parser (activecm#654)
Browse files Browse the repository at this point in the history
* Refactor import command dependencies so I can split out the parsing of the different file types

* Split parsing for different file types into their own files. Fix bug with open connections where the host map's counters for unexpected protocol port service tuples weren't being incremented

* Add time taken to parse display, add comments for enabling profiling of parsing

* Change strings.Split to strings.Index in parseTSVField to stop allocated extra strings, function runs ~1.2x faster now, and GC is doing a small bit better

* Use strconv.Atoi instead of strconv.ParseInt since it has a shortcut for simply formatted integers

* Map from each Zeek field's index in the header to the parse struct field offsets using an array. We previously mapped from each Zeek field's name to the offsets using a hashmap. This took a lot of time since the code was executed a lot.

* Rely on system gzip/ pigz when possible instead of golang gzip. Docker specifically uses pigz for this purpose.

* Switch from standard json lib to json-iter

* convert unique ip sets over to hashmaps from slices

* fix bug in gzip changes where subprocesses were not properly closed

* Change batching limit such that batches are limited to the maximum of either 4GB (as before) or half of system RAM. Note that RAM usage is much lower than the batch limit since we don't store every line we read.

* linter fixes

* Replace string sets backed by string slices with maps

* Cache IPv4 format conversions perfomed by the golang library

* provide a fast path for creating unique ip objects without agent info

* (activecm#684) Fix host collection connection counts are undercounted in the new parser

* Fix bug where UniqueIPSets were not being properly used after converting the set representation

* Fix bug in string sets where they were not being properly initialized

* CI kick

* replace spaces in log fields with underscores

Co-authored-by: Logan L <[email protected]>
  • Loading branch information
Zalgo2462 and Logan L authored Aug 20, 2021
1 parent 2e63f03 commit 13b2e77
Show file tree
Hide file tree
Showing 55 changed files with 2,312 additions and 1,768 deletions.
21 changes: 17 additions & 4 deletions commands/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,12 @@ func (i *Importer) run() error {
}
i.res.Config.S.Rolling = rollingCfg

importer := parser.NewFSImporter(i.res, i.threads, i.threads, i.importFiles)
importer := parser.NewFSImporter(i.res)
if len(importer.GetInternalSubnets()) == 0 {
return cli.NewExitError("Internal subnets are not defined. Please set the InternalSubnets section of the config file.", -1)
}

indexedFiles := importer.CollectFileDetails()
indexedFiles := importer.CollectFileDetails(i.importFiles, i.threads)
// if no compatible files for import were found, exit
if len(indexedFiles) == 0 {
return cli.NewExitError("No compatible log files found", -1)
Expand All @@ -259,7 +259,20 @@ func (i *Importer) run() error {
fmt.Printf("\t[+] Non-rolling database %v will be converted to rolling\n", i.targetDatabase)
}

importer.Run(indexedFiles)
/*
// Uncomment these lines to enable CPU profiling
f, err := os.Create("./cpu.pprof")
if err != nil {
log.Fatal("could not create CPU profile: ", err)
}
defer f.Close() // error handling omitted for example
if err := pprof.StartCPUProfile(f); err != nil {
log.Fatal("could not start CPU profile: ", err)
}
defer pprof.StopCPUProfile()
*/

importer.Run(indexedFiles, i.threads)

i.res.Log.Infof("Finished importing %v\n", i.importFiles)

Expand Down Expand Up @@ -288,7 +301,7 @@ func (i *Importer) handleDeleteOldData() error {

// Remove the analysis results for the chunk
targetChunk := i.res.Config.S.Rolling.CurrentChunk
removerRepo := remover.NewMongoRemover(i.res)
removerRepo := remover.NewMongoRemover(i.res.DB, i.res.Config, i.res.Log)
err := removerRepo.Remove(targetChunk)
if err != nil {
return err
Expand Down
31 changes: 3 additions & 28 deletions commands/show-long-connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/activecm/rita/pkg/uconn"
"github.com/activecm/rita/resources"
"github.com/activecm/rita/util"
"github.com/olekukonko/tablewriter"
"github.com/urfave/cli"
)
Expand Down Expand Up @@ -64,32 +65,6 @@ func init() {
bootstrapCommands(command)
}

const (
day = time.Minute * 60 * 24
year = 365 * day
)

// https://gist.github.com/harshavardhana/327e0577c4fed9211f65#gistcomment-2557682
func duration(d time.Duration) string {
if d < day {
return d.String()
}

var b strings.Builder

if d >= year {
years := d / year
fmt.Fprintf(&b, "%dy", years)
d -= years * year
}

days := d / day
d -= days * day
fmt.Fprintf(&b, "%dd%s", days, d)

return b.String()
}

func showConns(connResults []uconn.LongConnResult, delim string, showNetNames bool) error {

var headerFields []string
Expand Down Expand Up @@ -162,15 +137,15 @@ func showConnsHuman(connResults []uconn.LongConnResult, showNetNames bool) error
result.SrcIP,
result.DstIP,
strings.Join(result.Tuples, " "),
duration(time.Duration(int(result.MaxDuration * float64(time.Second)))),
util.FormatDuration(time.Duration(int(result.MaxDuration * float64(time.Second)))),
state,
}
} else {
row = []string{
result.SrcIP,
result.DstIP,
strings.Join(result.Tuples, " "),
duration(time.Duration(int(result.MaxDuration * float64(time.Second)))),
util.FormatDuration(time.Duration(int(result.MaxDuration * float64(time.Second)))),
state,
}
}
Expand Down
45 changes: 40 additions & 5 deletions database/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"time"

"github.com/activecm/rita/config"
fpt "github.com/activecm/rita/parser/fileparsetypes"
"github.com/activecm/rita/parser/files"
"github.com/blang/semver"
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
Expand Down Expand Up @@ -525,10 +525,10 @@ func (m *MetaDB) GetAnalyzedDatabases() []string {
// GetFiles gets a list of all IndexedFile objects associated with the given database.
// If successful return a list of files from the database. On failure return an empty
// list of files and generate a log message.
func (m *MetaDB) GetFiles(database string) ([]fpt.IndexedFile, error) {
func (m *MetaDB) GetFiles(database string) ([]files.IndexedFile, error) {
m.lock.Lock()
defer m.lock.Unlock()
var toReturn []fpt.IndexedFile
var toReturn []files.IndexedFile

ssn := m.dbHandle.Copy()
defer ssn.Close()
Expand All @@ -544,8 +544,8 @@ func (m *MetaDB) GetFiles(database string) ([]fpt.IndexedFile, error) {
return toReturn, nil
}

//AddParsedFiles adds indexed files to the files the metaDB using the bulk API
func (m *MetaDB) AddParsedFiles(files []*fpt.IndexedFile) error {
//AddNewFilesToIndex adds indexed files to the files the metaDB using the bulk API
func (m *MetaDB) AddNewFilesToIndex(files []*files.IndexedFile) error {
m.lock.Lock()
defer m.lock.Unlock()
if len(files) == 0 {
Expand Down Expand Up @@ -574,6 +574,41 @@ func (m *MetaDB) AddParsedFiles(files []*fpt.IndexedFile) error {
return nil
}

//FilterOutPreviouslyIndexedFiles checks all indexedFiles passed in to ensure
//that they have not previously been imported into the same database.
//The files are compared based on their hashes (md5 of first 15000 bytes)
//and the database they are slated to be imported into.
func (m *MetaDB) FilterOutPreviouslyIndexedFiles(indexedFiles []*files.IndexedFile,
targetDatabase string) []*files.IndexedFile {

var toReturn []*files.IndexedFile
oldFiles, err := m.GetFiles(targetDatabase)
if err != nil {
m.log.WithFields(log.Fields{
"error": err.Error(),
}).Error("Could not obtain a list of previously parsed files")
}

for _, newFile := range indexedFiles {
have := false
for _, oldFile := range oldFiles {
if oldFile.Hash == newFile.Hash {
m.log.WithFields(log.Fields{
"path": newFile.Path,
"target_database": newFile.TargetDatabase,
}).Warning("Refusing to import file into the same database twice")
have = true
break
}
}

if !have {
toReturn = append(toReturn, newFile)
}
}
return toReturn
}

//RemoveFilesByChunk removes FilesTable entries for a given database chunk.
//This helps provide the ability to re-import a given chunk.
func (m *MetaDB) RemoveFilesByChunk(database string, cid int) error {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ require (
github.com/google/go-querystring v1.0.0 // indirect
github.com/google/safebrowsing v0.0.0-20190214191829-0feabcc2960b // indirect
github.com/google/uuid v1.1.2
github.com/json-iterator/go v1.1.11
github.com/mattn/go-isatty v0.0.4 // indirect
github.com/mattn/go-runewidth v0.0.4 // indirect
github.com/olekukonko/tablewriter v0.0.2-0.20190214164707-93462a5dfaa6
github.com/pbnjay/memory v0.0.0-20201129165224-b12e5d931931
github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5
github.com/sirupsen/logrus v1.3.0
github.com/skratchdot/open-golang v0.0.0-20190104022628-a2dfa6d0dab6
Expand Down
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ github.com/google/go-github v17.0.0+incompatible h1:N0LgJ1j65A7kfXrZnUDaYCs/Sf4r
github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ=
github.com/google/go-querystring v1.0.0 h1:Xkwi/a1rcvNg1PPYe5vI8GbeBY/jrVuDX5ASuANWTrk=
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/safebrowsing v0.0.0-20171128203709-fe6951d7ef01/go.mod h1:5s5M4BFXyqfUstbiDH1ClnS7VmZmDqUaY/X0Rqbfw3o=
github.com/google/safebrowsing v0.0.0-20190214191829-0feabcc2960b h1:VnwTdca7ctu+Z5+ljDDtNUPVJmDbCOkWvQONMdSf+qs=
github.com/google/safebrowsing v0.0.0-20190214191829-0feabcc2960b/go.mod h1:5s5M4BFXyqfUstbiDH1ClnS7VmZmDqUaY/X0Rqbfw3o=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/json-iterator/go v1.1.11 h1:uVUAXhF2To8cbw/3xN3pxj6kk7TYKs98NIrTqPlMWAQ=
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
Expand All @@ -37,10 +40,16 @@ github.com/mattn/go-isatty v0.0.4 h1:bnP0vzxcAdeI1zdubAl5PjU6zsERjGZb7raWodagDYs
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-runewidth v0.0.4 h1:2BvfKmzob6Bmd4YsL0zygOqfdFnK7GR4QL06Do4/p7Y=
github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/olekukonko/tablewriter v0.0.2-0.20190214164707-93462a5dfaa6 h1:W1ga1lGmzN+6EO7j79vMYv40YO/rE2zOYDvMbB7udmc=
github.com/olekukonko/tablewriter v0.0.2-0.20190214164707-93462a5dfaa6/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/pbnjay/memory v0.0.0-20201129165224-b12e5d931931 h1:EeWknjeRU+R3O4ghG7XZCpgSfJNStZyEP8aWyQwJM8s=
github.com/pbnjay/memory v0.0.0-20201129165224-b12e5d931931/go.mod h1:RMU2gJXhratVxBDTFeOdNhd540tG57lt9FIUV0YLvIQ=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5 h1:mZHayPoR0lNmnHyvtYjDeq0zlVHn9K/ZXoy17ylucdo=
Expand Down
Loading

0 comments on commit 13b2e77

Please sign in to comment.