Skip to content

Commit

Permalink
feat: add dynamic tagging to gnmi plugin (influxdata#7484)
Browse files Browse the repository at this point in the history
  • Loading branch information
bewing authored Feb 7, 2022
1 parent 7f2b9c9 commit 7e769d7
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 2 deletions.
16 changes: 14 additions & 2 deletions plugins/inputs/gnmi/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,23 @@ It has been optimized to support gNMI telemetry as produced by Cisco IOS XR (64-

## If suppression is enabled, send updates at least every X seconds anyway
# heartbeat_interval = "60s"

#[[inputs.gnmi.subscription]]
# name = "descr"
# origin = "openconfig-interfaces"
# path = "/interfaces/interface/state/description"
# subscription_mode = "on_change"

## If tag_only is set, the subscription in question will be utilized to maintain a map of
## tags to apply to other measurements emitted by the plugin, by matching path keys
## All fields from the tag-only subscription will be applied as tags to other readings,
## in the format <name>_<fieldBase>.
# tag_only = true
```

## Example Output

```shell
ifcounters,path=openconfig-interfaces:/interfaces/interface/state/counters,host=linux,name=MgmtEth0/RP0/CPU0/0,source=10.49.234.115 in-multicast-pkts=0i,out-multicast-pkts=0i,out-errors=0i,out-discards=0i,in-broadcast-pkts=0i,out-broadcast-pkts=0i,in-discards=0i,in-unknown-protos=0i,in-errors=0i,out-unicast-pkts=0i,in-octets=0i,out-octets=0i,last-clear="2019-05-22T16:53:21Z",in-unicast-pkts=0i 1559145777425000000
ifcounters,path=openconfig-interfaces:/interfaces/interface/state/counters,host=linux,name=GigabitEthernet0/0/0/0,source=10.49.234.115 out-multicast-pkts=0i,out-broadcast-pkts=0i,in-errors=0i,out-errors=0i,in-discards=0i,out-octets=0i,in-unknown-protos=0i,in-unicast-pkts=0i,in-octets=0i,in-multicast-pkts=0i,in-broadcast-pkts=0i,last-clear="2019-05-22T16:54:50Z",out-unicast-pkts=0i,out-discards=0i 1559145777425000000
ifcounters,path=openconfig-interfaces:/interfaces/interface/state/counters,host=linux,name=MgmtEth0/RP0/CPU0/0,source=10.49.234.115,descr/description=Foo in-multicast-pkts=0i,out-multicast-pkts=0i,out-errors=0i,out-discards=0i,in-broadcast-pkts=0i,out-broadcast-pkts=0i,in-discards=0i,in-unknown-protos=0i,in-errors=0i,out-unicast-pkts=0i,in-octets=0i,out-octets=0i,last-clear="2019-05-22T16:53:21Z",in-unicast-pkts=0i 1559145777425000000
ifcounters,path=openconfig-interfaces:/interfaces/interface/state/counters,host=linux,name=GigabitEthernet0/0/0/0,source=10.49.234.115,descr/description=Bar out-multicast-pkts=0i,out-broadcast-pkts=0i,in-errors=0i,out-errors=0i,in-discards=0i,out-octets=0i,in-unknown-protos=0i,in-unicast-pkts=0i,in-octets=0i,in-multicast-pkts=0i,in-broadcast-pkts=0i,last-clear="2019-05-22T16:54:50Z",out-unicast-pkts=0i,out-discards=0i 1559145777425000000
```
46 changes: 46 additions & 0 deletions plugins/inputs/gnmi/gnmi.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ type GNMI struct {
acc telegraf.Accumulator
cancel context.CancelFunc
wg sync.WaitGroup
// Lookup/device+name/key/value
lookup map[string]map[string]map[string]interface{}

Log telegraf.Logger
}
Expand All @@ -73,6 +75,9 @@ type Subscription struct {
// Duplicate suppression
SuppressRedundant bool `toml:"suppress_redundant"`
HeartbeatInterval config.Duration `toml:"heartbeat_interval"`

// Mark this subscription as a tag-only lookup source, not emitting any metric
TagOnly bool `toml:"tag_only"`
}

// Start the http listener service
Expand All @@ -83,6 +88,7 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error {
var request *gnmiLib.SubscribeRequest
c.acc = acc
ctx, c.cancel = context.WithCancel(context.Background())
c.lookup = make(map[string]map[string]map[string]interface{})

// Validate configuration
if request, err = c.newSubscribeRequest(); err != nil {
Expand Down Expand Up @@ -133,6 +139,11 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error {
c.internalAliases[longPath] = name
c.internalAliases[shortPath] = name
}

if subscription.TagOnly {
// Create the top-level lookup for this tag
c.lookup[name] = make(map[string]map[string]interface{})
}
}
for alias, encodingPath := range c.Aliases {
c.internalAliases[encodingPath] = alias
Expand Down Expand Up @@ -297,6 +308,29 @@ func (c *GNMI) handleSubscribeResponseUpdate(address string, response *gnmiLib.S
}
}

// Update tag lookups and discard rest of update
subscriptionKey := tags["source"] + "/" + tags["name"]
if _, ok := c.lookup[name]; ok {
// We are subscribed to this, so add the fields to the lookup-table
if _, ok := c.lookup[name][subscriptionKey]; !ok {
c.lookup[name][subscriptionKey] = make(map[string]interface{})
}
for k, v := range fields {
c.lookup[name][subscriptionKey][path.Base(k)] = v
}
// Do not process the data further as we only subscribed here for the lookup table
continue
}

// Apply lookups if present
for subscriptionName, values := range c.lookup {
if annotations, ok := values[subscriptionKey]; ok {
for k, v := range annotations {
tags[subscriptionName+"/"+k] = v.(string)
}
}
}

// Group metrics
for k, v := range fields {
key := k
Expand Down Expand Up @@ -559,6 +593,18 @@ const sampleConfig = `
## If suppression is enabled, send updates at least every X seconds anyway
# heartbeat_interval = "60s"
#[[inputs.gnmi.subscription]]
# name = "descr"
# origin = "openconfig-interfaces"
# path = "/interfaces/interface/state/description"
# subscription_mode = "on_change"
## If tag_only is set, the subscription in question will be utilized to maintain a map of
## tags to apply to other measurements emitted by the plugin, by matching path keys
## All fields from the tag-only subscription will be applied as tags to other readings,
## in the format <name>_<fieldBase>.
# tag_only = true
`

// SampleConfig of plugin
Expand Down
120 changes: 120 additions & 0 deletions plugins/inputs/gnmi/gnmi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,126 @@ func TestNotification(t *testing.T) {
),
},
},
{
name: "tagged update pair",
plugin: &GNMI{
Log: testutil.Logger{},
Encoding: "proto",
Redial: config.Duration(1 * time.Second),
Subscriptions: []Subscription{
{
Name: "oc-intf-desc",
Origin: "openconfig-interfaces",
Path: "/interfaces/interface/state/description",
SubscriptionMode: "on_change",
TagOnly: true,
},
{
Name: "oc-intf-counters",
Origin: "openconfig-interfaces",
Path: "/interfaces/interface/state/counters",
SubscriptionMode: "sample",
},
},
},
server: &MockServer{
SubscribeF: func(server gnmiLib.GNMI_SubscribeServer) error {
tagResponse := &gnmiLib.SubscribeResponse{
Response: &gnmiLib.SubscribeResponse_Update{
Update: &gnmiLib.Notification{
Timestamp: 1543236571000000000,
Prefix: &gnmiLib.Path{},
Update: []*gnmiLib.Update{
{
Path: &gnmiLib.Path{
Origin: "",
Elem: []*gnmiLib.PathElem{
{
Name: "interfaces",
},
{
Name: "interface",
Key: map[string]string{"name": "Ethernet1"},
},
{
Name: "state",
},
{
Name: "description",
},
},
Target: "",
},
Val: &gnmiLib.TypedValue{
Value: &gnmiLib.TypedValue_StringVal{StringVal: "foo"},
},
},
},
},
},
}
if err := server.Send(tagResponse); err != nil {
return err
}
if err := server.Send(&gnmiLib.SubscribeResponse{Response: &gnmiLib.SubscribeResponse_SyncResponse{SyncResponse: true}}); err != nil {
return err
}
taggedResponse := &gnmiLib.SubscribeResponse{
Response: &gnmiLib.SubscribeResponse_Update{
Update: &gnmiLib.Notification{
Timestamp: 1543236572000000000,
Prefix: &gnmiLib.Path{},
Update: []*gnmiLib.Update{
{
Path: &gnmiLib.Path{
Origin: "",
Elem: []*gnmiLib.PathElem{
{
Name: "interfaces",
},
{
Name: "interface",
Key: map[string]string{"name": "Ethernet1"},
},
{
Name: "state",
},
{
Name: "counters",
},
{
Name: "in-broadcast-pkts",
},
},
Target: "",
},
Val: &gnmiLib.TypedValue{
Value: &gnmiLib.TypedValue_IntVal{IntVal: 42},
},
},
},
},
},
}
return server.Send(taggedResponse)
},
},
expected: []telegraf.Metric{
testutil.MustMetric(
"oc-intf-counters",
map[string]string{
"path": "",
"source": "127.0.0.1",
"name": "Ethernet1",
"oc-intf-desc/description": "foo",
},
map[string]interface{}{
"in_broadcast_pkts": 42,
},
time.Unix(0, 0),
),
},
},
}

for _, tt := range tests {
Expand Down

0 comments on commit 7e769d7

Please sign in to comment.