Skip to content

Commit

Permalink
New DPDK input plugin (influxdata#8883)
Browse files Browse the repository at this point in the history
  • Loading branch information
p-zak authored Apr 28, 2021
1 parent d181b43 commit ff2992e
Show file tree
Hide file tree
Showing 11 changed files with 1,609 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ For documentation on the latest development code see the [documentation index][d
* [docker](./plugins/inputs/docker)
* [docker_log](./plugins/inputs/docker_log)
* [dovecot](./plugins/inputs/dovecot)
* [dpdk](./plugins/inputs/dpdk)
* [aws ecs](./plugins/inputs/ecs) (Amazon Elastic Container Service, Fargate)
* [elasticsearch](./plugins/inputs/elasticsearch)
* [ethtool](./plugins/inputs/ethtool)
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/docker"
_ "github.com/influxdata/telegraf/plugins/inputs/docker_log"
_ "github.com/influxdata/telegraf/plugins/inputs/dovecot"
_ "github.com/influxdata/telegraf/plugins/inputs/dpdk"
_ "github.com/influxdata/telegraf/plugins/inputs/ecs"
_ "github.com/influxdata/telegraf/plugins/inputs/elasticsearch"
_ "github.com/influxdata/telegraf/plugins/inputs/ethtool"
Expand Down
200 changes: 200 additions & 0 deletions plugins/inputs/dpdk/README.md

Large diffs are not rendered by default.

263 changes: 263 additions & 0 deletions plugins/inputs/dpdk/dpdk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
// +build linux

package dpdk

import (
"encoding/json"
"fmt"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/plugins/inputs"
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
)

const (
description = "Reads metrics from DPDK applications using v2 telemetry interface."
sampleConfig = `
## Path to DPDK telemetry socket. This shall point to v2 version of DPDK telemetry interface.
# socket_path = "/var/run/dpdk/rte/dpdk_telemetry.v2"
## Duration that defines how long the connected socket client will wait for a response before terminating connection.
## This includes both writing to and reading from socket. Since it's local socket access
## to a fast packet processing application, the timeout should be sufficient for most users.
## Setting the value to 0 disables the timeout (not recommended)
# socket_access_timeout = "200ms"
## Enables telemetry data collection for selected device types.
## Adding "ethdev" enables collection of telemetry from DPDK NICs (stats, xstats, link_status).
## Adding "rawdev" enables collection of telemetry from DPDK Raw Devices (xstats).
# device_types = ["ethdev"]
## List of custom, application-specific telemetry commands to query
## The list of available commands depend on the application deployed. Applications can register their own commands
## via telemetry library API http://doc.dpdk.org/guides/prog_guide/telemetry_lib.html#registering-commands
## For e.g. L3 Forwarding with Power Management Sample Application this could be:
## additional_commands = ["/l3fwd-power/stats"]
# additional_commands = []
## Allows turning off collecting data for individual "ethdev" commands.
## Remove "/ethdev/link_status" from list to start getting link status metrics.
[inputs.dpdk.ethdev]
exclude_commands = ["/ethdev/link_status"]
## When running multiple instances of the plugin it's recommended to add a unique tag to each instance to identify
## metrics exposed by an instance of DPDK application. This is useful when multiple DPDK apps run on a single host.
## [inputs.dpdk.tags]
## dpdk_instance = "my-fwd-app"
`
defaultPathToSocket = "/var/run/dpdk/rte/dpdk_telemetry.v2"
defaultAccessTimeout = config.Duration(200 * time.Millisecond)
maxCommandLength = 56
maxCommandLengthWithParams = 1024
pluginName = "dpdk"
ethdevListCommand = "/ethdev/list"
rawdevListCommand = "/rawdev/list"
)

type dpdk struct {
SocketPath string `toml:"socket_path"`
AccessTimeout config.Duration `toml:"socket_access_timeout"`
DeviceTypes []string `toml:"device_types"`
EthdevConfig ethdevConfig `toml:"ethdev"`
AdditionalCommands []string `toml:"additional_commands"`
Log telegraf.Logger `toml:"-"`

connector *dpdkConnector
rawdevCommands []string
ethdevCommands []string
ethdevExcludedCommandsFilter filter.Filter
}

type ethdevConfig struct {
EthdevExcludeCommands []string `toml:"exclude_commands"`
}

func init() {
inputs.Add(pluginName, func() telegraf.Input {
dpdk := &dpdk{
// Setting it here (rather than in `Init()`) to distinguish between "zero" value,
// default value and don't having value in config at all.
AccessTimeout: defaultAccessTimeout,
}
return dpdk
})
}

func (dpdk *dpdk) SampleConfig() string {
return sampleConfig
}

func (dpdk *dpdk) Description() string {
return description
}

// Performs validation of all parameters from configuration
func (dpdk *dpdk) Init() error {
if dpdk.SocketPath == "" {
dpdk.SocketPath = defaultPathToSocket
dpdk.Log.Debugf("using default '%v' path for socket_path", defaultPathToSocket)
}

if dpdk.DeviceTypes == nil {
dpdk.DeviceTypes = []string{"ethdev"}
}

var err error
if err = isSocket(dpdk.SocketPath); err != nil {
return err
}

dpdk.rawdevCommands = []string{"/rawdev/xstats"}
dpdk.ethdevCommands = []string{"/ethdev/stats", "/ethdev/xstats", "/ethdev/link_status"}

if err = dpdk.validateCommands(); err != nil {
return err
}

if dpdk.AccessTimeout < 0 {
return fmt.Errorf("socket_access_timeout should be positive number or equal to 0 (to disable timeouts)")
}

if len(dpdk.AdditionalCommands) == 0 && len(dpdk.DeviceTypes) == 0 {
return fmt.Errorf("plugin was configured with nothing to read")
}

dpdk.ethdevExcludedCommandsFilter, err = filter.Compile(dpdk.EthdevConfig.EthdevExcludeCommands)
if err != nil {
return fmt.Errorf("error occurred during filter prepation for ethdev excluded commands - %v", err)
}

dpdk.connector = newDpdkConnector(dpdk.SocketPath, dpdk.AccessTimeout)
initMessage, err := dpdk.connector.connect()
if initMessage != nil {
dpdk.Log.Debugf("Successfully connected to %v running as process with PID %v with len %v",
initMessage.Version, initMessage.Pid, initMessage.MaxOutputLen)
}
return err
}

// Checks that user-supplied commands are unique and match DPDK commands format
func (dpdk *dpdk) validateCommands() error {
dpdk.AdditionalCommands = uniqueValues(dpdk.AdditionalCommands)

for _, commandWithParams := range dpdk.AdditionalCommands {
if len(commandWithParams) == 0 {
return fmt.Errorf("got empty command")
}

if commandWithParams[0] != '/' {
return fmt.Errorf("'%v' command should start with '/'", commandWithParams)
}

if commandWithoutParams := stripParams(commandWithParams); len(commandWithoutParams) >= maxCommandLength {
return fmt.Errorf("'%v' command is too long. It shall be less than %v characters", commandWithoutParams, maxCommandLength)
}

if len(commandWithParams) >= maxCommandLengthWithParams {
return fmt.Errorf("command with parameters '%v' shall be less than %v characters", commandWithParams, maxCommandLengthWithParams)
}
}

return nil
}

// Gathers all unique commands and processes each command sequentially
// Parallel processing could be achieved by running several instances of this plugin with different settings
func (dpdk *dpdk) Gather(acc telegraf.Accumulator) error {
// This needs to be done during every `Gather(...)`, because DPDK can be restarted between consecutive
// `Gather(...)` cycles which can cause that it will be exposing different set of metrics.
commands := dpdk.gatherCommands(acc)

for _, command := range commands {
dpdk.processCommand(acc, command)
}

return nil
}

// Gathers all unique commands
func (dpdk *dpdk) gatherCommands(acc telegraf.Accumulator) []string {
var commands []string
if choice.Contains("ethdev", dpdk.DeviceTypes) {
ethdevCommands := removeSubset(dpdk.ethdevCommands, dpdk.ethdevExcludedCommandsFilter)
ethdevCommands, err := dpdk.appendCommandsWithParamsFromList(ethdevListCommand, ethdevCommands)
if err != nil {
acc.AddError(fmt.Errorf("error occurred during fetching of %v params - %v", ethdevListCommand, err))
}

commands = append(commands, ethdevCommands...)
}

if choice.Contains("rawdev", dpdk.DeviceTypes) {
rawdevCommands, err := dpdk.appendCommandsWithParamsFromList(rawdevListCommand, dpdk.rawdevCommands)
if err != nil {
acc.AddError(fmt.Errorf("error occurred during fetching of %v params - %v", rawdevListCommand, err))
}

commands = append(commands, rawdevCommands...)
}

commands = append(commands, dpdk.AdditionalCommands...)
return uniqueValues(commands)
}

// Fetches all identifiers of devices and then creates all possible combinations of commands for each device
func (dpdk *dpdk) appendCommandsWithParamsFromList(listCommand string, commands []string) ([]string, error) {
response, err := dpdk.connector.getCommandResponse(listCommand)
if err != nil {
return nil, err
}

params, err := jsonToArray(response, listCommand)
if err != nil {
return nil, err
}

result := make([]string, 0, len(commands)*len(params))
for _, command := range commands {
for _, param := range params {
result = append(result, commandWithParams(command, param))
}
}

return result, nil
}

// Executes command, parses response and creates/writes metric from response
func (dpdk *dpdk) processCommand(acc telegraf.Accumulator, commandWithParams string) {
buf, err := dpdk.connector.getCommandResponse(commandWithParams)
if err != nil {
acc.AddError(err)
return
}

var parsedResponse map[string]interface{}
err = json.Unmarshal(buf, &parsedResponse)
if err != nil {
acc.AddError(fmt.Errorf("failed to unmarshall json response from %v command - %v", commandWithParams, err))
return
}

command := stripParams(commandWithParams)
value := parsedResponse[command]
if isEmpty(value) {
acc.AddError(fmt.Errorf("got empty json on '%v' command", commandWithParams))
return
}

jf := jsonparser.JSONFlattener{}
err = jf.FullFlattenJSON("", value, true, true)
if err != nil {
acc.AddError(fmt.Errorf("failed to flatten response - %v", err))
return
}

acc.AddFields(pluginName, jf.Fields, map[string]string{
"command": command,
"params": getParams(commandWithParams),
})
}
Loading

0 comments on commit ff2992e

Please sign in to comment.