Skip to content

Commit

Permalink
Move virt-controller VM controll loop to workqueue
Browse files Browse the repository at this point in the history
Instead of directly using a Delta FIFO, use the workqueue to hand over
the VM to the scheduler if necessary.
  • Loading branch information
rmohr committed Jan 23, 2017
1 parent ec9f22d commit 1047bf1
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 180 deletions.
34 changes: 12 additions & 22 deletions cmd/virt-controller/virt-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@ import (

"github.com/emicklei/go-restful"
"github.com/facebookgo/inject"
"k8s.io/client-go/tools/cache"
"kubevirt.io/kubevirt/pkg/kubecli"
"kubevirt.io/kubevirt/pkg/logging"
"kubevirt.io/kubevirt/pkg/util"
"kubevirt.io/kubevirt/pkg/virt-controller/rest"
"kubevirt.io/kubevirt/pkg/virt-controller/services"
"kubevirt.io/kubevirt/pkg/virt-controller/watch"
Expand All @@ -35,30 +33,32 @@ func main() {
if err != nil {
golog.Fatal(err)
}
vmHandler, err := watch.NewVMResourceEventHandler()
if err != nil {
golog.Fatal(err)
}
podHandler, err := watch.NewPodResourceEventHandler()
if err != nil {
golog.Fatal(err)
}
vmCache, err := util.NewVMCache()

clientSet, err := kubecli.Get()

if err != nil {
golog.Fatal(err)
}

clientSet, err := kubecli.Get()
// Bootstrapping. From here on the initialization order is important
stop := make(chan struct{})
defer close(stop)

// Start wachting vms
restClient, err := kubecli.GetRESTClient()
if err != nil {
golog.Fatal(err)
}
vmCache, vmController := watch.NewVMController(vmService, nil, restClient)

g.Provide(
&inject.Object{Value: clientSet},
&inject.Object{Value: templateService},
&inject.Object{Value: vmService},
&inject.Object{Value: vmHandler},
&inject.Object{Value: podHandler},
&inject.Object{Value: vmCache},
)
Expand All @@ -69,20 +69,10 @@ func main() {
}
restful.Add(rest.WebService)

// Bootstrapping. From here on the initialization order is important
stop := make(chan struct{})
defer close(stop)

// Warm up the vmCache before the pod watcher is started
go vmCache.Run(stop)
cache.WaitForCacheSync(stop, vmCache.HasSynced)
go vmController.Run(1, stop)

// Start wachting vms
vmController, err := watch.NewVMInformer(vmHandler)
if err != nil {
golog.Fatal(err)
}
go vmController.Run(stop)
// Wait until VM cache has warmed up before we start watching pods
vmController.WaitForSync(stop)

// Start watching pods
podController, err := watch.NewPodInformer(podHandler)
Expand Down
3 changes: 2 additions & 1 deletion cmd/virt-handler/virt-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"kubevirt.io/kubevirt/pkg/api/v1"
"kubevirt.io/kubevirt/pkg/kubecli"
"kubevirt.io/kubevirt/pkg/logging"
"kubevirt.io/kubevirt/pkg/util"
"kubevirt.io/kubevirt/pkg/virt-handler"
"kubevirt.io/kubevirt/pkg/virt-handler/libvirt"
virtcache "kubevirt.io/kubevirt/pkg/virt-handler/libvirt/cache"
Expand Down Expand Up @@ -105,7 +106,7 @@ func main() {
// Poplulate the VM store with known Domains on the host, to get deletes since the last run
for _, domain := range domainCache.GetStore().List() {
d := domain.(*libvirt.Domain)
vmStore.Add(libvirt.NewVMReferenceFromName(d.ObjectMeta.Name))
vmStore.Add(util.NewVMReferenceFromName(d.ObjectMeta.Name))
}

// Watch for domain changes
Expand Down
8 changes: 6 additions & 2 deletions pkg/kubecli/kubecli.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ type ResourceEventHandler interface {
OnDelete(obj interface{}) error
}

func CatchPanic() {
func HandlePanic() {
if r := recover(); r != nil {
logging.DefaultLogger().Critical().Log("stacktrace", debug.Stack(), "msg", r)
}
Expand Down Expand Up @@ -235,7 +235,7 @@ func NewController(lw cache.ListerWatcher, queue workqueue.RateLimitingInterface
type ControllerFunc func(cache.Indexer, workqueue.RateLimitingInterface) bool

func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
defer CatchPanic()
defer HandlePanic()
defer c.queue.ShutDown()
logging.DefaultLogger().Info().Msg("Starting VM controller.")

Expand All @@ -249,6 +249,10 @@ func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
logging.DefaultLogger().Info().Msg("Stopping VM controller.")
}

func (c *Controller) WaitForSync(stopCh chan struct{}) {
cache.WaitForCacheSync(stopCh, c.informer.HasSynced)
}

func (c *Controller) runWorker() {
for c.f(c.indexer, c.queue) {
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/util/cache.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package util

import (
"fmt"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/pkg/fields"
"k8s.io/client-go/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"
"kubevirt.io/kubevirt/pkg/api/v1"
"kubevirt.io/kubevirt/pkg/kubecli"
Expand All @@ -17,3 +19,16 @@ func NewVMCache() (cache.SharedInformer, error) {
informer := cache.NewSharedInformer(vmCacheSource, &v1.VM{}, 0)
return informer, nil
}

// TODO Namespace could be different, also store it somewhere in the domain, so that we can report deletes on handler startup properly
func NewVMReferenceFromName(name string) *v1.VM {
vm := &v1.VM{
ObjectMeta: api.ObjectMeta{
Name: name,
Namespace: api.NamespaceDefault,
SelfLink: fmt.Sprintf("/apis/%s/namespaces/%s/%s", v1.GroupVersion.String(), api.NamespaceDefault, name),
},
}
vm.SetGroupVersionKind(schema.GroupVersionKind{Group: v1.GroupVersion.Group, Kind: "VM", Version: v1.GroupVersion.Version})
return vm
}
2 changes: 1 addition & 1 deletion pkg/virt-controller/watch/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func NewPodCache() (cache.SharedInformer, error) {
}

func processPod(p *podResourceEventHandler, pod *v1.Pod) error {
defer kubecli.CatchPanic()
defer kubecli.HandlePanic()
vmObj, exists, err := p.VMCache.GetStore().GetByKey(kubeapi.NamespaceDefault + "/" + pod.GetLabels()[corev1.DomainLabel])
if err != nil {
// TODO handle this smarter, for now just try again
Expand Down
Loading

0 comments on commit 1047bf1

Please sign in to comment.