Skip to content

Commit

Permalink
Merge pull request kubevirt#5060 from davidvossel/virt-handler-improv…
Browse files Browse the repository at this point in the history
…ements

Virt-handler should only be informed of VMIs scheduled on virt-handler nodes
  • Loading branch information
kubevirt-bot authored Feb 22, 2021
2 parents cfc5d03 + 8f50760 commit e2eeff1
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 44 deletions.
2 changes: 0 additions & 2 deletions cmd/virt-handler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ go_library(
"//vendor/github.com/spf13/pflag:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
Expand Down
55 changes: 17 additions & 38 deletions cmd/virt-handler/virt-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ import (
"github.com/golang/glog"
flag "github.com/spf13/pflag"
k8sv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
k8coresv1 "k8s.io/client-go/kubernetes/typed/core/v1"
Expand Down Expand Up @@ -194,6 +192,11 @@ func (app *virtHandlerApp) Run() {

app.markNodeAsUnschedulable(logger)

app.namespace, err = clientutil.GetNamespace()
if err != nil {
glog.Fatalf("Error searching for namespace: %v", err)
}

go func() {
sigint := make(chan os.Signal, 1)

Expand All @@ -210,33 +213,14 @@ func (app *virtHandlerApp) Run() {
// Scheme is used to create an ObjectReference from an Object (e.g. VirtualMachineInstance) during Event creation
recorder := broadcaster.NewRecorder(scheme.Scheme, k8sv1.EventSource{Component: "virt-handler", Host: app.HostOverride})

vmiSourceLabel, err := labels.Parse(fmt.Sprintf(v1.NodeNameLabel+" in (%s)", app.HostOverride))
if err != nil {
panic(err)
}
vmiTargetLabel, err := labels.Parse(fmt.Sprintf(v1.MigrationTargetNodeNameLabel+" in (%s)", app.HostOverride))
if err != nil {
panic(err)
}

// Wire VirtualMachineInstance controller
factory := controller.NewKubeInformerFactory(app.virtCli.RestClient(), app.virtCli, nil, app.namespace)

vmSourceSharedInformer := cache.NewSharedIndexInformer(
controller.NewListWatchFromClient(app.virtCli.RestClient(), "virtualmachineinstances", k8sv1.NamespaceAll, fields.Everything(), vmiSourceLabel),
&v1.VirtualMachineInstance{},
0,
cache.Indexers{},
)

vmTargetSharedInformer := cache.NewSharedIndexInformer(
controller.NewListWatchFromClient(app.virtCli.RestClient(), "virtualmachineinstances", k8sv1.NamespaceAll, fields.Everything(), vmiTargetLabel),
&v1.VirtualMachineInstance{},
0,
cache.Indexers{},
)
vmiSourceInformer := factory.VMISourceHost(app.HostOverride)
vmiTargetInformer := factory.VMITargetHost(app.HostOverride)

// Wire Domain controller
domainSharedInformer, err := virtcache.NewSharedInformer(app.VirtShareDir, int(app.WatchdogTimeoutDuration.Seconds()), recorder, vmSourceSharedInformer.GetStore(), time.Duration(app.domainResyncPeriodSeconds)*time.Second)
domainSharedInformer, err := virtcache.NewSharedInformer(app.VirtShareDir, int(app.WatchdogTimeoutDuration.Seconds()), recorder, vmiSourceInformer.GetStore(), time.Duration(app.domainResyncPeriodSeconds)*time.Second)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -274,17 +258,10 @@ func (app *virtHandlerApp) Run() {
panic(err)
}

app.namespace, err = clientutil.GetNamespace()
if err != nil {
glog.Fatalf("Error searching for namespace: %v", err)
}

if err := app.prepareCertManager(); err != nil {
glog.Fatalf("Error preparing the certificate manager: %v", err)
}

factory := controller.NewKubeInformerFactory(app.virtCli.RestClient(), app.virtCli, nil, app.namespace)

if err := app.setupTLS(factory); err != nil {
glog.Fatalf("Error constructing migration tls config: %v", err)
}
Expand All @@ -298,7 +275,6 @@ func (app *virtHandlerApp) Run() {
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})

podIsolationDetector := isolation.NewSocketBasedIsolationDetector(app.VirtShareDir)
vmiInformer := factory.VMI()
app.clusterConfig = virtconfig.NewClusterConfig(factory.ConfigMap(), factory.CRD(), factory.KubeVirt(), app.namespace)
// set log verbosity
app.clusterConfig.SetConfigModifiedCallback(app.shouldChangeLogVerbosity)
Expand All @@ -310,8 +286,8 @@ func (app *virtHandlerApp) Run() {
app.PodIpAddress,
app.VirtShareDir,
app.VirtPrivateDir,
vmSourceSharedInformer,
vmTargetSharedInformer,
vmiSourceInformer,
vmiTargetInformer,
domainSharedInformer,
gracefulShutdownInformer,
int(app.WatchdogTimeoutDuration.Seconds()),
Expand All @@ -327,11 +303,11 @@ func (app *virtHandlerApp) Run() {

consoleHandler := rest.NewConsoleHandler(
podIsolationDetector,
vmiInformer,
vmiSourceInformer,
)

lifecycleHandler := rest.NewLifecycleHandler(
vmiInformer,
vmiSourceInformer,
app.VirtShareDir,
)

Expand All @@ -343,7 +319,10 @@ func (app *virtHandlerApp) Run() {
// Bootstrapping. From here on the startup order matters
stop := make(chan struct{})
defer close(stop)

factory.Start(stop)
go gracefulShutdownInformer.Run(stop)
go domainSharedInformer.Run(stop)

se, exists, err := selinux.NewSELinux()
if err == nil && exists {
Expand All @@ -365,7 +344,7 @@ func (app *virtHandlerApp) Run() {
panic(fmt.Errorf("failed to detect the presence of selinux: %v", err))
}

cache.WaitForCacheSync(stop, factory.ConfigMap().HasSynced, vmiInformer.HasSynced, factory.CRD().HasSynced)
cache.WaitForCacheSync(stop, factory.ConfigMap().HasSynced, vmiSourceInformer.HasSynced, factory.CRD().HasSynced)

go vmController.Run(10, stop)

Expand Down
41 changes: 41 additions & 0 deletions pkg/controller/virtinformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ type KubeInformerFactory interface {
// Watches for vmi objects
VMI() cache.SharedIndexInformer

// Watches for vmi objects assigned to a specific host
VMISourceHost(hostName string) cache.SharedIndexInformer

// Watches for vmi objects assigned to a specific host
// as a migration target
VMITargetHost(hostName string) cache.SharedIndexInformer

// Watches for VirtualMachineInstanceReplicaSet objects
VMIReplicaSet() cache.SharedIndexInformer

Expand Down Expand Up @@ -303,6 +310,40 @@ func (f *kubeInformerFactory) VMI() cache.SharedIndexInformer {
})
}

func (f *kubeInformerFactory) VMISourceHost(hostName string) cache.SharedIndexInformer {
labelSelector, err := labels.Parse(fmt.Sprintf(kubev1.NodeNameLabel+" in (%s)", hostName))
if err != nil {
panic(err)
}

return f.getInformer("vmiInformer-sources", func() cache.SharedIndexInformer {
lw := NewListWatchFromClient(f.restClient, "virtualmachineinstances", k8sv1.NamespaceAll, fields.Everything(), labelSelector)
return cache.NewSharedIndexInformer(lw, &kubev1.VirtualMachineInstance{}, f.defaultResync, cache.Indexers{
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
"node": func(obj interface{}) (strings []string, e error) {
return []string{obj.(*kubev1.VirtualMachineInstance).Status.NodeName}, nil
},
})
})
}

func (f *kubeInformerFactory) VMITargetHost(hostName string) cache.SharedIndexInformer {
labelSelector, err := labels.Parse(fmt.Sprintf(kubev1.MigrationTargetNodeNameLabel+" in (%s)", hostName))
if err != nil {
panic(err)
}

return f.getInformer("vmiInformer-targets", func() cache.SharedIndexInformer {
lw := NewListWatchFromClient(f.restClient, "virtualmachineinstances", k8sv1.NamespaceAll, fields.Everything(), labelSelector)
return cache.NewSharedIndexInformer(lw, &kubev1.VirtualMachineInstance{}, f.defaultResync, cache.Indexers{
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
"node": func(obj interface{}) (strings []string, e error) {
return []string{obj.(*kubev1.VirtualMachineInstance).Status.NodeName}, nil
},
})
})
}

func (f *kubeInformerFactory) VMIReplicaSet() cache.SharedIndexInformer {
return f.getInformer("vmirsInformer", func() cache.SharedIndexInformer {
lw := cache.NewListWatchFromClient(f.restClient, "virtualmachineinstancereplicasets", k8sv1.NamespaceAll, fields.Everything())
Expand Down
4 changes: 0 additions & 4 deletions pkg/virt-handler/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,6 @@ func (c *VirtualMachineController) Run(threadiness int, stopCh chan struct{}) {
log.Log.Info("Starting virt-handler controller.")

// Wait for the domain cache to be synced
go c.domainInformer.Run(stopCh)
cache.WaitForCacheSync(stopCh, c.domainInformer.HasSynced)

go c.deviceManagerController.Run(stopCh)
Expand All @@ -1016,9 +1015,6 @@ func (c *VirtualMachineController) Run(threadiness int, stopCh chan struct{}) {
)
}

go c.vmiSourceInformer.Run(stopCh)
go c.vmiTargetInformer.Run(stopCh)
go c.gracefulShutdownInformer.Run(stopCh)
cache.WaitForCacheSync(stopCh, c.domainInformer.HasSynced, c.vmiSourceInformer.HasSynced, c.vmiTargetInformer.HasSynced, c.gracefulShutdownInformer.HasSynced)

go c.heartBeat(c.heartBeatInterval, stopCh)
Expand Down

0 comments on commit e2eeff1

Please sign in to comment.