Skip to content

Commit

Permalink
Collect k8s endpoints, ingress, and services in kube_inventory plugin (
Browse files Browse the repository at this point in the history
  • Loading branch information
glinton authored and danielnelson committed Jul 19, 2019
1 parent bdb4598 commit 877c423
Show file tree
Hide file tree
Showing 10 changed files with 709 additions and 0 deletions.
2 changes: 2 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions plugins/inputs/kube_inventory/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/ericchiang/k8s/apis/apps/v1beta1"
"github.com/ericchiang/k8s/apis/apps/v1beta2"
"github.com/ericchiang/k8s/apis/core/v1"
v1beta1EXT "github.com/ericchiang/k8s/apis/extensions/v1beta1"

"github.com/influxdata/telegraf/internal/tls"
)
Expand Down Expand Up @@ -61,6 +62,20 @@ func (c *client) getDeployments(ctx context.Context) (*v1beta1.DeploymentList, e
return list, c.List(ctx, c.namespace, list)
}

func (c *client) getEndpoints(ctx context.Context) (*v1.EndpointsList, error) {
list := new(v1.EndpointsList)
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
return list, c.List(ctx, c.namespace, list)
}

func (c *client) getIngress(ctx context.Context) (*v1beta1EXT.IngressList, error) {
list := new(v1beta1EXT.IngressList)
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
return list, c.List(ctx, c.namespace, list)
}

func (c *client) getNodes(ctx context.Context) (*v1.NodeList, error) {
list := new(v1.NodeList)
ctx, cancel := context.WithTimeout(ctx, c.timeout)
Expand Down Expand Up @@ -89,6 +104,13 @@ func (c *client) getPods(ctx context.Context) (*v1.PodList, error) {
return list, c.List(ctx, c.namespace, list)
}

func (c *client) getServices(ctx context.Context) (*v1.ServiceList, error) {
list := new(v1.ServiceList)
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
return list, c.List(ctx, c.namespace, list)
}

func (c *client) getStatefulSets(ctx context.Context) (*v1beta1.StatefulSetList, error) {
list := new(v1beta1.StatefulSetList)
ctx, cancel := context.WithTimeout(ctx, c.timeout)
Expand Down
8 changes: 8 additions & 0 deletions plugins/inputs/kube_inventory/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"
"time"

"github.com/ericchiang/k8s/util/intstr"
"github.com/influxdata/telegraf/internal/tls"
)

Expand All @@ -27,6 +28,13 @@ func toBoolPtr(b bool) *bool {
return &b
}

func toIntStrPtrS(s string) *intstr.IntOrString {
return &intstr.IntOrString{StrVal: &s}
}

func toIntStrPtrI(i int32) *intstr.IntOrString {
return &intstr.IntOrString{IntVal: &i}
}
func TestNewClient(t *testing.T) {
_, err := newClient("https://127.0.0.1:443/", "default", "abc123", time.Second, tls.ClientConfig{})
if err != nil {
Expand Down
82 changes: 82 additions & 0 deletions plugins/inputs/kube_inventory/endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package kube_inventory

import (
"context"
"strings"
"time"

"github.com/ericchiang/k8s/apis/core/v1"

"github.com/influxdata/telegraf"
)

func collectEndpoints(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory) {
list, err := ki.client.getEndpoints(ctx)
if err != nil {
acc.AddError(err)
return
}
for _, i := range list.Items {
if err = ki.gatherEndpoint(*i, acc); err != nil {
acc.AddError(err)
return
}
}
}

func (ki *KubernetesInventory) gatherEndpoint(e v1.Endpoints, acc telegraf.Accumulator) error {
if e.Metadata.CreationTimestamp.GetSeconds() == 0 && e.Metadata.CreationTimestamp.GetNanos() == 0 {
return nil
}

fields := map[string]interface{}{
"created": time.Unix(e.Metadata.CreationTimestamp.GetSeconds(), int64(e.Metadata.CreationTimestamp.GetNanos())).UnixNano(),
"generation": e.Metadata.GetGeneration(),
}

tags := map[string]string{
"endpoint_name": e.Metadata.GetName(),
"namespace": e.Metadata.GetNamespace(),
}

for _, endpoint := range e.GetSubsets() {
for _, readyAddr := range endpoint.GetAddresses() {
fields["ready"] = true

tags["hostname"] = readyAddr.GetHostname()
tags["node_name"] = readyAddr.GetNodeName()
if readyAddr.TargetRef != nil {
tags[strings.ToLower(readyAddr.GetTargetRef().GetKind())] = readyAddr.GetTargetRef().GetName()
}

for _, port := range endpoint.GetPorts() {
fields["port"] = port.GetPort()

tags["port_name"] = port.GetName()
tags["port_protocol"] = port.GetProtocol()

acc.AddFields(endpointMeasurement, fields, tags)
}
}
for _, notReadyAddr := range endpoint.GetNotReadyAddresses() {
fields["ready"] = false

tags["hostname"] = notReadyAddr.GetHostname()
tags["node_name"] = notReadyAddr.GetNodeName()
if notReadyAddr.TargetRef != nil {
tags[strings.ToLower(notReadyAddr.GetTargetRef().GetKind())] = notReadyAddr.GetTargetRef().GetName()
}

for _, port := range endpoint.GetPorts() {
fields["port"] = port.GetPort()

tags["port_name"] = port.GetName()
tags["port_protocol"] = port.GetProtocol()

acc.AddFields(endpointMeasurement, fields, tags)
}
}
}

return nil
}
194 changes: 194 additions & 0 deletions plugins/inputs/kube_inventory/endpoint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package kube_inventory

import (
"testing"
"time"

"github.com/ericchiang/k8s/apis/core/v1"
metav1 "github.com/ericchiang/k8s/apis/meta/v1"
"github.com/influxdata/telegraf/testutil"
)

func TestEndpoint(t *testing.T) {
cli := &client{}

now := time.Now()
now = time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 1, 36, 0, now.Location())

tests := []struct {
name string
handler *mockHandler
output *testutil.Accumulator
hasError bool
}{
{
name: "no endpoints",
handler: &mockHandler{
responseMap: map[string]interface{}{
"/endpoints/": &v1.EndpointsList{},
},
},
hasError: false,
},
{
name: "collect ready endpoints",
handler: &mockHandler{
responseMap: map[string]interface{}{
"/endpoints/": &v1.EndpointsList{
Items: []*v1.Endpoints{
{
Subsets: []*v1.EndpointSubset{
{
Addresses: []*v1.EndpointAddress{
{
Hostname: toStrPtr("storage-6"),
NodeName: toStrPtr("b.storage.internal"),
TargetRef: &v1.ObjectReference{
Kind: toStrPtr("pod"),
Name: toStrPtr("storage-6"),
},
},
},
Ports: []*v1.EndpointPort{
{
Name: toStrPtr("server"),
Protocol: toStrPtr("TCP"),
Port: toInt32Ptr(8080),
},
},
},
},
Metadata: &metav1.ObjectMeta{
Generation: toInt64Ptr(12),
Namespace: toStrPtr("ns1"),
Name: toStrPtr("storage"),
CreationTimestamp: &metav1.Time{Seconds: toInt64Ptr(now.Unix())},
},
},
},
},
},
},
output: &testutil.Accumulator{
Metrics: []*testutil.Metric{
{
Fields: map[string]interface{}{
"ready": true,
"port": int32(8080),
"generation": int64(12),
"created": now.UnixNano(),
},
Tags: map[string]string{
"endpoint_name": "storage",
"namespace": "ns1",
"hostname": "storage-6",
"node_name": "b.storage.internal",
"port_name": "server",
"port_protocol": "TCP",
"pod": "storage-6",
},
},
},
},
hasError: false,
},
{
name: "collect notready endpoints",
handler: &mockHandler{
responseMap: map[string]interface{}{
"/endpoints/": &v1.EndpointsList{
Items: []*v1.Endpoints{
{
Subsets: []*v1.EndpointSubset{
{
NotReadyAddresses: []*v1.EndpointAddress{
{
Hostname: toStrPtr("storage-6"),
NodeName: toStrPtr("b.storage.internal"),
TargetRef: &v1.ObjectReference{
Kind: toStrPtr("pod"),
Name: toStrPtr("storage-6"),
},
},
},
Ports: []*v1.EndpointPort{
{
Name: toStrPtr("server"),
Protocol: toStrPtr("TCP"),
Port: toInt32Ptr(8080),
},
},
},
},
Metadata: &metav1.ObjectMeta{
Generation: toInt64Ptr(12),
Namespace: toStrPtr("ns1"),
Name: toStrPtr("storage"),
CreationTimestamp: &metav1.Time{Seconds: toInt64Ptr(now.Unix())},
},
},
},
},
},
},
output: &testutil.Accumulator{
Metrics: []*testutil.Metric{
{
Fields: map[string]interface{}{
"ready": false,
"port": int32(8080),
"generation": int64(12),
"created": now.UnixNano(),
},
Tags: map[string]string{
"endpoint_name": "storage",
"namespace": "ns1",
"hostname": "storage-6",
"node_name": "b.storage.internal",
"port_name": "server",
"port_protocol": "TCP",
"pod": "storage-6",
},
},
},
},
hasError: false,
},
}

for _, v := range tests {
ks := &KubernetesInventory{
client: cli,
}
acc := new(testutil.Accumulator)
for _, endpoint := range ((v.handler.responseMap["/endpoints/"]).(*v1.EndpointsList)).Items {
err := ks.gatherEndpoint(*endpoint, acc)
if err != nil {
t.Errorf("Failed to gather endpoint - %s", err.Error())
}
}

err := acc.FirstError()
if err == nil && v.hasError {
t.Fatalf("%s failed, should have error", v.name)
} else if err != nil && !v.hasError {
t.Fatalf("%s failed, err: %v", v.name, err)
}
if v.output == nil && len(acc.Metrics) > 0 {
t.Fatalf("%s: collected extra data", v.name)
} else if v.output != nil && len(v.output.Metrics) > 0 {
for i := range v.output.Metrics {
for k, m := range v.output.Metrics[i].Tags {
if acc.Metrics[i].Tags[k] != m {
t.Fatalf("%s: tag %s metrics unmatch Expected %s, got '%v'\n", v.name, k, m, acc.Metrics[i].Tags[k])
}
}
for k, m := range v.output.Metrics[i].Fields {
if acc.Metrics[i].Fields[k] != m {
t.Fatalf("%s: field %s metrics unmatch Expected %v(%T), got %v(%T)\n", v.name, k, m, m, acc.Metrics[i].Fields[k], acc.Metrics[i].Fields[k])
}
}
}
}
}
}
Loading

0 comments on commit 877c423

Please sign in to comment.