Skip to content

Commit

Permalink
fanout grace period + inline relativeexpiry
Browse files Browse the repository at this point in the history
  • Loading branch information
rosstimothy committed Apr 8, 2022
1 parent aad1a4e commit d7a598d
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 64 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ update-vendor:
# delete the vendored api package. In its place
# create a symlink to the the original api package
rm -r vendor/github.com/gravitational/teleport/api
ln -s -r $(shell readlink -f api) vendor/github.com/gravitational/teleport
cd vendor/github.com/gravitational/teleport && ln -s ../../../../api api

# update-webassets updates the minified code in the webassets repo using the latest webapps
# repo and creates a PR in the teleport repo to update webassets submodule.
Expand Down
40 changes: 21 additions & 19 deletions lib/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ func tombstoneKey() []byte {
return backend.Key("cache", teleport.Version, "tombstone", "ok")
}

const cacheTargetAuth string = "auth"
const (
relativeExpiryCap int = 1000
)

// ForAuth sets up watch configuration for the auth server
func ForAuth(cfg Config) Config {
cfg.target = cacheTargetAuth
cfg.target = "auth"
cfg.Watches = []types.WatchKind{
{Kind: types.KindCertAuthority, LoadSecrets: true},
{Kind: types.KindClusterName},
Expand Down Expand Up @@ -946,12 +948,6 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry utils.Retry, timer *tim
// cannot run concurrently with event processing. this function injects additional events into
// the outbound event stream.
func (c *Cache) performRelativeNodeExpiry(ctx context.Context) error {
if c.target != cacheTargetAuth {
// if we are not the auth cache, we are a downstream cache and can rely upon the
// upstream auth cache to perform relative expiry and propagate the changes.
return nil
}

// TODO(fspmarshall): Start using dynamic value once it is implemented.
gracePeriod := apidefaults.ServerAnnounceTTL

Expand Down Expand Up @@ -1001,20 +997,26 @@ func (c *Cache) performRelativeNodeExpiry(ctx context.Context) error {
continue
}

// event stream processing is paused while this function runs. we perform the
// actual expiry by constructing a fake delete event for the resource which both
// updates this cache, and all downstream caches.
err = c.processEvent(ctx, types.Event{
Type: types.OpDelete,
Resource: &types.ResourceHeader{
Kind: types.KindNode,
Metadata: node.GetMetadata(),
},
})
// event stream processing is paused while this function runs. we perform the
// actual expiry by constructing a fake delete event for the resource which *only*
// updates this cache. this is performed on all caches so eventually all downstream
// caches should become consistent.
err := c.presenceCache.DeleteNode(ctx, apidefaults.Namespace, node.GetMetadata().Name)
if err != nil {
// resource could be missing in the cache
// expired or not created, if the first consumed
// event is delete
if !trace.IsNotFound(err) {
c.Warningf("Failed to delete resource %v.", err)
return trace.Wrap(err)
}
}
if err != nil {
return trace.Wrap(err)
}
removed++
if removed++; removed >= relativeExpiryCap {
break
}
}

if removed > 0 {
Expand Down
31 changes: 15 additions & 16 deletions lib/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cache

import (
"context"
"errors"
"fmt"
"os"
"sync"
Expand Down Expand Up @@ -2261,13 +2262,13 @@ func (p *proxyEvents) NewWatcher(ctx context.Context, watch types.Watch) (types.

func TestRelativeExpiryOverflow(t *testing.T) {
ctx := context.Background()
nodeCount := 90000
nodeCount := 60000
expireBlock := 4500
clock := clockwork.NewFakeClock()
p, err := newPack(
t.TempDir(),
func(c Config) Config {
c.RelativeExpiryCheckInterval = time.Minute
c.RelativeExpiryCheckInterval = 20 * time.Minute
c.Clock = clock
return ForAuth(c)
},
Expand Down Expand Up @@ -2313,25 +2314,23 @@ func TestRelativeExpiryOverflow(t *testing.T) {
require.NoError(t, err)
require.Len(t, nodes, nodeCount)

clock.Advance(20 * time.Minute)
require.NoError(t, p.cache.performRelativeNodeExpiry(ctx))
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

expired := nodeCount - expireBlock
var processed int
for {
clock.Advance(20 * time.Minute)
select {
case <-watcher.Done():
require.NoError(t, watcher.Error())
case evt := <-watcher.Events():
require.NotNil(t, evt)

if processed++; processed > expireBlock/3 {
clock.Advance(20 * time.Minute)
require.NoError(t, p.cache.performRelativeNodeExpiry(ctx))
processed = 0
err := watcher.Error()
if err == nil || err.Error() == "watcher closed" || errors.Is(err, context.Canceled) {
return
}

if expired--; expired < 0 {
require.NoError(t, err)
case <-ticker.C:
nodes, err := p.cache.GetNodes(ctx, apidefaults.Namespace)
require.NoError(t, err)
require.Less(t, len(nodes), nodeCount)
if len(nodes) <= expireBlock {
return
}
}
Expand Down
53 changes: 26 additions & 27 deletions lib/services/fanout.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,41 +37,41 @@ type fanoutEntry struct {
watcher *fanoutWatcher
}

type bufferConfig struct {
type fanoutConfig struct {
gracePeriod time.Duration
capacity int
events chan FanoutEvent
clock clockwork.Clock
}

type BufferOption func(*bufferConfig)

// BufferCapacity sets the event capacity of the circular buffer.
func BufferCapacity(c int) BufferOption {
return func(cfg *bufferConfig) {
if c > 0 {
cfg.capacity = c
}
}
}
type FanoutOption func(*fanoutConfig)

// BacklogGracePeriod sets the amount of time a watcher with a backlog will be tolerated.
func BacklogGracePeriod(d time.Duration) BufferOption {
return func(cfg *bufferConfig) {
func BacklogGracePeriod(d time.Duration) FanoutOption {
return func(cfg *fanoutConfig) {
if d > 0 {
cfg.gracePeriod = d
}
}
}

// BufferClock sets a custom clock for the buffer (used in tests).
func BufferClock(c clockwork.Clock) BufferOption {
return func(cfg *bufferConfig) {
// FanoutClock sets a custom clock for the buffer (used in tests).
func FanoutClock(c clockwork.Clock) FanoutOption {
return func(cfg *fanoutConfig) {
if c != nil {
cfg.clock = c
}
}
}

// EventsChannel sets a custom clock for the buffer (used in tests).
func EventsChannel(c chan FanoutEvent) FanoutOption {
return func(cfg *fanoutConfig) {
if c != nil {
cfg.events = c
}
}
}

// Fanout is a helper which allows a stream of events to be fanned-out to many
// watchers. Used by the cache layer to forward events.
type Fanout struct {
Expand All @@ -80,26 +80,25 @@ type Fanout struct {
watchers map[string][]fanoutEntry
// eventsCh is used in tests
eventsCh chan FanoutEvent
cfg bufferConfig
cfg fanoutConfig
}

// NewFanout creates a new Fanout instance in an uninitialized
// state. Until initialized, watchers will be queued but no
// events will be sent.
func NewFanout(eventsCh ...chan FanoutEvent) *Fanout {
cfg := bufferConfig{
func NewFanout(opts ...FanoutOption) *Fanout {
cfg := fanoutConfig{
gracePeriod: backend.DefaultBacklogGracePeriod,
capacity: backend.DefaultBufferCapacity,
clock: clockwork.NewRealClock(),
}
for _, opt := range opts {
opt(&cfg)
}

f := &Fanout{
watchers: make(map[string][]fanoutEntry),
cfg: cfg,
}
if len(eventsCh) != 0 {
f.eventsCh = eventsCh[0]
}
return f
}

Expand Down Expand Up @@ -379,7 +378,7 @@ func (w *fanoutWatcher) emit(event types.Event) error {
if w.fanout.cfg.clock.Now().After(w.backlogSince.Add(w.fanout.cfg.gracePeriod)) {
// backlog has existed for longer than grace period,
// this watcher needs to be removed.
return trace.BadParameter("buffer overflow")
return trace.BadParameter("buffer overflow of %v (backlog=%v).", len(w.eventC), len(w.backlog))
}
// backlog exists, but we are still within grace period.
w.backlog = append(w.backlog, event)
Expand Down Expand Up @@ -489,10 +488,10 @@ type FanoutSet struct {
// NewFanoutSet creates a new FanoutSet instance in an uninitialized
// state. Until initialized, watchers will be queued but no
// events will be sent.
func NewFanoutSet() *FanoutSet {
func NewFanoutSet(opts ...FanoutOption) *FanoutSet {
members := make([]*Fanout, 0, fanoutSetSize)
for i := 0; i < fanoutSetSize; i++ {
members = append(members, NewFanout())
members = append(members, NewFanout(opts...))
}
return &FanoutSet{
counter: atomic.NewUint64(0),
Expand Down
2 changes: 1 addition & 1 deletion lib/services/fanout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
// removes it from the buffer
func TestFanoutWatcherClose(t *testing.T) {
eventsCh := make(chan FanoutEvent, 1)
f := NewFanout(eventsCh)
f := NewFanout(EventsChannel(eventsCh))
w, err := f.NewWatcher(context.TODO(),
types.Watch{Name: "test", Kinds: []types.WatchKind{{Name: "test"}}})
require.NoError(t, err)
Expand Down

0 comments on commit d7a598d

Please sign in to comment.