Skip to content

Commit

Permalink
Feature/0xPolygonHermez#1676 trusted reorg (0xPolygonHermez#1677)
Browse files Browse the repository at this point in the history
* db schema + tests

* sync without tests

* add queries

* reorg pool synchronizer

* add queries

* detect l2 reorg

* Synchronizer implemented

* DeleteTxs name fixed

* linter

* pre execute txs

* implement full preprocess

* fix migration

* fix migration

* fixes

* synchronization bug fixed

* fix empty timestamp

* pre execution as unsigned transaction

* fix and test

---------

Co-authored-by: ToniRamirezM <[email protected]>
  • Loading branch information
ARR552 and ToniRamirezM authored Feb 21, 2023
1 parent 3ce6dbe commit 67a1cf7
Show file tree
Hide file tree
Showing 26 changed files with 512 additions and 272 deletions.
7 changes: 4 additions & 3 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ func start(cliCtx *cli.Context) error {
go runJSONRPCServer(*c, poolInstance, st, apis)
case SYNCHRONIZER:
log.Info("Running synchronizer")
go runSynchronizer(*c, etherman, etm, st)
poolInstance := createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st)
go runSynchronizer(*c, etherman, etm, st, poolInstance)
case BROADCAST:
log.Info("Running broadcast service")
go runBroadcastServer(c.BroadcastServer, st)
Expand Down Expand Up @@ -188,8 +189,8 @@ func newEtherman(c config.Config) (*etherman.Client, error) {
return etherman, nil
}

func runSynchronizer(cfg config.Config, etherman *etherman.Client, ethTxManager *ethtxmanager.Client, st *state.State) {
sy, err := synchronizer.NewSynchronizer(cfg.IsTrustedSequencer, etherman, st, ethTxManager, cfg.NetworkConfig.Genesis, cfg.Synchronizer)
func runSynchronizer(cfg config.Config, etherman *etherman.Client, ethTxManager *ethtxmanager.Client, st *state.State, pool *pool.Pool) {
sy, err := synchronizer.NewSynchronizer(cfg.IsTrustedSequencer, etherman, st, pool, ethTxManager, cfg.NetworkConfig.Genesis, cfg.Synchronizer)
if err != nil {
log.Fatal(err)
}
Expand Down
9 changes: 9 additions & 0 deletions db/migrations/state/0002.sql
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ CREATE INDEX IF NOT EXISTS log_address_idx ON state.log (address);
ALTER TABLE state.virtual_batch
ADD COLUMN sequencer_addr VARCHAR DEFAULT '0x0000000000000000000000000000000000000000';

CREATE TABLE IF NOT EXISTS state.trusted_reorg
(
timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
batch_num BIGINT,
reason VARCHAR NOT NULL
);

-- +migrate Down
UPDATE state.proof
SET prover = prover_id;
Expand All @@ -51,3 +58,5 @@ DROP INDEX IF EXISTS state.log_address_idx;

ALTER TABLE state.virtual_batch
DROP COLUMN IF EXISTS sequencer_addr;

DROP TABLE IF EXISTS state.trusted_reorg;
10 changes: 10 additions & 0 deletions db/migrations/state/0002_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ func (m migrationTest0002) RunAssertsAfterMigrationUp(t *testing.T, db *sql.DB)
VALUES (2, '0x29e885edaf8e4b51e1d2e05f9da28161d2fb4f6b1d53827d9b80a23cf2d7d9f1', '0x514910771af9ca656af840dff83e8264ecf986ca', 1, '0x514910771af9ca656af840dff83e8264ecf986ca');`
_, err = db.Exec(insertVirtualBatch)
assert.NoError(t, err)
// Insert reorg
const insertReorg = `INSERT INTO state.trusted_reorg (batch_num, reason)
VALUES (2, 'reason of the trusted reorg');`
_, err = db.Exec(insertReorg)
assert.NoError(t, err)
}

func (m migrationTest0002) RunAssertsAfterMigrationDown(t *testing.T, db *sql.DB) {
Expand Down Expand Up @@ -122,6 +127,11 @@ func (m migrationTest0002) RunAssertsAfterMigrationDown(t *testing.T, db *sql.DB
VALUES (3, '0x29e885edaf8e4b51e1d2e05f9da28161d2fb4f6b1d53827d9b80a23cf2d7d9f1', '0x514910771af9ca656af840dff83e8264ecf986ca', 1);`
_, err = db.Exec(insertVirtualBatch)
assert.NoError(t, err)
// Insert reorg
const insertReorg = `INSERT INTO state.trusted_reorg (batch_num, reason)
VALUES (2, 'reason of the trusted reorg');`
_, err = db.Exec(insertReorg)
assert.Error(t, err)
}

func TestMigration0002(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion pool/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
type storage interface {
AddTx(ctx context.Context, tx Transaction) error
CountTransactionsByStatus(ctx context.Context, status TxStatus) (uint64, error)
DeleteTxsByHashes(ctx context.Context, hashes []common.Hash) error
DeleteTransactionsByHashes(ctx context.Context, hashes []common.Hash) error
GetGasPrice(ctx context.Context) (uint64, error)
GetNonce(ctx context.Context, address common.Address) (uint64, error)
GetPendingTxHashesSince(ctx context.Context, since time.Time) ([]common.Hash, error)
Expand All @@ -38,4 +38,5 @@ type stateInterface interface {
GetLastL2BlockNumber(ctx context.Context, dbTx pgx.Tx) (uint64, error)
GetNonce(ctx context.Context, address common.Address, batchNumber uint64, dbTx pgx.Tx) (uint64, error)
GetTransactionByHash(ctx context.Context, transactionHash common.Hash, dbTx pgx.Tx) (*types.Transaction, error)
PreProcessTransaction(ctx context.Context, tx *types.Transaction, forcedNonce uint64, dbTx pgx.Tx) (*state.ProcessBatchResponse, error)
}
11 changes: 7 additions & 4 deletions pool/pgpoolstorage/pgpoolstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,8 @@ func (p *PostgresPoolStorage) UpdateTxsStatus(ctx context.Context, hashes []stri
return nil
}

// DeleteTxsByHashes deletes txs by their hashes
func (p *PostgresPoolStorage) DeleteTxsByHashes(ctx context.Context, hashes []common.Hash) error {
// DeleteTransactionsByHashes deletes txs by their hashes
func (p *PostgresPoolStorage) DeleteTransactionsByHashes(ctx context.Context, hashes []common.Hash) error {
hh := make([]string, 0, len(hashes))
for _, h := range hashes {
hh = append(hh, h.Hex())
Expand Down Expand Up @@ -501,11 +501,14 @@ func (p *PostgresPoolStorage) GetTxByHash(ctx context.Context, hash common.Hash)
if err := tx.UnmarshalBinary(b); err != nil {
return nil, err
}
return &pool.Transaction{

poolTx := &pool.Transaction{
ReceivedAt: receivedAt,
Status: pool.TxStatus(status),
Transaction: *tx,
}, nil
}

return poolTx, nil
}

func scanTx(rows pgx.Rows) (*pool.Transaction, error) {
Expand Down
64 changes: 36 additions & 28 deletions pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/0xPolygonHermez/zkevm-node/state"
"github.com/0xPolygonHermez/zkevm-node/state/runtime/executor"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -76,9 +77,38 @@ func (p *Pool) AddTx(ctx context.Context, tx types.Transaction) error {

poolTx.IsClaims = poolTx.IsClaimTx(p.l2BridgeAddr, p.cfg.FreeClaimGasLimit)

// Execute transaction to calculate its zkCounters
zkCounters, err := p.PreExecuteTx(ctx, tx)
if err == nil {
poolTx.ZKCounters = zkCounters
}

if executor.IsExecutorOutOfCountersError(executor.ExecutorErrorCode(err)) {
return err
}

return p.storage.AddTx(ctx, poolTx)
}

// PreExecuteTx executes a transaction to calculate its zkCounters
func (p *Pool) PreExecuteTx(ctx context.Context, tx types.Transaction) (state.ZKCounters, error) {
sender, err := state.GetSender(tx)
if err != nil {
return state.ZKCounters{}, err
}

nonce, err := p.storage.GetNonce(ctx, sender)
if err != nil {
return state.ZKCounters{}, err
}

processBatchResponse, err := p.state.PreProcessTransaction(ctx, &tx, nonce, nil)
if err != nil {
return state.ZKCounters{}, err
}
return processBatchResponse.UsedZkCounters, nil
}

// GetPendingTxs from the pool
// limit parameter is used to limit amount of pending txs from the db,
// if limit = 0, then there is no limit
Expand Down Expand Up @@ -246,35 +276,13 @@ func (p *Pool) checkTxFieldCompatibilityWithExecutor(ctx context.Context, tx typ
return nil
}

// MarkReorgedTxsAsPending updated reorged txs status from selected to pending
func (p *Pool) MarkReorgedTxsAsPending(ctx context.Context) error {
// TODO: Change status to "reorged"
// DeleteReorgedTransactions deletes transactions from the pool
func (p *Pool) DeleteReorgedTransactions(ctx context.Context, transactions []*types.Transaction) error {
hashes := []common.Hash{}

// get selected transactions from pool
selectedTxs, err := p.GetSelectedTxs(ctx, 0)
if err != nil {
return err
}

txsHashesToUpdate := []string{}
// look for non existent transactions on state
for _, selectedTx := range selectedTxs {
txHash := selectedTx.Hash()
_, err := p.state.GetTransactionByHash(ctx, txHash, nil)
if errors.Is(err, state.ErrNotFound) {
txsHashesToUpdate = append(txsHashesToUpdate, txHash.String())
} else if err != nil {
return err
}
for _, tx := range transactions {
hashes = append(hashes, tx.Hash())
}

// revert pool state from selected to pending on the pool
err = p.UpdateTxsStatus(ctx, txsHashesToUpdate, TxStatusPending)
if err != nil {
return err
}

return nil
return p.storage.DeleteTransactionsByHashes(ctx, hashes)
}

// TODO: Create a method for the synchronizer to update Tx Statuses to "pending" or "reorged"
79 changes: 8 additions & 71 deletions pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func Test_AddTx(t *testing.T) {
t.Error(err)
}

rows, err := poolSqlDB.Query(ctx, "SELECT hash, encoded, decoded, status FROM pool.transaction")
rows, err := poolSqlDB.Query(ctx, "SELECT hash, encoded, decoded, status, used_steps FROM pool.transaction")
defer rows.Close() // nolint:staticcheck
if err != nil {
t.Error(err)
Expand All @@ -133,7 +133,8 @@ func Test_AddTx(t *testing.T) {
c := 0
for rows.Next() {
var hash, encoded, decoded, status string
err := rows.Scan(&hash, &encoded, &decoded, &status)
var usedSteps int
err := rows.Scan(&hash, &encoded, &decoded, &status, &usedSteps)
if err != nil {
t.Error(err)
}
Expand All @@ -143,6 +144,7 @@ func Test_AddTx(t *testing.T) {
assert.Equal(t, txRLPHash, encoded, "invalid encoded")
assert.JSONEq(t, string(b), decoded, "invalid decoded")
assert.Equal(t, string(pool.TxStatusPending), status, "invalid tx status")
assert.Greater(t, usedSteps, 0, "invalid used steps")
c++
}

Expand Down Expand Up @@ -669,71 +671,6 @@ func Test_SetAndGetGasPrice(t *testing.T) {
assert.Equal(t, expectedGasPrice, gasPrice)
}

func TestMarkReorgedTxsAsPending(t *testing.T) {
initOrResetDB()
ctx := context.Background()
stateSqlDB, err := db.NewSQLDB(stateDBCfg)
if err != nil {
t.Error(err)
}
defer stateSqlDB.Close() //nolint:gosec,errcheck

st := newState(stateSqlDB)

genesisBlock := state.Block{
BlockNumber: 0,
BlockHash: state.ZeroHash,
ParentHash: state.ZeroHash,
ReceivedAt: time.Now(),
}
dbTx, err := st.BeginStateTransaction(ctx)
require.NoError(t, err)
_, err = st.SetGenesis(ctx, genesisBlock, genesis, dbTx)
require.NoError(t, err)
require.NoError(t, dbTx.Commit(ctx))

s, err := pgpoolstorage.NewPostgresPoolStorage(poolDBCfg)
if err != nil {
t.Error(err)
}
cfg := pool.Config{
FreeClaimGasLimit: 150000,
}
p := pool.NewPool(cfg, s, st, common.Address{}, chainID.Uint64())

privateKey, err := crypto.HexToECDSA(strings.TrimPrefix(senderPrivateKey, "0x"))
require.NoError(t, err)

auth, err := bind.NewKeyedTransactorWithChainID(privateKey, chainID)
require.NoError(t, err)

tx1 := types.NewTransaction(uint64(0), common.Address{}, big.NewInt(10), uint64(1), big.NewInt(10), []byte{})
signedTx1, err := auth.Signer(auth.From, tx1)
require.NoError(t, err)
if err := p.AddTx(ctx, *signedTx1); err != nil {
t.Error(err)
}

tx2 := types.NewTransaction(uint64(1), common.Address{}, big.NewInt(10), uint64(1), big.NewInt(10), []byte{})
signedTx2, err := auth.Signer(auth.From, tx2)
require.NoError(t, err)
if err := p.AddTx(ctx, *signedTx2); err != nil {
t.Error(err)
}

err = p.UpdateTxsStatus(ctx, []string{signedTx1.Hash().String(), signedTx2.Hash().String()}, pool.TxStatusSelected)
if err != nil {
t.Error(err)
}

err = p.MarkReorgedTxsAsPending(ctx)
require.NoError(t, err)
txs, err := p.GetPendingTxs(ctx, false, 100)
require.NoError(t, err)
require.Equal(t, signedTx1.Hash().Hex(), txs[1].Hash().Hex())
require.Equal(t, signedTx2.Hash().Hex(), txs[0].Hash().Hex())
}

func TestGetPendingTxSince(t *testing.T) {
initOrResetDB()

Expand Down Expand Up @@ -834,7 +771,7 @@ func TestGetPendingTxSince(t *testing.T) {
assert.Equal(t, 0, len(txHashes))
}

func Test_DeleteTxsByHashes(t *testing.T) {
func Test_DeleteTransactionsByHashes(t *testing.T) {
ctx := context.Background()
initOrResetDB()
stateSqlDB, err := db.NewSQLDB(stateDBCfg)
Expand Down Expand Up @@ -892,7 +829,7 @@ func Test_DeleteTxsByHashes(t *testing.T) {
t.Error(err)
}

err = p.DeleteTxsByHashes(ctx, []common.Hash{signedTx1.Hash(), signedTx2.Hash()})
err = p.DeleteTransactionsByHashes(ctx, []common.Hash{signedTx1.Hash(), signedTx2.Hash()})
if err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -1039,14 +976,14 @@ func Test_TryAddIncompatibleTxs(t *testing.T) {
func newState(sqlDB *pgxpool.Pool) *state.State {
ctx := context.Background()
stateDb := state.NewPostgresStorage(sqlDB)
zkProverURI := testutils.GetEnv("ZKPROVER_URI", "localhost")
zkProverURI := testutils.GetEnv("ZKPROVER_URI", "34.245.104.156")

executorServerConfig := executor.Config{URI: fmt.Sprintf("%s:50071", zkProverURI)}
mtDBServerConfig := merkletree.Config{URI: fmt.Sprintf("%s:50061", zkProverURI)}
executorClient, _, _ := executor.NewExecutorClient(ctx, executorServerConfig)
stateDBClient, _, _ := merkletree.NewMTDBServiceClient(ctx, mtDBServerConfig)
stateTree := merkletree.NewStateTree(stateDBClient)
st := state.NewState(state.Config{MaxCumulativeGasUsed: 800000}, stateDb, executorClient, stateTree)
st := state.NewState(state.Config{MaxCumulativeGasUsed: 800000, ChainID: chainID.Uint64(), CurrentForkID: 1}, stateDb, executorClient, stateTree)
return st
}

Expand Down
5 changes: 3 additions & 2 deletions pool/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ type Transaction struct {
Status TxStatus
IsClaims bool
state.ZKCounters
FailedCounter uint64
ReceivedAt time.Time
FailedCounter uint64
ReceivedAt time.Time
PreprocessedStateRoot common.Hash
}

// IsClaimTx checks, if tx is a claim tx
Expand Down
Loading

0 comments on commit 67a1cf7

Please sign in to comment.