Skip to content

Commit

Permalink
Instrument command API run requests (newrelic#256)
Browse files Browse the repository at this point in the history
* fix: replace command API ACK mechanism from ID to Hash

* fix: ACK each time & skip handling cmds already ACKed

* feat: instrument cmd-api run/stop requests

* feat: rename event field `stop_mode` to `cmd_stop_mode`

* feat: prefix event summary value with cmdapi.

* feat: update event summary value to "cmd-api"

* fix: stop integration handler command-subscription

* feat: log integration args

* fix: fixes CI test skipping goroutine-leak weirdly failing on sensitive-obfustaion regex

* fix: run once termination

* fix: fixes CI test skipping goroutine-leak weirdly failing on sensitive-obfustaion regex

* fix: windows CI

* fix: CI commenting debug args field
  • Loading branch information
varas authored Dec 1, 2020
1 parent ebdc814 commit 79cad20
Show file tree
Hide file tree
Showing 17 changed files with 181 additions and 79 deletions.
57 changes: 29 additions & 28 deletions cmd/newrelic-infra/newrelic-infra.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,40 +231,13 @@ func initializeAgentAndRun(c *config.Config, logFwCfg config.LogForward) error {
ffManager := feature_flags.NewManager(c.Features)
il := newInstancesLookup(integrationCfg)

// queues integration run requests
definitionQ := make(chan integration.Definition, 100)

tracker := stoppable.NewTracker()

// Command channel handlers
backoffSecsC := make(chan int, 1) // 1 won't block on initial cmd-channel fetch
boHandler := ccBackoff.NewHandler(backoffSecsC)
ffHandle := fflag.NewHandler(c, ffManager, wlog.WithComponent("FFHandler"))
ffHandler := cmdchannel.NewCmdHandler("set_feature_flag", ffHandle.Handle)
riHandler := runintegration.NewHandler(definitionQ, il, wlog.WithComponent("runintegration.Handler"))
siHandler := stopintegration.NewHandler(tracker, wlog.WithComponent("stopintegration.Handler"))
// Command channel service
ccService := service.NewService(
caClient,
c.CommandChannelIntervalSec,
backoffSecsC,
boHandler,
ffHandler,
riHandler,
siHandler,
)
initCmdResponse, err := ccService.InitialFetch(context.Background())
if err != nil {
aslog.WithError(err).Warn("Commands initial fetch failed.")
}

fatal := func(err error, message string) {
aslog.WithError(err).Error(message)
os.Exit(1)
}

aslog.Info("Checking network connectivity...")
err = waitForNetwork(c.CollectorURL, c.StartupConnectionTimeout, c.StartupConnectionRetries, transport)
err := waitForNetwork(c.CollectorURL, c.StartupConnectionTimeout, c.StartupConnectionRetries, transport)
if err != nil {
fatal(err, "Can't reach the New Relic collector.")
}
Expand Down Expand Up @@ -317,6 +290,12 @@ func initializeAgentAndRun(c *config.Config, logFwCfg config.LogForward) error {
return err
}

// queues integration run requests
definitionQ := make(chan integration.Definition, 100)

// track stoppable integrations
tracker := stoppable.NewTracker()

var dmEmitter dm.Emitter
if enabled, exists := ffManager.GetFeatureFlag(fflag.FlagDMRegisterEnable); exists && enabled {
dmEmitter = dm.NewEmitter(agt.GetContext(), dmSender, registerClient)
Expand All @@ -326,6 +305,28 @@ func initializeAgentAndRun(c *config.Config, logFwCfg config.LogForward) error {
integrationEmitter := emitter.NewIntegrationEmittor(agt, dmEmitter, ffManager)
integrationManager := v4.NewManager(integrationCfg, integrationEmitter, il, definitionQ, tracker)

// Command channel handlers
backoffSecsC := make(chan int, 1) // 1 won't block on initial cmd-channel fetch
boHandler := ccBackoff.NewHandler(backoffSecsC)
ffHandle := fflag.NewHandler(c, ffManager, wlog.WithComponent("FFHandler"))
ffHandler := cmdchannel.NewCmdHandler("set_feature_flag", ffHandle.Handle)
riHandler := runintegration.NewHandler(definitionQ, il, dmEmitter, wlog.WithComponent("runintegration.Handler"))
siHandler := stopintegration.NewHandler(tracker, il, dmEmitter, wlog.WithComponent("stopintegration.Handler"))
// Command channel service
ccService := service.NewService(
caClient,
c.CommandChannelIntervalSec,
backoffSecsC,
boHandler,
ffHandler,
riHandler,
siHandler,
)
initCmdResponse, err := ccService.InitialFetch(context.Background())
if err != nil {
aslog.WithError(err).Warn("Commands initial fetch failed.")
}

// log-forwarder
fbIntCfg := v4.FBSupervisorConfig{
FluentBitExePath: c.FluentBitExePath,
Expand Down
42 changes: 30 additions & 12 deletions internal/agent/cmdchannel/runintegration/runintegration.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@ import (
"errors"
"fmt"
"strings"
"time"

"github.com/newrelic/infrastructure-agent/internal/agent/cmdchannel"
"github.com/newrelic/infrastructure-agent/internal/integrations/v4/integration"
"github.com/newrelic/infrastructure-agent/pkg/backend/commandapi"
"github.com/newrelic/infrastructure-agent/pkg/fwrequest"
"github.com/newrelic/infrastructure-agent/pkg/integrations/v4/config"
"github.com/newrelic/infrastructure-agent/pkg/integrations/v4/dm"
"github.com/newrelic/infrastructure-agent/pkg/integrations/v4/protocol"
"github.com/newrelic/infrastructure-agent/pkg/log"
"github.com/newrelic/infrastructure-agent/pkg/trace"
)
Expand All @@ -29,11 +33,11 @@ type RunIntArgs struct {

// Hash hashes the run-integration request, so intergation can be required to stop using same arguments.
func (a *RunIntArgs) Hash() string {
return fmt.Sprintf("%s#%s", strings.TrimSpace(a.IntegrationName), strings.TrimSpace(a.IntegrationName))
return fmt.Sprintf("%s#%v", strings.TrimSpace(a.IntegrationName), a.IntegrationArgs)
}

// NewHandler creates a cmd-channel handler for run-integration requests.
func NewHandler(definitionQ chan<- integration.Definition, il integration.InstancesLookup, logger log.Entry) *cmdchannel.CmdHandler {
func NewHandler(definitionQ chan<- integration.Definition, il integration.InstancesLookup, dmEmitter dm.Emitter, logger log.Entry) *cmdchannel.CmdHandler {
handleF := func(ctx context.Context, cmd commandapi.Command, initialFetch bool) (err error) {
trace.CmdReq("run integration request received")
var args RunIntArgs
Expand All @@ -47,32 +51,46 @@ func NewHandler(definitionQ chan<- integration.Definition, il integration.Instan
return
}

def, err := integration.NewDefinition(newConfigFromCmdChannelRunInt(args), il, nil, nil)
def, err := integration.NewDefinition(NewConfigFromCmdChannelRunInt(args), il, nil, nil)
if err != nil {
logger.
WithField("cmd_id", cmd.ID).
WithField("cmd_name", cmd.Name).
WithField("cmd_args", string(cmd.Args)).
WithField("cmd_args_name", args.IntegrationName).
WithField("cmd_args_args", fmt.Sprintf("%+v", args.IntegrationArgs)).
WithError(err).
Warn("cannot create handler for cmd channel run_integration requests")
LogDecorated(logger, cmd, args).WithError(err).Warn("cannot create handler for cmd channel run_integration requests")
return
}
def.CmdChannelHash = args.Hash()

definitionQ <- def

ev := cmd.Event(args.IntegrationName, args.IntegrationArgs)
ev["cmd_stop_hash"] = args.Hash()
NotifyPlatform(dmEmitter, def, ev)

return
}

return cmdchannel.NewCmdHandler("run_integration", handleF)
}

func NotifyPlatform(dmEmitter dm.Emitter, def integration.Definition, ev protocol.EventData) {
ds := protocol.NewEventDataset(time.Now().UnixNano(), ev)
data := protocol.NewData("cmdapi.runintegration", "1", []protocol.Dataset{ds})
dmEmitter.Send(fwrequest.NewFwRequest(def, nil, nil, data))
}

// newConfigFromCmdReq creates an integration config from a command request.
func newConfigFromCmdChannelRunInt(args RunIntArgs) config.ConfigEntry {
func NewConfigFromCmdChannelRunInt(args RunIntArgs) config.ConfigEntry {
// executable would be looked up by integration name
return config.ConfigEntry{
InstanceName: args.IntegrationName,
CLIArgs: args.IntegrationArgs,
Interval: "0",
}
}

func LogDecorated(logger log.Entry, cmd commandapi.Command, args RunIntArgs) log.Entry {
return logger.
WithField("cmd_id", cmd.ID).
WithField("cmd_name", cmd.Name).
WithField("cmd_args", string(cmd.Args)).
WithField("cmd_args_name", args.IntegrationName).
WithField("cmd_args_args", fmt.Sprintf("%+v", args.IntegrationArgs))
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/newrelic/infrastructure-agent/internal/agent/cmdchannel"
"github.com/newrelic/infrastructure-agent/internal/integrations/v4/integration"
"github.com/newrelic/infrastructure-agent/pkg/backend/commandapi"
dm "github.com/newrelic/infrastructure-agent/pkg/integrations/v4/dm/testutils"
"github.com/newrelic/infrastructure-agent/pkg/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -19,7 +20,7 @@ var (
)

func TestHandle_returnsErrorOnMissingIntegrationName(t *testing.T) {
h := NewHandler(make(chan integration.Definition, 1), integration.ErrLookup, l)
h := NewHandler(make(chan integration.Definition, 1), integration.ErrLookup, dm.NewNoopEmitter(), l)

cmdArgsMissingName := commandapi.Command{
Args: []byte(`{ "integration_args": ["foo", "bar"] }`),
Expand All @@ -39,7 +40,7 @@ func TestHandle_queuesIntegrationToBeRun(t *testing.T) {
return "/path/to/nri-foo", nil
},
}
h := NewHandler(defQueue, il, l)
h := NewHandler(defQueue, il, dm.NewNoopEmitter(), l)

cmd := commandapi.Command{
Args: []byte(`{ "integration_name": "nri-foo", "integration_args": ["bar", "baz"] }`),
Expand Down
5 changes: 3 additions & 2 deletions internal/agent/cmdchannel/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/newrelic/infrastructure-agent/internal/integrations/v4/integration"
http2 "github.com/newrelic/infrastructure-agent/pkg/backend/http"
"github.com/newrelic/infrastructure-agent/pkg/entity"
dm "github.com/newrelic/infrastructure-agent/pkg/integrations/v4/dm/testutils"
"github.com/newrelic/infrastructure-agent/pkg/log"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -154,7 +155,7 @@ func TestSrv_InitialFetch_HandlesRunIntegration(t *testing.T) {
return "/path/to/nri-foo", nil
},
}
h := runintegration.NewHandler(defQueue, il, l)
h := runintegration.NewHandler(defQueue, il, dm.NewNoopEmitter(), l)

s := NewService(cmdchanneltest.SuccessClient(serializedCmds), 1, make(chan int, 1), h)

Expand Down Expand Up @@ -284,7 +285,7 @@ func TestSrv_Run_HandlesRunIntegrationAndACKs(t *testing.T) {
return "/path/to/nri-foo", nil
},
}
h := runintegration.NewHandler(defQueue, il, l)
h := runintegration.NewHandler(defQueue, il, dm.NewNoopEmitter(), l)

cmd := `
{
Expand Down
48 changes: 32 additions & 16 deletions internal/agent/cmdchannel/stopintegration/stopintegration.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ package stopintegration
import (
"context"
"encoding/json"
"fmt"
"runtime"
"time"

"github.com/newrelic/infrastructure-agent/internal/agent/cmdchannel"
"github.com/newrelic/infrastructure-agent/internal/agent/cmdchannel/runintegration"
"github.com/newrelic/infrastructure-agent/internal/integrations/v4/integration"
"github.com/newrelic/infrastructure-agent/pkg/backend/commandapi"
"github.com/newrelic/infrastructure-agent/pkg/integrations/stoppable"
"github.com/newrelic/infrastructure-agent/pkg/integrations/v4/dm"
"github.com/newrelic/infrastructure-agent/pkg/log"
"github.com/newrelic/infrastructure-agent/pkg/trace"
"github.com/shirou/gopsutil/process"
Expand All @@ -23,7 +24,7 @@ const (
)

// NewHandler creates a cmd-channel handler for stop-integration requests.
func NewHandler(tracker *stoppable.Tracker, l log.Entry) *cmdchannel.CmdHandler {
func NewHandler(tracker *stoppable.Tracker, il integration.InstancesLookup, dmEmitter dm.Emitter, l log.Entry) *cmdchannel.CmdHandler {
handleF := func(ctx context.Context, cmd commandapi.Command, initialFetch bool) (err error) {
if runtime.GOOS == "windows" {
return cmdchannel.ErrOSNotSupported
Expand All @@ -47,37 +48,46 @@ func NewHandler(tracker *stoppable.Tracker, l log.Entry) *cmdchannel.CmdHandler
// integration isn't running
if pidC == nil {
if tracked {
logDecorated(l, cmd, args).Debug("integration is not running")
runintegration.LogDecorated(l, cmd, args).Debug("integration is not running")
} else {
logDecorated(l, cmd, args).Warn("cannot stop non tracked integration")
runintegration.LogDecorated(l, cmd, args).Warn("cannot stop non tracked integration")
}
return nil
}

p, err := process.NewProcess(int32(<-pidC))
if err != nil {
logDecorated(l, cmd, args).WithError(err).Warn("cannot retrieve process")
runintegration.LogDecorated(l, cmd, args).WithError(err).Warn("cannot retrieve process")
return
}

stopModeUsed := "error"
// request graceful stop (SIGTERM)
err = p.TerminateWithContext(ctx)
if err != nil {
logDecorated(l, cmd, args).WithError(err).Debug("cannot SIGTERM process")
runintegration.LogDecorated(l, cmd, args).WithError(err).Debug("cannot SIGTERM process")
} else {
// wait grace period, blocking is fine as cmd handlers run within their own goroutines.
time.Sleep(terminationGracePeriod)
}

isRunning, err := p.IsRunningWithContext(ctx)
if err != nil {
logDecorated(l, cmd, args).WithError(err).Warn("cannot retrieve process running state")
runintegration.LogDecorated(l, cmd, args).WithError(err).Warn("cannot retrieve process running state")
} else {
stopModeUsed = "sigterm"
}

// force termination (SIGKILL)
if isRunning || err != nil {
stopped := tracker.Kill(args.Hash())
logDecorated(l, cmd, args).WithField("stopped", stopped).Debug("integration force quit")
runintegration.LogDecorated(l, cmd, args).WithField("stopped", stopped).Debug("integration force quit")
stopModeUsed = "sigkill"
}

// notify platform
if err = notifyPlatform(dmEmitter, il, cmd, args, stopModeUsed); err != nil {
runintegration.LogDecorated(l, cmd, args).WithError(err).Warn("cannot notify platform about command")
}

// no further error handling required
Expand All @@ -86,14 +96,20 @@ func NewHandler(tracker *stoppable.Tracker, l log.Entry) *cmdchannel.CmdHandler
return
}

return cmdchannel.NewCmdHandler("run_integration", handleF)
return cmdchannel.NewCmdHandler("stop_integration", handleF)
}

func logDecorated(logger log.Entry, cmd commandapi.Command, args runintegration.RunIntArgs) log.Entry {
return logger.
WithField("cmd_id", cmd.ID).
WithField("cmd_name", cmd.Name).
WithField("cmd_args", string(cmd.Args)).
WithField("cmd_args_name", args.IntegrationName).
WithField("cmd_args_args", fmt.Sprintf("%+v", args.IntegrationArgs))
func notifyPlatform(dmEmitter dm.Emitter, il integration.InstancesLookup, cmd commandapi.Command, args runintegration.RunIntArgs, stopModeUsed string) error {
def, err := integration.NewDefinition(runintegration.NewConfigFromCmdChannelRunInt(args), il, nil, nil)
if err != nil {
return err
}

def.CmdChannelHash = args.Hash()
ev := cmd.Event(args.IntegrationName, args.IntegrationArgs)
ev["cmd_stop_hash"] = args.Hash()
ev["cmd_stop_mode"] = stopModeUsed
runintegration.NotifyPlatform(dmEmitter, def, ev)

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (

"github.com/newrelic/infrastructure-agent/internal/agent/cmdchannel"
"github.com/newrelic/infrastructure-agent/internal/agent/cmdchannel/runintegration"
"github.com/newrelic/infrastructure-agent/internal/integrations/v4/integration"
"github.com/newrelic/infrastructure-agent/pkg/backend/commandapi"
"github.com/newrelic/infrastructure-agent/pkg/integrations/stoppable"
dm "github.com/newrelic/infrastructure-agent/pkg/integrations/v4/dm/testutils"
"github.com/newrelic/infrastructure-agent/pkg/log"
"github.com/shirou/gopsutil/process"
"github.com/stretchr/testify/assert"
Expand All @@ -28,7 +30,7 @@ func TestHandle_returnsErrorOnMissingName(t *testing.T) {
t.Skip("CC stop-intergation is not supported on Windows")
}

h := NewHandler(stoppable.NewTracker(), l)
h := NewHandler(stoppable.NewTracker(), integration.ErrLookup, dm.NewNoopEmitter(), l)

cmdArgsMissingPID := commandapi.Command{
Args: []byte(`{ "integration_args": ["foo"] }`),
Expand All @@ -45,7 +47,7 @@ func TestHandle_signalStopProcess(t *testing.T) {

// Given a handler with an stoppables tracker
tracker := stoppable.NewTracker()
h := NewHandler(tracker, l)
h := NewHandler(tracker, integration.ErrLookup, dm.NewNoopEmitter(), l)

// When a process context is tracked
ctx := context.Background()
Expand Down
7 changes: 2 additions & 5 deletions internal/integrations/v4/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,11 @@ func (r *Executor) Execute(ctx context.Context, pidChan chan<- int) OutputReceiv
go func() {
cmd := r.buildCommand(commandCtx)

//argsS := make([]string, len(cmd.Args))
//copy(argsS, cmd.Args)
//args := helpers.ObfuscateSensitiveDataFromArray(argsS)

illog.
WithField("command", r.Command).
WithField("path", cmd.Path).
//WithField("args", args).
// TODO: creates weird failure on leaktest
//WithField("args", helpers.ObfuscateSensitiveDataFromArray(cmd.Args)).
WithField("env", helpers.ObfuscateSensitiveDataFromArray(cmd.Env)).
Debug("Running command.")

Expand Down
2 changes: 1 addition & 1 deletion internal/integrations/v4/integration/definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Definition struct {
ConfigTemplate []byte // external configuration file, if provided
InventorySource ids.PluginID
WhenConditions []when.Condition
CmdChannelHash string // not empty: generated by command-channel "run_integration", contains name+args hash
CmdChannelHash string // not empty: generated by command-channel "run/stop_integration", contains name+args hash
runnable executor.Executor
newTempFile func(template []byte) (string, error)
}
Expand Down
5 changes: 5 additions & 0 deletions internal/integrations/v4/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ func (d *Definition) fromName(te config2.ConfigEntry, lookup InstancesLookup) er
// - If a wrong string is provided, it returns the default interval and logs a warning message
// - If the provided integration is lower than the minimum allowed, it logs a warning message and returns the minimum
func getInterval(duration string) time.Duration {
// zero value disables interval
if duration == "0" {
return 0
}

if duration == "" {
return defaultIntegrationInterval
}
Expand Down
Loading

0 comments on commit 79cad20

Please sign in to comment.