Skip to content

Commit

Permalink
shards: wait for watcher to stop when closing
Browse files Browse the repository at this point in the history
Previously we never shutdown the shardWatcher. This meant that when
closing shardedSearcher, the shardWatcher would still interact with it
if it noticed shards changing. This can happen in practice but is
rare. It can more easily be triggered by running e2e tests which cleanup
the index files.

Change-Id: I372e9f1723485ae092d82f0502deb64b4b3c976f
  • Loading branch information
keegancsmith committed Feb 23, 2021
1 parent 217568b commit 031d445
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 17 deletions.
20 changes: 18 additions & 2 deletions shards/shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,28 @@ func NewDirectorySearcher(dir string) (zoekt.Searcher, error) {
tl := &loader{
ss: ss,
}
_, err := NewDirectoryWatcher(dir, tl)
dw, err := NewDirectoryWatcher(dir, tl)
if err != nil {
return nil, err
}

return ss, nil
return &directorySearcher{
Searcher: ss,
directoryWatcher: dw,
}, nil
}

type directorySearcher struct {
zoekt.Searcher

directoryWatcher Stopper
}

func (s *directorySearcher) Close() {
// We need to Stop directoryWatcher first since it calls load/unload on
// Searcher.
s.directoryWatcher.Stop()
s.Searcher.Close()
}

type loader struct {
Expand Down
35 changes: 22 additions & 13 deletions shards/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ package shards

import (
"fmt"
"io"
"log"
"os"
"path/filepath"
"runtime"
"sync"
"time"

"github.com/fsnotify/fsnotify"
Expand All @@ -36,30 +36,38 @@ type shardWatcher struct {
dir string
timestamps map[string]time.Time
loader shardLoader
quit chan<- struct{}

closeOnce sync.Once
// quit is closed by Close to signal the directory watcher to stop.
quit chan struct{}
// stopped is closed once the directory watcher has stopped.
stopped chan struct{}
}

func (sw *shardWatcher) Close() error {
if sw.quit != nil {
func (sw *shardWatcher) Stop() {
sw.closeOnce.Do(func() {
close(sw.quit)
sw.quit = nil
}
return nil
<-sw.stopped
})
}

type Stopper interface {
Stop()
}

func NewDirectoryWatcher(dir string, loader shardLoader) (io.Closer, error) {
quitter := make(chan struct{}, 1)
func NewDirectoryWatcher(dir string, loader shardLoader) (Stopper, error) {
sw := &shardWatcher{
dir: dir,
timestamps: map[string]time.Time{},
loader: loader,
quit: quitter,
quit: make(chan struct{}),
stopped: make(chan struct{}),
}
if err := sw.scan(); err != nil {
return nil, err
}

if err := sw.watch(quitter); err != nil {
if err := sw.watch(); err != nil {
return nil, err
}

Expand Down Expand Up @@ -144,7 +152,7 @@ func (s *shardWatcher) scan() error {
return nil
}

func (s *shardWatcher) watch(quitter <-chan struct{}) error {
func (s *shardWatcher) watch() error {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return err
Expand All @@ -171,7 +179,7 @@ func (s *shardWatcher) watch(quitter <-chan struct{}) error {
if err != nil && err != fsnotify.ErrEventOverflow {
log.Println("watcher error:", err)
}
case <-quitter:
case <-s.quit:
watcher.Close()
close(signal)
return
Expand All @@ -180,6 +188,7 @@ func (s *shardWatcher) watch(quitter <-chan struct{}) error {
}()

go func() {
defer close(s.stopped)
for range signal {
s.scan()
}
Expand Down
4 changes: 2 additions & 2 deletions shards/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestDirWatcherUnloadOnce(t *testing.T) {
if err != nil {
t.Fatalf("NewDirectoryWatcher: %v", err)
}
defer dw.Close()
defer dw.Stop()

if got := <-logger.loads; got != shard {
t.Fatalf("got load event %v, want %v", got, shard)
Expand Down Expand Up @@ -95,7 +95,7 @@ func TestDirWatcherUnloadOnce(t *testing.T) {
t.Fatalf("WriteFile: %v", err)
}

dw.Close()
dw.Stop()

select {
case k := <-logger.loads:
Expand Down

0 comments on commit 031d445

Please sign in to comment.