Skip to content

Commit

Permalink
Allow two agents to run on the same cluster
Browse files Browse the repository at this point in the history
The change opens up the ability to run two or more agents pointed to
different fleet-controllers. The specific use case where this is useful
is for fleet managing a cluster that is running fleet to manage other
clusters. More concretely we have this with Rancher managing Rancher.
In this situation you want two fleet agents, one that is running for
fleet-local and one that is run for the upstream fleet-controller.
  • Loading branch information
ibuildthecloud committed Sep 18, 2021
1 parent 325f4c2 commit 9957126
Show file tree
Hide file tree
Showing 18 changed files with 178 additions and 57 deletions.
10 changes: 10 additions & 0 deletions charts/fleet-crd/templates/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1694,6 +1694,9 @@ spec:
type: object
nullable: true
type: array
agentNamespace:
nullable: true
type: string
clientID:
nullable: true
type: string
Expand Down Expand Up @@ -1740,6 +1743,8 @@ spec:
type: string
agentMigrated:
type: boolean
agentNamespaceMigrated:
type: boolean
cattleNamespaceMigrated:
type: boolean
conditions:
Expand Down Expand Up @@ -4308,6 +4313,9 @@ spec:
type: object
nullable: true
type: array
agentNamespace:
nullable: true
type: string
clientID:
nullable: true
type: string
Expand Down Expand Up @@ -4354,6 +4362,8 @@ spec:
type: string
agentMigrated:
type: boolean
agentNamespaceMigrated:
type: boolean
cattleNamespaceMigrated:
type: boolean
conditions:
Expand Down
3 changes: 2 additions & 1 deletion cmd/fleetagent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var (
type FleetAgent struct {
Kubeconfig string `usage:"kubeconfig file"`
Namespace string `usage:"namespace to watch" env:"NAMESPACE"`
AgentScope string `usage:"An identifier used to scope the agent bundleID names, typically the same as namespace" env:"AGENT_SCOPE"`
Simulators int `usage:"Numbers of simulators to run"`
CheckinInterval string `usage:"How often to post cluster status" env:"CHECKIN_INTERVAL"`
}
Expand All @@ -47,7 +48,7 @@ func (a *FleetAgent) Run(cmd *cobra.Command, args []string) error {
if a.Simulators > 0 {
return simulator.Simulate(cmd.Context(), a.Simulators, a.Kubeconfig, a.Namespace, "default", opts)
}
if err := agent.Start(cmd.Context(), a.Kubeconfig, a.Namespace, &opts); err != nil {
if err := agent.Start(cmd.Context(), a.Kubeconfig, a.Namespace, a.AgentScope, &opts); err != nil {
return err
}
<-cmd.Context().Done()
Expand Down
3 changes: 2 additions & 1 deletion modules/agent/pkg/agent/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func Register(ctx context.Context, kubeConfig, namespace, clusterID string) erro
return err
}

func Start(ctx context.Context, kubeConfig, namespace string, opts *Options) error {
func Start(ctx context.Context, kubeConfig, namespace, agentScope string, opts *Options) error {
if opts == nil {
opts = &Options{}
}
Expand Down Expand Up @@ -78,6 +78,7 @@ func Start(ctx context.Context, kubeConfig, namespace string, opts *Options) err
fleetNamespace,
namespace,
opts.DefaultNamespace,
agentScope,
agentInfo.ClusterNamespace,
agentInfo.ClusterName,
opts.CheckinInterval,
Expand Down
43 changes: 42 additions & 1 deletion modules/agent/pkg/controllers/bundledeployment/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@ import (
fleet "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1"
fleetcontrollers "github.com/rancher/fleet/pkg/generated/controllers/fleet.cattle.io/v1alpha1"
"github.com/rancher/wrangler/pkg/condition"
"github.com/rancher/wrangler/pkg/merr"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
)

type handler struct {
Expand All @@ -25,10 +29,14 @@ type handler struct {
trigger *trigger.Trigger
deployManager *deployer.Manager
bdController fleetcontrollers.BundleDeploymentController
restMapper meta.RESTMapper
dynamic dynamic.Interface
}

func Register(ctx context.Context,
trigger *trigger.Trigger,
restMapper meta.RESTMapper,
dynamic dynamic.Interface,
deployManager *deployer.Manager,
bdController fleetcontrollers.BundleDeploymentController) {

Expand All @@ -37,6 +45,8 @@ func Register(ctx context.Context,
trigger: trigger,
deployManager: deployManager,
bdController: bdController,
restMapper: restMapper,
dynamic: dynamic,
}

fleetcontrollers.RegisterBundleDeploymentStatusHandler(ctx,
Expand Down Expand Up @@ -146,8 +156,12 @@ func (h *handler) Trigger(key string, bd *fleet.BundleDeployment) (*fleet.Bundle
return bd, nil
}

func isAgent(bd *fleet.BundleDeployment) bool {
return strings.HasPrefix(bd.Name, "fleet-agent")
}

func shouldRedeploy(bd *fleet.BundleDeployment) bool {
if strings.HasPrefix(bd.Name, "fleet-agent") {
if isAgent(bd) {
return true
}
if bd.Spec.Options.ForceSyncGeneration <= 0 {
Expand All @@ -159,6 +173,28 @@ func shouldRedeploy(bd *fleet.BundleDeployment) bool {
return *bd.Status.SyncGeneration != bd.Spec.Options.ForceSyncGeneration
}

func (h *handler) cleanupOldAgent(modifiedStatuses []fleet.ModifiedStatus) error {
var errs []error
for _, modified := range modifiedStatuses {
if modified.Delete {
gvk := schema.FromAPIVersionAndKind(modified.APIVersion, modified.Kind)
mapping, err := h.restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
errs = append(errs, fmt.Errorf("mapping resource for %s for agent cleanup: %w", gvk, err))
continue
}

logrus.Infof("Removing old agent resource %s/%s, %s", modified.Namespace, modified.Name, gvk)
err = h.dynamic.Resource(mapping.Resource).Namespace(modified.Namespace).Delete(h.ctx, modified.Name, metav1.DeleteOptions{})
if err != nil {
errs = append(errs, fmt.Errorf("deleting %s/%s for %s for agent cleanup: %w", modified.Namespace, modified.Name, gvk, err))
continue
}
}
}
return merr.NewErrors(errs...)
}

func (h *handler) MonitorBundle(bd *fleet.BundleDeployment, status fleet.BundleDeploymentStatus) (fleet.BundleDeploymentStatus, error) {
if bd.Spec.DeploymentID != status.AppliedDeploymentID {
return status, nil
Expand All @@ -181,6 +217,11 @@ func (h *handler) MonitorBundle(bd *fleet.BundleDeployment, status fleet.BundleD
if shouldRedeploy(bd) {
logrus.Infof("Redeploying %s", bd.Name)
status.AppliedDeploymentID = ""
if isAgent(bd) {
if err := h.cleanupOldAgent(status.ModifiedStatus); err != nil {
return status, fmt.Errorf("failed to clean up agent: %w", err)
}
}
}
}

Expand Down
7 changes: 5 additions & 2 deletions modules/agent/pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (a *appContext) start(ctx context.Context) error {
}

func Register(ctx context.Context, leaderElect bool,
fleetNamespace, agentNamespace, defaultNamespace, clusterNamespace, clusterName string,
fleetNamespace, agentNamespace, defaultNamespace, agentScope, clusterNamespace, clusterName string,
checkinInterval time.Duration,
fleetConfig *rest.Config, clientConfig clientcmd.ClientConfig,
fleetMapper, mapper meta.RESTMapper,
Expand All @@ -93,18 +93,21 @@ func Register(ctx context.Context, leaderElect bool,
labelPrefix = defaultNamespace
}

helmDeployer, err := helmdeployer.NewHelm(agentNamespace, defaultNamespace, labelPrefix, appCtx,
helmDeployer, err := helmdeployer.NewHelm(agentNamespace, defaultNamespace, labelPrefix, agentScope, appCtx,
appCtx.Core.ServiceAccount().Cache(), appCtx.Core.ConfigMap().Cache(), appCtx.Core.Secret().Cache())
if err != nil {
return err
}

bundledeployment.Register(ctx,
trigger.New(ctx, appCtx.restMapper, appCtx.Dynamic),
appCtx.restMapper,
appCtx.Dynamic,
deployer.NewManager(
fleetNamespace,
defaultNamespace,
labelPrefix,
agentScope,
appCtx.Fleet.BundleDeployment().Cache(),
manifest.NewLookup(appCtx.Fleet.Content()),
helmDeployer,
Expand Down
4 changes: 3 additions & 1 deletion modules/agent/pkg/deployer/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ type Manager struct {
deployer Deployer
apply apply.Apply
labelPrefix string
labelSuffix string
}

func NewManager(fleetNamespace string,
defaultNamespace string,
labelPrefix string,
labelPrefix, labelSuffix string,
bundleDeploymentCache fleetcontrollers.BundleDeploymentCache,
lookup manifest.Lookup,
deployer Deployer,
Expand All @@ -32,6 +33,7 @@ func NewManager(fleetNamespace string,
fleetNamespace: fleetNamespace,
defaultNamespace: defaultNamespace,
labelPrefix: labelPrefix,
labelSuffix: labelSuffix,
bundleDeploymentCache: bundleDeploymentCache,
lookup: lookup,
deployer: deployer,
Expand Down
12 changes: 9 additions & 3 deletions modules/agent/pkg/deployer/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (m *Manager) getApply(bd *fleet.BundleDeployment, ns string) (apply.Apply,
apply := m.apply
return apply.
WithIgnorePreviousApplied().
WithSetID(GetSetID(bd.Name, m.labelPrefix)).
WithSetID(GetSetID(bd.Name, m.labelPrefix, m.labelSuffix)).
WithDefaultNamespace(ns), nil
}

Expand Down Expand Up @@ -177,10 +177,16 @@ func (m *Manager) MonitorBundle(bd *fleet.BundleDeployment) (DeploymentStatus, e
return status, nil
}

func GetSetID(bundleID, labelPrefix string) string {
func GetSetID(bundleID, labelPrefix, labelSuffix string) string {
// bundle is fleet-agent bundle, we need to use setID fleet-agent-bootstrap since it was applied with import controller
if strings.HasPrefix(bundleID, "fleet-agent") {
return "fleet-agent-bootstrap"
if labelSuffix == "" {
return "fleet-agent-bootstrap"
}
return name.SafeConcatName("fleet-agent-bootstrap", labelSuffix)
}
if labelSuffix != "" {
return name.SafeConcatName(labelPrefix, bundleID, labelSuffix)
}
return name.SafeConcatName(labelPrefix, bundleID)
}
Expand Down
6 changes: 0 additions & 6 deletions modules/agent/pkg/register/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ const (
Values = "values"
APIServerURL = "apiServerURL"
APIServerCA = "apiServerCA"
SystemNamespace = "systemNamespace"
DeploymentNamespace = "deploymentNamespace"
ClusterNamespace = "clusterNamespace"
ClusterName = "clusterName"
Expand Down Expand Up @@ -187,13 +186,8 @@ func createClusterSecret(ctx context.Context, clusterID string, k8s corecontroll
newToken := newSecret.Data[Token]
clusterNamespace := newSecret.Data[ClusterNamespace]
clusterName := newSecret.Data[ClusterName]
systemNamespace := string(newSecret.Data[SystemNamespace])
deploymentNamespace := newSecret.Data[DeploymentNamespace]

if !cfg.IgnoreAgentNamespaceCheck && systemNamespace != secret.Namespace {
return nil, fmt.Errorf("fleet-agent must be installed in the namespace %s", systemNamespace)
}

newKubeconfig, err := updateClientConfig(clientConfig, string(newToken), string(deploymentNamespace))
if err != nil {
return nil, err
Expand Down
3 changes: 1 addition & 2 deletions modules/agent/pkg/simulator/simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func simulateAgent(ctx context.Context, i int, kubeConfig, namespace, defaultNam
opts.DefaultNamespace = simDefaultNamespace
opts.ClusterID = clusterID
opts.NoLeaderElect = true
return agent.Start(ctx, kubeConfig, simNamespace, &opts)
return agent.Start(ctx, kubeConfig, simNamespace, simNamespace, &opts)
}

func setupNamespace(ctx context.Context, kubeConfig, namespace, simNamespace string) (string, error) {
Expand Down Expand Up @@ -159,7 +159,6 @@ func injectConfig(cm *corev1.ConfigMap, simNamespace string) (*corev1.ConfigMap,
if err != nil {
return nil, err
}
cfg.IgnoreAgentNamespaceCheck = true
if cfg.Labels == nil {
cfg.Labels = map[string]string{}
}
Expand Down
4 changes: 2 additions & 2 deletions modules/cli/agentconfig/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type Options struct {
ClientID string
}

func AgentConfig(ctx context.Context, controllerNamespace string, cg *client.Getter, opts *Options) ([]runtime.Object, error) {
func AgentConfig(ctx context.Context, agentNamespace, controllerNamespace string, cg *client.Getter, opts *Options) ([]runtime.Object, error) {
if opts == nil {
opts = &Options{}
}
Expand All @@ -30,7 +30,7 @@ func AgentConfig(ctx context.Context, controllerNamespace string, cg *client.Get
return nil, err
}

return Objects(controllerNamespace, opts.Labels, opts.ClientID)
return Objects(agentNamespace, opts.Labels, opts.ClientID)
}

func Objects(controllerNamespace string, clusterLabels map[string]string, clientID string) ([]runtime.Object, error) {
Expand Down
20 changes: 10 additions & 10 deletions modules/cli/agentmanifest/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ type Options struct {
AgentEnvVars []v1.EnvVar
}

func AgentToken(ctx context.Context, systemNamespace, controllerNamespace string, client *client.Client, tokenName string, opts *Options) ([]runtime.Object, error) {
token, err := getToken(ctx, systemNamespace, tokenName, client)
func AgentToken(ctx context.Context, agentNamespace, controllerNamespace string, client *client.Client, tokenName string, opts *Options) ([]runtime.Object, error) {
token, err := getToken(ctx, controllerNamespace, tokenName, client)
if err != nil {
return nil, err
}
Expand All @@ -58,7 +58,7 @@ func AgentToken(ctx context.Context, systemNamespace, controllerNamespace string
token["apiServerCA"] = opts.CA
}

return objects(controllerNamespace, token), nil
return objects(agentNamespace, token), nil
}

func insecurePing(host string) {
Expand Down Expand Up @@ -96,7 +96,7 @@ func testKubeConfig(kubeConfig, host string) error {
return nil
}

func AgentManifest(ctx context.Context, systemNamespace, controllerNamespace string, cg *client.Getter, output io.Writer, tokenName string, opts *Options) error {
func AgentManifest(ctx context.Context, agentNamespace, controllerNamespace, agentScope string, cg *client.Getter, output io.Writer, tokenName string, opts *Options) error {
if opts == nil {
opts = &Options{}
}
Expand All @@ -106,12 +106,12 @@ func AgentManifest(ctx context.Context, systemNamespace, controllerNamespace str
return err
}

objs, err := AgentToken(ctx, systemNamespace, controllerNamespace, client, tokenName, opts)
objs, err := AgentToken(ctx, agentNamespace, controllerNamespace, client, tokenName, opts)
if err != nil {
return err
}

agentConfig, err := agentconfig.AgentConfig(ctx, controllerNamespace, cg, &agentconfig.Options{
agentConfig, err := agentconfig.AgentConfig(ctx, agentNamespace, controllerNamespace, cg, &agentconfig.Options{
Labels: opts.Labels,
ClientID: opts.ClientID,
})
Expand All @@ -121,12 +121,12 @@ func AgentManifest(ctx context.Context, systemNamespace, controllerNamespace str

objs = append(objs, agentConfig...)

cfg, err := config.Lookup(ctx, systemNamespace, config.ManagerConfigName, client.Core.ConfigMap())
cfg, err := config.Lookup(ctx, controllerNamespace, config.ManagerConfigName, client.Core.ConfigMap())
if err != nil {
return err
}

objs = append(objs, agent.Manifest(controllerNamespace, cfg.AgentImage, cfg.AgentImagePullPolicy, opts.Generation, opts.CheckinInterval, opts.AgentEnvVars)...)
objs = append(objs, agent.Manifest(agentNamespace, agentScope, cfg.AgentImage, cfg.AgentImagePullPolicy, opts.Generation, opts.CheckinInterval, opts.AgentEnvVars)...)

data, err := yaml.Export(objs...)
if err != nil {
Expand Down Expand Up @@ -224,7 +224,7 @@ func getCA(ca []byte, cfg clientcmdapi.Config) ([]byte, error) {
return GetCAFromConfig(cfg)
}

func getToken(ctx context.Context, systemNamespace, tokenName string, client *client.Client) (map[string][]byte, error) {
func getToken(ctx context.Context, controllerNamespace, tokenName string, client *client.Client) (map[string][]byte, error) {
secretName, err := waitForSecretName(ctx, tokenName, client)
if err != nil {
return nil, err
Expand All @@ -249,7 +249,7 @@ func getToken(ctx context.Context, systemNamespace, tokenName string, client *cl
return nil, fmt.Errorf("failed to find token in values")
}

expectedNamespace := fleetns.RegistrationNamespace(systemNamespace)
expectedNamespace := fleetns.RegistrationNamespace(controllerNamespace)
actualNamespace := data["systemRegistrationNamespace"]
if actualNamespace != expectedNamespace {
return nil, fmt.Errorf("registration namespace (%s) from secret (%s/%s) does not match expected: %s", actualNamespace, secret.Namespace, secret.Name, expectedNamespace)
Expand Down
Loading

0 comments on commit 9957126

Please sign in to comment.