Skip to content

Commit

Permalink
Merge pull request kcp-dev#1372 from ncdc/ddsif
Browse files Browse the repository at this point in the history
Various dynamic discovery shared informer factory updates
  • Loading branch information
openshift-ci[bot] authored Jun 30, 2022
2 parents 8074126 + 2867356 commit 355f371
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 75 deletions.
67 changes: 67 additions & 0 deletions pkg/indexers/indexers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
Copyright 2022 The KCP Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package indexers

import (
"github.com/kcp-dev/logicalcluster"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clusters"
)

const (
// ByLogicalCluster is the name for the index that indexes by an object's logical cluster.
ByLogicalCluster = "kcp-global-byLogicalCluster"
// ByLogicalClusterAndNamespace is the name for the index that indexes by an object's logical cluster and namespace.
ByLogicalClusterAndNamespace = "kcp-global-byLogicalClusterAndNamespace"
)

// ClusterScoped returns cache.Indexers appropriate for cluster-scoped resources.
func ClusterScoped() cache.Indexers {
return cache.Indexers{
ByLogicalCluster: IndexByLogicalCluster,
}
}

// NamespaceScoped returns cache.Indexers appropriate for namespace-scoped resources.
func NamespaceScoped() cache.Indexers {
return cache.Indexers{
ByLogicalCluster: IndexByLogicalCluster,
ByLogicalClusterAndNamespace: IndexByLogicalClusterAndNamespace,
}
}

// IndexByLogicalCluster is an index function that indexes by an object's logical cluster.
func IndexByLogicalCluster(obj interface{}) ([]string, error) {
a, err := meta.Accessor(obj)
if err != nil {
return nil, err
}

return []string{logicalcluster.From(a).String()}, nil
}

// IndexByLogicalClusterAndNamespace is an index function that indexes by an object's logical cluster and namespace.
func IndexByLogicalClusterAndNamespace(obj interface{}) ([]string, error) {
a, err := meta.Accessor(obj)
if err != nil {
return nil, err
}

return []string{clusters.ToClusterAwareKey(logicalcluster.From(a), a.GetNamespace())}, nil
}
172 changes: 112 additions & 60 deletions pkg/informer/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/kcp-dev/logicalcluster"

corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
Expand All @@ -37,6 +39,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1"
tenancylisters "github.com/kcp-dev/kcp/pkg/client/listers/tenancy/v1alpha1"
)

Expand All @@ -54,32 +57,31 @@ type DynamicDiscoverySharedInformerFactory struct {
workspaceLister tenancylisters.ClusterWorkspaceLister
disco clusterDiscovery
dynamicClient dynamic.Interface
handlers []GVREventHandler
filterFunc func(interface{}) bool
pollInterval time.Duration
indexers cache.Indexers

mu sync.RWMutex // guards gvrs
gvrs map[schema.GroupVersionResource]struct{}
informers map[schema.GroupVersionResource]informers.GenericInformer
informerStops map[schema.GroupVersionResource]chan struct{}
terminating bool
}
// handlersLock protects multiple writers racing to update handlers.
handlersLock sync.Mutex
handlers atomic.Value

// IndexerFor returns the indexer for the given type GVR.
func (d *DynamicDiscoverySharedInformerFactory) IndexerFor(gvr schema.GroupVersionResource) cache.Indexer {
return d.InformerForResource(gvr).Informer().GetIndexer()
mu sync.RWMutex
informers map[schema.GroupVersionResource]informers.GenericInformer
startedInformers map[schema.GroupVersionResource]bool
informerStops map[schema.GroupVersionResource]chan struct{}
terminating bool
}

// InformerForResource returns the GenericInformer for gvr, creating it if needed.
func (d *DynamicDiscoverySharedInformerFactory) InformerForResource(gvr schema.GroupVersionResource) informers.GenericInformer {
// InformerForResource returns the GenericInformer for gvr, creating it if needed. The GenericInformer must be started
// by calling Start on the DynamicDiscoverySharedInformerFactory before the GenericInformer can be used.
func (d *DynamicDiscoverySharedInformerFactory) InformerForResource(gvr schema.GroupVersionResource) (informers.GenericInformer, error) {
// See if we already have it
d.mu.RLock()
inf := d.informers[gvr]
d.mu.RUnlock()

if inf != nil {
return inf
return inf, nil
}

// Grab the write lock, then find-or-create
Expand All @@ -91,14 +93,16 @@ func (d *DynamicDiscoverySharedInformerFactory) InformerForResource(gvr schema.G

// informerForResourceLockHeld returns the GenericInformer for gvr, creating it if needed. The caller must have the write
// lock before calling this method.
func (d *DynamicDiscoverySharedInformerFactory) informerForResourceLockHeld(gvr schema.GroupVersionResource) informers.GenericInformer {
func (d *DynamicDiscoverySharedInformerFactory) informerForResourceLockHeld(gvr schema.GroupVersionResource) (informers.GenericInformer, error) {
// In case it was created in between the initial check while the rlock was held and when the write lock was
// acquired, return it instead of creating a 2nd copy and overwriting.
inf := d.informers[gvr]
if inf != nil {
return inf
return inf, nil
}

klog.Infof("Adding dynamic informer for %q", gvr)

// Definitely need to create it
inf = dynamicinformer.NewFilteredDynamicInformer(
d.dynamicClient,
Expand All @@ -109,10 +113,35 @@ func (d *DynamicDiscoverySharedInformerFactory) informerForResourceLockHeld(gvr
nil,
)

inf.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: d.filterFunc,
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
for _, h := range d.handlers.Load().([]GVREventHandler) {
h.OnAdd(gvr, obj)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
for _, h := range d.handlers.Load().([]GVREventHandler) {
h.OnUpdate(gvr, oldObj, newObj)
}
},
DeleteFunc: func(obj interface{}) {
for _, h := range d.handlers.Load().([]GVREventHandler) {
h.OnDelete(gvr, obj)
}
},
},
})

if err := inf.Informer().AddIndexers(d.indexers); err != nil {
return nil, err
}

// Store in cache
d.informers[gvr] = inf

return inf
return inf, nil
}

// Listers returns a map of per-resource-type listers for all types that are
Expand All @@ -130,10 +159,9 @@ func (d *DynamicDiscoverySharedInformerFactory) Listers() (listers map[schema.Gr
return
}

for gvr := range d.gvrs {
for gvr, informer := range d.informers {
// We have the read lock so d.informers is fully populated for all the gvrs in d.gvrs. We use d.informers
// directly instead of calling either InformerForResource or informerForResourceLockHeld.
informer := d.informers[gvr]
if !informer.Informer().HasSynced() {
notSynced = append(notSynced, gvr)
continue
Expand All @@ -155,16 +183,20 @@ func NewDynamicDiscoverySharedInformerFactory(
filterFunc func(obj interface{}) bool,
pollInterval time.Duration,
) *DynamicDiscoverySharedInformerFactory {
return &DynamicDiscoverySharedInformerFactory{
workspaceLister: workspaceLister,
disco: disco,
dynamicClient: dynClient,
filterFunc: filterFunc,
gvrs: make(map[schema.GroupVersionResource]struct{}),
pollInterval: pollInterval,
informers: make(map[schema.GroupVersionResource]informers.GenericInformer),
informerStops: make(map[schema.GroupVersionResource]chan struct{}),
f := &DynamicDiscoverySharedInformerFactory{
workspaceLister: workspaceLister,
disco: disco,
dynamicClient: dynClient,
filterFunc: filterFunc,
pollInterval: pollInterval,
informers: make(map[schema.GroupVersionResource]informers.GenericInformer),
informerStops: make(map[schema.GroupVersionResource]chan struct{}),
startedInformers: make(map[schema.GroupVersionResource]bool),
}

f.handlers.Store([]GVREventHandler{})

return f
}

// GVREventHandler is an event handler that includes the GroupVersionResource
Expand Down Expand Up @@ -198,7 +230,18 @@ func (g GVREventHandlerFuncs) OnDelete(gvr schema.GroupVersionResource, obj inte
}

func (d *DynamicDiscoverySharedInformerFactory) AddEventHandler(handler GVREventHandler) {
d.handlers = append(d.handlers, handler)
d.handlersLock.Lock()

handlers := d.handlers.Load().([]GVREventHandler)

var newHandlers []GVREventHandler
copy(newHandlers, handlers)

newHandlers = append(newHandlers, handler)

d.handlers.Store(newHandlers)

d.handlersLock.Unlock()
}

func (d *DynamicDiscoverySharedInformerFactory) AddIndexers(indexers cache.Indexers) error {
Expand All @@ -215,7 +258,9 @@ func (d *DynamicDiscoverySharedInformerFactory) AddIndexers(indexers cache.Index
return nil
}

func (d *DynamicDiscoverySharedInformerFactory) Start(ctx context.Context) {
// StartPolling starts the polling process that periodically discovers new resources and starts informers for them.
// This call is non-blocking.
func (d *DynamicDiscoverySharedInformerFactory) StartPolling(ctx context.Context) {
// Immediately discover types and start informing.
if err := wait.PollImmediateInfiniteWithContext(ctx, time.Second, func(ctx context.Context) (bool, error) {
if err := d.discoverTypes(ctx); err != nil {
Expand Down Expand Up @@ -304,7 +349,7 @@ func (d *DynamicDiscoverySharedInformerFactory) discoverTypes(ctx context.Contex
}
}

// Grab a read lock to compare against d.gvrs to see if we need to start or stop any informers
// Grab a read lock to compare against d.informers to see if we need to start or stop any informers
d.mu.RLock()
informersToAdd, informersToRemove := d.calculateInformersLockHeld(latest)
d.mu.RUnlock()
Expand All @@ -328,41 +373,19 @@ func (d *DynamicDiscoverySharedInformerFactory) discoverTypes(ctx context.Contex
for i := range informersToAdd {
gvr := informersToAdd[i]

klog.Infof("Adding dynamic informer for %q", gvr)

// We have the write lock, so call the LH variant
inf := d.informerForResourceLockHeld(gvr).Informer()
inf.AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: d.filterFunc,
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
for _, h := range d.handlers {
h.OnAdd(gvr, obj)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
for _, h := range d.handlers {
h.OnUpdate(gvr, oldObj, newObj)
}
},
DeleteFunc: func(obj interface{}) {
for _, h := range d.handlers {
h.OnDelete(gvr, obj)
}
},
},
})

if err := inf.GetIndexer().AddIndexers(d.indexers); err != nil {
inf, err := d.informerForResourceLockHeld(gvr)
if err != nil {
return err
}

// Set up a stop channel for this specific informer
stop := make(chan struct{})
go inf.Run(stop)
go inf.Informer().Run(stop)

// And store it
d.informerStops[gvr] = stop
d.startedInformers[gvr] = true
}

for i := range informersToRemove {
Expand All @@ -379,21 +402,50 @@ func (d *DynamicDiscoverySharedInformerFactory) discoverTypes(ctx context.Contex
klog.V(4).Infof("Removing dynamic informer from maps for %q", gvr)
delete(d.informers, gvr)
delete(d.informerStops, gvr)
delete(d.startedInformers, gvr)
}

d.gvrs = latest

return nil
}

// Start starts any informers that have been created but not yet started. The passed in stop channel is ignored;
// instead, a new stop channel is created, so the factory can properly stop the informer if/when the API is removed.
// Like other shared informer factories, this call is non-blocking.
func (d *DynamicDiscoverySharedInformerFactory) Start(_ <-chan struct{}) {
d.mu.Lock()
defer d.mu.Unlock()

for gvr, informer := range d.informers {
if !d.startedInformers[gvr] {
// Set up a stop channel for this specific informer
stop := make(chan struct{})
go informer.Informer().Run(stop)

// And store it
d.informerStops[gvr] = stop
d.startedInformers[gvr] = true
}
}
}

var (
crdGVR = apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions")
apibindingsGVR = apisv1alpha1.SchemeGroupVersion.WithResource("apibindings")
)

func (d *DynamicDiscoverySharedInformerFactory) calculateInformersLockHeld(latest map[schema.GroupVersionResource]struct{}) (toAdd, toRemove []schema.GroupVersionResource) {
for gvr := range latest {
if _, found := d.gvrs[gvr]; !found {
if _, found := d.informers[gvr]; !found {
toAdd = append(toAdd, gvr)
}
}

for gvr := range d.gvrs {
for gvr := range d.informers {
// HACK(ncdc): these are needed by our kubeQuota controller - don't delete them
if gvr == crdGVR || gvr == apibindingsGVR {
continue
}

if _, found := latest[gvr]; !found {
toRemove = append(toRemove, gvr)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/reconciler/workload/resource/resource_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,11 @@ func (c *Controller) processResource(ctx context.Context, key string) error {
}
key = parts[1]

obj, exists, err := c.ddsif.IndexerFor(*gvr).GetByKey(key)
inf, err := c.ddsif.InformerForResource(*gvr)
if err != nil {
return err
}
obj, exists, err := inf.Informer().GetIndexer().GetByKey(key)
if err != nil {
klog.Errorf("Error getting %q GVR %q from indexer: %v", key, gvrstr, err)
return err
Expand Down
Loading

0 comments on commit 355f371

Please sign in to comment.