Skip to content

Commit

Permalink
WaitTxToBeSynced (0xPolygonHermez#1418)
Browse files Browse the repository at this point in the history
* WaitTxToBeSynched

* WaitTxToBeSynched

* WaitTxToBeSynched

* Properly wait

* synched -> synced

* synched -> synced

* add unit test

* add unit test
  • Loading branch information
ToniRamirezM authored Nov 28, 2022
1 parent 0942c10 commit 4fe5a01
Show file tree
Hide file tree
Showing 13 changed files with 192 additions and 8 deletions.
2 changes: 1 addition & 1 deletion cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func start(cliCtx *cli.Context) error {
ctx := context.Background()
st := newState(ctx, c, l2ChainID, stateSqlDB)

ethTxManager := ethtxmanager.New(c.EthTxManager, etherman)
ethTxManager := ethtxmanager.New(c.EthTxManager, etherman, st)

for _, item := range cliCtx.StringSlice(config.FlagComponents) {
switch item {
Expand Down
4 changes: 4 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ func Test_Defaults(t *testing.T) {
path: "EthTxManager.WaitTxToBeMined",
expectedValue: types.NewDuration(2 * time.Minute),
},
{
path: "EthTxManager.WaitTxToBeSynced",
expectedValue: types.NewDuration(10 * time.Second),
},
{
path: "EthTxManager.PercentageToIncreaseGasPrice",
expectedValue: uint64(10),
Expand Down
1 change: 1 addition & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ MaxVerifyBatchTxRetries = 10
FrequencyForResendingFailedSendBatches = "1s"
FrequencyForResendingFailedVerifyBatch = "1s"
WaitTxToBeMined = "2m"
WaitTxToBeSynced = "10s"
PercentageToIncreaseGasPrice = 10
PercentageToIncreaseGasLimit = 10
Expand Down
1 change: 1 addition & 0 deletions config/environments/local/debug.node.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ MaxVerifyBatchTxRetries = 10
FrequencyForResendingFailedSendBatches = "1s"
FrequencyForResendingFailedVerifyBatch = "1s"
WaitTxToBeMined = "2m"
WaitTxToBeSynced = "10s"
PercentageToIncreaseGasPrice = 10
PercentageToIncreaseGasLimit = 10

Expand Down
1 change: 1 addition & 0 deletions config/environments/local/local.node.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ MaxVerifyBatchTxRetries = 10
FrequencyForResendingFailedSendBatches = "1s"
FrequencyForResendingFailedVerifyBatch = "1s"
WaitTxToBeMined = "2m"
WaitTxToBeSynced = "10s"
PercentageToIncreaseGasPrice = 10
PercentageToIncreaseGasLimit = 10

Expand Down
2 changes: 2 additions & 0 deletions ethtxmanager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ type Config struct {
FrequencyForResendingFailedVerifyBatch types.Duration `mapstructure:"FrequencyForResendingFailedVerifyBatch"`
// WaitTxToBeMined time to wait after transaction was sent to the ethereum
WaitTxToBeMined types.Duration `mapstructure:"WaitTxToBeMined"`
// WaitTxToBeSynced time to wait after transaction was sent to the ethereum to get into the state
WaitTxToBeSynced types.Duration `mapstructure:"WaitTxToBeSynced"`
// PercentageToIncreaseGasPrice when tx is failed by timeout increase gas price by this percentage
PercentageToIncreaseGasPrice uint64 `mapstructure:"PercentageToIncreaseGasPrice"`
// PercentageToIncreaseGasLimit when tx is failed by timeout increase gas price by this percentage
Expand Down
9 changes: 5 additions & 4 deletions ethtxmanager/ethtxmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ const oneHundred = 100
type Client struct {
cfg Config
ethMan etherman
state state
}

// New creates new eth tx manager
func New(cfg Config, ethMan etherman) *Client {
func New(cfg Config, ethMan etherman, state state) *Client {
return &Client{
cfg: cfg,
ethMan: ethMan,
state: state,
}
}

Expand Down Expand Up @@ -87,7 +89,7 @@ func (c *Client) SequenceBatches(ctx context.Context, sequences []ethmanTypes.Se
return fmt.Errorf("tx %s failed, err: %w", tx.Hash(), err)
} else {
log.Infof("sequence sent to L1 successfully. Tx hash: %s", tx.Hash())
return nil
return c.state.WaitSequencingTxToBeSynced(ctx, tx, c.cfg.WaitTxToBeSynced.Duration)
}
}
return nil
Expand Down Expand Up @@ -146,8 +148,7 @@ func (c *Client) VerifyBatch(ctx context.Context, batchNum uint64, resGetProof *
return fmt.Errorf("tx %s failed, err: %w", tx.Hash(), err)
} else {
log.Infof("batch verification sent to L1 successfully. Tx hash: %s", tx.Hash())
time.Sleep(c.cfg.FrequencyForResendingFailedVerifyBatch.Duration)
return nil
return c.state.WaitVerifiedBatchToBeSynced(ctx, batchNum, c.cfg.WaitTxToBeSynced.Duration)
}
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions ethtxmanager/ethtxmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestIncreaseGasPrice(t *testing.T) {

func TestSequenceBatchesWithROEthman(t *testing.T) {
ethManRO, _, _, _, _ := ethman.NewSimulatedEtherman(ethman.Config{}, nil)
txMan := New(Config{MaxSendBatchTxRetries: 2}, ethManRO) // 3 executions in total
txMan := New(Config{MaxSendBatchTxRetries: 2}, ethManRO, nil) // 3 executions in total

err := txMan.SequenceBatches(context.Background(), []ethmanTypes.Sequence{})

Expand All @@ -31,7 +31,7 @@ func TestSequenceBatchesWithROEthman(t *testing.T) {

func TestVerifyBatchWithROEthman(t *testing.T) {
ethManRO, _, _, _, _ := ethman.NewSimulatedEtherman(ethman.Config{}, nil)
txMan := New(Config{MaxVerifyBatchTxRetries: 2}, ethManRO) // 3 executions in total
txMan := New(Config{MaxVerifyBatchTxRetries: 2}, ethManRO, nil) // 3 executions in total

err := txMan.VerifyBatch(context.Background(), 42, nil)

Expand Down
5 changes: 5 additions & 0 deletions ethtxmanager/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,8 @@ type etherman interface {
GetTxReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error)
WaitTxToBeMined(ctx context.Context, tx *types.Transaction, timeout time.Duration) error
}

type state interface {
WaitSequencingTxToBeSynced(parentCtx context.Context, tx *types.Transaction, timeout time.Duration) error
WaitVerifiedBatchToBeSynced(parentCtx context.Context, batchNumber uint64, timeout time.Duration) error
}
2 changes: 1 addition & 1 deletion sequencer/sequencer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func TestSequenceTooBig(t *testing.T) {
state := st.NewState(stateCfg, stateDb, executorClient, stateTree)

pool := pool.NewPool(poolDb, state, CONFIG_ADDRESSES[CONFIG_NAME_GER], big.NewInt(CONFIG_CHAIN_ID).Uint64())
ethtxmanager := ethtxmanager.New(ethtxmanager.Config{}, eth_man)
ethtxmanager := ethtxmanager.New(ethtxmanager.Config{}, eth_man, state)
gpe := gasprice.NewDefaultEstimator(gasprice.Config{
Type: gasprice.DefaultType,
DefaultGasPriceWei: 1000000000,
Expand Down
12 changes: 12 additions & 0 deletions state/pgstatestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,18 @@ func (p *PostgresStorage) IsBatchVirtualized(ctx context.Context, batchNumber ui
return exists, nil
}

// IsSequencingTXSynced checks if sequencing tx has been synced into the state
func (p *PostgresStorage) IsSequencingTXSynced(ctx context.Context, transactionHash common.Hash, dbTx pgx.Tx) (bool, error) {
const query = `SELECT EXISTS (SELECT 1 FROM state.virtual_batch WHERE tx_hash = $1)`
e := p.getExecQuerier(dbTx)
var exists bool
err := e.QueryRow(ctx, query, transactionHash.String()).Scan(&exists)
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
return exists, err
}
return exists, nil
}

// GetProcessingContext returns the processing context for the given batch.
func (p *PostgresStorage) GetProcessingContext(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*ProcessingContext, error) {
e := p.getExecQuerier(dbTx)
Expand Down
48 changes: 48 additions & 0 deletions state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -1302,3 +1302,51 @@ func DetermineProcessedTransactions(responses []*ProcessTransactionResponse) (
}
return processedTxResponses, processedTxsHashes, unprocessedTxResponses, unprocessedTxsHashes
}

// WaitSequencingTxToBeSynced waits for a sequencing transaction to be synced into the state
func (s *State) WaitSequencingTxToBeSynced(parentCtx context.Context, tx *types.Transaction, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(parentCtx, timeout)
defer cancel()

for {
virtualized, err := s.IsSequencingTXSynced(ctx, tx.Hash(), nil)
if err != nil && err != ErrNotFound {
log.Errorf("error waiting sequencing tx %s to be synced: %w", tx.Hash().String(), err)
return err
} else if ctx.Err() != nil {
log.Errorf("error waiting sequencing tx %s to be synced: %w", tx.Hash().String(), err)
return ctx.Err()
} else if virtualized {
break
}

time.Sleep(time.Second)
}

log.Debug("Sequencing txh successfully synced: ", tx.Hash().String())
return nil
}

// WaitVerifiedBatchToBeSynced waits for a sequenced batch to be synced into the state
func (s *State) WaitVerifiedBatchToBeSynced(parentCtx context.Context, batchNumber uint64, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(parentCtx, timeout)
defer cancel()

for {
batch, err := s.GetVerifiedBatch(ctx, batchNumber, nil)
if err != nil && err != ErrNotFound {
log.Errorf("error waiting verified batch %s to be synced: %w", batchNumber, err)
return err
} else if ctx.Err() != nil {
log.Errorf("error waiting verified batch %s to be synced: %w", batchNumber, err)
return ctx.Err()
} else if batch != nil {
break
}

time.Sleep(time.Second)
}

log.Debug("Verified batch successfully synced: ", batchNumber)
return nil
}
109 changes: 109 additions & 0 deletions state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2541,3 +2541,112 @@ func TestExecutorGasEstimationMultisig(t *testing.T) {
assert.Equal(t, executorclientpb.Error_ERROR_NO_ERROR, processBatchResponse.Responses[0].Error)
log.Debugf("Used gas = %v", processBatchResponse.Responses[0].GasUsed)
}

func TestWaitSequencingTxToBeSyncedAndWaitVerifiedBatchToBeSynced(t *testing.T) {
ctx := context.Background()

// Set Genesis
block := state.Block{
BlockNumber: 0,
BlockHash: state.ZeroHash,
ParentHash: state.ZeroHash,
ReceivedAt: time.Now(),
}

genesis := state.Genesis{
Actions: []*state.GenesisAction{
{
Address: "0x617b3a3528F9cDd6630fd3301B9c8911F7Bf063D",
Type: int(merkletree.LeafTypeBalance),
Value: "100000000000000000000000",
},
{
Address: "0x70997970C51812dc3A010C7d01b50e0d17dc79C8",
Type: int(merkletree.LeafTypeBalance),
Value: "100000000000000000000000",
},
{
Address: "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266",
Type: int(merkletree.LeafTypeBalance),
Value: "100000000000000000000000",
},
},
}

initOrResetDB()

dbTx, err := testState.BeginStateTransaction(ctx)
require.NoError(t, err)
_, err = testState.SetGenesis(ctx, block, genesis, dbTx)
require.NoError(t, err)
require.NoError(t, dbTx.Commit(ctx))

// Dummy transaction
tx := types.NewTx(&types.LegacyTx{
Nonce: 1,
To: nil,
Value: new(big.Int),
Gas: uint64(1234),
GasPrice: new(big.Int).SetUint64(0),
Data: common.Hex2Bytes("0x00"),
})

err = testState.WaitSequencingTxToBeSynced(ctx, tx, 1*time.Second)
require.Error(t, err)

processingContext := state.ProcessingContext{
BatchNumber: 1,
Coinbase: common.Address{},
Timestamp: time.Now(),
GlobalExitRoot: common.Hash{},
}

dbTx, err = testState.BeginStateTransaction(ctx)
require.NoError(t, err)

err = testState.OpenBatch(ctx, processingContext, dbTx)
require.NoError(t, err)

err = testState.StoreTransactions(ctx, 1, nil, dbTx)
require.NoError(t, err)

processingReceipt := state.ProcessingReceipt{
BatchNumber: 1,
StateRoot: common.Hash{},
LocalExitRoot: common.Hash{},
}

err = testState.CloseBatch(ctx, processingReceipt, dbTx)
require.NoError(t, err)

virtualBatch := state.VirtualBatch{
BatchNumber: 1,
TxHash: tx.Hash(),
Coinbase: common.HexToAddress("0x00"),
BlockNumber: 0,
}

err = testState.AddVirtualBatch(ctx, &virtualBatch, dbTx)
require.NoError(t, err)
require.NoError(t, dbTx.Commit(ctx))

err = testState.WaitSequencingTxToBeSynced(ctx, tx, 5*time.Second)
require.NoError(t, err)

// VerifiedBatch
err = testState.WaitVerifiedBatchToBeSynced(ctx, 1, 1*time.Second)
require.Error(t, err)

verifiedBatch := state.VerifiedBatch{
BlockNumber: 0,
BatchNumber: 1,
Aggregator: common.Address{},
TxHash: common.Hash{},
}

err = testState.AddVerifiedBatch(ctx, &verifiedBatch, nil)
require.NoError(t, err)

err = testState.WaitVerifiedBatchToBeSynced(ctx, 1, 5*time.Second)
require.NoError(t, err)
}

0 comments on commit 4fe5a01

Please sign in to comment.