Skip to content

Commit

Permalink
Use workqueue for virt-handler
Browse files Browse the repository at this point in the history
  • Loading branch information
rmohr committed Jan 11, 2017
1 parent 05e4e2d commit 589fec6
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 89 deletions.
6 changes: 2 additions & 4 deletions cmd/virt-handler/virt-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,13 @@ func main() {
// Poplulate the VM store with known Domains on the host, to get deletes since the last run
for _, domain := range domainCache.GetStore().List() {
d := domain.(*libvirt.Domain)
vmStore.Add(&v1.VM{
ObjectMeta: api.ObjectMeta{Name: d.ObjectMeta.Name, Namespace: api.NamespaceDefault},
})
vmStore.Add(libvirt.NewVMReferenceFromName(d.ObjectMeta.Name))
}

// Watch for domain changes
go domainController.Run(stop)
// Watch for VM changes
go vmController.Run(stop)
go vmController.Run(1, stop)

// Sleep forever
// TODO add a http handler which provides health check
Expand Down
27 changes: 1 addition & 26 deletions pkg/kubecli/kubecli.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package kubecli

import (
"flag"
"fmt"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api"
kubev1 "k8s.io/client-go/pkg/api/v1"
Expand Down Expand Up @@ -85,37 +84,14 @@ func NewListWatchFromClient(c cache.Getter, resource string, namespace string, f
return &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}

type Compressor struct{}

func (c *Compressor) Compress(d cache.Deltas) cache.Deltas {

// Start processing from the last entry or from the last Delete
deleteIndex := -1
for i, d := range d {
if d.Type == cache.Deleted {
deleteIndex = i
}
}

var newDeltas cache.Deltas
if deleteIndex > -1 && deleteIndex < len(d)-1 {
newDeltas = cache.Deltas{d[deleteIndex], d[len(d)-1]}
}
newDeltas = cache.Deltas{d[len(d)-1]}
fmt.Println("DELTAS:")
fmt.Println(d)
fmt.Println(newDeltas)
return newDeltas
}

func NewInformer(
lw cache.ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
) (cache.Indexer, *cache.Controller) {
clientState := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{})
fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, &Compressor{}, clientState)
fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, clientState)

cfg := &cache.Config{
Queue: fifo,
Expand All @@ -126,7 +102,6 @@ func NewInformer(

Process: func(obj interface{}) error {
// from oldest to newest
fmt.Println(obj.(cache.Deltas))

for _, d := range obj.(cache.Deltas) {
switch d.Type {
Expand Down
15 changes: 15 additions & 0 deletions pkg/virt-handler/libvirt/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ package libvirt

import (
"encoding/xml"
"fmt"
"github.com/jeevatkm/go-model"
"github.com/rgbkrk/libvirt-go"
"k8s.io/client-go/pkg/api"
kubev1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/runtime/schema"
"k8s.io/client-go/tools/record"
"kubevirt.io/kubevirt/pkg/api/v1"
"kubevirt.io/kubevirt/pkg/logging"
Expand Down Expand Up @@ -187,3 +190,15 @@ func (l *LibvirtDomainManager) KillVM(vm *v1.VM) error {
l.recorder.Event(vm, kubev1.EventTypeNormal, v1.Deleted.String(), "VM undefined")
return nil
}

// TODO Namespace could be different, also store it somewhere in the domain, so that we can report deletes on handler startup properly
func NewVMReferenceFromName(name string) *v1.VM {
vm := &v1.VM{
ObjectMeta: api.ObjectMeta{
Name: name,
Namespace: api.NamespaceDefault,
SelfLink: fmt.Sprintf("/apis/%s/namespaces/%s/%s", v1.GroupVersion.String(), api.NamespaceDefault, name)},
}
vm.SetGroupVersionKind(schema.GroupVersionKind{Group: v1.GroupVersion.Group, Kind: "VM", Version: v1.GroupVersion.Version})
return vm
}
180 changes: 121 additions & 59 deletions pkg/virt-handler/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,79 +3,141 @@ package virthandler
import (
kubeapi "k8s.io/client-go/pkg/api"
kubev1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/util/wait"
"k8s.io/client-go/pkg/util/workqueue"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"kubevirt.io/kubevirt/pkg/api/v1"
"kubevirt.io/kubevirt/pkg/kubecli"
"kubevirt.io/kubevirt/pkg/logging"
"kubevirt.io/kubevirt/pkg/virt-handler/libvirt"
"time"
)

func NewVMController(listWatcher cache.ListerWatcher, domainManager libvirt.DomainManager, recorder record.EventRecorder, restClient rest.RESTClient) (cache.Indexer, *cache.Controller) {
logger := logging.DefaultLogger()

return kubecli.NewInformer(listWatcher, &v1.VM{}, 0, kubecli.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) error {
logger.Info().Msg("VM ADD")
vm := obj.(*v1.VM)
err := domainManager.SyncVM(vm)
if err != nil {
goto is_error
}
type Controller struct {
indexer cache.Indexer
queue workqueue.RateLimitingInterface
controller *cache.Controller
domainManager libvirt.DomainManager
recorder record.EventRecorder
restclient rest.RESTClient
}

if vm.Status.Phase != v1.Running {
obj, err = kubeapi.Scheme.Copy(vm)
if err != nil {
goto is_error
}
vm = obj.(*v1.VM)
vm.Status.Phase = v1.Running
err = restClient.Put().Resource("vms").Body(vm).
Name(vm.ObjectMeta.Name).Namespace(kubeapi.NamespaceDefault).Do().Error()
if err != nil {
goto is_error
}
}
is_error:
if err != nil {
logger.Error().Msg(err)
recorder.Event(vm, kubev1.EventTypeWarning, v1.SyncFailed.String(), err.Error())
return cache.ErrRequeue{Err: err}
func NewVMController(listWatcher cache.ListerWatcher, domainManager libvirt.DomainManager, recorder record.EventRecorder, restClient rest.RESTClient) (cache.Indexer, *Controller) {
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
indexer, controller := cache.NewIndexerInformer(listWatcher, &v1.VM{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
return nil
},
DeleteFunc: func(obj interface{}) error {
// stop and undefine
// Let's reenque the delete request until we reach the end of the mothod or until
// we detect that the VM does not exist anymore
logger.Info().Msg("VM DELETE")
vm, ok := obj.(*v1.VM)
if !ok {
vm = obj.(cache.DeletedFinalStateUnknown).Obj.(*v1.VM)
UpdateFunc: func(old interface{}, new interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(new)
if err == nil {
queue.Add(key)
}
err := domainManager.KillVM(vm)
if err != nil {
logger.Error().Msg(err)
recorder.Event(vm, kubev1.EventTypeWarning, v1.SyncFailed.String(), err.Error())
return cache.ErrRequeue{Err: err}
}
return nil
},
UpdateFunc: func(old interface{}, new interface{}) error {

logger.Info().Msg("VM UPDATE")
// TODO: at the moment kubecli.NewInformer guarantees that if old is already equal to new,
// in this case we don't need to sync if old is equal to new (but this might change)
// TODO: Implement the spec update flow in LibvirtDomainManager.SyncVM
vm := new.(*v1.VM)
err := domainManager.SyncVM(vm)
if err != nil {
logger.Error().Msg(err)
recorder.Event(vm, kubev1.EventTypeWarning, v1.SyncFailed.String(), err.Error())
return cache.ErrRequeue{Err: err}
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
return nil
},
})
}, cache.Indexers{})

return indexer, &Controller{
controller: controller,
indexer: indexer,
queue: queue,
domainManager: domainManager,
recorder: recorder,
restclient: restClient,
}
}

func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
defer kubecli.NewPanicCatcher()
defer c.queue.ShutDown()
logging.DefaultLogger().Info().Msg("Starting VM controller")

go c.controller.Run(stopCh)

for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}

<-stopCh
logging.DefaultLogger().Info().Msg("Stopping VM controller")
}

func (c *Controller) runWorker() {
for c.processNextWorkItem() {
}
}

func (c *Controller) processNextWorkItem() bool {
key, quit := c.queue.Get()
if quit {
logging.DefaultLogger().V(3).Info().Msg("Exiting")
return false
}
defer c.queue.Done(key)
// Fetch the latest Vm state from cache
obj, exists, err := c.indexer.GetByKey(key.(string))
logging.DefaultLogger().V(3).Info().Msgf("Object %s", obj)

if err != nil {
// TODO do something more smart here
c.queue.Forget(key)
return true
}

// Retrieve the VM
var vm *v1.VM
if !exists {
_, name, err := cache.SplitMetaNamespaceKey(key.(string))
if err != nil {
// TODO do something more smart here
c.queue.Forget(key)
return true
}
vm = libvirt.NewVMReferenceFromName(name)
} else {
vm = obj.(*v1.VM)
}
logging.DefaultLogger().V(3).Info().Object(vm).Msg("Processing VM update")

// Process the VM
if !exists {
// Since the VM was not in the cache, we delete it
err = c.domainManager.KillVM(vm)
} else {
// Synchronize the VM state
err = c.domainManager.SyncVM(vm)

// Update VM status to running
if err == nil && vm.Status.Phase != v1.Running {
obj, err = kubeapi.Scheme.Copy(vm)
if err == nil {
vm = obj.(*v1.VM)
vm.Status.Phase = v1.Running
err = c.restclient.Put().Resource("vms").Body(vm).
Name(vm.ObjectMeta.Name).Namespace(kubeapi.NamespaceDefault).Do().Error()
}
}
}

if err != nil {
// Something went wrong, reenqueue the item with a delay
logging.DefaultLogger().V(3).Info().Object(vm).Msgf("Synchronizing the VM failed with: %s", err)
c.recorder.Event(vm, kubev1.EventTypeWarning, v1.SyncFailed.String(), err.Error())
c.queue.AddRateLimited(key)
return true
}

logging.DefaultLogger().V(3).Info().Object(vm).Msg("Synchronizing the VM succeeded")
c.queue.Forget(key)
return true
}

0 comments on commit 589fec6

Please sign in to comment.