Skip to content

Commit

Permalink
Feature/running sequencer (0xPolygonHermez#896)
Browse files Browse the repository at this point in the history
* WIP

* fixing sequencer.go

* fixing sequencer.go

* fixing sequencer.go

* Fix lint and add default address with balance to local genesis

Co-authored-by: Arnau <[email protected]>
  • Loading branch information
Mikelle and arnaubennassar authored Jul 14, 2022
1 parent 980a296 commit 3dd6424
Show file tree
Hide file tree
Showing 10 changed files with 177 additions and 37 deletions.
2 changes: 1 addition & 1 deletion config/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ var (
ChainID: 1337,
Genesis: Genesis{
Balances: map[common.Address]*big.Int{
common.HexToAddress("0x9d98deabc42dd696deb9e40b4f1cab7ddbf55988"): bigIntFromBase10String("100000000000000000000000"),
common.HexToAddress("0x70997970C51812dc3A010C7d01b50e0d17dc79C8"): bigIntFromBase10String("100000000000000000000000"),
},
SmartContracts: map[common.Address][]byte{
common.HexToAddress("0xae4bb80be56b819606589de61d5ec3b522eeb032"): common.Hex2Bytes("608060405234801561001057600080fd5b50600436106100675760003560e01c806333d6247d1161005057806333d6247d146100a85780633ed691ef146100bd578063a3c573eb146100d257600080fd5b806301fd90441461006c5780633381fe9014610088575b600080fd5b61007560015481565b6040519081526020015b60405180910390f35b6100756100963660046101c7565b60006020819052908152604090205481565b6100bb6100b63660046101c7565b610117565b005b43600090815260208190526040902054610075565b6002546100f29073ffffffffffffffffffffffffffffffffffffffff1681565b60405173ffffffffffffffffffffffffffffffffffffffff909116815260200161007f565b60025473ffffffffffffffffffffffffffffffffffffffff1633146101c2576040517f08c379a000000000000000000000000000000000000000000000000000000000815260206004820152603460248201527f476c6f62616c45786974526f6f744d616e616765724c323a3a7570646174654560448201527f786974526f6f743a204f4e4c595f425249444745000000000000000000000000606482015260840160405180910390fd5b600155565b6000602082840312156101d957600080fd5b503591905056fea2646970667358221220d6ed73b81f538d38669b0b750b93be08ca365978fae900eedc9ca93131c97ca664736f6c63430008090033"),
Expand Down
6 changes: 6 additions & 0 deletions sequencer/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type etherman interface {
EstimateGasSequenceBatches(sequences []ethmanTypes.Sequence) (uint64, error)
GetSendSequenceFee() (*big.Int, error)
TrustedSequencer() (common.Address, error)
GetLatestBatchNumber() (uint64, error)
}

// stateInterface gathers the methods required to interact with the state.
Expand All @@ -49,7 +50,12 @@ type stateInterface interface {

StoreTransactions(ctx context.Context, batchNum uint64, processedTxs []*state.ProcessTransactionResponse, dbTx pgx.Tx) error
CloseBatch(ctx context.Context, receipt state.ProcessingReceipt, dbTx pgx.Tx) error
OpenBatch(ctx context.Context, processingContext state.ProcessingContext, dbTx pgx.Tx) error
ProcessSequencerBatch(ctx context.Context, batchNumber uint64, txs []types.Transaction, dbTx pgx.Tx) (*state.ProcessBatchResponse, error)

BeginStateTransaction(ctx context.Context) (pgx.Tx, error)
CommitStateTransaction(ctx context.Context, dbTx pgx.Tx) error
RollbackStateTransaction(ctx context.Context, dbTx pgx.Tx) error
}

type txManager interface {
Expand Down
3 changes: 3 additions & 0 deletions sequencer/profitabilitychecker/profitabilitychecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ func (c *Checker) IsSequenceProfitable(ctx context.Context, sequence types.Seque

// IsSendSequencesProfitable checks profitability to send sequences to the ethereum
func (c *Checker) IsSendSequencesProfitable(estimatedGas *big.Int, sequences []types.Sequence) bool {
if len(sequences) == 0 {
return false
}
if c.Config.SendBatchesEvenWhenNotProfitable {
return true
}
Expand Down
165 changes: 145 additions & 20 deletions sequencer/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func New(
if err != nil {
return nil, fmt.Errorf("failed to get trusted sequencer address, err: %v", err)
}
// TODO: check that private key used in etherman matches addr

return &Sequencer{
cfg: cfg,
pool: pool,
Expand All @@ -70,6 +72,52 @@ func New(

// Start starts the sequencer
func (s *Sequencer) Start(ctx context.Context) {
for !s.isSynced(ctx) {
log.Infof("waiting for synchronizer to sync...")
time.Sleep(s.cfg.WaitPeriodPoolIsEmpty.Duration)
}
// initialize sequence
batchNum, err := s.state.GetLastBatchNumber(ctx, nil)
if err != nil {
log.Fatalf("failed to get last batch number, err: %v", err)
}
// case A: genesis
if batchNum == 0 {
log.Infof("starting sequencer with genesis batch")
processingCtx := state.ProcessingContext{
BatchNumber: 1,
Coinbase: s.address,
Timestamp: time.Now(),
GlobalExitRoot: state.ZeroHash,
}
dbTx, err := s.state.BeginStateTransaction(ctx)
if err != nil {
log.Fatalf("failed to begin state transaction for opening a batch, err: %v", err)
}
err = s.state.OpenBatch(ctx, processingCtx, dbTx)
if err != nil {
if rollbackErr := s.state.RollbackStateTransaction(ctx, dbTx); rollbackErr != nil {
log.Fatalf(
"failed to rollback dbTx when opening batch that gave err: %v. Rollback err: %v",
rollbackErr, err,
)
}
log.Fatalf("failed to open a batch, err: %v", err)
}
if err := s.state.CommitStateTransaction(ctx, dbTx); err != nil {
log.Fatalf("failed to commit dbTx when opening batch, err: %v", err)
}
s.lastBatchNum = processingCtx.BatchNumber
s.sequenceInProgress = types.Sequence{
GlobalExitRoot: processingCtx.GlobalExitRoot,
Timestamp: processingCtx.Timestamp.Unix(),
ForceBatchesNum: 0,
Txs: nil,
}
}
// TODO:
// case B: ongoing sequence (sequencer stopped with an ongoing batch aka not closed)
// case C: else (latest batch is closed and is not genesis)
go s.trackReorg(ctx)
go s.trackOldTxs(ctx)
ticker := time.NewTicker(s.cfg.WaitPeriodPoolIsEmpty.Duration)
Expand Down Expand Up @@ -166,24 +214,59 @@ func (s *Sequencer) tryToProcessTx(ctx context.Context, ticker *time.Ticker) {
}

log.Infof("processing tx")
dbTx, err := s.state.BeginStateTransaction(ctx)
if err != nil {
log.Errorf("failed to begin state transaction for processing tx, err: %v", err)
return
}

s.sequenceInProgress.Txs = append(s.sequenceInProgress.Txs, tx.Transaction)
processBatchResp, err := s.state.ProcessSequencerBatch(ctx, s.lastBatchNum, s.sequenceInProgress.Txs, nil)
processBatchResp, err := s.state.ProcessSequencerBatch(ctx, s.lastBatchNum, s.sequenceInProgress.Txs, dbTx)
if err != nil {
if rollbackErr := s.state.RollbackStateTransaction(ctx, dbTx); rollbackErr != nil {
log.Errorf(
"failed to rollback dbTx when processing tx that gave err: %v. Rollback err: %v",
rollbackErr, err,
)
return
}
s.sequenceInProgress.Txs = s.sequenceInProgress.Txs[:len(s.sequenceInProgress.Txs)-1]
log.Debugf("failed to process tx, hash: %s, err: %v", tx.Hash(), err)
return
}

if err := s.state.CommitStateTransaction(ctx, dbTx); err != nil {
log.Errorf("failed to commit dbTx when processing tx, err: %v", err)
return
}

s.lastStateRoot = processBatchResp.NewStateRoot
s.lastLocalExitRoot = processBatchResp.NewLocalExitRoot

// TODO: add logic based on this response to decide which txs we include on the DB
err = s.state.StoreTransactions(ctx, s.lastBatchNum, processBatchResp.Responses, nil)
dbTx, err = s.state.BeginStateTransaction(ctx)
if err != nil {
log.Errorf("failed to begin state transaction for StoreTransactions, err: %v", err)
return
}
err = s.state.StoreTransactions(ctx, s.lastBatchNum, processBatchResp.Responses, dbTx)
if err != nil {
if rollbackErr := s.state.RollbackStateTransaction(ctx, dbTx); rollbackErr != nil {
log.Errorf(
"failed to rollback dbTx when StoreTransactions that gave err: %v. Rollback err: %v",
rollbackErr, err,
)
return
}
log.Errorf("failed to store transactions, err: %v", err)
return
}

if err := s.state.CommitStateTransaction(ctx, dbTx); err != nil {
log.Errorf("failed to commit dbTx when StoreTransactions, err: %v", err)
return
}

log.Infof("marking tx as selected in the pool")
// TODO: add correct handling in case update didn't go through
_ = s.pool.UpdateTxState(ctx, tx.Hash(), pool.TxStateSelected)
Expand All @@ -202,11 +285,11 @@ func waitTick(ctx context.Context, ticker *time.Ticker) {

func (s *Sequencer) isSynced(ctx context.Context) bool {
lastSyncedBatchNum, err := s.state.GetLastVirtualBatchNum(ctx, nil)
if err != nil {
if err != nil && err != state.ErrNotFound {
log.Errorf("failed to get last synced batch, err: %v", err)
return false
}
lastEthBatchNum, err := s.state.GetLastBatchNumberSeenOnEthereum(ctx, nil)
lastEthBatchNum, err := s.etherman.GetLatestBatchNumber()
if err != nil {
log.Errorf("failed to get last eth batch, err: %v", err)
return false
Expand All @@ -224,18 +307,19 @@ func (s *Sequencer) isSynced(ctx context.Context) bool {
func (s *Sequencer) shouldSendSequences(ctx context.Context) (bool, bool) {
estimatedGas, err := s.etherman.EstimateGasSequenceBatches(s.closedSequences)
if err != nil && isDataForEthTxTooBig(err) {
log.Warnf("closedSequences eth data is too big, err: %v", err)
return true, true
}

if err != nil {
log.Errorf("failed to estimate gas for sequence batches", err)
log.Errorf("failed to estimate gas for sequence batches, err: %v", err)
return false, false
}

// TODO: checkAgainstForcedBatchQueueTimeout

lastBatchVirtualizationTime, err := s.state.GetTimeForLatestBatchVirtualization(ctx, nil)
if err != nil {
if err != nil && !errors.Is(err, state.ErrNotFound) {
log.Errorf("failed to get last l1 interaction time, err: %v", err)
return false, false
}
Expand All @@ -254,7 +338,7 @@ func (s *Sequencer) shouldSendSequences(ctx context.Context) (bool, bool) {
// in case it's enough blocks since last GER update, long time since last batch and sequence is profitable
func (s *Sequencer) shouldCloseSequenceInProgress(ctx context.Context) bool {
numberOfBlocks, err := s.state.GetNumberOfBlocksSinceLastGERUpdate(ctx, nil)
if err != nil {
if err != nil && err != state.ErrNotFound {
log.Errorf("failed to get last time GER updated, err: %v", err)
return false
}
Expand All @@ -263,11 +347,11 @@ func (s *Sequencer) shouldCloseSequenceInProgress(ctx context.Context) bool {
}

lastBatchTime, err := s.state.GetLastBatchTime(ctx, nil)
if err != nil {
if err != nil && !errors.Is(err, state.ErrNotFound) {
log.Errorf("failed to get last batch time, err: %v", err)
return false
}
if lastBatchTime.Before(time.Now().Add(-s.cfg.LastTimeBatchMaxWaitPeriod.Duration)) {
if lastBatchTime.Before(time.Now().Add(-s.cfg.LastTimeBatchMaxWaitPeriod.Duration)) && len(s.sequenceInProgress.Txs) > 0 {
return s.isSequenceProfitable(ctx)
}

Expand All @@ -292,48 +376,89 @@ func (s *Sequencer) getMostProfitablePendingTx(ctx context.Context) (*pool.Trans
}
if len(tx) == 0 {
log.Infof("waiting for pending tx to appear...")
return nil, false
return nil, true
}
return &tx[0], true
}

func (s *Sequencer) newSequence(ctx context.Context) (types.Sequence, error) {
// close current batch
if s.lastStateRoot.String() != "" || s.lastLocalExitRoot.String() != "" {
receipt := state.ProcessingReceipt{
BatchNumber: s.lastBatchNum,
StateRoot: s.lastStateRoot,
LocalExitRoot: s.lastLocalExitRoot,
}
err := s.state.CloseBatch(ctx, receipt, nil)
dbTx, err := s.state.BeginStateTransaction(ctx)
if err != nil {
return types.Sequence{}, fmt.Errorf("failed to begin state transaction to close batch, err: %v", err)
}
err = s.state.CloseBatch(ctx, receipt, dbTx)
if err != nil {
if rollbackErr := s.state.RollbackStateTransaction(ctx, dbTx); rollbackErr != nil {
return types.Sequence{}, fmt.Errorf(
"failed to rollback dbTx when closing batch that gave err: %v. Rollback err: %v",
rollbackErr, err,
)
}
return types.Sequence{}, fmt.Errorf("failed to close batch, err: %v", err)
}
if err := s.state.CommitStateTransaction(ctx, dbTx); err != nil {
return types.Sequence{}, fmt.Errorf("failed to commit dbTx when close batch, err: %v", err)
}
} else {
return types.Sequence{}, errors.New("lastStateRoot and lastLocalExitRoot are empty, impossible to close a batch")
}

root, err := s.state.GetLatestGlobalExitRoot(ctx, nil)
if err != nil {
// open next batch
var gerHash common.Hash
ger, err := s.state.GetLatestGlobalExitRoot(ctx, nil)
if err != nil && err == state.ErrNotFound {
gerHash = state.ZeroHash
} else if err != nil {
return types.Sequence{}, fmt.Errorf("failed to get latest global exit root, err: %v", err)
} else {
gerHash = ger.GlobalExitRoot
}

s.lastBatchNum, err = s.state.GetLastBatchNumber(ctx, nil)
lastBatchNum, err := s.state.GetLastBatchNumber(ctx, nil)
if err != nil {
return types.Sequence{}, fmt.Errorf("failed to get last batch number, err: %v", err)
}
s.lastBatchNum = s.lastBatchNum + 1
newBatchNum := lastBatchNum + 1
dbTx, err := s.state.BeginStateTransaction(ctx)
if err != nil {
return types.Sequence{}, fmt.Errorf("failed to open new batch, err: %v", err)
}
processingCtx := state.ProcessingContext{
BatchNumber: newBatchNum,
Coinbase: s.address,
Timestamp: time.Now(),
GlobalExitRoot: gerHash,
}
err = s.state.OpenBatch(ctx, processingCtx, dbTx)
if err != nil {
if rollbackErr := s.state.RollbackStateTransaction(ctx, dbTx); rollbackErr != nil {
return types.Sequence{}, fmt.Errorf(
"failed to rollback dbTx when opening batch that gave err: %v. Rollback err: %v",
rollbackErr, err,
)
}
return types.Sequence{}, fmt.Errorf("failed to open new batch, err: %v", err)
}
if err := s.state.CommitStateTransaction(ctx, dbTx); err != nil {
return types.Sequence{}, fmt.Errorf("failed to commit dbTx when opening batch, err: %v", err)
}

s.lastBatchNum = newBatchNum
return types.Sequence{
GlobalExitRoot: root.GlobalExitRoot,
Timestamp: time.Now().Unix(),
GlobalExitRoot: processingCtx.GlobalExitRoot,
Timestamp: processingCtx.Timestamp.Unix(),
ForceBatchesNum: 0,
Txs: nil,
}, nil
}

func isDataForEthTxTooBig(err error) bool {
return strings.Contains(err.Error(), errGasRequiredExceedsAllowance) ||
errors.As(err, &core.ErrOversizedData) ||
errors.Is(err, core.ErrOversizedData) ||
strings.Contains(err.Error(), errContentLengthTooLarge)
}
6 changes: 4 additions & 2 deletions state/converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,10 @@ func convertToProperMap(responses map[string]string) map[common.Hash]common.Hash

func convertToExecutorTrace(callTrace *pb.CallTrace) instrumentation.ExecutorTrace {
trace := new(instrumentation.ExecutorTrace)
trace.Context = convertToContext(callTrace.Context)
trace.Steps = convertToInstrumentationSteps(callTrace.Steps)
if callTrace != nil {
trace.Context = convertToContext(callTrace.Context)
trace.Steps = convertToInstrumentationSteps(callTrace.Steps)
}

return *trace
}
Expand Down
2 changes: 1 addition & 1 deletion state/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func generateReceipt(block *types.Block, processedTx *ProcessTransactionResponse
BlockNumber: block.Number(),
BlockHash: block.Hash(),
GasUsed: processedTx.GasUsed,
TxHash: processedTx.TxHash,
TxHash: processedTx.Tx.Hash(),
TransactionIndex: 0,
ContractAddress: processedTx.CreateAddress,
Logs: processedTx.Logs,
Expand Down
16 changes: 10 additions & 6 deletions state/pgstatestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const (
getLastBatchNumberSQL = "SELECT COALESCE(MAX(batch_num), 0) FROM state.batch"
getLastNBatchesSQL = "SELECT batch_num, global_exit_root, local_exit_root, state_root, timestamp, coinbase, raw_txs_data from state.batch ORDER BY batch_num DESC LIMIT $1"
getLastBatchTimeSQL = "SELECT timestamp FROM state.batch ORDER BY batch_num DESC LIMIT 1"
getLastVirtualBatchNumSQL = "SELECT batch_num FROM state.virtual_batch ORDER BY batch_num DESC LIMIT 1"
getLastVirtualBatchNumSQL = "SELECT COALESCE(MAX(batch_num), 0) FROM state.virtual_batch"
getLastVirtualBatchBlockNumSQL = "SELECT block_num FROM state.virtual_batch ORDER BY batch_num DESC LIMIT 1"
getLastBlockNumSQL = "SELECT block_num FROM state.block ORDER BY block_num DESC LIMIT 1"
getLastL2BlockNumber = "SELECT block_num FROM state.l2block ORDER BY block_num DESC LIMIT 1"
Expand Down Expand Up @@ -512,8 +512,8 @@ func scanBatch(row pgx.Row) (Batch, error) {
batch := Batch{}
var (
gerStr string
lerStr string
stateStr string
lerStr *string
stateStr *string
coinbaseStr string
)
if err := row.Scan(
Expand All @@ -528,8 +528,13 @@ func scanBatch(row pgx.Row) (Batch, error) {
return batch, err
}
batch.GlobalExitRoot = common.HexToHash(gerStr)
batch.LocalExitRoot = common.HexToHash(lerStr)
batch.StateRoot = common.HexToHash(stateStr)
if lerStr != nil {
batch.LocalExitRoot = common.HexToHash(*lerStr)
}
if stateStr != nil {
batch.StateRoot = common.HexToHash(*stateStr)
}

batch.Coinbase = common.HexToAddress(coinbaseStr)
return batch, nil
}
Expand Down Expand Up @@ -942,7 +947,6 @@ func (p *PostgresStorage) AddL2Block(ctx context.Context, batchNumber uint64, l2
return err
}
decoded := string(binary)

_, err = e.Exec(ctx, addTransactionSQL, tx.Hash().String(), "", encoded, decoded, l2Block.Number().Uint64())
if err != nil {
return err
Expand Down
1 change: 0 additions & 1 deletion state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,6 @@ func (s *State) StoreTransactions(ctx context.Context, batchNumber uint64, proce
Coinbase: processingContext.Coinbase,
Root: processedTx.StateRoot,
}

transactions := []*types.Transaction{&processedTx.Tx}

// Create block to be able to calculate its hash
Expand Down
Loading

0 comments on commit 3dd6424

Please sign in to comment.