Skip to content

Commit

Permalink
Add more seqv2 logs (0xPolygonHermez#810)
Browse files Browse the repository at this point in the history
* Add more seqv2 logs

* bump golint version

* add dbTx to all statev2 methods used by broadcast

* reuse code to determine queryer/queryRower to use

* rename interfaces

* single querier interface
  • Loading branch information
fgimenez authored Jun 28, 2022
1 parent 49e1a6f commit c337f48
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 23 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ test-e2e-group-3: build-docker compile-scs ## Runs group 3 e2e tests checking ra

.PHONY: install-linter
install-linter: ## Installs the linter
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $$(go env GOPATH)/bin v1.45.2
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $$(go env GOPATH)/bin v1.46.2

.PHONY: lint
lint: ## Runs the linter
Expand Down
8 changes: 3 additions & 5 deletions sequencerv2/broadcast/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ import (
// Consumer interfaces required by the package.

type stateInterface interface {
GetLastBatch(ctx context.Context, tx pgx.Tx) (*statev2.Batch, error)
GetBatchByNumber(ctx context.Context, batchNumber uint64, tx pgx.Tx) (*statev2.Batch, error)
GetEncodedTransactionsByBatchNumber(ctx context.Context, batchNumber uint64, tx pgx.Tx) (encoded []string, err error)
GetLastBatch(ctx context.Context, dbTx pgx.Tx) (*statev2.Batch, error)
GetBatchByNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*statev2.Batch, error)
GetEncodedTransactionsByBatchNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (encoded []string, err error)
}

// This should be moved into the state package
17 changes: 10 additions & 7 deletions sequencerv2/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,15 @@ func (s *Sequencer) Start(ctx context.Context) {
}

func (s *Sequencer) tryToProcessTx(ctx context.Context, ticker *time.Ticker) {
// 1. Wait for synchronizer to sync last batch
if !s.isSynced(ctx) {
log.Infof("wait for synchronizer to sync last batch")
waitTick(ctx, ticker)
return
}

// 2. Check if current sequence should be closed
log.Infof("synchronizer has synced last batch, checking if current sequence should be closed")
if s.shouldCloseSequenceInProgress(ctx) {
log.Infof("current sequence should be closed")
s.closedSequences = append(s.closedSequences, s.sequenceInProgress)
newSequence, err := s.newSequence(ctx)
if err != nil {
Expand All @@ -79,10 +80,12 @@ func (s *Sequencer) tryToProcessTx(ctx context.Context, ticker *time.Ticker) {
s.sequenceInProgress = newSequence
}

// 3. Check if current sequence should be sent
log.Infof("checking if current sequence should be sent")
shouldSent, shouldCut := s.shouldSendSequences(ctx)
if shouldSent {
log.Infof("current sequence should be sent")
if shouldCut {
log.Infof("current sequence should be cut")
cutSequence := s.closedSequences[len(s.closedSequences)-1]
err := s.txManager.SequenceBatches(s.closedSequences)
if err != nil {
Expand All @@ -100,7 +103,7 @@ func (s *Sequencer) tryToProcessTx(ctx context.Context, ticker *time.Ticker) {
}
}

// 4. get pending tx from the pool
log.Infof("getting pending tx from the pool")
tx, ok := s.getMostProfitablePendingTx(ctx)
if !ok {
return
Expand All @@ -112,7 +115,7 @@ func (s *Sequencer) tryToProcessTx(ctx context.Context, ticker *time.Ticker) {
return
}

// 5. Process tx
log.Infof("processing tx")
s.sequenceInProgress.Txs = append(s.sequenceInProgress.Txs, tx.Transaction)
_, err := s.state.ProcessBatch(ctx, s.sequenceInProgress.Txs)
if err != nil {
Expand All @@ -121,11 +124,11 @@ func (s *Sequencer) tryToProcessTx(ctx context.Context, ticker *time.Ticker) {
return
}

// 6. Mark tx as selected in the pool
log.Infof("marking tx as selected in the pool")
// TODO: add correct handling in case update didn't go through
_ = s.pool.UpdateTxState(ctx, tx.Hash(), pool.TxStateSelected)

// 7. broadcast tx in a new l2 block
log.Infof("TODO: broadcast tx in a new l2 block")
}

func waitTick(ctx context.Context, ticker *time.Ticker) {
Expand Down
11 changes: 11 additions & 0 deletions statev2/interfaces.go
Original file line number Diff line number Diff line change
@@ -1 +1,12 @@
package statev2

import (
"context"

"github.com/jackc/pgx/v4"
)

type querier interface {
Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
}
26 changes: 20 additions & 6 deletions statev2/pgstatestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ func NewPostgresStorage(db *pgxpool.Pool) *PostgresStorage {
}
}

// getQuerier determines which queryer to use, dbTx or the main pgxpool.
func (s *PostgresStorage) getQuerier(dbTx pgx.Tx) querier {
if dbTx != nil {
return dbTx
}
return s
}

// Reset resets the state to a block
func (s *PostgresStorage) Reset(ctx context.Context, block *Block, tx pgx.Tx) error {
if _, err := tx.Exec(ctx, resetSQL, block.BlockNumber); err != nil {
Expand Down Expand Up @@ -190,12 +198,14 @@ func (s *PostgresStorage) GetVerifiedBatch(ctx context.Context, tx pgx.Tx, batch
return &verifiedBatch, nil
}

func (s *PostgresStorage) GetLastBatch(ctx context.Context, tx pgx.Tx) (*Batch, error) {
func (s *PostgresStorage) GetLastBatch(ctx context.Context, dbTx pgx.Tx) (*Batch, error) {
var (
batch Batch
gerStr string
)
err := s.QueryRow(ctx, getLastBatchSQL).Scan(&batch.BatchNumber, &gerStr, &batch.Timestamp)
q := s.getQuerier(dbTx)

err := q.QueryRow(ctx, getLastBatchSQL).Scan(&batch.BatchNumber, &gerStr, &batch.Timestamp)

if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrStateNotSynchronized
Expand Down Expand Up @@ -254,12 +264,14 @@ func (s *PostgresStorage) GetLastBatchNumberSeenOnEthereum(ctx context.Context)
return batchNumber, nil
}

func (s *PostgresStorage) GetBatchByNumber(ctx context.Context, batchNumber uint64, tx pgx.Tx) (*Batch, error) {
func (s *PostgresStorage) GetBatchByNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*Batch, error) {
var (
batch Batch
gerStr string
)
err := s.QueryRow(ctx, getBatchByNumberSQL, batchNumber).Scan(&batch.BatchNumber, &gerStr, &batch.Timestamp)
q := s.getQuerier(dbTx)

err := q.QueryRow(ctx, getBatchByNumberSQL, batchNumber).Scan(&batch.BatchNumber, &gerStr, &batch.Timestamp)

if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrStateNotSynchronized
Expand All @@ -270,8 +282,10 @@ func (s *PostgresStorage) GetBatchByNumber(ctx context.Context, batchNumber uint
return &batch, nil
}

func (s *PostgresStorage) GetEncodedTransactionsByBatchNumber(ctx context.Context, batchNumber uint64, tx pgx.Tx) (encoded []string, err error) {
rows, err := s.Query(ctx, getEncodedTransactionsByBatchNumberSQL, batchNumber)
func (s *PostgresStorage) GetEncodedTransactionsByBatchNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (encoded []string, err error) {
q := s.getQuerier(dbTx)

rows, err := q.Query(ctx, getEncodedTransactionsByBatchNumberSQL, batchNumber)
if !errors.Is(err, pgx.ErrNoRows) && err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions statev2/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,13 @@ func (s *State) GetLastBatch(ctx context.Context, dbTx pgx.Tx) (*Batch, error) {
}

// GetBatchByNumber gets a batch from data base by its number
func (s *State) GetBatchByNumber(ctx context.Context, batchNumber uint64, tx pgx.Tx) (*Batch, error) {
return s.PostgresStorage.GetBatchByNumber(ctx, batchNumber, tx)
func (s *State) GetBatchByNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*Batch, error) {
return s.PostgresStorage.GetBatchByNumber(ctx, batchNumber, dbTx)
}

// GetEncodedTransactionsByBatchNumber gets the txs for a given batch in encoded form
func (s *State) GetEncodedTransactionsByBatchNumber(ctx context.Context, batchNumber uint64, tx pgx.Tx) (encoded []string, err error) {
return s.PostgresStorage.GetEncodedTransactionsByBatchNumber(ctx, batchNumber, tx)
func (s *State) GetEncodedTransactionsByBatchNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (encoded []string, err error) {
return s.PostgresStorage.GetEncodedTransactionsByBatchNumber(ctx, batchNumber, dbTx)
}

// ProcessSequence process sequence of the txs
Expand Down

0 comments on commit c337f48

Please sign in to comment.