Skip to content

Commit

Permalink
Add reserved zk counters to pool transactions (0xPolygonHermez#3346)
Browse files Browse the repository at this point in the history
* add reserved zk counters to pool transaction

* add reserved zk counters to pool transaction

* correct migration test
  • Loading branch information
ToniRamirezM authored Feb 22, 2024
1 parent 5c2fcff commit a039a0a
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 35 deletions.
7 changes: 7 additions & 0 deletions db/migrations/pool/0013.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- +migrate Up
ALTER TABLE pool.transaction
ADD COLUMN reserved_zkcounters jsonb;

-- +migrate Down
ALTER TABLE pool.transaction
DROP COLUMN reserved_zkcounters;
50 changes: 50 additions & 0 deletions db/migrations/pool/0013_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package pool_migrations_test

import (
"database/sql"
"testing"

"github.com/0xPolygonHermez/zkevm-node/state"
"github.com/stretchr/testify/require"
)

// this migration adds reserved_zkcounters to the transaction
type migrationTest0013 struct{}

func (m migrationTest0013) InsertData(db *sql.DB) error {
return nil
}

func (m migrationTest0013) RunAssertsAfterMigrationUp(t *testing.T, db *sql.DB) {
var reserved_zkcounters = state.ZKCounters{
GasUsed: 0,
KeccakHashes: 1,
PoseidonHashes: 2,
}

const insertTx = `
INSERT INTO pool.transaction (hash, ip, received_at, from_address, reserved_zkcounters)
VALUES ('0x0001', '127.0.0.1', '2023-12-07', '0x0011', $1)`

_, err := db.Exec(insertTx, reserved_zkcounters)
require.NoError(t, err)
}

func (m migrationTest0013) RunAssertsAfterMigrationDown(t *testing.T, db *sql.DB) {
var reserved_zkcounters = state.ZKCounters{
GasUsed: 0,
KeccakHashes: 1,
PoseidonHashes: 2,
}

const insertTx = `
INSERT INTO pool.transaction (hash, ip, received_at, from_address, reserved_zkcounters)
VALUES ('0x0001', '127.0.0.1', '2023-12-07', '0x0011', $1)`

_, err := db.Exec(insertTx, reserved_zkcounters)
require.Error(t, err)
}

func TestMigration0013(t *testing.T) {
runMigrationTest(t, 13, migrationTest0013{})
}
2 changes: 1 addition & 1 deletion pool/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type storage interface {
GetTxFromAddressFromByHash(ctx context.Context, hash common.Hash) (common.Address, uint64, error)
GetTransactionByHash(ctx context.Context, hash common.Hash) (*Transaction, error)
GetTransactionByL2Hash(ctx context.Context, hash common.Hash) (*Transaction, error)
GetTxZkCountersByHash(ctx context.Context, hash common.Hash) (*state.ZKCounters, error)
GetTxZkCountersByHash(ctx context.Context, hash common.Hash) (*state.ZKCounters, *state.ZKCounters, error)
DeleteTransactionByHash(ctx context.Context, hash common.Hash) error
MarkWIPTxsAsPending(ctx context.Context) error
GetAllAddressesBlocked(ctx context.Context) ([]common.Address, error)
Expand Down
55 changes: 33 additions & 22 deletions pool/pgpoolstorage/pgpoolstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,11 @@ func (p *PostgresPoolStorage) AddTx(ctx context.Context, tx pool.Transaction) er
from_address,
is_wip,
ip,
failed_reason
failed_reason,
reserved_zkcounters
)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, NULL)
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, NULL, $20)
ON CONFLICT (hash) DO UPDATE SET
encoded = $2,
decoded = $3,
Expand All @@ -98,7 +99,8 @@ func (p *PostgresPoolStorage) AddTx(ctx context.Context, tx pool.Transaction) er
from_address = $17,
is_wip = $18,
ip = $19,
failed_reason = NULL
failed_reason = NULL,
reserved_zkcounters = $20
`

// Get FromAddress from the JSON data
Expand Down Expand Up @@ -127,7 +129,8 @@ func (p *PostgresPoolStorage) AddTx(ctx context.Context, tx pool.Transaction) er
tx.ReceivedAt,
fromAddress,
tx.IsWIP,
tx.IP); err != nil {
tx.IP,
tx.ReservedZKCounters); err != nil {
return err
}
return nil
Expand All @@ -144,11 +147,11 @@ func (p *PostgresPoolStorage) GetTxsByStatus(ctx context.Context, status pool.Tx
)
if limit == 0 {
sql = `SELECT encoded, status, received_at, is_wip, ip, cumulative_gas_used, used_keccak_hashes, used_poseidon_hashes, used_poseidon_paddings, used_mem_aligns,
used_arithmetics, used_binaries, used_steps, used_sha256_hashes, failed_reason FROM pool.transaction WHERE status = $1 ORDER BY gas_price DESC`
used_arithmetics, used_binaries, used_steps, used_sha256_hashes, failed_reason, reserved_zkcounters FROM pool.transaction WHERE status = $1 ORDER BY gas_price DESC`
rows, err = p.db.Query(ctx, sql, status.String())
} else {
sql = `SELECT encoded, status, received_at, is_wip, ip, cumulative_gas_used, used_keccak_hashes, used_poseidon_hashes, used_poseidon_paddings, used_mem_aligns,
used_arithmetics, used_binaries, used_steps, used_sha256_hashes, failed_reason FROM pool.transaction WHERE status = $1 ORDER BY gas_price DESC LIMIT $2`
used_arithmetics, used_binaries, used_steps, used_sha256_hashes, failed_reason, reserved_zkcounters FROM pool.transaction WHERE status = $1 ORDER BY gas_price DESC LIMIT $2`
rows, err = p.db.Query(ctx, sql, status.String(), limit)
}
if err != nil {
Expand Down Expand Up @@ -177,7 +180,7 @@ func (p *PostgresPoolStorage) GetNonWIPPendingTxs(ctx context.Context) ([]pool.T
)

sql = `SELECT encoded, status, received_at, is_wip, ip, cumulative_gas_used, used_keccak_hashes, used_poseidon_hashes, used_poseidon_paddings, used_mem_aligns,
used_arithmetics, used_binaries, used_steps, used_sha256_hashes, failed_reason FROM pool.transaction WHERE is_wip IS FALSE and status = $1`
used_arithmetics, used_binaries, used_steps, used_sha256_hashes, failed_reason, reserved_zkcounters FROM pool.transaction WHERE is_wip IS FALSE and status = $1`
rows, err = p.db.Query(ctx, sql, pool.TxStatusPending)

if err != nil {
Expand Down Expand Up @@ -236,7 +239,8 @@ func (p *PostgresPoolStorage) GetTxs(ctx context.Context, filterStatus pool.TxSt
received_at,
nonce,
is_wip,
ip
ip,
reserved_zkcounters
FROM
pool.transaction p1
WHERE
Expand Down Expand Up @@ -265,7 +269,8 @@ func (p *PostgresPoolStorage) GetTxs(ctx context.Context, filterStatus pool.TxSt
received_at,
nonce,
is_wip,
ip
ip,
reserved_zkcounters
FROM
pool.transaction p1
WHERE
Expand All @@ -285,8 +290,9 @@ func (p *PostgresPoolStorage) GetTxs(ctx context.Context, filterStatus pool.TxSt
cumulativeGasUsed uint64
usedKeccakHashes, usedPoseidonHashes, usedPoseidonPaddings,
usedMemAligns, usedArithmetics, usedBinaries, usedSteps, usedSHA256Hashes uint32
nonce uint64
isWIP bool
nonce uint64
isWIP bool
reservedZKCounters state.ZKCounters
)

args := []interface{}{filterStatus, minGasPrice, limit}
Expand Down Expand Up @@ -316,6 +322,7 @@ func (p *PostgresPoolStorage) GetTxs(ctx context.Context, filterStatus pool.TxSt
&nonce,
&isWIP,
&ip,
&reservedZKCounters,
)

if err != nil {
Expand Down Expand Up @@ -345,6 +352,7 @@ func (p *PostgresPoolStorage) GetTxs(ctx context.Context, filterStatus pool.TxSt
}
tx.IsWIP = isWIP
tx.IP = ip
tx.ReservedZKCounters = reservedZKCounters
txs = append(txs, tx)
}

Expand Down Expand Up @@ -511,7 +519,7 @@ func (p *PostgresPoolStorage) IsTxPending(ctx context.Context, hash common.Hash)
// GetTxsByFromAndNonce get all the transactions from the pool with the same from and nonce
func (p *PostgresPoolStorage) GetTxsByFromAndNonce(ctx context.Context, from common.Address, nonce uint64) ([]pool.Transaction, error) {
sql := `SELECT encoded, status, received_at, is_wip, ip, cumulative_gas_used, used_keccak_hashes, used_poseidon_hashes,
used_poseidon_paddings, used_mem_aligns, used_arithmetics, used_binaries, used_steps, used_sha256_hashes, failed_reason
used_poseidon_paddings, used_mem_aligns, used_arithmetics, used_binaries, used_steps, used_sha256_hashes, failed_reason, reserved_zkcounters
FROM pool.transaction
WHERE from_address = $1
AND nonce = $2`
Expand Down Expand Up @@ -682,10 +690,11 @@ func scanTx(rows pgx.Rows) (*pool.Transaction, error) {
usedSteps uint32
usedSHA256Hashes uint32
failedReason *string
reservedZKCounters state.ZKCounters
)

if err := rows.Scan(&encoded, &status, &receivedAt, &isWIP, &ip, &cumulativeGasUsed, &usedKeccakHashes, &usedPoseidonHashes,
&usedPoseidonPaddings, &usedMemAligns, &usedArithmetics, &usedBinaries, &usedSteps, &usedSHA256Hashes, &failedReason); err != nil {
&usedPoseidonPaddings, &usedMemAligns, &usedArithmetics, &usedBinaries, &usedSteps, &usedSHA256Hashes, &failedReason, &reservedZKCounters); err != nil {
return nil, err
}

Expand Down Expand Up @@ -714,6 +723,7 @@ func scanTx(rows pgx.Rows) (*pool.Transaction, error) {
tx.ZKCounters.Steps = usedSteps
tx.ZKCounters.Sha256Hashes_V2 = usedSHA256Hashes
tx.FailedReason = failedReason
tx.ReservedZKCounters = reservedZKCounters

return tx, nil
}
Expand All @@ -728,21 +738,22 @@ func (p *PostgresPoolStorage) DeleteTransactionByHash(ctx context.Context, hash
}

// GetTxZkCountersByHash gets a transaction zkcounters by its hash
func (p *PostgresPoolStorage) GetTxZkCountersByHash(ctx context.Context, hash common.Hash) (*state.ZKCounters, error) {
var zkCounters state.ZKCounters
func (p *PostgresPoolStorage) GetTxZkCountersByHash(ctx context.Context, hash common.Hash) (*state.ZKCounters, *state.ZKCounters, error) {
var usedZKCounters state.ZKCounters
var reservedZKCounters state.ZKCounters

sql := `SELECT cumulative_gas_used, used_keccak_hashes, used_poseidon_hashes, used_poseidon_paddings, used_mem_aligns,
used_arithmetics, used_binaries, used_steps, used_sha256_hashes FROM pool.transaction WHERE hash = $1`
err := p.db.QueryRow(ctx, sql, hash.String()).Scan(&zkCounters.GasUsed, &zkCounters.KeccakHashes,
&zkCounters.PoseidonHashes, &zkCounters.PoseidonPaddings,
&zkCounters.MemAligns, &zkCounters.Arithmetics, &zkCounters.Binaries, &zkCounters.Steps, &zkCounters.Sha256Hashes_V2)
used_arithmetics, used_binaries, used_steps, used_sha256_hashes, reserved_zkcounters FROM pool.transaction WHERE hash = $1`
err := p.db.QueryRow(ctx, sql, hash.String()).Scan(&usedZKCounters.GasUsed, &usedZKCounters.KeccakHashes,
&usedZKCounters.PoseidonHashes, &usedZKCounters.PoseidonPaddings,
&usedZKCounters.MemAligns, &usedZKCounters.Arithmetics, &usedZKCounters.Binaries, &usedZKCounters.Steps, &usedZKCounters.Sha256Hashes_V2, &reservedZKCounters)
if errors.Is(err, pgx.ErrNoRows) {
return nil, pool.ErrNotFound
return nil, nil, pool.ErrNotFound
} else if err != nil {
return nil, err
return nil, nil, err
}

return &zkCounters, nil
return &usedZKCounters, &reservedZKCounters, nil
}

// MarkWIPTxsAsPending updates WIP status to non WIP
Expand Down
14 changes: 9 additions & 5 deletions pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ type Pool struct {
}

type preExecutionResponse struct {
usedZkCounters state.ZKCounters
usedZKCounters state.ZKCounters
reservedZKCounters state.ZKCounters
isExecutorLevelError bool
OOCError error
OOGError error
Expand Down Expand Up @@ -238,7 +239,8 @@ func (p *Pool) StoreTx(ctx context.Context, tx types.Transaction, ip string, isW
}

poolTx := NewTransaction(tx, ip, isWIP)
poolTx.ZKCounters = preExecutionResponse.usedZkCounters
poolTx.ZKCounters = preExecutionResponse.usedZKCounters
poolTx.ReservedZKCounters = preExecutionResponse.reservedZKCounters

return p.storage.AddTx(ctx, *poolTx)
}
Expand Down Expand Up @@ -292,7 +294,7 @@ func (p *Pool) ValidateBreakEvenGasPrice(ctx context.Context, tx types.Transacti

// preExecuteTx executes a transaction to calculate its zkCounters
func (p *Pool) preExecuteTx(ctx context.Context, tx types.Transaction) (preExecutionResponse, error) {
response := preExecutionResponse{usedZkCounters: state.ZKCounters{}, OOCError: nil, OOGError: nil, isReverted: false}
response := preExecutionResponse{usedZKCounters: state.ZKCounters{}, reservedZKCounters: state.ZKCounters{}, OOCError: nil, OOGError: nil, isReverted: false}

// TODO: Add effectivePercentage = 0xFF to the request (factor of 1) when gRPC message is updated
processBatchResponse, err := p.state.PreProcessTransaction(ctx, &tx, nil)
Expand All @@ -309,7 +311,8 @@ func (p *Pool) preExecuteTx(ctx context.Context, tx types.Transaction) (preExecu
response.OOGError = err
}
if processBatchResponse != nil && processBatchResponse.BlockResponses != nil && len(processBatchResponse.BlockResponses) > 0 {
response.usedZkCounters = processBatchResponse.UsedZkCounters
response.usedZKCounters = processBatchResponse.UsedZkCounters
response.reservedZKCounters = processBatchResponse.ReservedZkCounters
response.txResponse = processBatchResponse.BlockResponses[0].TransactionResponses[0]
}
return response, nil
Expand All @@ -335,7 +338,8 @@ func (p *Pool) preExecuteTx(ctx context.Context, tx types.Transaction) (preExecu
}
}

response.usedZkCounters = processBatchResponse.UsedZkCounters
response.usedZKCounters = processBatchResponse.UsedZkCounters
response.reservedZKCounters = processBatchResponse.ReservedZkCounters
response.txResponse = processBatchResponse.BlockResponses[0].TransactionResponses[0]
}

Expand Down
2 changes: 1 addition & 1 deletion sequencer/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type txPool interface {
MarkWIPTxsAsPending(ctx context.Context) error
GetNonWIPPendingTxs(ctx context.Context) ([]pool.Transaction, error)
UpdateTxStatus(ctx context.Context, hash common.Hash, newStatus pool.TxStatus, isWIP bool, failedReason *string) error
GetTxZkCountersByHash(ctx context.Context, hash common.Hash) (*state.ZKCounters, error)
GetTxZkCountersByHash(ctx context.Context, hash common.Hash) (*state.ZKCounters, *state.ZKCounters, error)
UpdateTxWIPStatus(ctx context.Context, hash common.Hash, isWIP bool) error
GetGasPrices(ctx context.Context) (pool.GasPrices, error)
GetDefaultMinGasPriceAllowed() uint64
Expand Down
21 changes: 15 additions & 6 deletions sequencer/mock_pool.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit a039a0a

Please sign in to comment.