Skip to content

Commit

Permalink
Merge pull request lavanet#415 from lavanet/CNS-376-fixation-versioni…
Browse files Browse the repository at this point in the history
…ng-and-migration

CNS-376 fixation versioning and migration
  • Loading branch information
Yaroms authored Apr 24, 2023
2 parents b19502b + 4e97092 commit 8a7b423
Show file tree
Hide file tree
Showing 17 changed files with 359 additions and 31 deletions.
2 changes: 1 addition & 1 deletion common/common_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package common_test
package common

import (
"testing"
Expand Down
34 changes: 33 additions & 1 deletion common/fixation_entry.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package common

import (
"fmt"
"math"
"strconv"

Expand Down Expand Up @@ -81,6 +82,18 @@ func (fs *FixationStore) getStore(ctx sdk.Context, index string) *prefix.Store {
return &store
}

// getEntry returns an existing entry in the store
func (fs *FixationStore) getEntry(ctx sdk.Context, safeIndex string, block uint64) (entry types.Entry) {
store := fs.getStore(ctx, safeIndex)
byteKey := types.EncodeKey(block)
b := store.Get(byteKey)
if b == nil {
panic(fmt.Sprintf("getEntry: unknown entry: %s block: %d", types.DesanitizeIndex(safeIndex), block))
}
fs.cdc.MustUnmarshal(b, &entry)
return entry
}

// setEntry modifies an existing entry in the store
func (fs *FixationStore) setEntry(ctx sdk.Context, entry types.Entry) {
store := fs.getStore(ctx, entry.Index)
Expand Down Expand Up @@ -349,18 +362,37 @@ func (fs *FixationStore) removeEntry(ctx sdk.Context, index string, block uint64
}

func (fs *FixationStore) createStoreKey(index string) string {
return types.EntryKey + fs.prefix + index
return types.EntryPrefix + fs.prefix + index
}

func (fs *FixationStore) AdvanceBlock(ctx sdk.Context) {
fs.tstore.Tick(ctx)
}

func (fs *FixationStore) getVersion(ctx sdk.Context) uint64 {
store := prefix.NewStore(ctx.KVStore(fs.storeKey), types.KeyPrefix(fs.prefix))

b := store.Get(types.KeyPrefix(types.FixationVersionKey))
if b == nil {
return 1
}

return types.DecodeKey(b)
}

func (fs *FixationStore) setVersion(ctx sdk.Context, val uint64) {
store := prefix.NewStore(ctx.KVStore(fs.storeKey), types.KeyPrefix(fs.prefix))
b := types.EncodeKey(val)
store.Set(types.KeyPrefix(types.FixationVersionKey), b)
}

// NewFixationStore returns a new FixationStore object
func NewFixationStore(storeKey sdk.StoreKey, cdc codec.BinaryCodec, prefix string) *FixationStore {
fs := FixationStore{storeKey: storeKey, cdc: cdc, prefix: prefix}

callback := func(ctx sdk.Context, data string) { fs.deleteStaleEntries(ctx, data) }
tstore := NewTimerStore(storeKey, cdc, prefix).WithCallbackByBlockHeight(callback)
fs.tstore = *tstore

return &fs
}
6 changes: 3 additions & 3 deletions common/fixation_entry_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (fs FixationStore) setEntryIndex(ctx sdk.Context, safeIndex string) {
types.AssertSanitizedIndex(safeIndex, fs.prefix)
store := fs.getEntryIndexStore(ctx)
appendedValue := []byte(safeIndex) // convert the index value to a byte array
store.Set(types.KeyPrefix(types.EntryIndexKey+fs.prefix+safeIndex), appendedValue)
store.Set(types.KeyPrefix(fs.createEntryIndexKey(safeIndex)), appendedValue)
}

// removeEntryIndex removes an Entry index from the store
Expand Down Expand Up @@ -92,9 +92,9 @@ func (fs *FixationStore) GetAllEntryVersions(ctx sdk.Context, index string, stal
}

func (fs FixationStore) createEntryIndexStoreKey() string {
return types.EntryIndexKey + fs.prefix
return types.EntryIndexPrefix + fs.prefix
}

func (fs FixationStore) createEntryIndexKey(safeIndex string) string {
return types.EntryIndexKey + fs.prefix + safeIndex
return types.EntryIndexPrefix + fs.prefix + safeIndex
}
11 changes: 5 additions & 6 deletions common/fixation_entry_test.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,27 @@
package common_test
package common

import (
"strconv"
"testing"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/lavanet/lava/common"
"github.com/lavanet/lava/common/types"
"github.com/stretchr/testify/require"
)

func initCtxAndFixationStores(t *testing.T, count int) (sdk.Context, []*common.FixationStore) {
func initCtxAndFixationStores(t *testing.T, count int) (sdk.Context, []*FixationStore) {
ctx, cdc := initCtx(t)

fs := make([]*common.FixationStore, count)
fs := make([]*FixationStore, count)
for i := 0; i < count; i++ {
fixationKey := "mock_fix_" + strconv.Itoa(i)
fs[i] = common.NewFixationStore(mockStoreKey, cdc, fixationKey)
fs[i] = NewFixationStore(mockStoreKey, cdc, fixationKey)
}

return ctx, fs
}

func initCtxAndFixationStore(t *testing.T) (sdk.Context, *common.FixationStore) {
func initCtxAndFixationStore(t *testing.T) (sdk.Context, *FixationStore) {
ctx, fs := initCtxAndFixationStores(t, 1)
return ctx, fs[0]
}
Expand Down
79 changes: 79 additions & 0 deletions common/fixation_migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package common

import (
"fmt"
"math"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/lavanet/lava/common/types"
)

func prefixForErrors(from uint64) string {
return fmt.Sprintf("FixationStore: migration from version %d", from)
}

var fixationMigrators = map[int]func(sdk.Context, *FixationStore) error{
1: fixationMigrate1to2,
}

func (fs *FixationStore) MigrateVersion(ctx sdk.Context) (err error) {
from := fs.getVersion(ctx)
to := types.FixationVersion()

for from < to {
function, ok := fixationMigrators[int(from)]
if !ok {
return fmt.Errorf("%s not available", prefixForErrors(from))
}

err = function(ctx, fs)
if err != nil {
return err
}

from += 1
}

fs.setVersion(ctx, to)
return nil
}

// fixationMigrate1to2: fix refcounts
// - correct refcount of head (most recent) by adding one (because new
// entries used to begin with refcount 0 instead of refcount 1)
// - correct negative refcounts if found any (for extra care)
func fixationMigrate1to2(ctx sdk.Context, fs *FixationStore) error {
indices := fs.GetAllEntryIndices(ctx)
for _, index := range indices {
safeIndex, err := types.SanitizeIndex(index)
if err != nil {
return fmt.Errorf("%s: failed to sanitize index: %s", prefixForErrors(1), index)
}
blocks := fs.GetAllEntryVersions(ctx, index, true)
if len(blocks) < 1 {
return fmt.Errorf("%s: no versions for index: %s", prefixForErrors(1), index)
}
recent := blocks[len(blocks)-1]
for _, block := range blocks {
entry := fs.getEntry(ctx, safeIndex, block)
// check for refcount overflow due to excessive putEntry
if entry.Refcount > math.MaxInt64 {
return fmt.Errorf("%s: entry has negative refcount index: %s", prefixForErrors(1), index)
}
// bump refcount of head entries (most recent version of an entry)
if block == recent {
entry.Refcount += 1
}
// if refcount still zero, make sure StaleAt is set
if entry.Refcount == 0 && entry.StaleAt == math.MaxUint64 {
entry.StaleAt = uint64(ctx.BlockHeight()) + uint64(types.STALE_ENTRY_TIME)
}
fs.setEntry(ctx, entry)
// if StaleAt is set, then start corresponding timer
if entry.StaleAt != math.MaxUint {
fs.tstore.AddTimerByBlockHeight(ctx, entry.StaleAt, entry.Index)
}
}
}
return nil
}
90 changes: 90 additions & 0 deletions common/fixation_migrate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package common

import (
"fmt"
"math"
"testing"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/lavanet/lava/common/types"
"github.com/stretchr/testify/require"
)

const (
mockPrefix = "mock-fs"
)

type mockEntry1to2 struct {
index string
block uint64
head bool
count int
before uint64
after uint64
}

func TestMigrate1to2(t *testing.T) {
var err error

ctx, cdc := initCtx(t)
fs := NewFixationStore(mockStoreKey, cdc, mockPrefix)
coin := sdk.Coin{Denom: "utest", Amount: sdk.NewInt(1)}

templates := []mockEntry1to2{
// entry_1 has 3 valid versions
{"entry_1", 100, false, 1, 1, 1},
{"entry_1", 200, false, 2, 2, 2},
{"entry_1", 300, true, 0, 0, 1},
// entry_2 has 2 valid versions, one stale-at
{"entry_2", 100, false, 1, 1, 1},
{"entry_2", 200, false, 0, 0, 0},
{"entry_2", 300, true, 0, 0, 1},
// entry_3 has 2 valid versions, head with extra refcount
{"entry_3", 100, false, 0, 0, 0},
{"entry_3", 200, false, 1, 1, 1},
{"entry_3", 300, true, 1, 1, 2},
}

// create entries
for _, tt := range templates {
err = fs.AppendEntry(ctx, tt.index, tt.block, &coin)
require.Nil(t, err)
for i := 0; i < tt.count; i++ {
ctx = ctx.WithBlockHeight(int64(tt.block))
found := fs.GetEntry(ctx, tt.index, &coin)
require.True(t, found)
}
}

// verify entries before migration
for _, tt := range templates {
what := fmt.Sprintf("before: index: %s, block: %d, count: %d", tt.index, tt.block, tt.count)
safeIndex, err := types.SanitizeIndex(tt.index)
require.Nil(t, err, what)
entry := fs.getEntry(ctx, safeIndex, tt.block)
if tt.head {
// adjust head entry's refcount to version 1 style
entry.Refcount -= 1
fs.setEntry(ctx, entry)
}
require.Equal(t, tt.before, entry.Refcount, what)
}

// perform migration version 1 to version 2
err = fs.MigrateVersion(ctx)
require.Nil(t, err, "migration version 1 to version 2")

// verify entries after migration
for _, tt := range templates {
what := fmt.Sprintf("after: index: %s, block: %d, count: %d", tt.index, tt.block, tt.count)
safeIndex, err := types.SanitizeIndex(tt.index)
require.Nil(t, err, what)
entry := fs.getEntry(ctx, safeIndex, tt.block)
require.Equal(t, tt.after, entry.Refcount, what)
if entry.Refcount == 0 {
require.NotEqual(t, uint64(math.MaxUint64), entry.StaleAt, what)
} else {
require.Equal(t, uint64(math.MaxUint64), entry.StaleAt, what)
}
}
}
21 changes: 21 additions & 0 deletions common/timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,33 @@ type TimerStore struct {
callbacks [2]TimerCallback // as per TimerType
}

func TimerVersion() uint64 {
return 1
}

// NewTimerStore returns a new TimerStore object
func NewTimerStore(storeKey sdk.StoreKey, cdc codec.BinaryCodec, prefix string) *TimerStore {
tstore := TimerStore{storeKey: storeKey, cdc: cdc, prefix: prefix}
return &tstore
}

func (tstore *TimerStore) getVersion(ctx sdk.Context) uint64 {
store := prefix.NewStore(ctx.KVStore(tstore.storeKey), types.KeyPrefix(tstore.prefix))

b := store.Get(types.KeyPrefix(types.TimerVersionKey))
if b == nil {
return 1
}

return types.DecodeKey(b)
}

func (tstore *TimerStore) setVersion(ctx sdk.Context, val uint64) {
store := prefix.NewStore(ctx.KVStore(tstore.storeKey), types.KeyPrefix(tstore.prefix))
b := types.EncodeKey(val)
store.Set(types.KeyPrefix(types.TimerVersionKey), b)
}

func (tstore *TimerStore) WithCallbackByBlockHeight(callback func(ctx sdk.Context, data string)) *TimerStore {
tstoreNew := tstore
tstoreNew.callbacks[types.BlockHeight] = callback
Expand Down
37 changes: 37 additions & 0 deletions common/timer_migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package common

import (
"fmt"

sdk "github.com/cosmos/cosmos-sdk/types"
)

func (tstore *TimerStore) prefixForErrors(from uint64) string {
return fmt.Sprintf("TimerStore: migration from version %d", from)
}

var timerMigrators = map[int]func(sdk.Context, *TimerStore) error{
// fill with map entrys like "1: timerMigrate1to2"
}

func (tstore *TimerStore) MigrateVersion(ctx sdk.Context) (err error) {
from := tstore.getVersion(ctx)
to := TimerVersion()

for from < to {
function, ok := timerMigrators[int(from)]
if !ok {
return fmt.Errorf("%s not available", prefixForErrors(from))
}

err = function(ctx, tstore)
if err != nil {
return err
}

from += 1
}

tstore.setVersion(ctx, to)
return nil
}
Loading

0 comments on commit 8a7b423

Please sign in to comment.