Skip to content

Commit

Permalink
feat: support skipping empty blocks production (dymensionxyz#342)
Browse files Browse the repository at this point in the history
  • Loading branch information
mtsitrin authored Jun 14, 2023
1 parent 72e5acf commit 09cab6a
Show file tree
Hide file tree
Showing 11 changed files with 318 additions and 86 deletions.
81 changes: 61 additions & 20 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/avast/retry-go"
cryptocodec "github.com/cosmos/cosmos-sdk/crypto/codec"
"github.com/cosmos/cosmos-sdk/types/errors"
abciconv "github.com/dymensionxyz/dymint/conv/abci"
"github.com/dymensionxyz/dymint/node/events"
"github.com/dymensionxyz/dymint/p2p"
Expand Down Expand Up @@ -69,12 +70,12 @@ type Manager struct {

syncTargetDiode diodes.Diode

batchInProcess atomic.Value

shouldProduceBlocksCh chan bool

syncTarget uint64
isSyncedCond sync.Cond
syncTarget uint64
lastSubmissionTime int64
batchInProcess atomic.Value
isSyncedCond sync.Cond

syncCache map[uint64]*types.Block

Expand Down Expand Up @@ -114,16 +115,16 @@ func NewManager(
}

// TODO((#119): Probably should be validated and manage default on config init.
// TODO(omritoptix): Think about the default batchSize and default DABlockTime proper location.
if conf.DABlockTime == 0 {
logger.Info("WARNING: using default DA block time", "DABlockTime", config.DefaultNodeConfig.DABlockTime)
conf.DABlockTime = config.DefaultNodeConfig.DABlockTime
}

if conf.BlockBatchSizeBytes == 0 {
logger.Info("WARNING: using default DA batch size bytes limit", "BlockBatchSizeBytes", config.DefaultNodeConfig.BlockBatchSizeBytes)
conf.BlockBatchSizeBytes = config.DefaultNodeConfig.BlockBatchSizeBytes
}
if conf.BatchSubmitMaxTime == 0 {
logger.Info("WARNING: using default DA batch submit max time", "BatchSubmitMaxTime", config.DefaultNodeConfig.BatchSubmitMaxTime)
conf.BatchSubmitMaxTime = config.DefaultNodeConfig.BatchSubmitMaxTime

//TODO: validate it's larger than empty blocks time
}
if conf.BlockTime == 0 {
panic("Block production time must be a positive number")
}
Expand Down Expand Up @@ -299,33 +300,63 @@ func (m *Manager) waitForSync(ctx context.Context) error {

// ProduceBlockLoop is calling publishBlock in a loop as long as wer'e synced.
func (m *Manager) ProduceBlockLoop(ctx context.Context) {
atomic.StoreInt64(&m.lastSubmissionTime, time.Now().Unix())

// We want to wait until we are synced. After that, since there is no leader
// election yet, and leader are elected manually, we will not be out of sync until
// we are manually being replaced.
err := m.waitForSync(ctx)
if err != nil {
m.logger.Error("failed to wait for sync", "err", err)
panic(errors.Wrap(err, "failed to wait for sync"))
}

ticker := time.NewTicker(m.conf.BlockTime)
defer ticker.Stop()

var tickerEmptyBlocksMaxTime *time.Ticker
var tickerEmptyBlocksMaxTimeCh <-chan time.Time
if m.conf.EmptyBlocksMaxTime > 0 {
tickerEmptyBlocksMaxTime = time.NewTicker(m.conf.EmptyBlocksMaxTime)
tickerEmptyBlocksMaxTimeCh = tickerEmptyBlocksMaxTime.C
defer tickerEmptyBlocksMaxTime.Stop()
}

//Allow the initial block to be empty
produceEmptyBlock := true
for {
select {
//Context canceled
case <-ctx.Done():
return
//Empty blocks timeout
case <-tickerEmptyBlocksMaxTimeCh:
m.logger.Error("No transactions for too long, allowing to produce empty block")
produceEmptyBlock = true
//Produce block
case <-ticker.C:
err := m.produceBlock(ctx)
err := m.produceBlock(ctx, produceEmptyBlock)
if err == types.ErrSkippedEmptyBlock {
m.logger.Debug("Skipped empty block")
continue
}
if err != nil {
m.logger.Error("error while producing block", "error", err)
continue
}
//If empty blocks enabled, after block produced, reset the timeout timer
if tickerEmptyBlocksMaxTime != nil {
produceEmptyBlock = false
tickerEmptyBlocksMaxTime.Reset(m.conf.EmptyBlocksMaxTime)
}

//Node's health check channel
case shouldProduceBlocks := <-m.shouldProduceBlocksCh:
for !shouldProduceBlocks {
m.logger.Info("Stopped block production")
shouldProduceBlocks = <-m.shouldProduceBlocksCh
}
m.logger.Info("Resumed Block production")
}

}
}

Expand Down Expand Up @@ -371,6 +402,7 @@ func (m *Manager) SyncTargetLoop(ctx context.Context) {
func (m *Manager) updateSyncParams(ctx context.Context, endHeight uint64) {
m.logger.Info("Received new syncTarget", "syncTarget", endHeight)
atomic.StoreUint64(&m.syncTarget, endHeight)
atomic.StoreInt64(&m.lastSubmissionTime, time.Now().UnixNano())
m.syncTargetDiode.Set(diodes.GenericDataType(&endHeight))
}

Expand Down Expand Up @@ -614,7 +646,7 @@ func (m *Manager) fetchBatch(daHeight uint64) (da.ResultRetrieveBatch, error) {
return batchRes, err
}

func (m *Manager) produceBlock(ctx context.Context) error {
func (m *Manager) produceBlock(ctx context.Context, allowEmpty bool) error {
var lastCommit *types.Commit
var lastHeaderHash [32]byte
var err error
Expand Down Expand Up @@ -650,9 +682,11 @@ func (m *Manager) produceBlock(ctx context.Context) error {
return err
}
} else {
m.logger.Info("Creating block", "height", newHeight)
block = m.executor.CreateBlock(newHeight, lastCommit, lastHeaderHash, m.lastState)
m.logger.Debug("block info", "num_tx", len(block.Data.Txs))
if !allowEmpty && len(block.Data.Txs) == 0 {
return types.ErrSkippedEmptyBlock
}
m.logger.Info("block created", "height", newHeight, "num_tx", len(block.Data.Txs))

abciHeaderPb := abciconv.ToABCIHeaderPB(&block.Header)
abciHeaderBytes, err := abciHeaderPb.Marshal()
Expand Down Expand Up @@ -680,10 +714,16 @@ func (m *Manager) produceBlock(ctx context.Context) error {
return err
}

// Submit batch if we've reached the batch size and there isn't another batch currently in submission process.
//TODO: move to separate function
lastSubmissionTime := atomic.LoadInt64(&m.lastSubmissionTime)
requiredByTime := time.Since(time.Unix(0, lastSubmissionTime)) > m.conf.BatchSubmitMaxTime

// SyncTarget is the height of the last block in the last batch as seen by this node.
syncTarget := atomic.LoadUint64(&m.syncTarget)
if block.Header.Height-syncTarget >= m.conf.BlockBatchSize && m.batchInProcess.Load() == false {
requiredByNumOfBlocks := (block.Header.Height - syncTarget) > m.conf.BlockBatchSize

// Submit batch if we've reached the batch size and there isn't another batch currently in submission process.
if m.batchInProcess.Load() == false && (requiredByTime || requiredByNumOfBlocks) {
m.batchInProcess.Store(true)
go m.submitNextBatch(ctx)
}
Expand All @@ -694,7 +734,8 @@ func (m *Manager) produceBlock(ctx context.Context) error {
func (m *Manager) submitNextBatch(ctx context.Context) {
// Get the batch start and end height
startHeight := atomic.LoadUint64(&m.syncTarget) + 1
endHeight := startHeight + m.conf.BlockBatchSize - 1
endHeight := uint64(m.lastState.LastBlockHeight)

// Create the batch
nextBatch, err := m.createNextDABatch(startHeight, endHeight)
if err != nil {
Expand All @@ -714,7 +755,7 @@ func (m *Manager) submitNextBatch(ctx context.Context) {
// Submit batch to SL
// TODO(omritoptix): Handle a case where the SL submission fails due to syncTarget out of sync with the latestHeight in the SL.
// In that case we'll want to update the syncTarget before returning.
go m.settlementClient.SubmitBatch(nextBatch, m.dalc.GetClientType(), &resultSubmitToDA)
m.settlementClient.SubmitBatch(nextBatch, m.dalc.GetClientType(), &resultSubmitToDA)
}

func (m *Manager) updateStateIndex(stateIndex uint64) error {
Expand Down
60 changes: 27 additions & 33 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/dymensionxyz/dymint/log/test"
mempoolv1 "github.com/dymensionxyz/dymint/mempool/v1"
"github.com/dymensionxyz/dymint/node/events"
Expand All @@ -17,9 +21,6 @@ import (
"github.com/dymensionxyz/dymint/testutil"
"github.com/dymensionxyz/dymint/types"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
tmcfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/log"
Expand Down Expand Up @@ -104,7 +105,7 @@ func TestInitialState(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {

dalc := getMockDALC(100*time.Second, logger)
dalc := getMockDALC(logger)
agg, err := NewManager(key, conf, c.genesis, c.store, nil, proxyApp, dalc, settlementlc,
nil, pubsubServer, p2pClient, logger)
assert.NoError(err)
Expand Down Expand Up @@ -143,32 +144,26 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
}

t.Log("Validating manager can't produce blocks")
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
go manager.ProduceBlockLoop(ctx)
select {
case <-ctx.Done():
assert.Equal(t, lastStoreHeight, manager.store.Height())
}
<-ctx.Done()
assert.Equal(t, lastStoreHeight, manager.store.Height())

t.Log("Sync the manager")
ctx, cancel = context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
go manager.Start(ctx, false)
select {
case <-ctx.Done():
assert.Greater(t, manager.store.Height(), lastStoreHeight)
assert.Equal(t, batch.EndHeight, manager.store.Height())
}
<-ctx.Done()
require.Greater(t, manager.store.Height(), lastStoreHeight)
assert.Equal(t, batch.EndHeight, manager.store.Height())

t.Log("Validate blocks are produced")
ctx, cancel = context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
go manager.ProduceBlockLoop(ctx)
select {
case <-ctx.Done():
assert.Greater(t, manager.store.Height(), batch.EndHeight)
}
<-ctx.Done()
assert.Greater(t, manager.store.Height(), batch.EndHeight)
}

func TestRetrieveDaBatchesFailed(t *testing.T) {
Expand All @@ -194,7 +189,7 @@ func TestProduceNewBlock(t *testing.T) {
manager, err := getManager(getManagerConfig(), nil, nil, 1, 1, 0, proxyApp, nil)
require.NoError(t, err)
// Produce block
err = manager.produceBlock(context.Background())
err = manager.produceBlock(context.Background(), true)
require.NoError(t, err)
// Validate state is updated with the commit hash
assert.Equal(t, uint64(1), manager.store.Height())
Expand All @@ -221,7 +216,7 @@ func TestProducePendingBlock(t *testing.T) {
_, err = manager.store.SaveBlock(block, &block.LastCommit, nil)
require.NoError(t, err)
// Produce block
err = manager.produceBlock(context.Background())
err = manager.produceBlock(context.Background(), true)
require.NoError(t, err)
// Validate state is updated with the block that was saved in the store
assert.Equal(t, block.Header.Hash(), *(*[32]byte)(manager.lastState.LastBlockID.Hash))
Expand Down Expand Up @@ -391,7 +386,7 @@ func TestProduceBlockFailAfterCommit(t *testing.T) {
LastBlockAppHash: tc.LastAppCommitHash[:]}).Once()
mockStore.ShouldFailSetHeight = tc.shouldFailSetSetHeight
mockStore.ShoudFailUpdateState = tc.shouldFailUpdateState
_ = manager.produceBlock(context.Background())
_ = manager.produceBlock(context.Background(), true)
assert.Equal(tc.expectedStoreHeight, manager.store.Height())
assert.Equal(tc.expectedStateAppHash, manager.lastState.AppHash)
storeState, err := manager.store.LoadState()
Expand Down Expand Up @@ -441,7 +436,7 @@ func TestCreateNextDABatchWithBytesLimit(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
// Produce blocks
for i := 0; i < tc.blocksToProduce; i++ {
manager.produceBlock(ctx)
manager.produceBlock(ctx, true)
}

// Call createNextDABatch function
Expand Down Expand Up @@ -516,7 +511,7 @@ func getManager(conf config.BlockManagerConfig, settlementlc settlement.LayerI,
if dalc == nil {
dalc = &mockda.DataAvailabilityLayerClient{}
}
initDALCMock(dalc, pubsubServer, conf.DABlockTime, logger)
initDALCMock(dalc, pubsubServer, logger)

var proxyApp proxy.AppConns
if proxyAppConns == nil {
Expand Down Expand Up @@ -554,15 +549,15 @@ func getManager(conf config.BlockManagerConfig, settlementlc settlement.LayerI,
}

// TODO(omritoptix): Possible move out to a generic testutil
func getMockDALC(daBlockTime time.Duration, logger log.Logger) da.DataAvailabilityLayerClient {
func getMockDALC(logger log.Logger) da.DataAvailabilityLayerClient {
dalc := &mockda.DataAvailabilityLayerClient{}
initDALCMock(dalc, pubsub.NewServer(), daBlockTime, logger)
initDALCMock(dalc, pubsub.NewServer(), logger)
return dalc
}

// TODO(omritoptix): Possible move out to a generic testutil
func initDALCMock(dalc da.DataAvailabilityLayerClient, pubsubServer *pubsub.Server, daBlockTime time.Duration, logger log.Logger) {
_ = dalc.Init([]byte(daBlockTime.String()), pubsubServer, store.NewDefaultInMemoryKVStore(), logger)
func initDALCMock(dalc da.DataAvailabilityLayerClient, pubsubServer *pubsub.Server, logger log.Logger) {
_ = dalc.Init(nil, pubsubServer, store.NewDefaultInMemoryKVStore(), logger)
_ = dalc.Start()
}

Expand All @@ -581,11 +576,10 @@ func initSettlementLayerMock(settlementlc settlement.LayerI, proposer string, pu

func getManagerConfig() config.BlockManagerConfig {
return config.BlockManagerConfig{
BlockTime: 100 * time.Millisecond,
DABlockTime: 100 * time.Millisecond,
BatchSyncInterval: 1 * time.Second,
BlockBatchSize: defaultBatchSize,
DAStartHeight: 0,
NamespaceID: "0102030405060708",
BlockTime: 100 * time.Millisecond,
BlockBatchSize: defaultBatchSize,
BatchSubmitMaxTime: 30 * time.Minute,
DAStartHeight: 0,
NamespaceID: "0102030405060708",
}
}
Loading

0 comments on commit 09cab6a

Please sign in to comment.