From 540ae7f5e019699ddc189e36df1d3fdd09d298da Mon Sep 17 00:00:00 2001 From: Alonso Rodriguez Date: Thu, 4 May 2023 12:19:53 +0200 Subject: [PATCH] fix sync_info table (#2055) * fix * linter * merge set last batch info seen methods --------- Co-authored-by: Thiago Lemos --- state/pgstatestorage.go | 45 +++++----------- synchronizer/interfaces.go | 4 +- synchronizer/mock_etherman.go | 24 +++++++++ synchronizer/mock_state.go | 28 ++++++++++ synchronizer/synchronizer.go | 85 +++++++++++++++++++++++++++---- synchronizer/synchronizer_test.go | 84 ++++++++++++++++++++++-------- 6 files changed, 207 insertions(+), 63 deletions(-) diff --git a/state/pgstatestorage.go b/state/pgstatestorage.go index ce7710cde7..9435d1a428 100644 --- a/state/pgstatestorage.go +++ b/state/pgstatestorage.go @@ -542,32 +542,27 @@ func (p *PostgresStorage) GetLatestVirtualBatchTimestamp(ctx context.Context, db return timestamp, nil } -// SetLastBatchNumberSeenOnEthereum sets the last batch number that affected -// the roll-up in order to allow the components to know if the state -// is synchronized or not -func (p *PostgresStorage) SetLastBatchNumberSeenOnEthereum(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error { - const updateLastBatchSeenSQL = "UPDATE state.sync_info SET last_batch_num_seen = $1" +// SetLastBatchInfoSeenOnEthereum sets the last batch number that affected +// the roll-up and the last batch number that was consolidated on ethereum +// in order to allow the components to know if the state is synchronized or not +func (p *PostgresStorage) SetLastBatchInfoSeenOnEthereum(ctx context.Context, lastBatchNumberSeen, lastBatchNumberVerified uint64, dbTx pgx.Tx) error { + const query = ` + UPDATE state.sync_info + SET last_batch_num_seen = $1 + , last_batch_num_consolidated = $2` e := p.getExecQuerier(dbTx) - _, err := e.Exec(ctx, updateLastBatchSeenSQL, batchNumber) + _, err := e.Exec(ctx, query, lastBatchNumberSeen, lastBatchNumberVerified) return err } -// GetLastBatchNumberSeenOnEthereum returns the last batch number stored -// in the state that represents the last batch number that affected the -// roll-up in the Ethereum network. -func (p *PostgresStorage) GetLastBatchNumberSeenOnEthereum(ctx context.Context, dbTx pgx.Tx) (uint64, error) { - var batchNumber uint64 - const getLastBatchSeenSQL = "SELECT last_batch_num_seen FROM state.sync_info LIMIT 1" +// SetInitSyncBatch sets the initial batch number where the synchronization started +func (p *PostgresStorage) SetInitSyncBatch(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error { + updateInitBatchSQL := "UPDATE state.sync_info SET init_sync_batch = $1" e := p.getExecQuerier(dbTx) - err := e.QueryRow(ctx, getLastBatchSeenSQL).Scan(&batchNumber) - - if err != nil { - return 0, err - } - - return batchNumber, nil + _, err := e.Exec(ctx, updateInitBatchSQL, batchNumber) + return err } // GetBatchByNumber returns the batch with the given number. @@ -1575,18 +1570,6 @@ func (p *PostgresStorage) GetLastL2Block(ctx context.Context, dbTx pgx.Tx) (*typ return block, nil } -// GetLastVerifiedBatchNumberSeenOnEthereum gets last verified batch number seen on ethereum -func (p *PostgresStorage) GetLastVerifiedBatchNumberSeenOnEthereum(ctx context.Context, dbTx pgx.Tx) (uint64, error) { - const getLastVerifiedBatchSeenSQL = "SELECT last_batch_num_verified FROM state.sync_info LIMIT 1" - var batchNumber uint64 - e := p.getExecQuerier(dbTx) - err := e.QueryRow(ctx, getLastVerifiedBatchSeenSQL).Scan(&batchNumber) - if err != nil { - return 0, err - } - return batchNumber, nil -} - // GetLastVerifiedBatch gets last verified batch func (p *PostgresStorage) GetLastVerifiedBatch(ctx context.Context, dbTx pgx.Tx) (*VerifiedBatch, error) { const query = "SELECT block_num, batch_num, tx_hash, aggregator FROM state.verified_batch ORDER BY batch_num DESC LIMIT 1" diff --git a/synchronizer/interfaces.go b/synchronizer/interfaces.go index 2045b1564e..aa7200f699 100644 --- a/synchronizer/interfaces.go +++ b/synchronizer/interfaces.go @@ -23,6 +23,7 @@ type ethermanInterface interface { GetTrustedSequencerURL() (string, error) VerifyGenBlockNumber(ctx context.Context, genBlockNumber uint64) (bool, error) GetForks(ctx context.Context) ([]state.ForkIDInterval, error) + GetLatestVerifiedBatchNum() (uint64, error) } // stateInterface gathers the methods required to interact with the state. @@ -56,7 +57,8 @@ type stateInterface interface { ResetForkID(ctx context.Context, batchNumber, forkID uint64, version string, dbTx pgx.Tx) error GetForkIDTrustedReorgCount(ctx context.Context, forkID uint64, version string, dbTx pgx.Tx) (uint64, error) UpdateForkIDIntervals(intervals []state.ForkIDInterval) - + SetLastBatchInfoSeenOnEthereum(ctx context.Context, lastBatchNumberSeen, lastBatchNumberVerified uint64, dbTx pgx.Tx) error + SetInitSyncBatch(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error BeginStateTransaction(ctx context.Context) (pgx.Tx, error) } diff --git a/synchronizer/mock_etherman.go b/synchronizer/mock_etherman.go index 036a115fed..f2d1ea9ad7 100644 --- a/synchronizer/mock_etherman.go +++ b/synchronizer/mock_etherman.go @@ -98,6 +98,30 @@ func (_m *ethermanMock) GetLatestBatchNumber() (uint64, error) { return r0, r1 } +// GetLatestVerifiedBatchNum provides a mock function with given fields: +func (_m *ethermanMock) GetLatestVerifiedBatchNum() (uint64, error) { + ret := _m.Called() + + var r0 uint64 + var r1 error + if rf, ok := ret.Get(0).(func() (uint64, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() uint64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint64) + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetRollupInfoByBlockRange provides a mock function with given fields: ctx, fromBlock, toBlock func (_m *ethermanMock) GetRollupInfoByBlockRange(ctx context.Context, fromBlock uint64, toBlock *uint64) ([]etherman.Block, map[common.Hash][]etherman.Order, error) { ret := _m.Called(ctx, fromBlock, toBlock) diff --git a/synchronizer/mock_state.go b/synchronizer/mock_state.go index c4f69cbfc7..8e05a7d1ba 100644 --- a/synchronizer/mock_state.go +++ b/synchronizer/mock_state.go @@ -591,6 +591,34 @@ func (_m *stateMock) SetGenesis(ctx context.Context, block state.Block, genesis return r0, r1 } +// SetInitSyncBatch provides a mock function with given fields: ctx, batchNumber, dbTx +func (_m *stateMock) SetInitSyncBatch(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error { + ret := _m.Called(ctx, batchNumber, dbTx) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, uint64, pgx.Tx) error); ok { + r0 = rf(ctx, batchNumber, dbTx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SetLastBatchInfoSeenOnEthereum provides a mock function with given fields: ctx, lastBatchNumberSeen, lastBatchNumberVerified, dbTx +func (_m *stateMock) SetLastBatchInfoSeenOnEthereum(ctx context.Context, lastBatchNumberSeen uint64, lastBatchNumberVerified uint64, dbTx pgx.Tx) error { + ret := _m.Called(ctx, lastBatchNumberSeen, lastBatchNumberVerified, dbTx) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, uint64, uint64, pgx.Tx) error); ok { + r0 = rf(ctx, lastBatchNumberSeen, lastBatchNumberVerified, dbTx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // StoreTransactions provides a mock function with given fields: ctx, batchNum, processedTxs, dbTx func (_m *stateMock) StoreTransactions(ctx context.Context, batchNum uint64, processedTxs []*state.ProcessTransactionResponse, dbTx pgx.Tx) error { ret := _m.Called(ctx, batchNum, processedTxs, dbTx) diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 93641e20c5..1bd1ace820 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -75,7 +75,8 @@ func (s *ClientSynchronizer) Sync() error { log.Info("Sync started") dbTx, err := s.state.BeginStateTransaction(s.ctx) if err != nil { - log.Fatalf("error creating db transaction to get latest block") + log.Errorf("error creating db transaction to get latest block. Error: %v", err) + return err } lastEthBlockSynced, err := s.state.GetLastBlock(s.ctx, dbTx) if err != nil { @@ -84,15 +85,31 @@ func (s *ClientSynchronizer) Sync() error { valid, err := s.etherMan.VerifyGenBlockNumber(s.ctx, s.genesis.GenesisBlockNum) if err != nil { log.Error("error checking genesis block number. Error: ", err) + rollbackErr := dbTx.Rollback(s.ctx) + if rollbackErr != nil { + log.Errorf("error rolling back state. RollbackErr: %v, err: %s", rollbackErr, err.Error()) + return rollbackErr + } return err } else if !valid { log.Error("genesis Block number configured is not valid. It is required the block number where the PolygonZkEVM smc was deployed") + rollbackErr := dbTx.Rollback(s.ctx) + if rollbackErr != nil { + log.Errorf("error rolling back state. RollbackErr: %v", rollbackErr) + return rollbackErr + } return fmt.Errorf("genesis Block number configured is not valid. It is required the block number where the PolygonZkEVM smc was deployed") } log.Info("Setting genesis block") header, err := s.etherMan.HeaderByNumber(s.ctx, big.NewInt(0).SetUint64(s.genesis.GenesisBlockNum)) if err != nil { - log.Fatal("error getting l1 block header for block ", s.genesis.GenesisBlockNum, " : ", err) + log.Errorf("error getting l1 block header for block %d. Error: %v", s.genesis.GenesisBlockNum, err) + rollbackErr := dbTx.Rollback(s.ctx) + if rollbackErr != nil { + log.Errorf("error rolling back state. RollbackErr: %v, err: %s", rollbackErr, err.Error()) + return rollbackErr + } + return err } lastEthBlockSynced = &state.Block{ BlockNumber: header.Number.Uint64(), @@ -102,26 +119,64 @@ func (s *ClientSynchronizer) Sync() error { } newRoot, err := s.state.SetGenesis(s.ctx, *lastEthBlockSynced, s.genesis, dbTx) if err != nil { - log.Fatal("error setting genesis: ", err) + log.Error("error setting genesis: ", err) + rollbackErr := dbTx.Rollback(s.ctx) + if rollbackErr != nil { + log.Errorf("error rolling back state. RollbackErr: %v, err: %s", rollbackErr, err.Error()) + return rollbackErr + } + return err } var root common.Hash root.SetBytes(newRoot) if root != s.genesis.Root { - log.Fatal("Calculated newRoot should be ", s.genesis.Root, " instead of ", root) + log.Errorf("Calculated newRoot should be %s instead of %s", s.genesis.Root.String(), root.String()) + rollbackErr := dbTx.Rollback(s.ctx) + if rollbackErr != nil { + log.Errorf("error rolling back state. RollbackErr: %v", rollbackErr) + return rollbackErr + } + return fmt.Errorf("Calculated newRoot should be %s instead of %s", s.genesis.Root.String(), root.String()) } log.Debug("Genesis root matches!") } else { - log.Fatal("unexpected error getting the latest ethereum block. Error: ", err) + log.Error("unexpected error getting the latest ethereum block. Error: ", err) + rollbackErr := dbTx.Rollback(s.ctx) + if rollbackErr != nil { + log.Errorf("error rolling back state. RollbackErr: %v, err: %s", rollbackErr, err.Error()) + return rollbackErr + } + return err + } + } + initBatchNumber, err := s.state.GetLastBatchNumber(s.ctx, dbTx) + if err != nil { + log.Error("error getting latest batchNumber synced. Error: ", err) + rollbackErr := dbTx.Rollback(s.ctx) + if rollbackErr != nil { + log.Errorf("error rolling back state. RollbackErr: %v, err: %s", rollbackErr, err.Error()) + return rollbackErr + } + return err + } + err = s.state.SetInitSyncBatch(s.ctx, initBatchNumber, dbTx) + if err != nil { + log.Error("error setting initial batch number. Error: ", err) + rollbackErr := dbTx.Rollback(s.ctx) + if rollbackErr != nil { + log.Errorf("error rolling back state. RollbackErr: %v, err: %s", rollbackErr, err.Error()) + return rollbackErr } + return err } if err := dbTx.Commit(s.ctx); err != nil { log.Errorf("error committing dbTx, err: %v", err) rollbackErr := dbTx.Rollback(s.ctx) if rollbackErr != nil { - log.Fatalf("error rolling back state. RollbackErr: %s, err: %v", - rollbackErr.Error(), err) + log.Errorf("error rolling back state. RollbackErr: %v, err: %s", rollbackErr, err.Error()) + return rollbackErr } - log.Fatalf("error committing dbTx, err: %v", err) + return err } for { @@ -136,9 +191,21 @@ func (s *ClientSynchronizer) Sync() error { } latestSyncedBatch, err := s.state.GetLastBatchNumber(s.ctx, nil) if err != nil { - log.Warn("error getting latest batch synced. Error: ", err) + log.Warn("error getting latest batch synced in the db. Error: ", err) continue } + // Check the latest verified Batch number in the smc + lastVerifiedBatchNumber, err := s.etherMan.GetLatestVerifiedBatchNum() + if err != nil { + log.Warn("error getting last verified batch in the rollup. Error: ", err) + continue + } + err = s.state.SetLastBatchInfoSeenOnEthereum(s.ctx, latestSequencedBatchNumber, lastVerifiedBatchNumber, nil) + if err != nil { + log.Warn("error setting latest batch info into db. Error: ", err) + continue + } + // Sync trusted state if latestSyncedBatch >= latestSequencedBatchNumber { log.Info("L1 state fully synchronized") diff --git a/synchronizer/synchronizer_test.go b/synchronizer/synchronizer_test.go index 19e298de29..94d7218e3e 100644 --- a/synchronizer/synchronizer_test.go +++ b/synchronizer/synchronizer_test.go @@ -381,11 +381,42 @@ func TestForcedBatch(t *testing.T) { Return(lastBlock, nil). Once() + m.State. + On("GetLastBatchNumber", ctx, m.DbTx). + Return(uint64(10), nil). + Once() + + m.State. + On("SetInitSyncBatch", ctx, uint64(10), m.DbTx). + Return(nil). + Once() + m.DbTx. On("Commit", ctx). Return(nil). Once() + m.Etherman. + On("GetLatestBatchNumber"). + Return(uint64(10), nil). + Once() + + var nilDbTx pgx.Tx + m.State. + On("GetLastBatchNumber", ctx, nilDbTx). + Return(uint64(10), nil). + Once() + + m.Etherman. + On("GetLatestVerifiedBatchNum"). + Return(uint64(10), nil). + Once() + + m.State. + On("SetLastBatchInfoSeenOnEthereum", ctx, uint64(10), uint64(10), nilDbTx). + Return(nil). + Once() + m.Etherman. On("EthBlockByNumber", ctx, lastBlock.BlockNumber). Return(ethBlock, nil). @@ -546,17 +577,6 @@ func TestForcedBatch(t *testing.T) { Run(func(args mock.Arguments) { sync.Stop() }). Return(nil). Once() - - m.Etherman. - On("GetLatestBatchNumber"). - Return(uint64(10), nil). - Once() - - var nilDbTx pgx.Tx - m.State. - On("GetLastBatchNumber", ctx, nilDbTx). - Return(uint64(10), nil). - Once() }). Return(m.DbTx, nil). Once() @@ -601,11 +621,42 @@ func TestSequenceForcedBatch(t *testing.T) { Return(lastBlock, nil). Once() + m.State. + On("GetLastBatchNumber", ctx, m.DbTx). + Return(uint64(10), nil). + Once() + + m.State. + On("SetInitSyncBatch", ctx, uint64(10), m.DbTx). + Return(nil). + Once() + m.DbTx. On("Commit", ctx). Return(nil). Once() + m.Etherman. + On("GetLatestBatchNumber"). + Return(uint64(10), nil). + Once() + + var nilDbTx pgx.Tx + m.State. + On("GetLastBatchNumber", ctx, nilDbTx). + Return(uint64(10), nil). + Once() + + m.Etherman. + On("GetLatestVerifiedBatchNum"). + Return(uint64(10), nil). + Once() + + m.State. + On("SetLastBatchInfoSeenOnEthereum", ctx, uint64(10), uint64(10), nilDbTx). + Return(nil). + Once() + m.Etherman. On("EthBlockByNumber", ctx, lastBlock.BlockNumber). Return(ethBlock, nil). @@ -751,17 +802,6 @@ func TestSequenceForcedBatch(t *testing.T) { Run(func(args mock.Arguments) { sync.Stop() }). Return(nil). Once() - - m.Etherman. - On("GetLatestBatchNumber"). - Return(uint64(10), nil). - Once() - - var nilDbTx pgx.Tx - m.State. - On("GetLastBatchNumber", ctx, nilDbTx). - Return(uint64(10), nil). - Once() }). Return(m.DbTx, nil). Once()