Skip to content

Commit

Permalink
Be resilient against crashes of individual search shards.
Browse files Browse the repository at this point in the history
* Separate sharding support in a watcher and the parallel search.
* Recover exceptions in the search goroutines.
* Report crashed shards in the web interface.

Change-Id: Ia4d76ba8b2d4b8a130d78d0107880187f034d54b
  • Loading branch information
hanwen committed Sep 14, 2016
1 parent 2775b48 commit a2efd83
Show file tree
Hide file tree
Showing 10 changed files with 258 additions and 42 deletions.
44 changes: 26 additions & 18 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,17 @@ type MatchFragment struct {

// Stats contains interesting numbers on the search
type Stats struct {
// Number of candidate matches as a result of searching ngrams.
NgramMatches int
// Total length of files loaded.
BytesLoaded int64

// Number of search shards that had a crash.
Crashes int

// Wall clock time for this search
Duration time.Duration

// Number of files containing a match.
FileCount int

// Files that we evaluated. Equivalent to files for which all
// atom matches (including negations) evaluated to true.
Expand All @@ -77,34 +86,29 @@ type Stats struct {
// Files for which we loaded file content to verify substring matches
FilesLoaded int

// Total length of files thus loaded.
BytesLoaded int64

// Number of files containing a match.
FileCount int
// Candidate files whose contents weren't examined because we
// gathered enough matches.
FilesSkipped int

// Number of non-overlapping matches
MatchCount int

// Wall clock time for this search
Duration time.Duration
// Number of candidate matches as a result of searching ngrams.
NgramMatches int

// Wall clock time for queued search.
Wait time.Duration

// Candidate files whose contents weren't examined because we
// gathered enough matches.
FilesSkipped int
}

func (s *Stats) Add(o Stats) {
s.NgramMatches += o.NgramMatches
s.FilesLoaded += o.FilesLoaded
s.MatchCount += o.MatchCount
s.BytesLoaded += o.BytesLoaded
s.Crashes += o.Crashes
s.FileCount += o.FileCount
s.FilesConsidered += o.FilesConsidered
s.BytesLoaded += o.BytesLoaded
s.FilesLoaded += o.FilesLoaded
s.FilesSkipped += o.FilesSkipped
s.MatchCount += o.MatchCount
s.NgramMatches += o.NgramMatches
}

// SearchResult contains search matches and extra data
Expand Down Expand Up @@ -144,7 +148,8 @@ type Repository struct {

// RepoList holds a set of Repository metadata.
type RepoList struct {
Repos []*Repository
Repos []*Repository
Crashes int
}

type Searcher interface {
Expand All @@ -155,6 +160,9 @@ type Searcher interface {
List(ctx context.Context, q query.Q) (*RepoList, error)
Stats() (*RepoStats, error)
Close()

// Describe the searcher for debug messages.
String() string
}

type RepoStats struct {
Expand Down
4 changes: 4 additions & 0 deletions index.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ type indexUnaryData struct {
BranchVersions []string
}

func (d *indexData) String() string {
return fmt.Sprintf("shard(%s)", d.file.Name())
}

func (d *indexData) memoryUse() int {
sz := 0
for _, a := range [][]uint32{
Expand Down
4 changes: 4 additions & 0 deletions index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ type memSeeker struct {
data []byte
}

func (s *memSeeker) Name() string {
return "memseeker"
}

func (s *memSeeker) Close() {}
func (s *memSeeker) Read(off, sz uint32) ([]byte, error) {
return s.data[off : off+sz], nil
Expand Down
4 changes: 4 additions & 0 deletions indexfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,7 @@ func (f indexFileFromOS) Size() (uint32, error) {
func (f indexFileFromOS) Close() {
f.f.Close()
}

func (f indexFileFromOS) Name() string {
return f.f.Name()
}
4 changes: 4 additions & 0 deletions indexfile_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func (f *mmapedIndexFile) Read(off, sz uint32) ([]byte, error) {
return f.data[off : off+sz], nil
}

func (f *mmapedIndexFile) Name() string {
return f.f.Name()
}

func (f *mmapedIndexFile) Size() (uint32, error) {
fi, err := f.f.Stat()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions read.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ type IndexFile interface {
Read(off uint32, sz uint32) ([]byte, error)
Size() (uint32, error)
Close()
Name() string
}

// NewSearcher creates a Searcher for a single index file.
Expand Down
107 changes: 83 additions & 24 deletions shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type searchShard struct {
mtime time.Time
}

type shardedSearcher struct {
type shardWatcher struct {
dir string

// Limit the number of parallel queries. Since searching is
Expand Down Expand Up @@ -72,7 +72,11 @@ func loadShard(fn string) (*searchShard, error) {
}, nil
}

func (s *shardedSearcher) scan() error {
func (s *shardWatcher) String() string {
return fmt.Sprintf("shardWatcher(%s)", s.dir)
}

func (s *shardWatcher) scan() error {
fs, err := filepath.Glob(filepath.Join(s.dir, "*.zoekt"))
if err != nil {
return err
Expand Down Expand Up @@ -127,23 +131,41 @@ func (s *shardedSearcher) scan() error {
return nil
}

func (s *shardedSearcher) lock() {
func (s *shardWatcher) rlock() {
s.throttle <- struct{}{}
}

// getShards returns the currently loaded shards. The shards must be
// accessed under a rlock call.
func (s *shardWatcher) getShards() []Searcher {
var res []Searcher
for _, sh := range s.shards {
res = append(res, sh)
}
return res
}

func (s *shardWatcher) runlock() {
<-s.throttle
}

func (s *shardWatcher) lock() {
n := cap(s.throttle)
for n > 0 {
s.throttle <- struct{}{}
n--
}
}

func (s *shardedSearcher) unlock() {
func (s *shardWatcher) unlock() {
n := cap(s.throttle)
for n > 0 {
<-s.throttle
n--
}
}

func (s *shardedSearcher) replace(key string, shard *searchShard) {
func (s *shardWatcher) replace(key string, shard *searchShard) {
s.lock()
defer s.unlock()
old := s.shards[key]
Expand All @@ -157,7 +179,7 @@ func (s *shardedSearcher) replace(key string, shard *searchShard) {
}
}

func (s *shardedSearcher) watch() error {
func (s *shardWatcher) watch() error {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return err
Expand Down Expand Up @@ -187,7 +209,7 @@ func (s *shardedSearcher) watch() error {
// NewShardedSearcher returns a searcher instance that loads all
// shards corresponding to a glob into memory.
func NewShardedSearcher(dir string) (Searcher, error) {
ss := shardedSearcher{
ss := shardWatcher{
dir: dir,
shards: make(map[string]*searchShard),
quit: make(chan struct{}, 1),
Expand All @@ -202,11 +224,11 @@ func NewShardedSearcher(dir string) (Searcher, error) {
return nil, err
}

return &ss, nil
return &shardedSearcher{&ss}, nil
}

// Close closes references to open files. It may be called only once.
func (ss *shardedSearcher) Close() {
func (ss *shardWatcher) Close() {
close(ss.quit)
ss.lock()
defer ss.unlock()
Expand All @@ -215,9 +237,23 @@ func (ss *shardedSearcher) Close() {
}
}

type shardLoader interface {
Close()
getShards() []Searcher
rlock()
runlock()
String() string
}

type shardedSearcher struct {
shardLoader
}

func (ss *shardedSearcher) Stats() (*RepoStats, error) {
var r RepoStats
for _, s := range ss.shards {
ss.rlock()
defer ss.runlock()
for _, s := range ss.shardLoader.getShards() {
s, err := s.Stats()
if err != nil {
return nil, err
Expand All @@ -240,14 +276,14 @@ func (ss *shardedSearcher) Search(ctx context.Context, pat query.Q, opts *Search
}

// This critical section is large, but we don't want to deal with
// searches on a shards that have just been closed.
ss.throttle <- struct{}{}
// searches on shards that have just been closed.
ss.shardLoader.rlock()
aggregate.Wait = time.Now().Sub(start)
start = time.Now()

// TODO - allow for canceling the query.
shardCount := len(ss.shards)
all := make(chan res, shardCount)
shards := ss.getShards()
all := make(chan res, len(shards))

var childCtx context.Context
var cancel context.CancelFunc
Expand All @@ -259,15 +295,23 @@ func (ss *shardedSearcher) Search(ctx context.Context, pat query.Q, opts *Search

defer cancel()

for _, s := range ss.shards {
for _, s := range shards {
go func(s Searcher) {
defer func() {
if r := recover(); r != nil {
log.Printf("crashed shard: %s: %s", s.String(), r)

var r SearchResult
r.Stats.Crashes = 1
all <- res{&r, nil}
}
}()
ms, err := s.Search(childCtx, pat, opts)
all <- res{ms, err}
}(s)
}
<-ss.throttle

for i := 0; i < shardCount; i++ {
for range shards {
r := <-all
if r.err != nil {
return nil, r.err
Expand All @@ -289,6 +333,7 @@ func (ss *shardedSearcher) Search(ctx context.Context, pat query.Q, opts *Search
cancel = nil
}
}
ss.runlock()

sortFilesByScore(aggregate.Files)
aggregate.Duration = time.Now().Sub(start)
Expand All @@ -301,36 +346,50 @@ func (ss *shardedSearcher) List(ctx context.Context, r query.Q) (*RepoList, erro
err error
}

ss.throttle <- struct{}{}
shardCount := len(ss.shards)
ss.rlock()
shards := ss.getShards()
shardCount := len(shards)
all := make(chan res, shardCount)
for _, s := range ss.shards {
for _, s := range shards {
go func(s Searcher) {
defer func() {
if r := recover(); r != nil {
all <- res{
&RepoList{Crashes: 1}, nil,
}
}
}()
ms, err := s.List(ctx, r)
all <- res{ms, err}
}(s)
}
<-ss.throttle

crashes := 0
uniq := map[string]*Repository{}
for i := 0; i < shardCount; i++ {
r := <-all
if r.err != nil {
return nil, r.err
}
crashes += r.rl.Crashes
for _, r := range r.rl.Repos {
uniq[r.Name] = r
}
}
ss.runlock()

var names []string
for k := range uniq {
names = append(names, k)
for n := range uniq {
names = append(names, n)
}
sort.Strings(names)

var aggregate []*Repository
for _, k := range names {
aggregate = append(aggregate, uniq[k])
}
return &RepoList{aggregate}, nil
return &RepoList{
Repos: aggregate,
Crashes: crashes,
}, nil
}
Loading

0 comments on commit a2efd83

Please sign in to comment.