Skip to content

Commit

Permalink
Add vmpool indexer by uid to controller revision informer
Browse files Browse the repository at this point in the history
Signed-off-by: David Vossel <[email protected]>
  • Loading branch information
davidvossel committed Feb 21, 2022
1 parent 1a55864 commit 90538a3
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 17 deletions.
14 changes: 14 additions & 0 deletions pkg/controller/virtinformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,20 @@ func GetControllerRevisionInformerIndexers() cache.Indexers {
}
}

return nil, nil
},
"vmpool": func(obj interface{}) ([]string, error) {
cr, ok := obj.(*appsv1.ControllerRevision)
if !ok {
return nil, unexpectedObjectError
}

for _, ref := range cr.OwnerReferences {
if ref.Kind == "VirtualMachinePool" {
return []string{string(ref.UID)}, nil
}
}

return nil, nil
},
}
Expand Down
1 change: 1 addition & 0 deletions pkg/virt-controller/watch/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,7 @@ func (vca *VirtControllerApp) initPool() {
vca.vmiInformer,
vca.vmInformer,
vca.poolInformer,
vca.controllerRevisionInformer,
recorder,
controller.BurstReplicas)
}
Expand Down
67 changes: 50 additions & 17 deletions pkg/virt-controller/watch/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"k8s.io/utils/trace"

"github.com/davecgh/go-spew/spew"
appsv1 "k8s.io/api/apps/v1"
k8score "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -36,14 +37,15 @@ import (

// PoolController is the main PoolController struct.
type PoolController struct {
clientset kubecli.KubevirtClient
queue workqueue.RateLimitingInterface
vmInformer cache.SharedIndexInformer
vmiInformer cache.SharedIndexInformer
poolInformer cache.SharedIndexInformer
recorder record.EventRecorder
expectations *controller.UIDTrackingControllerExpectations
burstReplicas uint
clientset kubecli.KubevirtClient
queue workqueue.RateLimitingInterface
vmInformer cache.SharedIndexInformer
vmiInformer cache.SharedIndexInformer
poolInformer cache.SharedIndexInformer
revisionInformer cache.SharedIndexInformer
recorder record.EventRecorder
expectations *controller.UIDTrackingControllerExpectations
burstReplicas uint
}

const (
Expand All @@ -69,17 +71,19 @@ func NewPoolController(clientset kubecli.KubevirtClient,
vmiInformer cache.SharedIndexInformer,
vmInformer cache.SharedIndexInformer,
poolInformer cache.SharedIndexInformer,
revisionInformer cache.SharedIndexInformer,
recorder record.EventRecorder,
burstReplicas uint) *PoolController {
c := &PoolController{
clientset: clientset,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "virt-controller-pool"),
poolInformer: poolInformer,
vmiInformer: vmiInformer,
vmInformer: vmInformer,
recorder: recorder,
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
burstReplicas: burstReplicas,
clientset: clientset,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "virt-controller-pool"),
poolInformer: poolInformer,
vmiInformer: vmiInformer,
vmInformer: vmInformer,
revisionInformer: revisionInformer,
recorder: recorder,
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
burstReplicas: burstReplicas,
}

c.poolInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand All @@ -94,6 +98,11 @@ func NewPoolController(clientset kubecli.KubevirtClient,
UpdateFunc: c.updateVMHandler,
})

c.revisionInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addRevisionHandler,
UpdateFunc: c.updateRevisionHandler,
})

c.vmiInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addVMIHandler,
UpdateFunc: c.updateVMIHandler,
Expand Down Expand Up @@ -172,6 +181,30 @@ func (c *PoolController) updateVMIHandler(old, cur interface{}) {
c.addVMIHandler(cur)
}

// When a revision is created, enqueue the pool that manages it and update its expectations.
func (c *PoolController) addRevisionHandler(obj interface{}) {
cr := obj.(*appsv1.ControllerRevision)

// If it has a ControllerRef, that's all that matters.
if controllerRef := metav1.GetControllerOf(cr); controllerRef != nil {
pool := c.resolveControllerRef(cr.Namespace, controllerRef)
if pool == nil {
return
}
poolKey, err := controller.KeyFunc(pool)
if err != nil {
return
}
c.expectations.CreationObserved(poolKey)
c.enqueuePool(pool)
return
}
}

func (c *PoolController) updateRevisionHandler(old, cur interface{}) {
c.addRevisionHandler(cur)
}

// When a vm is created, enqueue the pool that manages it and update its expectations.
func (c *PoolController) addVMHandler(obj interface{}) {
vm := obj.(*virtv1.VirtualMachine)
Expand Down Expand Up @@ -408,7 +441,7 @@ func (c *PoolController) Run(threadiness int, stopCh <-chan struct{}) {
log.Log.Info("Starting pool controller.")

// Wait for cache sync before we start the pool controller
cache.WaitForCacheSync(stopCh, c.poolInformer.HasSynced, c.vmInformer.HasSynced)
cache.WaitForCacheSync(stopCh, c.poolInformer.HasSynced, c.vmInformer.HasSynced, c.vmiInformer.HasSynced, c.revisionInformer.HasSynced)

// Start the actual work
for i := 0; i < threadiness; i++ {
Expand Down
17 changes: 17 additions & 0 deletions pkg/virt-controller/watch/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
. "github.com/onsi/ginkgo"
"github.com/onsi/ginkgo/extensions/table"
. "github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
k8sv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -84,6 +85,8 @@ var _ = Describe("Pool", func() {

var ctrl *gomock.Controller

var crInformer cache.SharedIndexInformer

var vmInterface *kubecli.MockVirtualMachineInterface
var vmiInterface *kubecli.MockVirtualMachineInstanceInterface

Expand All @@ -103,6 +106,7 @@ var _ = Describe("Pool", func() {
go vmiInformer.Run(stop)
go vmInformer.Run(stop)
go poolInformer.Run(stop)
go crInformer.Run(stop)
Expect(cache.WaitForCacheSync(stop, vmiInformer.HasSynced, vmInformer.HasSynced, poolInformer.HasSynced)).To(BeTrue())
}

Expand Down Expand Up @@ -137,10 +141,23 @@ var _ = Describe("Pool", func() {
recorder = record.NewFakeRecorder(100)
recorder.IncludeObject = true

crInformer, _ = testutils.NewFakeInformerWithIndexersFor(&appsv1.ControllerRevision{}, cache.Indexers{
"vmpool": func(obj interface{}) ([]string, error) {
cr := obj.(*appsv1.ControllerRevision)
for _, ref := range cr.OwnerReferences {
if ref.Kind == "VirtualMachinePool" {
return []string{string(ref.UID)}, nil
}
}
return nil, nil
},
})

controller = NewPoolController(virtClient,
vmiInformer,
vmInformer,
poolInformer,
crInformer,
recorder,
uint(10))
// Wrap our workqueue to have a way to detect when we are done processing updates
Expand Down

0 comments on commit 90538a3

Please sign in to comment.