Skip to content

Commit

Permalink
perf: reduce unnecessary K8s calls for CRDs during reconciliation (ar…
Browse files Browse the repository at this point in the history
…goproj#3246)

* reduce K8s calls for CRDs during reconciliation
* additional metric labels to k8s API requests (server, verb, kind, namespace)
  • Loading branch information
jessesuen authored Mar 18, 2020
1 parent 127f50d commit 4bbce1c
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 42 deletions.
6 changes: 4 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ start-e2e: cli
ARGOCD_TLS_DATA_PATH=/tmp/argo-e2e/app/config/tls \
ARGOCD_E2E_DISABLE_AUTH=false \
ARGOCD_ZJWT_FEATURE_FLAG=always \
goreman start
goreman start ${ARGOCD_START}

# Cleans VSCode debug.test files from sub-dirs to prevent them from being included in packr boxes
.PHONY: clean-debug
Expand All @@ -215,7 +215,7 @@ start:
kubectl create ns argocd || true
kubens argocd
ARGOCD_ZJWT_FEATURE_FLAG=always \
goreman start
goreman start ${ARGOCD_START}

.PHONY: pre-commit
pre-commit: dep-ensure codegen build lint test
Expand Down
3 changes: 2 additions & 1 deletion cmd/argocd-application-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ func newCommand() *cobra.Command {
kubectlParallelismLimit)
errors.CheckError(err)

log.Infof("Application Controller (version: %s) starting (namespace: %s)", common.GetVersion(), namespace)
vers := common.GetVersion()
log.Infof("Application Controller (version: %s, built: %s) starting (namespace: %s)", vers.Version, vers.BuildDate, namespace)
stats.RegisterStackDumper()
stats.StartStatsTicker(10 * time.Minute)
stats.RegisterHeapDumper("memprofile")
Expand Down
35 changes: 31 additions & 4 deletions common/common.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package common

import (
"os"
"strconv"
)

// Default service addresses and URLS of Argo CD internal services
const (
// DefaultRepoServerAddr is the gRPC address of the Argo CD repo server
Expand Down Expand Up @@ -60,10 +65,6 @@ const (
AuthCookieName = "argocd.token"
// RevisionHistoryLimit is the max number of successful sync to keep in history
RevisionHistoryLimit = 10
// K8sClientConfigQPS controls the QPS to be used in K8s REST client configs
K8sClientConfigQPS = 25
// K8sClientConfigBurst controls the burst to be used in K8s REST client configs
K8sClientConfigBurst = 50
)

// Dex related constants
Expand Down Expand Up @@ -136,6 +137,10 @@ const (
EnvGitAttemptsCount = "ARGOCD_GIT_ATTEMPTS_COUNT"
// Overrides git submodule support, true by default
EnvGitSubmoduleEnabled = "ARGOCD_GIT_MODULES_ENABLED"
// EnvK8sClientQPS is the QPS value used for the kubernetes client (default: 50)
EnvK8sClientQPS = "ARGOCD_K8S_CLIENT_QPS"
// EnvK8sClientBurst is the burst value used for the kubernetes client (default: twice the client QPS)
EnvK8sClientBurst = "ARGOCD_K8S_CLIENT_BURST"
)

const (
Expand All @@ -147,3 +152,25 @@ const (
// Number should be bumped in case of backward incompatible change to make sure cache is invalidated after upgrade.
CacheVersion = "1.0.0"
)

var (
// K8sClientConfigQPS controls the QPS to be used in K8s REST client configs
K8sClientConfigQPS float32 = 50
// K8sClientConfigBurst controls the burst to be used in K8s REST client configs
K8sClientConfigBurst int = 100
)

func init() {
if envQPS := os.Getenv(EnvK8sClientQPS); envQPS != "" {
if qps, err := strconv.ParseFloat(envQPS, 32); err != nil {
K8sClientConfigQPS = float32(qps)
}
}
if envBurst := os.Getenv(EnvK8sClientBurst); envBurst != "" {
if burst, err := strconv.Atoi(envBurst); err != nil {
K8sClientConfigBurst = burst
}
} else {
K8sClientConfigBurst = 2 * int(K8sClientConfigQPS)
}
}
3 changes: 2 additions & 1 deletion controller/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (c *liveStateCache) getCluster(server string) (*clusterInfo, error) {
gvk := un.GroupVersionKind()
c.metricsServer.IncClusterEventsCount(cluster.Server, gvk.Group, gvk.Kind)
},
metricsServer: c.metricsServer,
}
c.lock.Lock()
c.clusters[cluster.Server] = info
Expand Down Expand Up @@ -195,7 +196,7 @@ func (c *liveStateCache) GetManagedLiveObjs(a *appv1.Application, targetObjs []*
if err != nil {
return nil, err
}
return clusterInfo.getManagedLiveObjs(a, targetObjs, c.metricsServer)
return clusterInfo.getManagedLiveObjs(a, targetObjs)
}

func (c *liveStateCache) GetVersionsInfo(serverURL string) (string, []metav1.APIGroup, error) {
Expand Down
16 changes: 13 additions & 3 deletions controller/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type clusterInfo struct {
cluster *appv1.Cluster
log *log.Entry
cacheSettingsSrc func() *cacheSettings
metricsServer *metrics.MetricsServer
}

func (c *clusterInfo) replaceResourceCache(gk schema.GroupKind, resourceVersion string, objs []unstructured.Unstructured, ns string) {
Expand Down Expand Up @@ -118,8 +119,9 @@ func isServiceAccountTokenSecret(un *unstructured.Unstructured) (bool, metav1.Ow

func (c *clusterInfo) createObjInfo(un *unstructured.Unstructured, appInstanceLabel string) *node {
ownerRefs := un.GetOwnerReferences()
gvk := un.GroupVersionKind()
// Special case for endpoint. Remove after https://github.com/kubernetes/kubernetes/issues/28483 is fixed
if un.GroupVersionKind().Group == "" && un.GetKind() == kube.EndpointsKind && len(un.GetOwnerReferences()) == 0 {
if gvk.Group == "" && gvk.Kind == kube.EndpointsKind && len(un.GetOwnerReferences()) == 0 {
ownerRefs = append(ownerRefs, metav1.OwnerReference{
Name: un.GetName(),
Kind: kube.ServiceKind,
Expand All @@ -143,7 +145,15 @@ func (c *clusterInfo) createObjInfo(un *unstructured.Unstructured, appInstanceLa
if len(ownerRefs) == 0 && appName != "" {
nodeInfo.appName = appName
nodeInfo.resource = un
} else {
// edge case. we do not label CRDs, so they miss the tracking label we inject. But we still
// want the full resource to be available in our cache (to diff), so we store all CRDs
switch gvk.Kind {
case kube.CustomResourceDefinitionKind:
nodeInfo.resource = un
}
}

nodeInfo.health, _ = health.GetResourceHealth(un, c.cacheSettingsSrc().ResourceOverrides)
return nodeInfo
}
Expand Down Expand Up @@ -464,7 +474,7 @@ func (c *clusterInfo) isNamespaced(gk schema.GroupKind) bool {
return true
}

func (c *clusterInfo) getManagedLiveObjs(a *appv1.Application, targetObjs []*unstructured.Unstructured, metricsServer *metrics.MetricsServer) (map[kube.ResourceKey]*unstructured.Unstructured, error) {
func (c *clusterInfo) getManagedLiveObjs(a *appv1.Application, targetObjs []*unstructured.Unstructured) (map[kube.ResourceKey]*unstructured.Unstructured, error) {
c.lock.RLock()
defer c.lock.RUnlock()

Expand All @@ -475,7 +485,7 @@ func (c *clusterInfo) getManagedLiveObjs(a *appv1.Application, targetObjs []*uns
managedObjs[key] = o.resource
}
}
config := metrics.AddMetricsTransportWrapper(metricsServer, a, c.cluster.RESTConfig())
config := metrics.AddMetricsTransportWrapper(c.metricsServer, a, c.cluster.RESTConfig())
// iterate target objects and identify ones that already exist in the cluster,
// but are simply missing our label
lock := &sync.Mutex{}
Expand Down
2 changes: 1 addition & 1 deletion controller/cache/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ metadata:
Namespace: "default",
},
},
}, []*unstructured.Unstructured{targetDeploy}, nil)
}, []*unstructured.Unstructured{targetDeploy})
assert.Nil(t, err)
assert.Equal(t, managedObjs, map[kube.ResourceKey]*unstructured.Unstructured{
kube.NewResourceKey("apps", "Deployment", "default", "helm-guestbook"): testDeploy,
Expand Down
16 changes: 12 additions & 4 deletions controller/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package metrics
import (
"context"
"net/http"
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -90,7 +89,7 @@ func NewMetricsServer(addr string, appLister applister.ApplicationLister, health
Name: "argocd_app_k8s_request_total",
Help: "Number of kubernetes requests executed during application reconciliation.",
},
append(descAppDefaultLabels, "response_code"),
append(descAppDefaultLabels, "server", "response_code", "verb", "resource_kind", "resource_namespace"),
)
registry.MustRegister(k8sRequestCounter)

Expand Down Expand Up @@ -169,8 +168,17 @@ func (m *MetricsServer) IncClusterEventsCount(server, group, kind string) {
}

// IncKubernetesRequest increments the kubernetes requests counter for an application
func (m *MetricsServer) IncKubernetesRequest(app *argoappv1.Application, statusCode int) {
m.k8sRequestCounter.WithLabelValues(app.Namespace, app.Name, app.Spec.GetProject(), strconv.Itoa(statusCode)).Inc()
func (m *MetricsServer) IncKubernetesRequest(app *argoappv1.Application, server, statusCode, verb, resourceKind, resourceNamespace string) {
var namespace, name, project string
if app != nil {
namespace = app.Namespace
name = app.Name
project = app.Spec.GetProject()
}
m.k8sRequestCounter.WithLabelValues(
namespace, name, project, server, statusCode,
verb, resourceKind, resourceNamespace,
).Inc()
}

// IncReconcile increments the reconcile counter for an application
Expand Down
35 changes: 11 additions & 24 deletions controller/metrics/transportwrapper.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,24 @@
package metrics

import (
"net/http"
"strconv"

"github.com/argoproj/pkg/kubeclientmetrics"
"k8s.io/client-go/rest"

"github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
)

type metricsRoundTripper struct {
roundTripper http.RoundTripper
app *v1alpha1.Application
metricsServer *MetricsServer
}

func (mrt *metricsRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
resp, err := mrt.roundTripper.RoundTrip(r)
statusCode := 0
if resp != nil {
statusCode = resp.StatusCode
}
mrt.metricsServer.IncKubernetesRequest(mrt.app, statusCode)
return resp, err
}

// AddMetricsTransportWrapper adds a transport wrapper which increments 'argocd_app_k8s_request_total' counter on each kubernetes request
func AddMetricsTransportWrapper(server *MetricsServer, app *v1alpha1.Application, config *rest.Config) *rest.Config {
wrap := config.WrapTransport
config.WrapTransport = func(rt http.RoundTripper) http.RoundTripper {
if wrap != nil {
rt = wrap(rt)
}
return &metricsRoundTripper{roundTripper: rt, metricsServer: server, app: app}
inc := func(resourceInfo kubeclientmetrics.ResourceInfo) error {
namespace := resourceInfo.Namespace
kind := resourceInfo.Kind
statusCode := strconv.Itoa(resourceInfo.StatusCode)
server.IncKubernetesRequest(app, resourceInfo.Server, statusCode, string(resourceInfo.Verb), kind, namespace)
return nil
}
return config

newConfig := kubeclientmetrics.AddMetricsTransportWrapper(config, inc)
return newConfig
}

0 comments on commit 4bbce1c

Please sign in to comment.