Skip to content

Commit

Permalink
Fix/reorg pool (0xPolygonHermez#1756)
Browse files Browse the repository at this point in the history
* fix

* fix check if reorg

* fix finalizer test
  • Loading branch information
ARR552 authored Mar 7, 2023
1 parent 32760e2 commit 420f1d0
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 39 deletions.
22 changes: 22 additions & 0 deletions pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,28 @@ func (p *Pool) AddTx(ctx context.Context, tx types.Transaction) error {
return p.storage.AddTx(ctx, poolTx)
}

// ReorgTx adds a reorged transaction to the pool with the pending state
func (p *Pool) ReorgTx(ctx context.Context, tx types.Transaction) error {
poolTx := Transaction{
Transaction: tx,
Status: TxStatusPending,
IsClaims: false,
ReceivedAt: time.Now(),
IsWIP: false,
}

poolTx.IsClaims = poolTx.IsClaimTx(p.l2BridgeAddr, p.cfg.FreeClaimGasLimit)

// Execute transaction to calculate its zkCounters
zkCounters, err := p.PreExecuteTx(ctx, tx)
if err != nil {
return err
}

poolTx.ZKCounters = zkCounters
return p.storage.AddTx(ctx, poolTx)
}

// PreExecuteTx executes a transaction to calculate its zkCounters
func (p *Pool) PreExecuteTx(ctx context.Context, tx types.Transaction) (state.ZKCounters, error) {
processBatchResponse, err := p.state.PreProcessTransaction(ctx, &tx, nil)
Expand Down
24 changes: 12 additions & 12 deletions sequencer/dbmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,18 @@ func (d *dbManager) loadFromPool() {
// TODO: Move this to a config parameter
time.Sleep(wait * time.Second)

numberOfReorgs, err := d.state.CountReorgs(d.ctx, nil)
if err != nil {
log.Error("failed to get number of reorgs: %v", err)
}

if numberOfReorgs != d.numberOfReorgs {
log.Warnf("New L2 reorg detected")
d.l2ReorgCh <- L2ReorgEvent{}
d.txsStore.Wg.Done()
continue
}

poolTransactions, err := d.txPool.GetNonWIPPendingTxs(d.ctx, false, 0)
if err != nil && err != pgpoolstorage.ErrNotFound {
log.Errorf("load tx from pool: %v", err)
Expand Down Expand Up @@ -161,18 +173,6 @@ func (d *dbManager) DeleteTransactionFromPool(ctx context.Context, txHash common
func (d *dbManager) storeProcessedTxAndDeleteFromPool() {
// TODO: Finish the retry mechanism and error handling
for {
numberOfReorgs, err := d.state.CountReorgs(d.ctx, nil)
if err != nil {
log.Error("failed to get number of reorgs: %v", err)
}

if numberOfReorgs != d.numberOfReorgs {
log.Warnf("New L2 reorg detected")
d.l2ReorgCh <- L2ReorgEvent{}
d.txsStore.Wg.Done()
continue
}

txToStore := <-d.txsStore.Ch
log.Debugf("Storing tx %v", txToStore.txResponse.TxHash)
dbTx, err := d.BeginStateTransaction(d.ctx)
Expand Down
2 changes: 1 addition & 1 deletion sequencer/finalizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func TestFinalizer_handleTransactionError(t *testing.T) {
dbManagerMock.On("DeleteTransactionFromPool", ctx, tx.Hash).Return(nil).Once()
}
if tc.expectedMoveCall {
workerMock.On("MoveTxToNotReady", oldHash, sender, &nonce, big.NewInt(0)).Return().Once()
workerMock.On("MoveTxToNotReady", oldHash, sender, &nonce, big.NewInt(0)).Return([]*TxTracker{}).Once()
}

result := &state.ProcessBatchResponse{
Expand Down
30 changes: 25 additions & 5 deletions sequencer/mock_worker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 14 additions & 5 deletions state/pgstatestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2224,7 +2224,7 @@ func (p *PostgresStorage) CountReorgs(ctx context.Context, dbTx pgx.Tx) (uint64,
}

// GetReorgedTransactions returns the transactions that were reorged
func (p *PostgresStorage) GetReorgedTransactions(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (txs []*types.Transaction, err error) {
func (p *PostgresStorage) GetReorgedTransactions(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) ([]*types.Transaction, error) {
const getReorgedTransactionsSql = "SELECT encoded FROM state.transaction t INNER JOIN state.l2block b ON t.l2_block_num = b.block_num WHERE b.batch_num >= $1 ORDER BY l2_block_num ASC"
e := p.getExecQuerier(dbTx)
rows, err := e.Query(ctx, getReorgedTransactionsSql, batchNumber)
Expand All @@ -2233,14 +2233,23 @@ func (p *PostgresStorage) GetReorgedTransactions(ctx context.Context, batchNumbe
}
defer rows.Close()

encodedTxs := make([]string, 0, len(rows.RawValues()))
txs := make([]*types.Transaction, 0, len(rows.RawValues()))

for i := 0; i < len(encodedTxs); i++ {
tx, err := DecodeTx(encodedTxs[i])
for rows.Next() {
if rows.Err() != nil {
return nil, rows.Err()
}
var encodedTx string
err := rows.Scan(&encodedTx)
if err != nil {
return nil, err
}

tx, err := DecodeTx(encodedTx)
if err != nil {
return nil, err
}
txs = append(txs, tx)
}
return
return txs, nil
}
2 changes: 1 addition & 1 deletion synchronizer/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,5 @@ type ethTxManager interface {

type poolInterface interface {
DeleteReorgedTransactions(ctx context.Context, txs []*types.Transaction) error
AddTx(ctx context.Context, tx types.Transaction) error
ReorgTx(ctx context.Context, tx types.Transaction) error
}
22 changes: 11 additions & 11 deletions synchronizer/mock_pool.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 12 additions & 3 deletions synchronizer/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ func (s *ClientSynchronizer) processSequenceBatches(sequencedBatches []etherman.
status := s.checkTrustedState(batch, tBatch, newRoot, dbTx)
if status {
// Reorg Pool
err := s.reorgPool(tBatch.BatchNumber, dbTx)
err := s.reorgPool(dbTx)
if err != nil {
rollbackErr := dbTx.Rollback(s.ctx)
if rollbackErr != nil {
Expand Down Expand Up @@ -1098,28 +1098,37 @@ func (s *ClientSynchronizer) processTrustedBatch(trustedBatch *pb.GetBatchRespon
return nil
}

func (s *ClientSynchronizer) reorgPool(batchNumber uint64, dbTx pgx.Tx) error {
func (s *ClientSynchronizer) reorgPool(dbTx pgx.Tx) error {
latestBatchNum, err := s.etherMan.GetLatestBatchNumber()
if err != nil {
log.Error("error getting the latestBatchNumber virtualized in the smc. Error: ", err)
return err
}
batchNumber := latestBatchNum + 1
// Get transactions that have to be included in the pool again
txs, err := s.state.GetReorgedTransactions(s.ctx, batchNumber, dbTx)
if err != nil {
log.Errorf("error getting txs from trusted state. BatchNumber: %d, error: %v", batchNumber, err)
return err
}
log.Debug("Reorged transactions: ", txs)

// Remove txs from the pool
err = s.pool.DeleteReorgedTransactions(s.ctx, txs)
if err != nil {
log.Errorf("error deleting txs from the pool. BatchNumber: %d, error: %v", batchNumber, err)
return err
}
log.Debug("Delete reorged transactions")

// Add txs to the pool
for _, tx := range txs {
err = s.pool.AddTx(s.ctx, *tx)
err = s.pool.ReorgTx(s.ctx, *tx)
if err != nil {
log.Errorf("error storing tx into the pool again. TxHash: %s. BatchNumber: %d, error: %v", tx.Hash().String(), batchNumber, err)
return err
}
log.Debug("Reorged transactions inserted in the pool: ", tx.Hash())
}
return nil
}
7 changes: 6 additions & 1 deletion synchronizer/synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ func TestTrustedStateReorg(t *testing.T) {
Return(nil).
Once()

m.Etherman.
On("GetLatestBatchNumber").
Return(tr.BatchNumber-1, nil).
Once()

txs := []*types.Transaction{types.NewTransaction(1, common.Address{}, big.NewInt(1), 1, big.NewInt(1), []byte{})}
m.State.
On("GetReorgedTransactions", ctx, tr.BatchNumber, m.DbTx).
Expand All @@ -181,7 +186,7 @@ func TestTrustedStateReorg(t *testing.T) {
Once()

m.Pool.
On("AddTx", ctx, *txs[0]).
On("ReorgTx", ctx, *txs[0]).
Return(nil).
Once()

Expand Down

0 comments on commit 420f1d0

Please sign in to comment.