Skip to content

Commit

Permalink
fix(pruning): memory leak pruning tx indexer iterator (dymensionxyz#1206
Browse files Browse the repository at this point in the history
)
  • Loading branch information
srene authored Nov 7, 2024
1 parent d2a9bc1 commit 2299320
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 39 deletions.
31 changes: 16 additions & 15 deletions indexers/blockindexer/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,19 @@ func (idx *BlockerIndexer) pruneBlocks(from, to uint64, logger log.Logger) (uint
}

for h := int64(from); h < int64(to); h++ {

// flush every 1000 blocks to avoid batches becoming too large
if toFlush > 1000 {
err := flush(batch, h)
if err != nil {
return 0, err
}
batch.Discard()
batch = idx.store.NewBatch()

toFlush = 0
}

ok, err := idx.Has(h)
if err != nil {
logger.Debug("pruning block indexer checking height", "height", h, "err", err)
Expand All @@ -565,27 +578,16 @@ func (idx *BlockerIndexer) pruneBlocks(from, to uint64, logger log.Logger) (uint
}

pruned++
toFlush++

prunedEvents, err := idx.pruneEvents(h, logger, batch)
if err != nil {
logger.Debug("pruning block indexer events", "height", h, "err", err)
continue
}
pruned += prunedEvents
toFlush += prunedEvents

toFlush += pruned

// flush every 1000 blocks to avoid batches becoming too large
if toFlush > 1000 {
err := flush(batch, h)
if err != nil {
return 0, err
}
batch.Discard()
batch = idx.store.NewBatch()

toFlush = 0
}
}

err := flush(batch, int64(to))
Expand Down Expand Up @@ -613,13 +615,12 @@ func (idx *BlockerIndexer) pruneEvents(height int64, logger log.Logger, batch st
return pruned, err
}
for _, key := range eventKeys.Keys {
pruned++
err := batch.Delete(key)
if err != nil {
logger.Error("pruning block indexer iterate events", "height", height, "err", err)
continue
}
pruned++

}
return pruned, nil
}
Expand Down
28 changes: 15 additions & 13 deletions indexers/txindex/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,20 +594,32 @@ func (txi *TxIndex) pruneTxsAndEvents(from, to uint64, logger log.Logger) (uint6

for h := from; h < to; h++ {

// flush every 1000 txs to avoid batches becoming too large
if toFlush > 1000 {
err := flush(batch, int64(h))
if err != nil {
return 0, err
}
batch.Discard()
batch = txi.store.NewBatch()
toFlush = 0
}

// first all events are pruned associated to the same height
prunedEvents, err := txi.pruneEvents(h, batch)
if err != nil {
logger.Error("pruning txs indexer events by height", "height", h, "error", err)
continue
}
pruned += prunedEvents
toFlush += prunedEvents

// then all txs indexed are iterated by height
it := txi.store.PrefixIterator(prefixForHeight(int64(h)))
defer it.Discard()

// and deleted all indexed (by hash and by keyheight)
for ; it.Valid(); it.Next() {
toFlush++
if err := batch.Delete(it.Key()); err != nil {
logger.Error("pruning txs indexer event key", "height", h, "error", err)
continue
Expand All @@ -620,17 +632,7 @@ func (txi *TxIndex) pruneTxsAndEvents(from, to uint64, logger log.Logger) (uint6
pruned++
}

toFlush += pruned
// flush every 1000 txs to avoid batches becoming too large
if toFlush > 1000 {
err := flush(batch, int64(h))
if err != nil {
return 0, err
}
batch.Discard()
batch = txi.store.NewBatch()
toFlush = 0
}
it.Discard()

}

Expand Down Expand Up @@ -658,11 +660,11 @@ func (txi *TxIndex) pruneEvents(height uint64, batch store.KVBatch) (uint64, err
return pruned, err
}
for _, key := range eventKeys.Keys {
pruned++
err := batch.Delete(key)
if err != nil {
return pruned, err
}
pruned++
}
return pruned, nil
}
Expand Down
21 changes: 10 additions & 11 deletions store/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,6 @@ func (s *DefaultStore) PruneStore(to uint64, logger types.Logger) (uint64, error
// pruneHeights prunes all store entries that are stored along blocks (blocks,commit,proposer, etc)
func (s *DefaultStore) pruneHeights(from, to uint64, logger types.Logger) (uint64, error) {
pruneBlocks := func(batch KVBatch, height uint64) error {

if err := batch.Delete(getResponsesKey(height)); err != nil {
logger.Error("delete responses", "error", err)
}
if err := batch.Delete(getDRSVersionKey(height)); err != nil {
logger.Error("delete drs", "error", err)
}
if err := batch.Delete(getProposerKey(height)); err != nil {
logger.Error("delete proposer", "error", err)
}

hash, err := s.loadHashFromIndex(height)
if err != nil {
return err
Expand All @@ -58,6 +47,16 @@ func (s *DefaultStore) pruneHeights(from, to uint64, logger types.Logger) (uint6
logger.Error("delete hash index", "error", err)
}

if err := batch.Delete(getResponsesKey(height)); err != nil {
logger.Error("delete responses", "error", err)
}
if err := batch.Delete(getDRSVersionKey(height)); err != nil {
logger.Error("delete drs", "error", err)
}
if err := batch.Delete(getProposerKey(height)); err != nil {
logger.Error("delete proposer", "error", err)
}

return nil
}

Expand Down

0 comments on commit 2299320

Please sign in to comment.