diff --git a/block/block.go b/block/block.go index 7f5f98fc1..24411f95f 100644 --- a/block/block.go +++ b/block/block.go @@ -118,34 +118,29 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta return nil } +// TODO: move to gossip.go func (m *Manager) attemptApplyCachedBlocks() error { - m.applyCachedBlockMutex.Lock() - defer m.applyCachedBlockMutex.Unlock() + m.retrieverMutex.Lock() + defer m.retrieverMutex.Unlock() for { expectedHeight := m.store.NextHeight() - prevCachedBlock, blockExists := m.prevBlock[expectedHeight] - prevCachedCommit, commitExists := m.prevCommit[expectedHeight] - - if !blockExists || !commitExists { + cachedBlock, blockExists := m.blockCache[expectedHeight] + if !blockExists { break } // Note: cached pairs have passed basic validation, so no need to validate again - err := m.applyBlock(prevCachedBlock, prevCachedCommit, blockMetaData{source: gossipedBlock}) + err := m.applyBlock(cachedBlock.Block, cachedBlock.Commit, blockMetaData{source: gossipedBlock}) if err != nil { return fmt.Errorf("apply cached block: expected height: %d: %w", expectedHeight, err) } m.logger.Debug("applied cached block", "height", expectedHeight) - } - for k := range m.prevBlock { - if k <= m.store.Height() { - delete(m.prevBlock, k) - delete(m.prevCommit, k) - } + delete(m.blockCache, cachedBlock.Block.Header.Height) } + return nil } diff --git a/block/manager.go b/block/manager.go index f30de5925..a6f5be6df 100644 --- a/block/manager.go +++ b/block/manager.go @@ -60,12 +60,26 @@ type Manager struct { // Block production shouldProduceBlocksCh chan bool produceEmptyBlockCh chan bool - produceBlockMutex sync.Mutex - applyCachedBlockMutex sync.Mutex + lastSubmissionTime atomic.Int64 + + /* + Guard against triggering a new batch submission when the old one is still going on (taking a while) + */ + submitBatchMutex sync.Mutex + + /* + Protect against producing two blocks at once if the first one is taking a while + Also, used to protect against the block production that occurs when batch submission thread + creates its empty block. + */ + produceBlockMutex sync.Mutex + + /* + Protect against processing two blocks at once when there are two routines handling incoming gossiped blocks, + and incoming DA blocks, respectively. + */ + retrieverMutex sync.Mutex - // batch submission - batchInProcess sync.Mutex - lastSubmissionTime atomic.Int64 // pendingBatch is the result of the last DA submission // that is pending settlement layer submission. // It is used to avoid double submission of the same batch. @@ -75,8 +89,7 @@ type Manager struct { logger types.Logger // Cached blocks and commits for applying at future heights. Invariant: the block and commit are .Valid() (validated sigs etc) - prevBlock map[uint64]*types.Block - prevCommit map[uint64]*types.Commit + blockCache map[uint64]CachedBlock } // NewManager creates new block Manager. @@ -125,8 +138,7 @@ func NewManager( shouldProduceBlocksCh: make(chan bool, 1), produceEmptyBlockCh: make(chan bool, 1), logger: logger, - prevBlock: make(map[uint64]*types.Block), - prevCommit: make(map[uint64]*types.Commit), + blockCache: make(map[uint64]CachedBlock), } return agg, nil @@ -223,9 +235,10 @@ func (m *Manager) onNodeHealthStatus(event pubsub.Message) { m.shouldProduceBlocksCh <- eventData.Error == nil } +// TODO: move to gossip.go // onNewGossippedBlock will take a block and apply it func (m *Manager) onNewGossipedBlock(event pubsub.Message) { - m.logger.Debug("Received new block event", "eventData", event.Data(), "cachedBlocks", len(m.prevBlock)) + m.logger.Debug("Received new block event", "eventData", event.Data(), "cachedBlocks", len(m.blockCache)) eventData := event.Data().(p2p.GossipedBlock) block := eventData.Block commit := eventData.Commit @@ -236,17 +249,20 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) { return } - // if height is expected, apply - // if height is higher than expected (future block), cache - if block.Header.Height == m.store.NextHeight() { - err := m.applyBlock(&block, &commit, blockMetaData{source: gossipedBlock}) + nextHeight := m.store.NextHeight() + if block.Header.Height >= nextHeight { + m.blockCache[block.Header.Height] = CachedBlock{ + Block: &block, + Commit: &commit, + } + m.logger.Debug("caching block", "block height", block.Header.Height, "store height", m.store.Height()) + } + + if block.Header.Height == nextHeight { + err := m.attemptApplyCachedBlocks() if err != nil { - m.logger.Error("apply gossiped block", "err", err) + m.logger.Error("applying cached blocks", "err", err) } - } else if block.Header.Height > m.store.NextHeight() { - m.prevBlock[block.Header.Height] = &block - m.prevCommit[block.Header.Height] = &commit - m.logger.Debug("Caching block", "block height", block.Header.Height, "store height", m.store.Height()) } } diff --git a/block/retriever.go b/block/retriever.go index b243598ee..aafd3de9b 100644 --- a/block/retriever.go +++ b/block/retriever.go @@ -62,6 +62,13 @@ func (m *Manager) syncUntilTarget(syncTarget uint64) error { } } m.logger.Info("Synced", "current height", currentHeight, "syncTarget", syncTarget) + + // check for cached blocks + err := m.attemptApplyCachedBlocks() + if err != nil { + m.logger.Error("applying previous cached blocks", "err", err) + } + return nil } @@ -84,6 +91,9 @@ func (m *Manager) processNextDABatch(daMetaData *da.DASubmitMetaData) error { m.logger.Debug("retrieved batches", "n", len(batchResp.Batches), "daHeight", daMetaData.Height) + m.retrieverMutex.Lock() + defer m.retrieverMutex.Unlock() + for _, batch := range batchResp.Batches { for i, block := range batch.Blocks { if block.Header.Height != m.store.NextHeight() { @@ -97,12 +107,9 @@ func (m *Manager) processNextDABatch(daMetaData *da.DASubmitMetaData) error { if err != nil { return fmt.Errorf("apply block: height: %d: %w", block.Header.Height, err) } - } - } - err := m.attemptApplyCachedBlocks() - if err != nil { - m.logger.Error("applying previous cached blocks", "err", err) + delete(m.blockCache, block.Header.Height) + } } return nil } diff --git a/block/submit.go b/block/submit.go index 4ac1741c1..67ff559c7 100644 --- a/block/submit.go +++ b/block/submit.go @@ -32,11 +32,11 @@ func (m *Manager) SubmitLoop(ctx context.Context) { // Finally, it submits the next batch of blocks and updates the sync target to the height of // the last block in the submitted batch. func (m *Manager) handleSubmissionTrigger(ctx context.Context) { - if !m.batchInProcess.TryLock() { // Attempt to lock for batch processing + if !m.submitBatchMutex.TryLock() { // Attempt to lock for batch processing m.logger.Debug("Batch submission already in process, skipping submission") return } - defer m.batchInProcess.Unlock() // Ensure unlocking at the end + defer m.submitBatchMutex.Unlock() // Ensure unlocking at the end // Load current sync target and height to determine if new blocks are available for submission. syncTarget, height := m.syncTarget.Load(), m.store.Height() diff --git a/block/types.go b/block/types.go index 22573d46a..199b80782 100644 --- a/block/types.go +++ b/block/types.go @@ -23,3 +23,8 @@ type PendingBatch struct { daResult *da.ResultSubmitBatch batch *types.Batch } + +type CachedBlock struct { + Block *types.Block + Commit *types.Commit +} diff --git a/rpc/client/client.go b/rpc/client/client.go index 4f3d6ef40..17d3c42d5 100644 --- a/rpc/client/client.go +++ b/rpc/client/client.go @@ -810,6 +810,7 @@ func (c *Client) CheckTx(ctx context.Context, tx tmtypes.Tx) (*ctypes.ResultChec } func (c *Client) eventsRoutine(sub tmtypes.Subscription, subscriber string, q tmpubsub.Query, outc chan<- ctypes.ResultEvent) { + defer close(outc) for { select { case msg := <-sub.Out(): diff --git a/rpc/json/service.go b/rpc/json/service.go index 52942bfae..416a98b5c 100644 --- a/rpc/json/service.go +++ b/rpc/json/service.go @@ -120,28 +120,25 @@ func (s *service) Subscribe(req *http.Request, args *subscribeArgs, wsConn *wsCo return nil, fmt.Errorf("subscribe: %w", err) } go func(subscriptionID []byte) { - for { - select { - case msg := <-out: - // build the base response - var resp rpctypes.RPCResponse - // Check if subscriptionID is string or int and generate the rest of the response accordingly - subscriptionIDInt, err := strconv.Atoi(string(subscriptionID)) - if err != nil { - s.logger.Info("Failed to convert subscriptionID to int") - resp = rpctypes.NewRPCSuccessResponse(rpctypes.JSONRPCStringID(subscriptionID), msg) - } else { - resp = rpctypes.NewRPCSuccessResponse(rpctypes.JSONRPCIntID(subscriptionIDInt), msg) - } - // Marshal response to JSON and send it to the websocket queue - jsonBytes, err := json.MarshalIndent(resp, "", " ") - if err != nil { - s.logger.Error("marshal RPCResponse to JSON", "err", err) - continue - } - if wsConn != nil { - wsConn.queue <- jsonBytes - } + for msg := range out { + // build the base response + var resp rpctypes.RPCResponse + // Check if subscriptionID is string or int and generate the rest of the response accordingly + subscriptionIDInt, err := strconv.Atoi(string(subscriptionID)) + if err != nil { + s.logger.Info("Failed to convert subscriptionID to int") + resp = rpctypes.NewRPCSuccessResponse(rpctypes.JSONRPCStringID(subscriptionID), msg) + } else { + resp = rpctypes.NewRPCSuccessResponse(rpctypes.JSONRPCIntID(subscriptionIDInt), msg) + } + // Marshal response to JSON and send it to the websocket queue + jsonBytes, err := json.MarshalIndent(resp, "", " ") + if err != nil { + s.logger.Error("marshal RPCResponse to JSON", "err", err) + continue + } + if wsConn != nil { + wsConn.queue <- jsonBytes } } }(subscriptionID)