Skip to content

Commit

Permalink
Sequencer L2 block parallel processing improvements (0xPolygonHermez#…
Browse files Browse the repository at this point in the history
…3604) (0xPolygonHermez#3637)

* wip

* first implementation of parallel sequencer optmizations and L2 block reorg management

* Close sipBatch (if needed) when processing reorg. Halt when 2 consecuties reorgs (same L2 block)

* Return error when reserved counters overflow on l2 block process. Log used/reserved counters when closing wip batch

* added logs to analyze blocking issue when storing L2 block

* Fix unlock mutex in addTxTracker. Set wipTx to nil in RestoreTxsPendingToStore

* add high reserved resorces in wipBatch

* store high reserved counter on statedb.batch table

* Return contextId in ProcessBatchV2

* fix synchornizer test

* Set SequentialProcessL2Block to false by default. Update node config documentation

* fix non-e2e tests

* fix finalizer tests

* remove unused code

* test

* Fix sequencer loadFromPool gofunc. Fix docker compose variables
  • Loading branch information
agnusmor authored May 22, 2024
1 parent 2292cd5 commit a5f9558
Show file tree
Hide file tree
Showing 46 changed files with 1,470 additions and 752 deletions.
6 changes: 5 additions & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ func Test_Defaults(t *testing.T) {
path: "Sequencer.Finalizer.ResourceExhaustedMarginPct",
expectedValue: uint32(10),
},
{
path: "Sequencer.Finalizer.StateRootSyncInterval",
expectedValue: types.NewDuration(3600 * time.Second),
},
{
path: "Sequencer.Finalizer.ForcedBatchesL1BlockConfirmations",
expectedValue: uint64(64),
Expand All @@ -127,7 +131,7 @@ func Test_Defaults(t *testing.T) {
},
{
path: "Sequencer.Finalizer.BatchMaxDeltaTimestamp",
expectedValue: types.NewDuration(10 * time.Second),
expectedValue: types.NewDuration(1800 * time.Second),
},
{
path: "Sequencer.Finalizer.Metrics.Interval",
Expand Down
5 changes: 3 additions & 2 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,13 @@ StateConsistencyCheckInterval = "5s"
ForcedBatchesCheckInterval = "10s"
L1InfoTreeL1BlockConfirmations = 64
L1InfoTreeCheckInterval = "10s"
BatchMaxDeltaTimestamp = "10s"
BatchMaxDeltaTimestamp = "1800s"
L2BlockMaxDeltaTimestamp = "3s"
ResourceExhaustedMarginPct = 10
StateRootSyncInterval = "3600s"
HaltOnBatchNumber = 0
SequentialBatchSanityCheck = false
SequentialProcessL2Block = true
SequentialProcessL2Block = false
[Sequencer.Finalizer.Metrics]
Interval = "60m"
EnableLog = true
Expand Down
3 changes: 2 additions & 1 deletion config/environments/local/local.node.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,10 @@ StateConsistencyCheckInterval = "5s"
BatchMaxDeltaTimestamp = "120s"
L2BlockMaxDeltaTimestamp = "3s"
ResourceExhaustedMarginPct = 10
StateRootSyncInterval = "360s"
HaltOnBatchNumber = 0
SequentialBatchSanityCheck = false
SequentialProcessL2Block = true
SequentialProcessL2Block = false
[Sequencer.Finalizer.Metrics]
Interval = "60m"
EnableLog = true
Expand Down
27 changes: 5 additions & 22 deletions db/migrations/state/0021.sql
Original file line number Diff line number Diff line change
@@ -1,25 +1,8 @@
-- +migrate Up

-- the update below fix the wrong receipt TX indexes
WITH map_fix_tx_index AS (
SELECT t.l2_block_num AS block_num
, t.hash AS tx_hash
, r.tx_index AS current_index
, (ROW_NUMBER() OVER (PARTITION BY t.l2_block_num ORDER BY r.tx_index))-1 AS correct_index
FROM state.receipt r
INNER JOIN state."transaction" t
ON t.hash = r.tx_hash
)
UPDATE state.receipt AS r
SET tx_index = m.correct_index
FROM map_fix_tx_index m
WHERE m.block_num = r.block_num
AND m.tx_hash = r.tx_hash
AND m.current_index = r.tx_index
AND m.current_index != m.correct_index;

ALTER TABLE state.batch
ADD COLUMN high_reserved_counters JSONB;

-- +migrate Down

-- no action is needed, the data fixed by the
-- migrate up must remain fixed
ALTER TABLE state.batch
DROP COLUMN high_reserved_counters;
151 changes: 35 additions & 116 deletions db/migrations/state/0021_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,142 +4,61 @@ import (
"database/sql"
"testing"

"github.com/0xPolygonHermez/zkevm-node/hex"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/assert"
)

type migrationTest0021TestCase struct {
Name string
Block migrationTest0021TestCaseBlock
}

type migrationTest0021TestCaseBlock struct {
Transactions []migrationTest0021TestCaseTransaction
}

type migrationTest0021TestCaseTransaction struct {
CurrentIndex uint
}

type migrationTest0021 struct {
TestCases []migrationTest0021TestCase
}
type migrationTest0021 struct{}

func (m migrationTest0021) InsertData(db *sql.DB) error {
const addBlock0 = "INSERT INTO state.block (block_num, received_at, block_hash) VALUES (0, now(), '0x0')"
if _, err := db.Exec(addBlock0); err != nil {
return err
}

const addBatch0 = `
const insertBatch0 = `
INSERT INTO state.batch (batch_num, global_exit_root, local_exit_root, acc_input_hash, state_root, timestamp, coinbase, raw_txs_data, forced_batch_num, wip)
VALUES (0,'0x0000', '0x0000', '0x0000', '0x0000', now(), '0x0000', null, null, true)`
if _, err := db.Exec(addBatch0); err != nil {
return err
}

const addL2Block = "INSERT INTO state.l2block (block_num, block_hash, header, uncles, parent_hash, state_root, received_at, batch_num, created_at) VALUES ($1, $2, '{}', '{}', '0x0', '0x0', now(), 0, now())"
const addTransaction = "INSERT INTO state.transaction (hash, encoded, decoded, l2_block_num, effective_percentage, l2_hash) VALUES ($1, 'ABCDEF', '{}', $2, 255, $1)"
const addReceipt = "INSERT INTO state.receipt (tx_hash, type, post_state, status, cumulative_gas_used, gas_used, effective_gas_price, block_num, tx_index, contract_address) VALUES ($1, 1, null, 1, 1234, 1234, 1, $2, $3, '')"

txUnique := 0
for tci, testCase := range m.TestCases {
blockNum := uint64(tci + 1)
blockHash := common.HexToHash(hex.EncodeUint64(blockNum)).String()
if _, err := db.Exec(addL2Block, blockNum, blockHash); err != nil {
return err
}
for _, tx := range testCase.Block.Transactions {
txUnique++
txHash := common.HexToHash(hex.EncodeUint64(uint64(txUnique))).String()
if _, err := db.Exec(addTransaction, txHash, blockNum); err != nil {
return err
}
if _, err := db.Exec(addReceipt, txHash, blockNum, tx.CurrentIndex); err != nil {
return err
}
}
// insert batch
_, err := db.Exec(insertBatch0)
if err != nil {
return err
}

return nil
}

func (m migrationTest0021) RunAssertsAfterMigrationUp(t *testing.T, db *sql.DB) {
const getReceiptsByBlock = "SELECT r.tx_index FROM state.receipt r WHERE r.block_num = $1 ORDER BY r.tx_index"
var result int

for tci := range m.TestCases {
blockNum := uint64(tci + 1)
// Check column high_reserved_counters exists in state.batch table
const getColumn = `SELECT count(*) FROM information_schema.columns WHERE table_name='batch' and column_name='high_reserved_counters'`
row := db.QueryRow(getColumn)
assert.NoError(t, row.Scan(&result))
assert.Equal(t, 1, result)

rows, err := db.Query(getReceiptsByBlock, blockNum)
require.NoError(t, err)
const insertBatch0 = `
INSERT INTO state.batch (batch_num, global_exit_root, local_exit_root, acc_input_hash, state_root, timestamp, coinbase, raw_txs_data, forced_batch_num, wip, high_reserved_counters)
VALUES (1,'0x0001', '0x0001', '0x0001', '0x0001', now(), '0x0001', null, null, true, '{"Steps": 1890125}')`

var expectedIndex = uint(0)
var txIndex uint
for rows.Next() {
err := rows.Scan(&txIndex)
require.NoError(t, err)
require.Equal(t, expectedIndex, txIndex)
expectedIndex++
}
}
// insert batch 1
_, err := db.Exec(insertBatch0)
assert.NoError(t, err)

const insertBatch1 = `
INSERT INTO state.batch (batch_num, global_exit_root, local_exit_root, acc_input_hash, state_root, timestamp, coinbase, raw_txs_data, forced_batch_num, wip, high_reserved_counters)
VALUES (2,'0x0002', '0x0002', '0x0002', '0x0002', now(), '0x0002', null, null, false, '{"Steps": 1890125}')`

// insert batch 2
_, err = db.Exec(insertBatch1)
assert.NoError(t, err)
}

func (m migrationTest0021) RunAssertsAfterMigrationDown(t *testing.T, db *sql.DB) {
m.RunAssertsAfterMigrationUp(t, db)
var result int

// Check column high_reserved_counters doesn't exists in state.batch table
const getCheckedColumn = `SELECT count(*) FROM information_schema.columns WHERE table_name='batch' and column_name='high_reserved_counters'`
row := db.QueryRow(getCheckedColumn)
assert.NoError(t, row.Scan(&result))
assert.Equal(t, 0, result)
}

func TestMigration0021(t *testing.T) {
runMigrationTest(t, 21, migrationTest0021{
TestCases: []migrationTest0021TestCase{
{
Name: "single tx with correct index",
Block: migrationTest0021TestCaseBlock{
Transactions: []migrationTest0021TestCaseTransaction{
{CurrentIndex: 0},
},
},
},
{
Name: "multiple txs indexes are correct",
Block: migrationTest0021TestCaseBlock{
Transactions: []migrationTest0021TestCaseTransaction{
{CurrentIndex: 0},
{CurrentIndex: 1},
{CurrentIndex: 2},
},
},
},
{
Name: "single tx with wrong tx index",
Block: migrationTest0021TestCaseBlock{
Transactions: []migrationTest0021TestCaseTransaction{
{CurrentIndex: 3},
},
},
},
{
Name: "multiple txs missing 0 index",
Block: migrationTest0021TestCaseBlock{
Transactions: []migrationTest0021TestCaseTransaction{
{CurrentIndex: 1},
{CurrentIndex: 2},
{CurrentIndex: 3},
{CurrentIndex: 4},
},
},
},
{
Name: "multiple has index 0 but also txs index gap",
Block: migrationTest0021TestCaseBlock{
Transactions: []migrationTest0021TestCaseTransaction{
{CurrentIndex: 0},
{CurrentIndex: 2},
{CurrentIndex: 4},
{CurrentIndex: 6},
},
},
},
},
})
runMigrationTest(t, 21, migrationTest0021{})
}
27 changes: 20 additions & 7 deletions db/migrations/state/0022.sql
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
-- +migrate Up

-- +migrate Up
ALTER TABLE state.exit_root
ADD COLUMN IF NOT EXISTS l1_info_tree_recursive_index BIGINT DEFAULT NULL UNIQUE;
CREATE INDEX IF NOT EXISTS idx_exit_root_l1_info_tree_recursive_index ON state.exit_root (l1_info_tree_recursive_index);
-- the update below fix the wrong receipt TX indexes
WITH map_fix_tx_index AS (
SELECT t.l2_block_num AS block_num
, t.hash AS tx_hash
, r.tx_index AS current_index
, (ROW_NUMBER() OVER (PARTITION BY t.l2_block_num ORDER BY r.tx_index))-1 AS correct_index
FROM state.receipt r
INNER JOIN state."transaction" t
ON t.hash = r.tx_hash
)
UPDATE state.receipt AS r
SET tx_index = m.correct_index
FROM map_fix_tx_index m
WHERE m.block_num = r.block_num
AND m.tx_hash = r.tx_hash
AND m.current_index = r.tx_index
AND m.current_index != m.correct_index;


-- +migrate Down
ALTER TABLE state.exit_root
DROP COLUMN IF EXISTS l1_info_tree_recursive_index;
DROP INDEX IF EXISTS state.idx_exit_root_l1_info_tree_recursive_index;

-- no action is needed, the data fixed by the
-- migrate up must remain fixed
Loading

0 comments on commit a5f9558

Please sign in to comment.