Skip to content

Commit

Permalink
Check txs before storing them (0xPolygonHermez#864)
Browse files Browse the repository at this point in the history
* Check repeated txs before storing them

* add happy path test case

* query tx directly

* properly manage encoded txs

* address review comments

* added TestGetTxsHashesByBatchNumber

* refactor test
  • Loading branch information
fgimenez authored Jul 8, 2022
1 parent a3f8003 commit 24ae33b
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 33 deletions.
23 changes: 23 additions & 0 deletions statev2/pgstatestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
getBatchByNumberSQL = "SELECT batch_num, global_exit_root, local_exit_root, state_root, timestamp, coinbase, raw_txs_data from statev2.batch WHERE batch_num = $1"
getProcessingContextSQL = "SELECT batch_num, global_exit_root, timestamp, coinbase from statev2.batch WHERE batch_num = $1"
getEncodedTransactionsByBatchNumberSQL = "SELECT encoded FROM statev2.transaction t INNER JOIN statev2.l2block b ON t.l2_block_num = b.block_num WHERE b.batch_num = $1"
getTransactionHashesByBatchNumberSQL = "SELECT hash FROM statev2.transaction t INNER JOIN statev2.l2block b ON t.l2_block_num = b.block_num WHERE b.batch_num = $1"
getLastBatchSeenSQL = "SELECT last_batch_num_seen FROM statev2.sync_info LIMIT 1"
updateLastBatchSeenSQL = "UPDATE statev2.sync_info SET last_batch_num_seen = $1"
resetTrustedBatchSQL = "DELETE FROM statev2.batch WHERE batch_num > $1"
Expand Down Expand Up @@ -548,6 +549,28 @@ func (p *PostgresStorage) GetEncodedTransactionsByBatchNumber(ctx context.Contex
return txs, nil
}

func (p *PostgresStorage) GetTxsHashesByBatchNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (encoded []common.Hash, err error) {
e := p.getExecQuerier(dbTx)
rows, err := e.Query(ctx, getTransactionHashesByBatchNumberSQL, batchNumber)
if !errors.Is(err, pgx.ErrNoRows) && err != nil {
return nil, err
}
defer rows.Close()

txs := make([]common.Hash, 0, len(rows.RawValues()))

for rows.Next() {
var hexHash string
err := rows.Scan(&hexHash)
if err != nil {
return nil, err
}

txs = append(txs, common.HexToHash(hexHash))
}
return txs, nil
}

// ResetTrustedState resets the batches which the batch number is highter than the input.
func (p *PostgresStorage) ResetTrustedBatch(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error {
e := p.getExecQuerier(dbTx)
Expand Down
73 changes: 40 additions & 33 deletions statev2/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ var (
ErrTimestampGE = errors.New("timestamp needs to be greater or equal")
// ErrDBTxNil indicates that the method requires a dbTx that is not nil
ErrDBTxNil = errors.New("the method requires a dbTx that is not nil")
// ErrExistingTxGreaterThanProcessedTx indicates that we have more txs stored
// in db than the txs we weant to process.
ErrExistingTxGreaterThanProcessedTx = errors.New("There are more transactions in the database than in the processed transaction set")
// ErrOutOfOrderProcessedTx indicates the the processed transactions of an
// ongoing batch are not in the same order as the transactions stored in the
// database for the same batch.
ErrOutOfOrderProcessedTx = errors.New("The processed transactions are not in the same order as the stored transactions")
)

var (
Expand Down Expand Up @@ -251,13 +258,23 @@ func (s *State) processBatch(ctx context.Context, batchNumber uint64, batchL2Dat
return s.executorClient.ProcessBatch(ctx, processBatchRequest)
}

// StoreTransactions is used by the sequencer to add processed transactions into an open batch.
// If the batch already has txs, those WILL BE DELETED before adding the new ones.
// StoreTransactions is used by the sequencer to add processed transactions into
// an open batch. If the batch already has txs, the processedTxs must be a super
// set of the existing ones, preserving order.
func (s *State) StoreTransactions(ctx context.Context, batchNumber uint64, processedTxs []*ProcessTransactionResponse, dbTx pgx.Tx) error {
// TODO: check existing txs vs parameter txs
if dbTx == nil {
return ErrDBTxNil
}

// check existing txs vs parameter txs
existingTxs, err := s.GetTxsHashesByBatchNumber(ctx, batchNumber, dbTx)
if err != nil {
return err
}
if err := CheckSupersetBatchTransactions(existingTxs, processedTxs); err != nil {
return err
}

// Check if last batch is closed. Note that it's assumed that only the latest batch can be open
isBatchClosed, err := s.PostgresStorage.IsBatchClosed(ctx, batchNumber, dbTx)
if err != nil {
Expand All @@ -267,35 +284,14 @@ func (s *State) StoreTransactions(ctx context.Context, batchNumber uint64, proce
return ErrBatchAlreadyClosed
}

foundPosition := -1

processingContext, err := s.GetProcessingContext(ctx, batchNumber, dbTx)
if err != nil {
return err
}

lastL2Block, err := s.GetLastL2Block(ctx, dbTx)
if err != nil {
return err
}

// Look for the transaction that matches latest state root in data base
// in case we already have l2blocks for that batch
// to just store new transactions
if lastL2Block.Header().Number.Uint64() == batchNumber {
stateRoot := lastL2Block.Header().Root
firstTxToInsert := len(existingTxs)

for i, processedTx := range processedTxs {
if processedTx.StateRoot == stateRoot {
foundPosition = i
break
}
}
}

foundPosition++

for i := foundPosition; i < len(processedTxs); i++ {
for i := firstTxToInsert; i < len(processedTxs); i++ {
processedTx := processedTxs[i]

lastL2Block, err := s.GetLastL2Block(ctx, dbTx)
Expand All @@ -310,20 +306,16 @@ func (s *State) StoreTransactions(ctx context.Context, batchNumber uint64, proce
Root: processedTx.StateRoot,
}

transactions := []*types.Transaction{}
transactions = append(transactions, &processedTx.Tx)
transactions := []*types.Transaction{&processedTx.Tx}

// Create block to be able to calculate its hash
block := types.NewBlock(header, transactions, []*types.Header{}, []*types.Receipt{}, &trie.StackTrie{})
block.ReceivedAt = processingContext.Timestamp

receipt := generateReceipt(block, processedTx)
receipts := []*types.Receipt{}
receipts = append(receipts, receipt)
receipts := []*types.Receipt{generateReceipt(block, processedTx)}

// Store L2 block and its transaction
err = s.PostgresStorage.AddL2Block(ctx, batchNumber, block, receipts, dbTx)
if err != nil {
if err := s.PostgresStorage.AddL2Block(ctx, batchNumber, block, receipts, dbTx); err != nil {
return err
}
}
Expand Down Expand Up @@ -672,3 +664,18 @@ func (s *State) SetGenesis(ctx context.Context, genesis Genesis, dbTx pgx.Tx) er

return s.PostgresStorage.AddL2Block(ctx, batch.BatchNumber, block, []*types.Receipt{}, dbTx)
}

// CheckSupersetBatchTransactions verifies that processedTransactions is a
// superset of existingTxs and that the existing txs have the same order,
// returns a non-nil error if that is not the case.
func CheckSupersetBatchTransactions(existingTxHashes []common.Hash, processedTxs []*ProcessTransactionResponse) error {
if len(existingTxHashes) > len(processedTxs) {
return ErrExistingTxGreaterThanProcessedTx
}
for i, existingTxHash := range existingTxHashes {
if existingTxHash != processedTxs[i].TxHash {
return ErrOutOfOrderProcessedTx
}
}
return nil
}
120 changes: 120 additions & 0 deletions statev2/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,3 +530,123 @@ func TestGenesis(t *testing.T) {
err := testState.SetGenesis(ctx, genesis, nil)
require.NoError(t, err)
}

func TestCheckSupersetBatchTransactions(t *testing.T) {
tcs := []struct {
description string
existingTxHashes []common.Hash
processedTxs []*state.ProcessTransactionResponse
expectedError bool
expectedErrorMsg string
}{
{
description: "empty existingTxHashes and processedTx is successful",
existingTxHashes: []common.Hash{},
processedTxs: []*state.ProcessTransactionResponse{},
},
{
description: "happy path",
existingTxHashes: []common.Hash{
common.HexToHash("0x8a84686634729c57532b9ffa4e632e241b2de5c880c771c5c214d5e7ec465b1c"),
common.HexToHash("0x30c6a361ba88906ef2085d05a2aeac15e793caff2bdc1deaaae2f4910d83de52"),
common.HexToHash("0x0d3453b6d17841b541d4f79f78d5fa22fff281551ed4012c7590b560b2969e7f"),
},
processedTxs: []*state.ProcessTransactionResponse{
{TxHash: common.HexToHash("0x8a84686634729c57532b9ffa4e632e241b2de5c880c771c5c214d5e7ec465b1c")},
{TxHash: common.HexToHash("0x30c6a361ba88906ef2085d05a2aeac15e793caff2bdc1deaaae2f4910d83de52")},
{TxHash: common.HexToHash("0x0d3453b6d17841b541d4f79f78d5fa22fff281551ed4012c7590b560b2969e7f")},
},
},
{
description: "existingTxHashes bigger than processedTx gives error",
existingTxHashes: []common.Hash{common.HexToHash(""), common.HexToHash("")},
processedTxs: []*state.ProcessTransactionResponse{{}},
expectedError: true,
expectedErrorMsg: state.ErrExistingTxGreaterThanProcessedTx.Error(),
},
{
description: "processedTx not present in existingTxHashes gives error",
existingTxHashes: []common.Hash{
common.HexToHash("0x8a84686634729c57532b9ffa4e632e241b2de5c880c771c5c214d5e7ec465b1c"),
common.HexToHash("0x30c6a361ba88906ef2085d05a2aeac15e793caff2bdc1deaaae2f4910d83de52"),
},
processedTxs: []*state.ProcessTransactionResponse{
{TxHash: common.HexToHash("0x8a84686634729c57532b9ffa4e632e241b2de5c880c771c5c214d5e7ec465b1c")},
{TxHash: common.HexToHash("0x0d3453b6d17841b541d4f79f78d5fa22fff281551ed4012c7590b560b2969e7f")},
},
expectedError: true,
expectedErrorMsg: state.ErrOutOfOrderProcessedTx.Error(),
},
{
description: "out of order processedTx gives error",
existingTxHashes: []common.Hash{
common.HexToHash("0x8a84686634729c57532b9ffa4e632e241b2de5c880c771c5c214d5e7ec465b1c"),
common.HexToHash("0x30c6a361ba88906ef2085d05a2aeac15e793caff2bdc1deaaae2f4910d83de52"),
common.HexToHash("0x0d3453b6d17841b541d4f79f78d5fa22fff281551ed4012c7590b560b2969e7f"),
},
processedTxs: []*state.ProcessTransactionResponse{
{TxHash: common.HexToHash("0x8a84686634729c57532b9ffa4e632e241b2de5c880c771c5c214d5e7ec465b1c")},
{TxHash: common.HexToHash("0x0d3453b6d17841b541d4f79f78d5fa22fff281551ed4012c7590b560b2969e7f")},
{TxHash: common.HexToHash("0x30c6a361ba88906ef2085d05a2aeac15e793caff2bdc1deaaae2f4910d83de52")},
},
expectedError: true,
expectedErrorMsg: state.ErrOutOfOrderProcessedTx.Error(),
},
}
for _, tc := range tcs {
tc := tc
t.Run(tc.description, func(t *testing.T) {
require.NoError(t, testutils.CheckError(
state.CheckSupersetBatchTransactions(tc.existingTxHashes, tc.processedTxs),
tc.expectedError,
tc.expectedErrorMsg,
))
})
}
}

func TestGetTxsHashesByBatchNumber(t *testing.T) {
// Init database instance
err := dbutils.InitOrReset(cfg)
require.NoError(t, err)
ctx := context.Background()
dbTx, err := testState.BeginStateTransaction(ctx)
require.NoError(t, err)
// Set genesis batch
err = testState.SetGenesis(ctx, state.Genesis{}, dbTx)
require.NoError(t, err)
// Open batch #1
processingCtx1 := state.ProcessingContext{
BatchNumber: 1,
Coinbase: common.HexToAddress("1"),
Timestamp: time.Now().UTC(),
GlobalExitRoot: common.HexToHash("a"),
}
err = testState.OpenBatch(ctx, processingCtx1, dbTx)
require.NoError(t, err)

// Add txs to batch #1
tx1 := *types.NewTransaction(0, common.HexToAddress("0"), big.NewInt(0), 0, big.NewInt(0), []byte("aaa"))
tx2 := *types.NewTransaction(1, common.HexToAddress("1"), big.NewInt(1), 0, big.NewInt(1), []byte("bbb"))
txsBatch1 := []*state.ProcessTransactionResponse{
{
TxHash: tx1.Hash(),
Tx: tx1,
},
{
TxHash: tx2.Hash(),
Tx: tx2,
},
}
err = testState.StoreTransactions(ctx, 1, txsBatch1, dbTx)
require.NoError(t, err)

txs, err := testState.GetTxsHashesByBatchNumber(ctx, 1, dbTx)
require.NoError(t, err)

require.Equal(t, len(txsBatch1), len(txs))
for i := range txsBatch1 {
require.Equal(t, txsBatch1[i].TxHash, txs[i])
}
require.NoError(t, dbTx.Commit(ctx))
}

0 comments on commit 24ae33b

Please sign in to comment.