Skip to content

Commit

Permalink
Refactor state errors (0xPolygonHermez#2105)
Browse files Browse the repository at this point in the history
* refactor state errors

* refactor state errors

* refactor state errors

* refactor state errors

* refactor state errors

* refactor

* fix

* function rename

* fix wg

* fix wg

* fix
  • Loading branch information
ToniRamirezM authored May 17, 2023
1 parent ce27fdd commit 32c81ee
Show file tree
Hide file tree
Showing 13 changed files with 132 additions and 305 deletions.
16 changes: 6 additions & 10 deletions pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,16 +186,12 @@ func (p *Pool) PreExecuteTx(ctx context.Context, tx types.Transaction) (preExecu
return response, err
}

response.usedZkCounters = processBatchResponse.UsedZkCounters

if !processBatchResponse.RomOOC {
if processBatchResponse.Responses != nil && len(processBatchResponse.Responses) > 0 {
r := processBatchResponse.Responses[0]
response.isOOC = executor.IsROMOutOfGasError(executor.RomErrorCode(r.RomError))
response.isReverted = errors.Is(r.RomError, runtime.ErrExecutionReverted)
}
} else {
response.isOOG = processBatchResponse.RomOOC
if processBatchResponse.Responses != nil && len(processBatchResponse.Responses) > 0 {
errorToCheck := processBatchResponse.Responses[0].RomError
response.isReverted = errors.Is(errorToCheck, runtime.ErrExecutionReverted)
response.isOOC = executor.IsROMOutOfCountersError(executor.RomErrorCode(errorToCheck))
response.isOOG = errors.Is(errorToCheck, runtime.ErrOutOfGas)
response.usedZkCounters = processBatchResponse.UsedZkCounters
}

return response, nil
Expand Down
8 changes: 4 additions & 4 deletions sequencer/dbmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,14 +441,14 @@ func (d *dbManager) CloseBatch(ctx context.Context, params ClosingBatchParameter
}

// ProcessForcedBatch process a forced batch
func (d *dbManager) ProcessForcedBatch(forcedBatchNum uint64, request state.ProcessRequest) (*state.ProcessBatchResponse, error) {
func (d *dbManager) ProcessForcedBatch(ForcedBatchNumber uint64, request state.ProcessRequest) (*state.ProcessBatchResponse, error) {
// Open Batch
processingCtx := state.ProcessingContext{
BatchNumber: request.BatchNumber,
Coinbase: request.Coinbase,
Timestamp: request.Timestamp,
GlobalExitRoot: request.GlobalExitRoot,
ForcedBatchNum: &forcedBatchNum,
ForcedBatchNum: &ForcedBatchNumber,
}
dbTx, err := d.state.BeginStateTransaction(d.ctx)
if err != nil {
Expand All @@ -469,7 +469,7 @@ func (d *dbManager) ProcessForcedBatch(forcedBatchNum uint64, request state.Proc
}

// Fetch Forced Batch
forcedBatch, err := d.state.GetForcedBatch(d.ctx, forcedBatchNum, dbTx)
forcedBatch, err := d.state.GetForcedBatch(d.ctx, ForcedBatchNumber, dbTx)
if err != nil {
if rollbackErr := dbTx.Rollback(d.ctx); rollbackErr != nil {
log.Errorf(
Expand All @@ -490,7 +490,7 @@ func (d *dbManager) ProcessForcedBatch(forcedBatchNum uint64, request state.Proc
// Close Batch
txsBytes := uint64(0)
for _, resp := range processBatchResponse.Responses {
if !resp.IsProcessed {
if !resp.ChangesStateRoot {
continue
}
txsBytes += resp.Tx.Size()
Expand Down
61 changes: 34 additions & 27 deletions sequencer/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,14 @@ func (f *finalizer) newWIPBatch(ctx context.Context) (*WipBatch, error) {

// Reprocess full batch as sanity check
processBatchResponse, err := f.reprocessFullBatch(ctx, f.batch.batchNumber, f.batch.stateRoot)
if err != nil || processBatchResponse.RomOOC {
if err != nil || processBatchResponse.IsRomOOCError || processBatchResponse.ExecutorError != nil {
log.Info("halting the finalizer because of a reprocessing error")
if err != nil {
f.halt(ctx, fmt.Errorf("failed to reprocess batch, err: %v", err))
} else {
} else if processBatchResponse.IsRomOOCError {
f.halt(ctx, fmt.Errorf("out of counters during reprocessFullBath"))
} else {
f.halt(ctx, fmt.Errorf("executor error during reprocessFullBath: %v", processBatchResponse.ExecutorError))
}
}

Expand Down Expand Up @@ -354,14 +356,12 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker) error
metrics.ProcessingTime(time.Since(start))
}()

var ger common.Hash
if f.batch.isEmpty() {
ger = f.batch.globalExitRoot
f.processRequest.GlobalExitRoot = f.batch.globalExitRoot
} else {
ger = state.ZeroHash
f.processRequest.GlobalExitRoot = state.ZeroHash
}

f.processRequest.GlobalExitRoot = ger
if tx != nil {
f.processRequest.Transactions = tx.RawTx
} else {
Expand All @@ -372,37 +372,36 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker) error
hash = tx.HashStr
}
log.Infof("processTransaction: single tx. Batch.BatchNumber: %d, BatchNumber: %d, OldStateRoot: %s, txHash: %s, GER: %s", f.batch.batchNumber, f.processRequest.BatchNumber, f.processRequest.OldStateRoot, hash, f.processRequest.GlobalExitRoot.String())
result, err := f.executor.ProcessBatch(ctx, f.processRequest, true)
processBatchResponse, err := f.executor.ProcessBatch(ctx, f.processRequest, true)
if err != nil {
log.Errorf("failed to process transaction: %s", err)
return err
}

oldStateRoot := f.batch.stateRoot
if len(result.Responses) > 0 && tx != nil {
err = f.handleTxProcessResp(ctx, tx, result, oldStateRoot)
if len(processBatchResponse.Responses) > 0 && tx != nil {
err = f.handleProcessTransactionResponse(ctx, tx, processBatchResponse, oldStateRoot)
if err != nil {
return err
}
}

// Update in-memory batch and processRequest
f.processRequest.OldStateRoot = result.NewStateRoot
f.batch.stateRoot = result.NewStateRoot
f.batch.localExitRoot = result.NewLocalExitRoot
log.Infof("processTransaction: data loaded in memory. batch.batchNumber: %d, batchNumber: %d, result.NewStateRoot: %s, result.NewLocalExitRoot: %s, oldStateRoot: %s", f.batch.batchNumber, f.processRequest.BatchNumber, result.NewStateRoot.String(), result.NewLocalExitRoot.String(), oldStateRoot.String())
f.processRequest.OldStateRoot = processBatchResponse.NewStateRoot
f.batch.stateRoot = processBatchResponse.NewStateRoot
f.batch.localExitRoot = processBatchResponse.NewLocalExitRoot
log.Infof("processTransaction: data loaded in memory. batch.batchNumber: %d, batchNumber: %d, result.NewStateRoot: %s, result.NewLocalExitRoot: %s, oldStateRoot: %s", f.batch.batchNumber, f.processRequest.BatchNumber, processBatchResponse.NewStateRoot.String(), processBatchResponse.NewLocalExitRoot.String(), oldStateRoot.String())

return nil
}

// handleTxProcessResp handles the response of transaction processing.
func (f *finalizer) handleTxProcessResp(ctx context.Context, tx *TxTracker, result *state.ProcessBatchResponse, oldStateRoot common.Hash) error {
// handleProcessTransactionResponse handles the response of transaction processing.
func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *TxTracker, result *state.ProcessBatchResponse, oldStateRoot common.Hash) error {
// Handle Transaction Error

errorCode := executor.RomErrorCode(result.Responses[0].RomError)
if result.RomOOC || executor.IsIntrinsicError(errorCode) {
if !state.IsStateRootChanged(errorCode) {
// If intrinsic error or OOC error, we skip adding the transaction to the batch
f.handleTransactionError(ctx, result, tx)
f.handleProcessTransactionError(ctx, result, tx)
return result.Responses[0].RomError
}

Expand Down Expand Up @@ -473,8 +472,8 @@ func (f *finalizer) updateWorkerAfterTxStored(ctx context.Context, tx *TxTracker
metrics.WorkerProcessingTime(time.Since(start))
}

// handleTransactionError handles the error of a transaction
func (f *finalizer) handleTransactionError(ctx context.Context, result *state.ProcessBatchResponse, tx *TxTracker) *sync.WaitGroup {
// handleProcessTransactionError handles the error of a transaction
func (f *finalizer) handleProcessTransactionError(ctx context.Context, result *state.ProcessBatchResponse, tx *TxTracker) *sync.WaitGroup {
txResponse := result.Responses[0]
errorCode := executor.RomErrorCode(txResponse.RomError)
addressInfo := result.ReadWriteAddresses[tx.From]
Expand Down Expand Up @@ -513,6 +512,7 @@ func (f *finalizer) handleTransactionError(ctx context.Context, result *state.Pr
wg.Add(1)
txToDelete := txToDelete
go func() {
defer wg.Done()
err := f.dbManager.UpdateTxStatus(ctx, txToDelete.Hash, pool.TxStatusFailed, false, &failedReason)
metrics.TxProcessed(metrics.TxProcessedLabelFailed, 1)
if err != nil {
Expand All @@ -528,6 +528,7 @@ func (f *finalizer) handleTransactionError(ctx context.Context, result *state.Pr

wg.Add(1)
go func() {
defer wg.Done()
// Update the status of the transaction to failed
err := f.dbManager.UpdateTxStatus(ctx, tx.Hash, pool.TxStatusFailed, false, &failedReason)
if err != nil {
Expand Down Expand Up @@ -618,11 +619,7 @@ func (f *finalizer) processForcedBatches(ctx context.Context, lastBatchNumberInS
defer f.nextForcedBatchesMux.Unlock()
f.nextForcedBatchDeadline = 0

dbTx, err := f.dbManager.BeginStateTransaction(ctx)
if err != nil {
return 0, common.Hash{}, fmt.Errorf("failed to begin state transaction, err: %w", err)
}
lastTrustedForcedBatchNumber, err := f.dbManager.GetLastTrustedForcedBatchNumber(ctx, dbTx)
lastTrustedForcedBatchNumber, err := f.dbManager.GetLastTrustedForcedBatchNumber(ctx, nil)
if err != nil {
return 0, common.Hash{}, fmt.Errorf("failed to get last trusted forced batch number, err: %w", err)
}
Expand Down Expand Up @@ -669,7 +666,7 @@ func (f *finalizer) processForcedBatch(ctx context.Context, lastBatchNumberInSta
f.nextGERMux.Lock()
f.lastGERHash = forcedBatch.GlobalExitRoot
f.nextGERMux.Unlock()
if len(response.Responses) > 0 && !response.RomOOC {
if len(response.Responses) > 0 && !response.IsRomOOCError {
f.handleForcedTxsProcessResp(request, response, stateRoot)
}

Expand Down Expand Up @@ -783,7 +780,7 @@ func (f *finalizer) reprocessFullBatch(ctx context.Context, batchNum uint64, exp
return nil, err
}

if result.RomOOC {
if result.IsRomOOCError {
log.Errorf("failed to process batch %v because OutOfCounters", batch.BatchNumber)
payload, err := json.Marshal(processRequest)
if err != nil {
Expand Down Expand Up @@ -886,25 +883,35 @@ func (f *finalizer) isBatchAlmostFull() bool {
resources := f.batch.remainingResources
zkCounters := resources.ZKCounters
result := false
resourceDesc := ""
if resources.Bytes <= f.getConstraintThresholdUint64(f.batchConstraints.MaxBatchBytesSize) {
resourceDesc = "MaxBatchBytesSize"
result = true
} else if zkCounters.UsedSteps <= f.getConstraintThresholdUint32(f.batchConstraints.MaxSteps) {
resourceDesc = "MaxSteps"
result = true
} else if zkCounters.UsedPoseidonPaddings <= f.getConstraintThresholdUint32(f.batchConstraints.MaxPoseidonPaddings) {
resourceDesc = "MaxPoseidonPaddings"
result = true
} else if zkCounters.UsedBinaries <= f.getConstraintThresholdUint32(f.batchConstraints.MaxBinaries) {
resourceDesc = "MaxBinaries"
result = true
} else if zkCounters.UsedKeccakHashes <= f.getConstraintThresholdUint32(f.batchConstraints.MaxKeccakHashes) {
resourceDesc = "MaxKeccakHashes"
result = true
} else if zkCounters.UsedArithmetics <= f.getConstraintThresholdUint32(f.batchConstraints.MaxArithmetics) {
resourceDesc = "MaxArithmetics"
result = true
} else if zkCounters.UsedMemAligns <= f.getConstraintThresholdUint32(f.batchConstraints.MaxMemAligns) {
resourceDesc = "MaxMemAligns"
result = true
} else if zkCounters.CumulativeGasUsed <= f.getConstraintThresholdUint64(f.batchConstraints.MaxCumulativeGasUsed) {
resourceDesc = "MaxCumulativeGasUsed"
result = true
}

if result {
log.Infof("Closing batch: %d, because it reached %s threshold limit", f.batch.batchNumber, resourceDesc)
f.batch.closingReason = state.BatchAlmostFullClosingReason
}

Expand Down
2 changes: 1 addition & 1 deletion sequencer/finalizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,7 @@ func TestFinalizer_handleTransactionError(t *testing.T) {
}

// act
wg := f.handleTransactionError(ctx, result, tx)
wg := f.handleProcessTransactionError(ctx, result, tx)
if wg != nil {
wg.Wait()
}
Expand Down
2 changes: 1 addition & 1 deletion sequencer/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ type dbManagerInterface interface {
GetBatchByNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*state.Batch, error)
IsBatchClosed(ctx context.Context, batchNum uint64) (bool, error)
GetLatestGer(ctx context.Context, maxBlockNumber uint64) (state.GlobalExitRoot, time.Time, error)
ProcessForcedBatch(forcedBatchNum uint64, request state.ProcessRequest) (*state.ProcessBatchResponse, error)
ProcessForcedBatch(ForcedBatchNumber uint64, request state.ProcessRequest) (*state.ProcessBatchResponse, error)
GetForcedBatchesSince(ctx context.Context, forcedBatchNumber, maxBlockNumber uint64, dbTx pgx.Tx) ([]*state.ForcedBatch, error)
GetLastL2BlockHeader(ctx context.Context, dbTx pgx.Tx) (*types.Header, error)
GetLastBlock(ctx context.Context, dbTx pgx.Tx) (*state.Block, error)
Expand Down
12 changes: 6 additions & 6 deletions sequencer/mock_db_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion state/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func (s *State) ProcessAndStoreClosedBatch(ctx context.Context, processingCtx Pr
// Filter unprocessed txs and decode txs to store metadata
// note that if the batch is not well encoded it will result in an empty batch (with no txs)
for i := 0; i < len(processed.Responses); i++ {
if !isProcessed(processed.Responses[i].Error) {
if !IsStateRootChanged(processed.Responses[i].Error) {
if executor.IsROMOutOfCountersError(processed.Responses[i].Error) {
processed.Responses = []*pb.ProcessTransactionResponse{}
break
Expand Down
49 changes: 32 additions & 17 deletions state/converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,27 +48,42 @@ func (s *State) convertToProcessBatchResponse(txs []types.Transaction, response
return nil, err
}

romOOC := response.Error != executor.EXECUTOR_ERROR_NO_ERROR
if !romOOC && len(response.Responses) > 0 {
// Check out of counters
errorToCheck := response.Responses[len(response.Responses)-1].Error
romOOC = executor.IsROMOutOfCountersError(errorToCheck)
isExecutorLevelError := (response.Error != executor.EXECUTOR_ERROR_NO_ERROR)
isRomLevelError := false
isRomOOCError := false

if response.Responses != nil {
for _, resp := range response.Responses {
if resp.Error != pb.RomError_ROM_ERROR_NO_ERROR {
isRomLevelError = true
break
}
}

if len(response.Responses) > 0 {
// Check out of counters
errorToCheck := response.Responses[len(response.Responses)-1].Error
isRomOOCError = executor.IsROMOutOfCountersError(errorToCheck)
}
}

return &ProcessBatchResponse{
NewStateRoot: common.BytesToHash(response.NewStateRoot),
NewAccInputHash: common.BytesToHash(response.NewAccInputHash),
NewLocalExitRoot: common.BytesToHash(response.NewLocalExitRoot),
NewBatchNumber: response.NewBatchNum,
UsedZkCounters: convertToCounters(response),
Responses: responses,
ExecutorError: executor.ExecutorErr(response.Error),
RomOOC: romOOC,
ReadWriteAddresses: readWriteAddresses,
NewStateRoot: common.BytesToHash(response.NewStateRoot),
NewAccInputHash: common.BytesToHash(response.NewAccInputHash),
NewLocalExitRoot: common.BytesToHash(response.NewLocalExitRoot),
NewBatchNumber: response.NewBatchNum,
UsedZkCounters: convertToCounters(response),
Responses: responses,
ExecutorError: executor.ExecutorErr(response.Error),
IsExecutorLevelError: isExecutorLevelError,
IsRomLevelError: isRomLevelError,
IsRomOOCError: isRomOOCError,
ReadWriteAddresses: readWriteAddresses,
}, nil
}

func isProcessed(err pb.RomError) bool {
// IsStateRootChanged returns true if the transaction changes the state root
func IsStateRootChanged(err pb.RomError) bool {
return !executor.IsIntrinsicError(err) && !executor.IsROMOutOfCountersError(err)
}

Expand Down Expand Up @@ -125,7 +140,7 @@ func (s *State) convertToProcessTransactionResponse(txs []types.Transaction, res
result.CreateAddress = common.HexToAddress(response.CreateAddress)
result.StateRoot = common.BytesToHash(response.StateRoot)
result.Logs = convertToLog(response.Logs)
result.IsProcessed = isProcessed(response.Error)
result.ChangesStateRoot = IsStateRootChanged(response.Error)
result.ExecutionTrace = *trace
result.CallTrace = convertToExecutorTrace(response.CallTrace)
result.Tx = txs[i]
Expand Down Expand Up @@ -158,7 +173,7 @@ func (s *State) convertToProcessTransactionResponse(txs []types.Transaction, res
log.Debugf("ProcessTransactionResponse[GasUsed]: %v", result.GasUsed)
log.Debugf("ProcessTransactionResponse[GasLeft]: %v", result.GasLeft)
log.Debugf("ProcessTransactionResponse[GasRefunded]: %v", result.GasRefunded)
log.Debugf("ProcessTransactionResponse[IsProcessed]: %v", result.IsProcessed)
log.Debugf("ProcessTransactionResponse[ChangesStateRoot]: %v", result.ChangesStateRoot)
}

return results, nil
Expand Down
2 changes: 1 addition & 1 deletion state/pgstatestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (p *PostgresStorage) GetForcedBatch(ctx context.Context, forcedBatchNumber

// GetForcedBatchesSince gets L1 forced batches since forcedBatchNumber
func (p *PostgresStorage) GetForcedBatchesSince(ctx context.Context, forcedBatchNumber, maxBlockNumber uint64, dbTx pgx.Tx) ([]*ForcedBatch, error) {
const getForcedBatchesSQL = "SELECT forced_batch_num, global_exit_root, timestamp, raw_txs_data, coinbase, block_num FROM state.forced_batch WHERE forced_batch_num > $1 AND block_num <= $2"
const getForcedBatchesSQL = "SELECT forced_batch_num, global_exit_root, timestamp, raw_txs_data, coinbase, block_num FROM state.forced_batch WHERE forced_batch_num > $1 AND block_num <= $2 ORDER BY forced_batch_num ASC"
q := p.getExecQuerier(dbTx)
rows, err := q.Query(ctx, getForcedBatchesSQL, forcedBatchNumber, maxBlockNumber)
if errors.Is(err, pgx.ErrNoRows) {
Expand Down
Loading

0 comments on commit 32c81ee

Please sign in to comment.