Skip to content

Commit

Permalink
add repair logic for broken indexes
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Apr 10, 2017
1 parent e872174 commit 521115e
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ kapacitor define-handler system aggregate_by_1m.yaml
Also empty string on a tag value is now a sufficient condition for the default conditions to be applied.
See [#1233](https://github.com/influxdata/kapacitor/pull/1233) for more information.
- [#1068](https://github.com/influxdata/kapacitor/issues/1068): Fix dot view syntax to use xlabels and not create invalid quotes.
- [#1295](https://github.com/influxdata/kapacitor/issues/1295): Fix curruption of recordings list after deleting all recordings.

## v1.2.0 [2017-01-23]

Expand Down
7 changes: 7 additions & 0 deletions services/replay/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type RecordingDAO interface {
// Offset and limit are pagination bounds. Offset is inclusive starting at index 0.
// More results may exist while the number of returned items is equal to limit.
List(pattern string, offset, limit int) ([]Recording, error)

// Repair fixes all indexes of the data.
Repair() error
}

//--------------------------------------------------------------------
Expand Down Expand Up @@ -124,6 +127,10 @@ func newRecordingKV(store storage.Interface) (*recordingKV, error) {
}, nil
}

func (kv *recordingKV) Repair() error {
return kv.store.Repair()
}

func (kv *recordingKV) error(err error) error {
if err == storage.ErrNoObjectExists {
return ErrNoRecordingExists
Expand Down
19 changes: 18 additions & 1 deletion services/replay/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ func (s *Service) Open() error {
return err
}

//TODO: Should we expose this via the API instead of doing it during every startup?
// This could be expensive and should not need to be run frequently.
if err := s.repairRecordingIndex(); err != nil {
return err
}

// Mark all running replays or recordings as failed since
// we are just starting and they cannot possibly be still running
s.markFailedRecordings()
Expand Down Expand Up @@ -273,6 +279,12 @@ func (s *Service) syncRecordingMetadata() error {
return nil
}

func (s *Service) repairRecordingIndex() error {
// Based on various upgrade paths it is possible for the recording date index to have bad values.
// This process removes dead indexes and adds missing indexes
return s.recordings.Repair()
}

func (s *Service) markFailedRecordings() {
limit := 100
offset := 0
Expand Down Expand Up @@ -1625,7 +1637,12 @@ func (s fileSource) Size() (int64, error) {
}

func (s fileSource) Remove() error {
return os.Remove(string(s))
err := os.Remove(string(s))
if err == os.ErrNotExist {
// Ignore file not exists errors as we are trying to remove the file.
return nil
}
return err
}

func (s fileSource) StreamWriter() (io.WriteCloser, error) {
Expand Down
56 changes: 55 additions & 1 deletion services/storage/indexed.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package storage

import (
"encoding"
"errors"
"fmt"
"path"
"strings"

"github.com/pkg/errors"
)

const (
Expand Down Expand Up @@ -371,6 +372,59 @@ func (s *IndexedStore) list(tx ReadOnlyTx, index, pattern string, offset, limit
return objects, nil
}

func (s *IndexedStore) Repair() error {
return s.store.Update(func(tx Tx) error {
return s.RepairTx(tx)
})
}

func (s *IndexedStore) RepairTx(tx Tx) error {
// Clean all indexes
for _, idx := range s.indexes {
if err := s.cleanIndex(tx, idx.Name); err != nil {
return errors.Wrapf(err, "failed to clean index %s", idx.Name)
}
}
// Walk all data and update existing index entries
data, err := tx.List(s.dataPrefix)
if err != nil {
return err
}
for _, kv := range data {
o := s.newObject()
err = o.UnmarshalBinary(kv.Value)
if err != nil {
return errors.Wrapf(err, "failed to unmarshal object with key: %q", kv.Key)
}
for _, idx := range s.indexes {
v, err := idx.ValueOf(o)
if err != nil {
return errors.Wrapf(err, "failed to get index value for object with key: %q", kv.Key)
}
key := s.indexKey(idx.Name, v)
err = tx.Put(key, []byte(o.ObjectID()))
if err != nil {
return errors.Wrapf(err, "failed to update index for object with key: %q", kv.Key)
}
}
}
return nil
}

// Clean index deletes all indexes entries.
func (s *IndexedStore) cleanIndex(tx Tx, index string) error {
entries, err := tx.List(s.indexKey(index, "") + "/")
if err != nil {
return err
}
for _, entry := range entries {
if err := tx.Delete(entry.Key); err != nil {
return err
}
}
return nil
}

func ImpossibleTypeErr(exp interface{}, got interface{}) error {
return fmt.Errorf("impossible error, object not of type %T, got %T", exp, got)
}

0 comments on commit 521115e

Please sign in to comment.