Skip to content

Commit

Permalink
feat: Add config Protocol (newrelic#472)
Browse files Browse the repository at this point in the history
* feat: Add config protocol package

* Add config context

* integrate config protocol into manager

* feat: add recurisive spawn test

* test: add v4 payload test

* feat: implement builder pattern to support multiple versiconfig protocol versions

* feat: implement builder pattern to support multiple versiconfig protocol versions

* feat: add performance tests & making use of json unmarshal instead of the regExp pattern

* fix: return config entry array

* test: skip manager test if windows

* feat: add validate function to cfg protocol

* core: move protocol example to v1

* test: add protocol validate ut

* core: improve log naming

* test: add handlelines UT

* Implement config repository, by adding the below features:

- It provides a cache implementation to skip duplicated definitions per config.
- Children processes  are kiled when the parent is terminated.

* feat: add support for update the set of definitions for a cofnig

* feat: add support for identify which definition must be deleted

* feat: terminate integrations from handler

* fix: hash the identifiers component of the definition

* core: fix import order

* test: add handler UT

* fix: race condition on cache

* refactor: cache interface func

* test: fix test race condition

* test: skip runner test for windows

* refactor: rename YAMLConfig variable

* doc: adds description to cache package

Co-authored-by: ivancorrales <[email protected]>
  • Loading branch information
gsanchezgavier and ivancorrales authored May 20, 2021
1 parent baf8142 commit 2d9ceb2
Show file tree
Hide file tree
Showing 29 changed files with 1,174 additions and 102 deletions.
9 changes: 7 additions & 2 deletions cmd/newrelic-infra/newrelic-infra.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/newrelic/infrastructure-agent/internal/integrations/v4/integration"
"github.com/newrelic/infrastructure-agent/internal/integrations/v4/v3legacy"
"github.com/newrelic/infrastructure-agent/internal/socketapi"
"github.com/newrelic/infrastructure-agent/pkg/integrations/configrequest"
"github.com/newrelic/infrastructure-agent/pkg/integrations/track"
"github.com/newrelic/infrastructure-agent/pkg/plugins"
"github.com/sirupsen/logrus"
Expand All @@ -47,7 +48,7 @@ import (
"github.com/newrelic/infrastructure-agent/pkg/helpers"
"github.com/newrelic/infrastructure-agent/pkg/helpers/recover"
"github.com/newrelic/infrastructure-agent/pkg/integrations/legacy"
"github.com/newrelic/infrastructure-agent/pkg/integrations/v4"
v4 "github.com/newrelic/infrastructure-agent/pkg/integrations/v4"
"github.com/newrelic/infrastructure-agent/pkg/integrations/v4/dm"
"github.com/newrelic/infrastructure-agent/pkg/integrations/v4/emitter"
"github.com/newrelic/infrastructure-agent/pkg/integrations/v4/logs"
Expand Down Expand Up @@ -287,6 +288,10 @@ func initializeAgentAndRun(c *config.Config, logFwCfg config.LogForward) error {

// queues integration run requests
definitionQ := make(chan integration.Definition, 100)
// queues config entries requests
configEntryQ := make(chan configrequest.Entry, 100)
// queues integration terminated definitions
terminateDefinitionQ := make(chan string, 100)

emitterWithRegister := dm.NewEmitter(agt.GetContext(), dmSender, registerClient)
nonRegisterEmitter := dm.NewNonRegisterEmitter(agt.GetContext(), dmSender)
Expand All @@ -297,7 +302,7 @@ func initializeAgentAndRun(c *config.Config, logFwCfg config.LogForward) error {
tracker := track.NewTracker(dmEmitter)

integrationEmitter := emitter.NewIntegrationEmittor(agt, dmEmitter, ffManager)
integrationManager := v4.NewManager(integrationCfg, integrationEmitter, il, definitionQ, tracker)
integrationManager := v4.NewManager(integrationCfg, integrationEmitter, il, definitionQ, terminateDefinitionQ, configEntryQ, tracker)

// Command channel handlers
backoffSecsC := make(chan int, 1) // 1 won't block on initial cmd-channel fetch
Expand Down
3 changes: 0 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ github.com/sirupsen/logrus v1.6.1-0.20200528085638-6699a89a232f/go.mod h1:yWOB1S
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand Down Expand Up @@ -137,8 +136,6 @@ golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd h1:xhmwyvizuTgC2qz7ZlMluP20uW+C3Rm0FD/WLDX8884=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3-0.20190829152558-3d0f7978add9 h1:SiG/YZHGpKRm5dMlAIMyiH/U4tSjw72M90ryHlc/rto=
golang.org/x/text v0.3.3-0.20190829152558-3d0f7978add9/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c h1:fqgJT0MGcGpPgpWU7VRdRjuArfcOvC4AoJmILihzhDg=
Expand Down
122 changes: 122 additions & 0 deletions internal/integrations/v4/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright 2020 New Relic Corporation. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package cache

import (
"sync"

"github.com/newrelic/infrastructure-agent/internal/integrations/v4/integration"
)

// Cache stores integrations definitions grouped by a Config Name.
type Cache interface {
GetDefinitions(cfgName string) []integration.Definition
ListConfigNames() []string
ApplyConfig(cfgDefinitions *ConfigDefinitions) []string
TakeConfig(cfgName string) *ConfigDefinitions
}

type ConfigDefinitions struct {
cfgName string
added map[string]integration.Definition
current map[string]struct{}
}

func (cfgDefinition *ConfigDefinitions) Add(def integration.Definition) bool {
dh := def.Hash()
cfgDefinition.added[dh] = def
_, ok := cfgDefinition.current[dh]
return !ok
}

// cache implements Cache to store integrations definitions by config protocols request names
type cache struct {
lock sync.RWMutex
hashes map[string]map[string]struct{}
definitions map[string]integration.Definition
}

// CreateCache initialize and return an empty cache
func CreateCache() Cache {
return &cache{
hashes: make(map[string]map[string]struct{}),
definitions: make(map[string]integration.Definition),
}
}

// addDefinition adds a integration definition to a cfg name group, returns false if already exists.
func (c *cache) addDefinition(cfgName string, definition integration.Definition) bool {
hash := definition.Hash()
if _, ok := c.hashes[cfgName][hash]; ok {
return false
}
if _, ok := c.hashes[cfgName]; !ok {
c.hashes[cfgName] = make(map[string]struct{})
}
c.hashes[cfgName][hash] = struct{}{}
c.definitions[hash] = definition
return true
}

// ListConfigNames returns a list of config names
func (c *cache) ListConfigNames() []string {
c.lock.RLock()
defer c.lock.RUnlock()
output := make([]string, len(c.hashes))
i := 0
for cfgName := range c.hashes {
output[i] = cfgName
i++
}
return output
}

func (c *cache) getHashes(cfgName string) map[string]struct{} {
return c.hashes[cfgName]
}

// GetDefinitions returns a list of integration definitions for a particular config name.
func (c *cache) GetDefinitions(cfgName string) []integration.Definition {
c.lock.RLock()
defer c.lock.RUnlock()
cfg := c.hashes[cfgName]
output := make([]integration.Definition, len(cfg))
i := 0
for hash := range cfg {
output[i] = c.definitions[hash]
i++
}
return output
}

// ApplyConfig sync the integrations definitions for a particular config name with the added definitions in cfgDefinitions.
// returns a list of removed definitions for the config name.
func (c *cache) ApplyConfig(cfgDefinitions *ConfigDefinitions) []string {
c.lock.Lock()
defer c.lock.Unlock()
toBeDeleted := make([]string, 0)
for hash, definition := range cfgDefinitions.added {
if _, ok := c.hashes[cfgDefinitions.cfgName][hash]; !ok {
c.addDefinition(cfgDefinitions.cfgName, definition)
}
}
for hash := range cfgDefinitions.current {
if _, ok := cfgDefinitions.added[hash]; !ok {
delete(c.definitions, hash)
delete(c.hashes[cfgDefinitions.cfgName], hash)
toBeDeleted = append(toBeDeleted, hash)
}
}
return toBeDeleted
}

// TakeConfig returns a ConfigDefinitions initialized for a particular config name
func (c *cache) TakeConfig(cfgName string) *ConfigDefinitions {
c.lock.RLock()
defer c.lock.RUnlock()
return &ConfigDefinitions{
cfgName: cfgName,
added: make(map[string]integration.Definition),
current: c.getHashes(cfgName),
}
}
77 changes: 77 additions & 0 deletions internal/integrations/v4/cache/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package cache

import (
"testing"

"github.com/newrelic/infrastructure-agent/internal/integrations/v4/integration"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func createIntegrationDefinition(name string) integration.Definition {
return integration.Definition{
Name: name,
}
}

func Test_createCache(t *testing.T) {
c := CreateCache()
assert.NotNil(t, c)
assert.Empty(t, c.ListConfigNames())
assert.Empty(t, c.GetDefinitions("cfg"))

def := createIntegrationDefinition("def")
initialCfg := &ConfigDefinitions{
cfgName: "cfg",
added: map[string]integration.Definition{
def.Hash(): def,
},
}

assert.Empty(t, c.ApplyConfig(initialCfg))
definitions := c.GetDefinitions("cfg")
require.Len(t, definitions, 1)
assert.Equal(t, "def", definitions[0].Name)

cd := c.TakeConfig("cfg")
for hash := range cd.current {
assert.Equal(t, def.Hash(), hash)
}

}

func Test_definitionsChange(t *testing.T) {
c := CreateCache()
def := createIntegrationDefinition("def")
def2 := createIntegrationDefinition("def2")
def3 := createIntegrationDefinition("def3")
def4 := createIntegrationDefinition("def4")

initialCfg := &ConfigDefinitions{
cfgName: "cfg1",
added: map[string]integration.Definition{
def.Hash(): def,
def2.Hash(): def2,
def3.Hash(): def3,
},
}
assert.Empty(t, c.ApplyConfig(initialCfg))
definitionsList := c.GetDefinitions("cfg1")
for _, def := range c.GetDefinitions("cfg1") {
assert.Contains(t, []string{"def", "def2", "def3"}, def.Name)
}
assert.Len(t, definitionsList, 3)

cfgDefinitions := c.TakeConfig("cfg1")
assert.False(t, cfgDefinitions.Add(def))
assert.False(t, cfgDefinitions.Add(def2))
assert.True(t, cfgDefinitions.Add(def4))

toBeDeleted := c.ApplyConfig(cfgDefinitions)
definitionsList = c.GetDefinitions("cfg1")
assert.Len(t, definitionsList, 3)
for _, def := range c.GetDefinitions("cfg1") {
assert.Contains(t, []string{"def", "def2", "def4"}, def.Name)
}
assert.Equal(t, []string{def3.Hash()}, toBeDeleted)
}
4 changes: 4 additions & 0 deletions internal/integrations/v4/fixtures/echo_from_env.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/usr/bin/env sh

echo $STDOUT_STRING
>&2 echo $STDERR_STRING
1 change: 1 addition & 0 deletions internal/integrations/v4/fixtures/fixtures_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ var (
FileContentsCmd = testhelp.Script("../fixtures/filecontents.sh")
FileContentsWithArgCmd = testhelp.Script("../fixtures/filecontents_witharg.sh")
FileContentsFromEnvCmd = testhelp.Script("../fixtures/filecontents_fromenv.sh")
EchoFromEnv = testhelp.Script("../fixtures/echo_from_env.sh")
)
1 change: 1 addition & 0 deletions internal/integrations/v4/fixtures/fixtures_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ var (
// at the moment, unsupported, as they use env vars with Powershell. Left here to avoid compile errors
FileContentsCmd = testhelp.Script("unsupported-test-case")
FileContentsFromEnvCmd = testhelp.Script("unsupported-test-case")
EchoFromEnv = testhelp.Script("unsupported-test-case")
)
20 changes: 20 additions & 0 deletions internal/integrations/v4/integration/definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package integration

import (
"context"
"crypto/sha256"
"errors"
"fmt"
"time"
Expand Down Expand Up @@ -44,6 +45,25 @@ type Definition struct {
newTempFile func(template []byte) (string, error)
}

func (d *Definition) Hash() string {
h := sha256.New()
identifier := fmt.Sprintf("%v%v%v%v%v%v%v%v%v%v%v",
d.Name,
d.Labels,
d.ExecutorConfig,
d.Interval,
d.Timeout,
d.ConfigTemplate,
d.InventorySource,
d.WhenConditions,
d.runnable.Args,
d.runnable.Cfg,
d.runnable.Command,
)
h.Write([]byte(identifier))
return fmt.Sprintf("%x", h.Sum(nil))
}

func (d *Definition) TimeoutEnabled() bool {
return d.Timeout > 0
}
Expand Down
20 changes: 20 additions & 0 deletions internal/integrations/v4/integration/definition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,3 +343,23 @@ func TestRun_RemoveExternalConfig(t *testing.T) {
}
})
}

func TestDefinition_Hash(t *testing.T) {
def := Definition{
Name: "def",
ConfigTemplate: []byte("a"),
}
assert.NotNil(t, def)
def2 := Definition{
Name: "def",
ConfigTemplate: []byte("b"),
}
assert.NotNil(t, def2)
assert.NotEqual(t, def.Hash(), def2.Hash())
def3 := Definition{
Name: "def",
ConfigTemplate: []byte("b"),
}
assert.NotNil(t, def3)
assert.Equal(t, def3.Hash(), def2.Hash())
}
13 changes: 9 additions & 4 deletions internal/integrations/v4/runner/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/newrelic/infrastructure-agent/internal/integrations/v4/integration"
"github.com/newrelic/infrastructure-agent/pkg/databind/pkg/databind"
"github.com/newrelic/infrastructure-agent/pkg/integrations/cmdrequest"
"github.com/newrelic/infrastructure-agent/pkg/integrations/configrequest"
"github.com/newrelic/infrastructure-agent/pkg/integrations/v4/emitter"
)

Expand All @@ -24,8 +25,10 @@ type Group struct {
emitter emitter.Emitter
// for testing purposes, allows defining which action to take when an execution
// error is received. If unset, it will be runner.logErrors
handleErrorsProvide func() runnerErrorHandler
cmdReqHandle cmdrequest.HandleFn
handleErrorsProvide func() runnerErrorHandler
cmdReqHandle cmdrequest.HandleFn
configHandle configrequest.HandleFn
terminateDefinitionQ chan string
}

type runnerErrorHandler func(ctx context.Context, errs <-chan error)
Expand All @@ -38,10 +41,12 @@ func NewGroup(
passthroughEnv []string,
emitter emitter.Emitter,
cmdReqHandle cmdrequest.HandleFn,
configHandle configrequest.HandleFn,
cfgPath string,
terminateDefinitionQ chan string,
) (g Group, c FeaturesCache, err error) {

g, c, err = loadFn(il, passthroughEnv, cfgPath, cmdReqHandle)
g, c, err = loadFn(il, passthroughEnv, cfgPath, cmdReqHandle, configHandle, terminateDefinitionQ)
if err != nil {
return
}
Expand All @@ -55,7 +60,7 @@ func NewGroup(
// provided context
func (g *Group) Run(ctx context.Context) (hasStartedAnyOHI bool) {
for _, integr := range g.integrations {
go NewRunner(integr, g.emitter, g.dSources, g.handleErrorsProvide, g.cmdReqHandle).Run(ctx, nil, nil)
go NewRunner(integr, g.emitter, g.dSources, g.handleErrorsProvide, g.cmdReqHandle, g.configHandle, g.terminateDefinitionQ).Run(ctx, nil, nil)
hasStartedAnyOHI = true
}

Expand Down
Loading

0 comments on commit 2d9ceb2

Please sign in to comment.