Skip to content

Commit

Permalink
search: add method for resetting the index
Browse files Browse the repository at this point in the history
Also, check contexts and shutdown channel often, to make resetting
more responsive.

Issue: HOTPOT-1501
  • Loading branch information
strib committed Mar 5, 2020
1 parent d3df096 commit 59db8c5
Showing 1 changed file with 63 additions and 3 deletions.
66 changes: 63 additions & 3 deletions go/kbfs/search/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/blevesearch/bleve/registry"
"github.com/keybase/client/go/kbfs/data"
"github.com/keybase/client/go/kbfs/idutil"
"github.com/keybase/client/go/kbfs/ioutil"
"github.com/keybase/client/go/kbfs/kbfsmd"
"github.com/keybase/client/go/kbfs/kbfssync"
"github.com/keybase/client/go/kbfs/libcontext"
Expand Down Expand Up @@ -76,6 +77,7 @@ type Indexer struct {
configInitFn initFn
once sync.Once
indexWG kbfssync.RepeatedWaitGroup
loopWG kbfssync.RepeatedWaitGroup
kvstoreName string
fullIndexCB func() error // helpful for testing

Expand Down Expand Up @@ -112,9 +114,7 @@ func newIndexerWithConfigInit(config libkbfs.Config, configInitFn initFn,
indexReadyCh: make(chan struct{}),
}

ctx, cancel := context.WithCancel(i.makeContext(context.Background()))
i.cancelLoop = cancel
go i.loop(ctx)
i.startLoop()
return i, nil
}

Expand All @@ -124,6 +124,13 @@ func NewIndexer(config libkbfs.Config) (*Indexer, error) {
config, defaultInitConfig, kvstoreNamePrefix)
}

func (i *Indexer) startLoop() {
ctx, cancel := context.WithCancel(i.makeContext(context.Background()))
i.cancelLoop = cancel
i.loopWG.Add(1)
go i.loop(ctx)
}

func (i *Indexer) makeContext(ctx context.Context) context.Context {
return libkbfs.CtxWithRandomIDReplayable(ctx, ctxIDKey, ctxOpID, i.log)
}
Expand Down Expand Up @@ -479,6 +486,17 @@ func (i *Indexer) currBatchLocked(ctx context.Context) (*bleve.Batch, error) {
return i.currBatch, nil
}

func (i *Indexer) checkDone(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-i.shutdownCh:
return errors.New("Shutdown")
default:
return nil
}
}

func (i *Indexer) indexChildWithPtrAndNode(
ctx context.Context, parentNode libkbfs.Node, parentDocID string,
childName data.PathPartString, oldPtr, newPtr data.BlockPointer,
Expand All @@ -498,6 +516,11 @@ func (i *Indexer) indexChildWithPtrAndNode(
}
}

err = i.checkDone(ctx)
if err != nil {
return nil, err
}

tlfID := n.GetFolderBranch().Tlf

// If the new pointer has already been indexed, skip indexing it again.
Expand Down Expand Up @@ -776,6 +799,11 @@ func (i *Indexer) fsForRev(
func (i *Indexer) indexNewlySyncedTlfDir(
ctx context.Context, dir libkbfs.Node,
dirDocID string, rev kbfsmd.Revision) error {
err := i.checkDone(ctx)
if err != nil {
return err
}

children, err := i.config.KBFSOps().GetDirChildren(ctx, dir)
if err != nil {
return err
Expand Down Expand Up @@ -1061,6 +1089,11 @@ func (i *Indexer) doIncrementalIndex(
// Iterate through each change and call the appropriate index
// function for it.
for _, change := range changes {
err := i.checkDone(ctx)
if err != nil {
return err
}

plainPath, _ := change.CurrPath.PlaintextSansTlf()
dir, _ := path.Split(plainPath)
dirFS, err := fs.ChrootAsLibFS(path.Clean(dir))
Expand Down Expand Up @@ -1200,6 +1233,8 @@ func (i *Indexer) handleTlfMessage(ctx context.Context, m tlfMessage) error {
}

func (i *Indexer) loop(ctx context.Context) {
defer i.loopWG.Done()

i.log.CDebugf(ctx, "Starting indexing loop")
defer i.log.CDebugf(ctx, "Ending index loop")

Expand Down Expand Up @@ -1267,6 +1302,10 @@ outerLoop:
// Shutdown shuts down this indexer.
func (i *Indexer) Shutdown(ctx context.Context) error {
close(i.shutdownCh)
err := i.loopWG.Wait(ctx)
if err != nil {
return err
}

i.lock.Lock()
defer i.lock.Unlock()
Expand Down Expand Up @@ -1369,3 +1408,24 @@ resultLoop:

return results, nextResult, nil
}

// ResetIndex shuts down the current indexer, completely removes its
// on-disk presence, and then restarts it as a blank index.
func (i *Indexer) ResetIndex(ctx context.Context) (err error) {
i.log.CDebugf(ctx, "Resetting the index")
defer func() { i.log.CDebugf(ctx, "Done resetting the index: %+v", err) }()

err = i.Shutdown(ctx)
if err != nil {
return err
}

dir := filepath.Join(i.config.StorageRoot(), indexStorageDir)
err = ioutil.RemoveAll(dir)
if err != nil {
return err
}

i.startLoop()
return nil
}

0 comments on commit 59db8c5

Please sign in to comment.