Skip to content

Commit

Permalink
✨ Operator endpoints for cluster insights (#1312)
Browse files Browse the repository at this point in the history
  • Loading branch information
MatiasFrank authored Nov 1, 2024
1 parent bb2f76a commit 9822b61
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 0 deletions.
7 changes: 7 additions & 0 deletions cmd/rig-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"connectrpc.com/grpcreflect"
"github.com/go-logr/logr"
"github.com/rigdev/rig-go-api/operator/api/v1/capabilities/capabilitiesconnect"
"github.com/rigdev/rig-go-api/operator/api/v1/cluster/clusterconnect"
"github.com/rigdev/rig-go-api/operator/api/v1/pipeline/pipelineconnect"
"github.com/rigdev/rig/cmd/rig-operator/apichecker"
"github.com/rigdev/rig/cmd/rig-operator/certgen"
Expand All @@ -23,10 +24,12 @@ import (
"github.com/rigdev/rig/pkg/build"
"github.com/rigdev/rig/pkg/controller/plugin"
"github.com/rigdev/rig/pkg/handler/api/capabilities"
"github.com/rigdev/rig/pkg/handler/api/cluster"
"github.com/rigdev/rig/pkg/handler/api/pipeline"
"github.com/rigdev/rig/pkg/manager"
"github.com/rigdev/rig/pkg/scheme"
svccapabilities "github.com/rigdev/rig/pkg/service/capabilities"
svccluster "github.com/rigdev/rig/pkg/service/cluster"
"github.com/rigdev/rig/pkg/service/config"
"github.com/rigdev/rig/pkg/service/objectstatus"
svcpipeline "github.com/rigdev/rig/pkg/service/pipeline"
Expand Down Expand Up @@ -132,6 +135,8 @@ func run(cmd *cobra.Command, _ []string) error {
svcpipeline.NewService,
objectstatus.NewService,
pipeline.NewHandler,
svccluster.New,
cluster.NewHandler,
manager.New,
),
fx.Invoke(
Expand All @@ -143,10 +148,12 @@ func run(cmd *cobra.Command, _ []string) error {
sh fx.Shutdowner,
cap capabilitiesconnect.ServiceHandler,
pip pipelineconnect.ServiceHandler,
cluster clusterconnect.ServiceHandler,
) {
mux := http.NewServeMux()
mux.Handle(capabilitiesconnect.NewServiceHandler(cap))
mux.Handle(pipelineconnect.NewServiceHandler(pip))
mux.Handle(clusterconnect.NewServiceHandler(cluster))
mux.Handle(grpcreflect.NewHandlerV1(grpcreflect.NewStaticReflector(
capabilitiesconnect.ServiceName,
pipelineconnect.ServiceName,
Expand Down
9 changes: 9 additions & 0 deletions deploy/charts/rig-operator/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ rules:
- secrets
- pods
- events
- nodes
verbs:
- get
- list
Expand Down Expand Up @@ -153,6 +154,14 @@ rules:
- patch
- update
- watch
- apiGroups:
- metrics.k8s.io
resources:
- nodes
verbs:
- get
- list
- watch
{{- range .Values.rbac.rules }}
- apiGroups:
{{- range .apiGroups }}
Expand Down
48 changes: 48 additions & 0 deletions pkg/handler/api/cluster/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package cluster

import (
"context"

"connectrpc.com/connect"
api_cluster "github.com/rigdev/rig-go-api/operator/api/v1/cluster"
"github.com/rigdev/rig-go-api/operator/api/v1/cluster/clusterconnect"
"github.com/rigdev/rig/pkg/service/cluster"
)

func NewHandler(
cluster cluster.Service,
) clusterconnect.ServiceHandler {
return &handler{
cluster: cluster,
}
}

type handler struct {
cluster cluster.Service
}

func (h *handler) GetNodes(
ctx context.Context, _ *connect.Request[api_cluster.GetNodesRequest],
) (*connect.Response[api_cluster.GetNodesResponse], error) {
nodes, err := h.cluster.GetNodes(ctx)
if err != nil {
return nil, err
}

return connect.NewResponse(&api_cluster.GetNodesResponse{
Nodes: nodes,
}), nil
}

func (h *handler) GetNodePods(
ctx context.Context, req *connect.Request[api_cluster.GetNodePodsRequest],
) (*connect.Response[api_cluster.GetNodePodsResponse], error) {
pods, err := h.cluster.GetNodePods(ctx, req.Msg.GetNodeName())
if err != nil {
return nil, err
}

return connect.NewResponse(&api_cluster.GetNodePodsResponse{
Pods: pods,
}), nil
}
105 changes: 105 additions & 0 deletions pkg/service/cluster/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package cluster

import (
"context"
"fmt"
"maps"
"slices"

"github.com/rigdev/rig-go-api/model"
api_cluster "github.com/rigdev/rig-go-api/operator/api/v1/cluster"
"github.com/rigdev/rig/pkg/pipeline"
corev1 "k8s.io/api/core/v1"
metricsv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type Service interface {
GetNodes(ctx context.Context) ([]*api_cluster.Node, error)
GetNodePods(ctx context.Context, nodeName string) ([]*api_cluster.Pod, error)
}

func New(client client.Client) Service {
return &service{
client: client,
}
}

type service struct {
client client.Client
}

func (s *service) GetNodes(ctx context.Context) ([]*api_cluster.Node, error) {
listReq := corev1.NodeList{}
if err := s.client.List(ctx, &listReq); err != nil {
return nil, fmt.Errorf("failed to list nodes: %w", err)
}

nodes := map[string]*api_cluster.Node{}

for _, node := range listReq.Items {
nodes[node.GetName()] = &api_cluster.Node{
NodeName: node.GetName(),
Allocateable: &model.Resources{
CpuMillis: uint64(node.Status.Allocatable.Cpu().MilliValue()),
MemoryBytes: uint64(node.Status.Allocatable.Memory().Value()),
},
MaxPods: uint64(node.Status.Allocatable.Pods().Value()),
}
}

list := metricsv1beta1.NodeMetricsList{}
if err := s.client.List(ctx, &list); err != nil {
return nil, fmt.Errorf("failed to list node metrics: %w", err)
}

for _, node := range list.Items {
n, ok := nodes[node.GetName()]
if !ok {
n = &api_cluster.Node{
NodeName: node.GetName(),
Allocateable: &model.Resources{},
Usage: &model.Resources{},
MaxPods: 0,
}
nodes[node.GetName()] = n
}
n.Usage = &model.Resources{
CpuMillis: uint64(node.Usage.Cpu().MilliValue()),
MemoryBytes: uint64(node.Usage.Memory().Value()),
}
}

keys := slices.Sorted((maps.Keys(nodes)))
var res []*api_cluster.Node
for _, k := range keys {
res = append(res, nodes[k])
}
return res, nil
}

func (s *service) GetNodePods(ctx context.Context, nodeName string) ([]*api_cluster.Pod, error) {
listReq := corev1.PodList{}
if err := s.client.List(ctx, &listReq, client.MatchingFields{
"spec.nodeName": nodeName,
}); err != nil {
return nil, err
}

var res []*api_cluster.Pod
for _, pod := range listReq.Items {
req := &model.Resources{}
for _, c := range pod.Spec.Containers {
req.CpuMillis += uint64(c.Resources.Requests.Cpu().MilliValue())
req.MemoryBytes += uint64(c.Resources.Requests.Memory().Value())
}
res = append(res, &api_cluster.Pod{
PodName: pod.GetName(),
Namespace: pod.GetNamespace(),
Requested: req,
CapsuleName: pod.Labels[pipeline.LabelCapsule],
})
}

return res, nil
}

0 comments on commit 9822b61

Please sign in to comment.