Skip to content

Commit

Permalink
Create a wrapper for dynamic cache (vmware-archive#568)
Browse files Browse the repository at this point in the history
The cache for dynamic cache will keep an up to date record of all the
objects in a cluster that have been queried. This will speed up the
time it takes to generate object graphs 8x or more.
  • Loading branch information
bryanl authored Mar 11, 2019
1 parent e72e6b8 commit 21a80ad
Show file tree
Hide file tree
Showing 39 changed files with 806 additions and 234 deletions.
1 change: 1 addition & 0 deletions internal/api/content.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func (h *contentHandler) handlePoll(ctx context.Context, poll, namespace string,
w: w,
logger: h.logger,
streamFn: stream,
contentPath: contentPath,
}

if err = cs.content(ctx); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions internal/api/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ type contentStreamer struct {
w http.ResponseWriter
streamFn streamFn
logger log.Logger
contentPath string
}

func (cs *contentStreamer) content(ctx context.Context) error {
Expand Down Expand Up @@ -209,6 +210,7 @@ func (cs *contentStreamer) content(ctx context.Context) error {
cs.logger.With(
"elapsed", time.Since(now),
"generator", eg.Name(),
"contentPath", cs.contentPath,
).Debugf("generate complete")
ch <- e

Expand Down
44 changes: 5 additions & 39 deletions internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package cache

import (
"context"
"fmt"
"strings"

cacheutil "github.com/heptio/developer-dash/internal/cache/util"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
kLabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
kcache "k8s.io/client-go/tools/cache"
)
Expand All @@ -18,45 +16,13 @@ import (

// Cache stores Kubernetes objects.
type Cache interface {
List(ctx context.Context, key Key) ([]*unstructured.Unstructured, error)
Get(ctx context.Context, key Key) (*unstructured.Unstructured, error)
Watch(key Key, handler kcache.ResourceEventHandler) error
}

// Key is a key for the cache.
type Key struct {
Namespace string
APIVersion string
Kind string
Name string
Selector kLabels.Selector
}

func (k Key) String() string {
var sb strings.Builder

sb.WriteString("CacheKey[")
if k.Namespace != "" {
fmt.Fprintf(&sb, "Namespace=%q, ", k.Namespace)
}
fmt.Fprintf(&sb, "APIVersion=%q, ", k.APIVersion)
fmt.Fprintf(&sb, "Kind=%q", k.Kind)

if k.Name != "" {
fmt.Fprintf(&sb, ", Name=%q", k.Name)
}

if k.Selector != nil {
fmt.Fprintf(&sb, ", Selector=%q", k.Selector.String())
}

sb.WriteString("]")

return sb.String()
List(ctx context.Context, key cacheutil.Key) ([]*unstructured.Unstructured, error)
Get(ctx context.Context, key cacheutil.Key) (*unstructured.Unstructured, error)
Watch(key cacheutil.Key, handler kcache.ResourceEventHandler) error
}

// GetAs gets an object from the cache by key.
func GetAs(ctx context.Context, c Cache, key Key, as interface{}) error {
func GetAs(ctx context.Context, c Cache, key cacheutil.Key, as interface{}) error {
u, err := c.Get(ctx, key)
if err != nil {
return errors.Wrap(err, "get object from cache")
Expand Down
42 changes: 33 additions & 9 deletions internal/cache/dynamic_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"
"time"

cacheutil "github.com/heptio/developer-dash/internal/cache/util"
"github.com/heptio/developer-dash/internal/cluster"
"github.com/heptio/developer-dash/internal/util/retry"
"github.com/pkg/errors"
Expand Down Expand Up @@ -35,6 +36,32 @@ func initDynamicSharedInformerFactory(client cluster.ClientInterface) (dynamicin
return factory, nil
}

func currentInformer(
key cacheutil.Key,
client cluster.ClientInterface,
factory dynamicinformer.DynamicSharedInformerFactory,
stopCh <-chan struct{}) (informers.GenericInformer, error) {
if factory == nil {
return nil, errors.New("dynamic shared informer factory is nil")
}

if client == nil {
return nil, errors.New("cluster client is nil")
}

gvk := key.GroupVersionKind()

gvr, err := client.Resource(gvk.GroupKind())
if err != nil {
return nil, err
}

informer := factory.ForResource(gvr)
factory.Start(stopCh)

return informer, nil
}

// DynamicCacheOpt is an option for configuration DynamicCache.
type DynamicCacheOpt func(*DynamicCache)

Expand Down Expand Up @@ -77,17 +104,14 @@ type lister interface {
List(selector kLabels.Selector) ([]kruntime.Object, error)
}

func (dc *DynamicCache) currentInformer(key Key) (informers.GenericInformer, error) {
gvk := schema.FromAPIVersionAndKind(key.APIVersion, key.Kind)
func (dc *DynamicCache) currentInformer(key cacheutil.Key) (informers.GenericInformer, error) {
gvk := key.GroupVersionKind()

gvr, err := dc.client.Resource(gvk.GroupKind())
informer, err := currentInformer(key, dc.client, dc.factory, dc.stopCh)
if err != nil {
return nil, err
}

informer := dc.factory.ForResource(gvr)
dc.factory.Start(dc.stopCh)

dc.mu.Lock()
defer dc.mu.Unlock()

Expand All @@ -106,7 +130,7 @@ func (dc *DynamicCache) currentInformer(key Key) (informers.GenericInformer, err
}

// List lists objects.
func (dc *DynamicCache) List(ctx context.Context, key Key) ([]*unstructured.Unstructured, error) {
func (dc *DynamicCache) List(ctx context.Context, key cacheutil.Key) ([]*unstructured.Unstructured, error) {
ctx, span := trace.StartSpan(ctx, "dynamicCacheList")
defer span.End()

Expand Down Expand Up @@ -155,7 +179,7 @@ type getter interface {
}

// Get retrieves a single object.
func (dc *DynamicCache) Get(ctx context.Context, key Key) (*unstructured.Unstructured, error) {
func (dc *DynamicCache) Get(ctx context.Context, key cacheutil.Key) (*unstructured.Unstructured, error) {
ctx, span := trace.StartSpan(ctx, "dynamicCacheList")
defer span.End()

Expand Down Expand Up @@ -226,7 +250,7 @@ func (dc *DynamicCache) Get(ctx context.Context, key Key) (*unstructured.Unstruc

// Watch watches the cluster for an event and performs actions with the
// supplied handler.
func (dc *DynamicCache) Watch(key Key, handler kcache.ResourceEventHandler) error {
func (dc *DynamicCache) Watch(key cacheutil.Key, handler kcache.ResourceEventHandler) error {
informer, err := dc.currentInformer(key)
if err != nil {
return errors.Wrapf(err, "retrieving informer for %s", key)
Expand Down
5 changes: 3 additions & 2 deletions internal/cache/dynamic_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

"github.com/golang/mock/gomock"
cacheutil "github.com/heptio/developer-dash/internal/cache/util"
"github.com/heptio/developer-dash/internal/cluster"
clusterfake "github.com/heptio/developer-dash/internal/cluster/fake"
"github.com/heptio/developer-dash/internal/testutil"
Expand Down Expand Up @@ -83,7 +84,7 @@ func Test_DynamicCache_List(t *testing.T) {
c, err := NewDynamicCache(client, ctx.Done(), factoryFunc)
require.NoError(t, err)

key := Key{
key := cacheutil.Key{
Namespace: "default",
APIVersion: "v1",
Kind: "Pod",
Expand Down Expand Up @@ -140,7 +141,7 @@ func Test_DynamicCache_Get(t *testing.T) {
c, err := NewDynamicCache(client, ctx.Done(), factoryFunc)
require.NoError(t, err)

key := Key{
key := cacheutil.Key{
Namespace: "default",
APIVersion: "v1",
Kind: "Pod",
Expand Down
46 changes: 46 additions & 0 deletions internal/cache/util/key.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package util

import (
"fmt"
"strings"

kLabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
)

// Key is a key for the cache.
type Key struct {
Namespace string
APIVersion string
Kind string
Name string
Selector kLabels.Selector
}

func (k Key) String() string {
var sb strings.Builder

sb.WriteString("CacheKey[")
if k.Namespace != "" {
fmt.Fprintf(&sb, "Namespace=%q, ", k.Namespace)
}
fmt.Fprintf(&sb, "APIVersion=%q, ", k.APIVersion)
fmt.Fprintf(&sb, "Kind=%q", k.Kind)

if k.Name != "" {
fmt.Fprintf(&sb, ", Name=%q", k.Name)
}

if k.Selector != nil {
fmt.Fprintf(&sb, ", Selector=%q", k.Selector.String())
}

sb.WriteString("]")

return sb.String()
}

// GroupVersionKind converts the Key to a GroupVersionKind.
func (k Key) GroupVersionKind() schema.GroupVersionKind {
return schema.FromAPIVersionAndKind(k.APIVersion, k.Kind)
}
Loading

0 comments on commit 21a80ad

Please sign in to comment.