Skip to content

Commit

Permalink
fix: reduced empty blocks submission (dymensionxyz#452)
Browse files Browse the repository at this point in the history
  • Loading branch information
mtsitrin authored Aug 23, 2023
1 parent 567abcd commit 2e9bb1d
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 19 deletions.
46 changes: 31 additions & 15 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func (m *Manager) Start(ctx context.Context, isAggregator bool) error {
m.logger.Info("Starting in aggregator mode")
// TODO(omritoptix): change to private methods
go m.ProduceBlockLoop(ctx)
go m.SubmitLoop(ctx)
}
// TODO(omritoptix): change to private methods
go m.RetriveLoop(ctx)
Expand Down Expand Up @@ -282,6 +283,36 @@ func (m *Manager) waitForSync(ctx context.Context) error {
return nil
}

func (m *Manager) SubmitLoop(ctx context.Context) {
ticker := time.NewTicker(m.conf.BatchSubmitMaxTime)
defer ticker.Stop()

for {
select {
//Context canceled
case <-ctx.Done():
return
case <-ticker.C:
// SyncTarget is the height of the last block in the last batch as seen by this node.
syncTarget := atomic.LoadUint64(&m.syncTarget)
height := m.store.Height()
//no new blocks produced yet
if (height - syncTarget) == 0 {
continue
}

// Submit batch if we've reached the batch size and there isn't another batch currently in submission process.
if m.batchInProcess.Load() == false {
m.batchInProcess.Store(true)
go m.submitNextBatch(ctx)
}

//TODO: add the case of batch size (should be signaled from the the block production)
// case <- requiredByNumOfBlocks
}
}
}

// ProduceBlockLoop is calling publishBlock in a loop as long as wer'e synced.
func (m *Manager) ProduceBlockLoop(ctx context.Context) {
atomic.StoreInt64(&m.lastSubmissionTime, time.Now().Unix())
Expand Down Expand Up @@ -708,21 +739,6 @@ func (m *Manager) produceBlock(ctx context.Context, allowEmpty bool) error {

m.logger.Info("block created", "height", newHeight, "num_tx", len(block.Data.Txs))
rollappHeightGauge.Set(float64(newHeight))

//TODO: move to separate function
lastSubmissionTime := atomic.LoadInt64(&m.lastSubmissionTime)
requiredByTime := time.Since(time.Unix(0, lastSubmissionTime)) > m.conf.BatchSubmitMaxTime

// SyncTarget is the height of the last block in the last batch as seen by this node.
syncTarget := atomic.LoadUint64(&m.syncTarget)
requiredByNumOfBlocks := (block.Header.Height - syncTarget) > m.conf.BlockBatchSize

// Submit batch if we've reached the batch size and there isn't another batch currently in submission process.
if m.batchInProcess.Load() == false && (requiredByTime || requiredByNumOfBlocks) {
m.batchInProcess.Store(true)
go m.submitNextBatch(ctx)
}

return nil
}

Expand Down
1 change: 1 addition & 0 deletions block/production_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func TestBatchSubmissionAfterTimeout(t *testing.T) {
mCtx, cancel := context.WithTimeout(context.Background(), runTime)
defer cancel()
go manager.ProduceBlockLoop(mCtx)
go manager.SubmitLoop(mCtx)
<-mCtx.Done()

require.True(manager.batchInProcess.Load() == true)
Expand Down
4 changes: 0 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,6 @@ func (c BlockManagerConfig) Validate() error {
return fmt.Errorf("empty_blocks_max_time must be greater than block_time")
}

if c.EmptyBlocksMaxTime != 0 && c.BatchSubmitMaxTime < c.EmptyBlocksMaxTime {
return fmt.Errorf("batch_submit_max_time must be greater than empty_blocks_max_time")
}

if c.BatchSubmitMaxTime < c.BlockTime {
return fmt.Errorf("batch_submit_max_time must be greater than block_time")
}
Expand Down

0 comments on commit 2e9bb1d

Please sign in to comment.