From 79cad202596a6d5eaabc4acdd3300d51c2a108f5 Mon Sep 17 00:00:00 2001 From: jhvaras Date: Tue, 1 Dec 2020 15:31:55 +0100 Subject: [PATCH] Instrument command API run requests (#256) * fix: replace command API ACK mechanism from ID to Hash * fix: ACK each time & skip handling cmds already ACKed * feat: instrument cmd-api run/stop requests * feat: rename event field `stop_mode` to `cmd_stop_mode` * feat: prefix event summary value with cmdapi. * feat: update event summary value to "cmd-api" * fix: stop integration handler command-subscription * feat: log integration args * fix: fixes CI test skipping goroutine-leak weirdly failing on sensitive-obfustaion regex * fix: run once termination * fix: fixes CI test skipping goroutine-leak weirdly failing on sensitive-obfustaion regex * fix: windows CI * fix: CI commenting debug args field --- cmd/newrelic-infra/newrelic-infra.go | 57 ++++++++++--------- .../runintegration/runintegration.go | 42 ++++++++++---- .../runintegration/runintegration_test.go | 5 +- .../agent/cmdchannel/service/service_test.go | 5 +- .../stopintegration/stopintegration.go | 48 ++++++++++------ .../stopintegration/stopintegration_test.go | 6 +- internal/integrations/v4/executor/executor.go | 7 +-- .../integrations/v4/integration/definition.go | 2 +- .../v4/integration/integration.go | 5 ++ internal/integrations/v4/runner/runner.go | 6 ++ pkg/backend/commandapi/client.go | 16 ++++++ pkg/integrations/cmdrequest/handler.go | 2 + pkg/integrations/stoppable/tracker_test.go | 4 +- pkg/integrations/v4/dm/emitter.go | 9 ++- pkg/integrations/v4/dm/emitter_no_register.go | 6 +- pkg/integrations/v4/dm/testutils/testutils.go | 16 ++++++ pkg/integrations/v4/protocol/types.go | 24 ++++++++ 17 files changed, 181 insertions(+), 79 deletions(-) create mode 100644 pkg/integrations/v4/dm/testutils/testutils.go diff --git a/cmd/newrelic-infra/newrelic-infra.go b/cmd/newrelic-infra/newrelic-infra.go index 58c0c0596..69135fe52 100644 --- a/cmd/newrelic-infra/newrelic-infra.go +++ b/cmd/newrelic-infra/newrelic-infra.go @@ -231,40 +231,13 @@ func initializeAgentAndRun(c *config.Config, logFwCfg config.LogForward) error { ffManager := feature_flags.NewManager(c.Features) il := newInstancesLookup(integrationCfg) - // queues integration run requests - definitionQ := make(chan integration.Definition, 100) - - tracker := stoppable.NewTracker() - - // Command channel handlers - backoffSecsC := make(chan int, 1) // 1 won't block on initial cmd-channel fetch - boHandler := ccBackoff.NewHandler(backoffSecsC) - ffHandle := fflag.NewHandler(c, ffManager, wlog.WithComponent("FFHandler")) - ffHandler := cmdchannel.NewCmdHandler("set_feature_flag", ffHandle.Handle) - riHandler := runintegration.NewHandler(definitionQ, il, wlog.WithComponent("runintegration.Handler")) - siHandler := stopintegration.NewHandler(tracker, wlog.WithComponent("stopintegration.Handler")) - // Command channel service - ccService := service.NewService( - caClient, - c.CommandChannelIntervalSec, - backoffSecsC, - boHandler, - ffHandler, - riHandler, - siHandler, - ) - initCmdResponse, err := ccService.InitialFetch(context.Background()) - if err != nil { - aslog.WithError(err).Warn("Commands initial fetch failed.") - } - fatal := func(err error, message string) { aslog.WithError(err).Error(message) os.Exit(1) } aslog.Info("Checking network connectivity...") - err = waitForNetwork(c.CollectorURL, c.StartupConnectionTimeout, c.StartupConnectionRetries, transport) + err := waitForNetwork(c.CollectorURL, c.StartupConnectionTimeout, c.StartupConnectionRetries, transport) if err != nil { fatal(err, "Can't reach the New Relic collector.") } @@ -317,6 +290,12 @@ func initializeAgentAndRun(c *config.Config, logFwCfg config.LogForward) error { return err } + // queues integration run requests + definitionQ := make(chan integration.Definition, 100) + + // track stoppable integrations + tracker := stoppable.NewTracker() + var dmEmitter dm.Emitter if enabled, exists := ffManager.GetFeatureFlag(fflag.FlagDMRegisterEnable); exists && enabled { dmEmitter = dm.NewEmitter(agt.GetContext(), dmSender, registerClient) @@ -326,6 +305,28 @@ func initializeAgentAndRun(c *config.Config, logFwCfg config.LogForward) error { integrationEmitter := emitter.NewIntegrationEmittor(agt, dmEmitter, ffManager) integrationManager := v4.NewManager(integrationCfg, integrationEmitter, il, definitionQ, tracker) + // Command channel handlers + backoffSecsC := make(chan int, 1) // 1 won't block on initial cmd-channel fetch + boHandler := ccBackoff.NewHandler(backoffSecsC) + ffHandle := fflag.NewHandler(c, ffManager, wlog.WithComponent("FFHandler")) + ffHandler := cmdchannel.NewCmdHandler("set_feature_flag", ffHandle.Handle) + riHandler := runintegration.NewHandler(definitionQ, il, dmEmitter, wlog.WithComponent("runintegration.Handler")) + siHandler := stopintegration.NewHandler(tracker, il, dmEmitter, wlog.WithComponent("stopintegration.Handler")) + // Command channel service + ccService := service.NewService( + caClient, + c.CommandChannelIntervalSec, + backoffSecsC, + boHandler, + ffHandler, + riHandler, + siHandler, + ) + initCmdResponse, err := ccService.InitialFetch(context.Background()) + if err != nil { + aslog.WithError(err).Warn("Commands initial fetch failed.") + } + // log-forwarder fbIntCfg := v4.FBSupervisorConfig{ FluentBitExePath: c.FluentBitExePath, diff --git a/internal/agent/cmdchannel/runintegration/runintegration.go b/internal/agent/cmdchannel/runintegration/runintegration.go index 079317dda..e2f50a051 100644 --- a/internal/agent/cmdchannel/runintegration/runintegration.go +++ b/internal/agent/cmdchannel/runintegration/runintegration.go @@ -8,11 +8,15 @@ import ( "errors" "fmt" "strings" + "time" "github.com/newrelic/infrastructure-agent/internal/agent/cmdchannel" "github.com/newrelic/infrastructure-agent/internal/integrations/v4/integration" "github.com/newrelic/infrastructure-agent/pkg/backend/commandapi" + "github.com/newrelic/infrastructure-agent/pkg/fwrequest" "github.com/newrelic/infrastructure-agent/pkg/integrations/v4/config" + "github.com/newrelic/infrastructure-agent/pkg/integrations/v4/dm" + "github.com/newrelic/infrastructure-agent/pkg/integrations/v4/protocol" "github.com/newrelic/infrastructure-agent/pkg/log" "github.com/newrelic/infrastructure-agent/pkg/trace" ) @@ -29,11 +33,11 @@ type RunIntArgs struct { // Hash hashes the run-integration request, so intergation can be required to stop using same arguments. func (a *RunIntArgs) Hash() string { - return fmt.Sprintf("%s#%s", strings.TrimSpace(a.IntegrationName), strings.TrimSpace(a.IntegrationName)) + return fmt.Sprintf("%s#%v", strings.TrimSpace(a.IntegrationName), a.IntegrationArgs) } // NewHandler creates a cmd-channel handler for run-integration requests. -func NewHandler(definitionQ chan<- integration.Definition, il integration.InstancesLookup, logger log.Entry) *cmdchannel.CmdHandler { +func NewHandler(definitionQ chan<- integration.Definition, il integration.InstancesLookup, dmEmitter dm.Emitter, logger log.Entry) *cmdchannel.CmdHandler { handleF := func(ctx context.Context, cmd commandapi.Command, initialFetch bool) (err error) { trace.CmdReq("run integration request received") var args RunIntArgs @@ -47,32 +51,46 @@ func NewHandler(definitionQ chan<- integration.Definition, il integration.Instan return } - def, err := integration.NewDefinition(newConfigFromCmdChannelRunInt(args), il, nil, nil) + def, err := integration.NewDefinition(NewConfigFromCmdChannelRunInt(args), il, nil, nil) if err != nil { - logger. - WithField("cmd_id", cmd.ID). - WithField("cmd_name", cmd.Name). - WithField("cmd_args", string(cmd.Args)). - WithField("cmd_args_name", args.IntegrationName). - WithField("cmd_args_args", fmt.Sprintf("%+v", args.IntegrationArgs)). - WithError(err). - Warn("cannot create handler for cmd channel run_integration requests") + LogDecorated(logger, cmd, args).WithError(err).Warn("cannot create handler for cmd channel run_integration requests") return } def.CmdChannelHash = args.Hash() definitionQ <- def + + ev := cmd.Event(args.IntegrationName, args.IntegrationArgs) + ev["cmd_stop_hash"] = args.Hash() + NotifyPlatform(dmEmitter, def, ev) + return } return cmdchannel.NewCmdHandler("run_integration", handleF) } +func NotifyPlatform(dmEmitter dm.Emitter, def integration.Definition, ev protocol.EventData) { + ds := protocol.NewEventDataset(time.Now().UnixNano(), ev) + data := protocol.NewData("cmdapi.runintegration", "1", []protocol.Dataset{ds}) + dmEmitter.Send(fwrequest.NewFwRequest(def, nil, nil, data)) +} + // newConfigFromCmdReq creates an integration config from a command request. -func newConfigFromCmdChannelRunInt(args RunIntArgs) config.ConfigEntry { +func NewConfigFromCmdChannelRunInt(args RunIntArgs) config.ConfigEntry { // executable would be looked up by integration name return config.ConfigEntry{ InstanceName: args.IntegrationName, CLIArgs: args.IntegrationArgs, + Interval: "0", } } + +func LogDecorated(logger log.Entry, cmd commandapi.Command, args RunIntArgs) log.Entry { + return logger. + WithField("cmd_id", cmd.ID). + WithField("cmd_name", cmd.Name). + WithField("cmd_args", string(cmd.Args)). + WithField("cmd_args_name", args.IntegrationName). + WithField("cmd_args_args", fmt.Sprintf("%+v", args.IntegrationArgs)) +} diff --git a/internal/agent/cmdchannel/runintegration/runintegration_test.go b/internal/agent/cmdchannel/runintegration/runintegration_test.go index aff57678e..f9a8477c6 100644 --- a/internal/agent/cmdchannel/runintegration/runintegration_test.go +++ b/internal/agent/cmdchannel/runintegration/runintegration_test.go @@ -9,6 +9,7 @@ import ( "github.com/newrelic/infrastructure-agent/internal/agent/cmdchannel" "github.com/newrelic/infrastructure-agent/internal/integrations/v4/integration" "github.com/newrelic/infrastructure-agent/pkg/backend/commandapi" + dm "github.com/newrelic/infrastructure-agent/pkg/integrations/v4/dm/testutils" "github.com/newrelic/infrastructure-agent/pkg/log" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -19,7 +20,7 @@ var ( ) func TestHandle_returnsErrorOnMissingIntegrationName(t *testing.T) { - h := NewHandler(make(chan integration.Definition, 1), integration.ErrLookup, l) + h := NewHandler(make(chan integration.Definition, 1), integration.ErrLookup, dm.NewNoopEmitter(), l) cmdArgsMissingName := commandapi.Command{ Args: []byte(`{ "integration_args": ["foo", "bar"] }`), @@ -39,7 +40,7 @@ func TestHandle_queuesIntegrationToBeRun(t *testing.T) { return "/path/to/nri-foo", nil }, } - h := NewHandler(defQueue, il, l) + h := NewHandler(defQueue, il, dm.NewNoopEmitter(), l) cmd := commandapi.Command{ Args: []byte(`{ "integration_name": "nri-foo", "integration_args": ["bar", "baz"] }`), diff --git a/internal/agent/cmdchannel/service/service_test.go b/internal/agent/cmdchannel/service/service_test.go index 2e42b535c..8f9fcbae1 100644 --- a/internal/agent/cmdchannel/service/service_test.go +++ b/internal/agent/cmdchannel/service/service_test.go @@ -21,6 +21,7 @@ import ( "github.com/newrelic/infrastructure-agent/internal/integrations/v4/integration" http2 "github.com/newrelic/infrastructure-agent/pkg/backend/http" "github.com/newrelic/infrastructure-agent/pkg/entity" + dm "github.com/newrelic/infrastructure-agent/pkg/integrations/v4/dm/testutils" "github.com/newrelic/infrastructure-agent/pkg/log" "github.com/stretchr/testify/require" @@ -154,7 +155,7 @@ func TestSrv_InitialFetch_HandlesRunIntegration(t *testing.T) { return "/path/to/nri-foo", nil }, } - h := runintegration.NewHandler(defQueue, il, l) + h := runintegration.NewHandler(defQueue, il, dm.NewNoopEmitter(), l) s := NewService(cmdchanneltest.SuccessClient(serializedCmds), 1, make(chan int, 1), h) @@ -284,7 +285,7 @@ func TestSrv_Run_HandlesRunIntegrationAndACKs(t *testing.T) { return "/path/to/nri-foo", nil }, } - h := runintegration.NewHandler(defQueue, il, l) + h := runintegration.NewHandler(defQueue, il, dm.NewNoopEmitter(), l) cmd := ` { diff --git a/internal/agent/cmdchannel/stopintegration/stopintegration.go b/internal/agent/cmdchannel/stopintegration/stopintegration.go index eac59b874..cab6bcf7c 100644 --- a/internal/agent/cmdchannel/stopintegration/stopintegration.go +++ b/internal/agent/cmdchannel/stopintegration/stopintegration.go @@ -5,14 +5,15 @@ package stopintegration import ( "context" "encoding/json" - "fmt" "runtime" "time" "github.com/newrelic/infrastructure-agent/internal/agent/cmdchannel" "github.com/newrelic/infrastructure-agent/internal/agent/cmdchannel/runintegration" + "github.com/newrelic/infrastructure-agent/internal/integrations/v4/integration" "github.com/newrelic/infrastructure-agent/pkg/backend/commandapi" "github.com/newrelic/infrastructure-agent/pkg/integrations/stoppable" + "github.com/newrelic/infrastructure-agent/pkg/integrations/v4/dm" "github.com/newrelic/infrastructure-agent/pkg/log" "github.com/newrelic/infrastructure-agent/pkg/trace" "github.com/shirou/gopsutil/process" @@ -23,7 +24,7 @@ const ( ) // NewHandler creates a cmd-channel handler for stop-integration requests. -func NewHandler(tracker *stoppable.Tracker, l log.Entry) *cmdchannel.CmdHandler { +func NewHandler(tracker *stoppable.Tracker, il integration.InstancesLookup, dmEmitter dm.Emitter, l log.Entry) *cmdchannel.CmdHandler { handleF := func(ctx context.Context, cmd commandapi.Command, initialFetch bool) (err error) { if runtime.GOOS == "windows" { return cmdchannel.ErrOSNotSupported @@ -47,23 +48,24 @@ func NewHandler(tracker *stoppable.Tracker, l log.Entry) *cmdchannel.CmdHandler // integration isn't running if pidC == nil { if tracked { - logDecorated(l, cmd, args).Debug("integration is not running") + runintegration.LogDecorated(l, cmd, args).Debug("integration is not running") } else { - logDecorated(l, cmd, args).Warn("cannot stop non tracked integration") + runintegration.LogDecorated(l, cmd, args).Warn("cannot stop non tracked integration") } return nil } p, err := process.NewProcess(int32(<-pidC)) if err != nil { - logDecorated(l, cmd, args).WithError(err).Warn("cannot retrieve process") + runintegration.LogDecorated(l, cmd, args).WithError(err).Warn("cannot retrieve process") return } + stopModeUsed := "error" // request graceful stop (SIGTERM) err = p.TerminateWithContext(ctx) if err != nil { - logDecorated(l, cmd, args).WithError(err).Debug("cannot SIGTERM process") + runintegration.LogDecorated(l, cmd, args).WithError(err).Debug("cannot SIGTERM process") } else { // wait grace period, blocking is fine as cmd handlers run within their own goroutines. time.Sleep(terminationGracePeriod) @@ -71,13 +73,21 @@ func NewHandler(tracker *stoppable.Tracker, l log.Entry) *cmdchannel.CmdHandler isRunning, err := p.IsRunningWithContext(ctx) if err != nil { - logDecorated(l, cmd, args).WithError(err).Warn("cannot retrieve process running state") + runintegration.LogDecorated(l, cmd, args).WithError(err).Warn("cannot retrieve process running state") + } else { + stopModeUsed = "sigterm" } // force termination (SIGKILL) if isRunning || err != nil { stopped := tracker.Kill(args.Hash()) - logDecorated(l, cmd, args).WithField("stopped", stopped).Debug("integration force quit") + runintegration.LogDecorated(l, cmd, args).WithField("stopped", stopped).Debug("integration force quit") + stopModeUsed = "sigkill" + } + + // notify platform + if err = notifyPlatform(dmEmitter, il, cmd, args, stopModeUsed); err != nil { + runintegration.LogDecorated(l, cmd, args).WithError(err).Warn("cannot notify platform about command") } // no further error handling required @@ -86,14 +96,20 @@ func NewHandler(tracker *stoppable.Tracker, l log.Entry) *cmdchannel.CmdHandler return } - return cmdchannel.NewCmdHandler("run_integration", handleF) + return cmdchannel.NewCmdHandler("stop_integration", handleF) } -func logDecorated(logger log.Entry, cmd commandapi.Command, args runintegration.RunIntArgs) log.Entry { - return logger. - WithField("cmd_id", cmd.ID). - WithField("cmd_name", cmd.Name). - WithField("cmd_args", string(cmd.Args)). - WithField("cmd_args_name", args.IntegrationName). - WithField("cmd_args_args", fmt.Sprintf("%+v", args.IntegrationArgs)) +func notifyPlatform(dmEmitter dm.Emitter, il integration.InstancesLookup, cmd commandapi.Command, args runintegration.RunIntArgs, stopModeUsed string) error { + def, err := integration.NewDefinition(runintegration.NewConfigFromCmdChannelRunInt(args), il, nil, nil) + if err != nil { + return err + } + + def.CmdChannelHash = args.Hash() + ev := cmd.Event(args.IntegrationName, args.IntegrationArgs) + ev["cmd_stop_hash"] = args.Hash() + ev["cmd_stop_mode"] = stopModeUsed + runintegration.NotifyPlatform(dmEmitter, def, ev) + + return nil } diff --git a/internal/agent/cmdchannel/stopintegration/stopintegration_test.go b/internal/agent/cmdchannel/stopintegration/stopintegration_test.go index 6c9da1402..cdbf13690 100644 --- a/internal/agent/cmdchannel/stopintegration/stopintegration_test.go +++ b/internal/agent/cmdchannel/stopintegration/stopintegration_test.go @@ -11,8 +11,10 @@ import ( "github.com/newrelic/infrastructure-agent/internal/agent/cmdchannel" "github.com/newrelic/infrastructure-agent/internal/agent/cmdchannel/runintegration" + "github.com/newrelic/infrastructure-agent/internal/integrations/v4/integration" "github.com/newrelic/infrastructure-agent/pkg/backend/commandapi" "github.com/newrelic/infrastructure-agent/pkg/integrations/stoppable" + dm "github.com/newrelic/infrastructure-agent/pkg/integrations/v4/dm/testutils" "github.com/newrelic/infrastructure-agent/pkg/log" "github.com/shirou/gopsutil/process" "github.com/stretchr/testify/assert" @@ -28,7 +30,7 @@ func TestHandle_returnsErrorOnMissingName(t *testing.T) { t.Skip("CC stop-intergation is not supported on Windows") } - h := NewHandler(stoppable.NewTracker(), l) + h := NewHandler(stoppable.NewTracker(), integration.ErrLookup, dm.NewNoopEmitter(), l) cmdArgsMissingPID := commandapi.Command{ Args: []byte(`{ "integration_args": ["foo"] }`), @@ -45,7 +47,7 @@ func TestHandle_signalStopProcess(t *testing.T) { // Given a handler with an stoppables tracker tracker := stoppable.NewTracker() - h := NewHandler(tracker, l) + h := NewHandler(tracker, integration.ErrLookup, dm.NewNoopEmitter(), l) // When a process context is tracked ctx := context.Background() diff --git a/internal/integrations/v4/executor/executor.go b/internal/integrations/v4/executor/executor.go index 5d1304f62..5486e6e7a 100644 --- a/internal/integrations/v4/executor/executor.go +++ b/internal/integrations/v4/executor/executor.go @@ -45,14 +45,11 @@ func (r *Executor) Execute(ctx context.Context, pidChan chan<- int) OutputReceiv go func() { cmd := r.buildCommand(commandCtx) - //argsS := make([]string, len(cmd.Args)) - //copy(argsS, cmd.Args) - //args := helpers.ObfuscateSensitiveDataFromArray(argsS) - illog. WithField("command", r.Command). WithField("path", cmd.Path). - //WithField("args", args). + // TODO: creates weird failure on leaktest + //WithField("args", helpers.ObfuscateSensitiveDataFromArray(cmd.Args)). WithField("env", helpers.ObfuscateSensitiveDataFromArray(cmd.Env)). Debug("Running command.") diff --git a/internal/integrations/v4/integration/definition.go b/internal/integrations/v4/integration/definition.go index 6f1407308..ffc899102 100644 --- a/internal/integrations/v4/integration/definition.go +++ b/internal/integrations/v4/integration/definition.go @@ -38,7 +38,7 @@ type Definition struct { ConfigTemplate []byte // external configuration file, if provided InventorySource ids.PluginID WhenConditions []when.Condition - CmdChannelHash string // not empty: generated by command-channel "run_integration", contains name+args hash + CmdChannelHash string // not empty: generated by command-channel "run/stop_integration", contains name+args hash runnable executor.Executor newTempFile func(template []byte) (string, error) } diff --git a/internal/integrations/v4/integration/integration.go b/internal/integrations/v4/integration/integration.go index f4035102d..d6b0aedf8 100644 --- a/internal/integrations/v4/integration/integration.go +++ b/internal/integrations/v4/integration/integration.go @@ -183,6 +183,11 @@ func (d *Definition) fromName(te config2.ConfigEntry, lookup InstancesLookup) er // - If a wrong string is provided, it returns the default interval and logs a warning message // - If the provided integration is lower than the minimum allowed, it logs a warning message and returns the minimum func getInterval(duration string) time.Duration { + // zero value disables interval + if duration == "0" { + return 0 + } + if duration == "" { return defaultIntegrationInterval } diff --git a/internal/integrations/v4/runner/runner.go b/internal/integrations/v4/runner/runner.go index 6ce64cf28..8525b7b57 100644 --- a/internal/integrations/v4/runner/runner.go +++ b/internal/integrations/v4/runner/runner.go @@ -96,6 +96,12 @@ func (r *runner) Run(ctx context.Context, pidWChan chan<- int) { } } + // single run + if r.definition.Interval == 0 { + r.log.Debug("single run finished") + return + } + select { case <-ctx.Done(): r.log.Debug("Integration has been interrupted. Finishing.") diff --git a/pkg/backend/commandapi/client.go b/pkg/backend/commandapi/client.go index 01dc90598..fec2dfc11 100644 --- a/pkg/backend/commandapi/client.go +++ b/pkg/backend/commandapi/client.go @@ -11,6 +11,7 @@ import ( backendhttp "github.com/newrelic/infrastructure-agent/pkg/backend/http" "github.com/newrelic/infrastructure-agent/pkg/entity" + "github.com/newrelic/infrastructure-agent/pkg/integrations/v4/protocol" ) type Client interface { @@ -25,6 +26,21 @@ type Command struct { Args json.RawMessage `json:"arguments"` } +// Event creates an event from command. +func (c *Command) Event(integrationName string, integrationArgs []string) protocol.EventData { + return protocol.EventData{ + "eventType": "InfrastructureEvent", + "category": "notifications", + "summary": "cmd-api", + "cmd_id": c.ID, + "cmd_hash": c.Hash, + "cmd_name": c.Name, + "cmd_args": string(c.Args), + "cmd_args_name": integrationName, + "cmd_args_args": strings.Join(integrationArgs, " "), + } +} + type client struct { svcURL string licenseKey string diff --git a/pkg/integrations/cmdrequest/handler.go b/pkg/integrations/cmdrequest/handler.go index f7e5f1946..ca506583a 100644 --- a/pkg/integrations/cmdrequest/handler.go +++ b/pkg/integrations/cmdrequest/handler.go @@ -51,6 +51,7 @@ func newConfigFromCmdReq(cr protocol.CmdRequestV1Cmd) config.ConfigEntry { InstanceName: cr.Name, Exec: append([]string{cr.Command}, cr.Args...), Env: cr.Env, + Interval: "0", } } @@ -60,5 +61,6 @@ func newConfigFromCmdReq(cr protocol.CmdRequestV1Cmd) config.ConfigEntry { InstanceName: cr.Name, CLIArgs: cr.Args, Env: cr.Env, + Interval: "0", } } diff --git a/pkg/integrations/stoppable/tracker_test.go b/pkg/integrations/stoppable/tracker_test.go index a8b789628..3c984b776 100644 --- a/pkg/integrations/stoppable/tracker_test.go +++ b/pkg/integrations/stoppable/tracker_test.go @@ -28,10 +28,8 @@ func TestStoppablesTracker_Add(t *testing.T) { assert.True(t, stopped) - // once stopped context is removed from track - _, ok := s.hash2Cancel["foo"] - assert.False(t, ok) + assert.False(t, ok, "once stopped context should had been removed from track") } func TestStoppablesTracker_Kill_WontStopNonTrackedContext(t *testing.T) { diff --git a/pkg/integrations/v4/dm/emitter.go b/pkg/integrations/v4/dm/emitter.go index 2e0123004..5d6a4a902 100644 --- a/pkg/integrations/v4/dm/emitter.go +++ b/pkg/integrations/v4/dm/emitter.go @@ -221,16 +221,15 @@ func (e *emitter) processEntityFwRequest(r fwrequest.EntityFwRequest) { plugin := agent.NewExternalPluginCommon(r.Definition.PluginID(r.Integration.Name), e.agentContext, r.Definition.Name) + emitInventory(&plugin, r.Definition, r.Integration, r.ID(), r.Data, labels) + + emitEvent(&plugin, r.Definition, r.Data, labels, r.ID()) + dmProcessor := IntegrationProcessor{ IntegrationInterval: r.Definition.Interval, IntegrationLabels: labels, IntegrationExtraAnnotations: annos, } - - emitInventory(&plugin, r.Definition, r.Integration, r.ID(), r.Data, labels) - - emitEvent(&plugin, r.Definition, r.Data, labels, r.ID()) - metrics := dmProcessor.ProcessMetrics(r.Data.Metrics, r.Data.Common, r.Data.Entity) if err := e.metricsSender.SendMetricsWithCommonAttributes(r.Data.Common, metrics); err != nil { elog.WithField("entity", r.ID()).WithError(err).Warn("discarding metrics") diff --git a/pkg/integrations/v4/dm/emitter_no_register.go b/pkg/integrations/v4/dm/emitter_no_register.go index 8a8a1925b..6e67b01a0 100644 --- a/pkg/integrations/v4/dm/emitter_no_register.go +++ b/pkg/integrations/v4/dm/emitter_no_register.go @@ -42,7 +42,7 @@ func (e *nonRegisterEmitter) Send(dto fwrequest.FwRequest) { idLookup host.IDLookup, metricsSender MetricsSender, emitter agent.PluginEmitter, - metadata integration.Definition, + definition integration.Definition, integrationMetadata protocol.IntegrationMetadata, dataSet protocol.Dataset, labels map[string]string, @@ -71,7 +71,7 @@ func (e *nonRegisterEmitter) Send(dto fwrequest.FwRequest) { return fmt.Errorf("error renaming entity: %s", err.Error()) } - integrationUser := metadata.ExecutorConfig.User + integrationUser := definition.ExecutorConfig.User if len(dataSet.Inventory) > 0 { inventoryDataSet := legacy.BuildInventoryDataSet( @@ -91,7 +91,7 @@ func (e *nonRegisterEmitter) Send(dto fwrequest.FwRequest) { } dmProcessor := IntegrationProcessor{ - IntegrationInterval: metadata.Interval, + IntegrationInterval: definition.Interval, IntegrationLabels: labels, IntegrationExtraAnnotations: extraAnnotations, } diff --git a/pkg/integrations/v4/dm/testutils/testutils.go b/pkg/integrations/v4/dm/testutils/testutils.go new file mode 100644 index 000000000..660884384 --- /dev/null +++ b/pkg/integrations/v4/dm/testutils/testutils.go @@ -0,0 +1,16 @@ +// Copyright 2020 New Relic Corporation. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 +package dm + +import ( + "github.com/newrelic/infrastructure-agent/pkg/fwrequest" + "github.com/newrelic/infrastructure-agent/pkg/integrations/v4/dm" +) + +type NoopEmitter struct{} + +func (e *NoopEmitter) Send(_ fwrequest.FwRequest) {} + +func NewNoopEmitter() dm.Emitter { + return &NoopEmitter{} +} diff --git a/pkg/integrations/v4/protocol/types.go b/pkg/integrations/v4/protocol/types.go index 4be52522e..8413918df 100644 --- a/pkg/integrations/v4/protocol/types.go +++ b/pkg/integrations/v4/protocol/types.go @@ -154,6 +154,30 @@ type MetricData map[string]interface{} // EventData is the data type for single shot events type EventData map[string]interface{} +// NewData creates a payload from code instead of JSON. +func NewData(name, version string, ds []Dataset) DataV4 { + return DataV4{ + PluginProtocolVersion: PluginProtocolVersion{RawProtocolVersion: 4}, + Integration: IntegrationMetadata{ + Name: name, + Version: version, + }, + DataSets: ds, + } +} + +// NewEventDataset creates a dataset with jsut a single event. +func NewEventDataset(ts int64, event EventData) Dataset { + return Dataset{ + Common: Common{ + Timestamp: &ts, + }, + Events: []EventData{ + event, + }, + } +} + // NewEventData create a new event data from builder func func NewEventData(options ...func(EventData)) (EventData, error) { e := EventData{