Skip to content

Commit

Permalink
etrog: fix permissionless errors (0xPolygonHermez#3140)
Browse files Browse the repository at this point in the history
* if got an error getting lastBlock produce a SIGSEGV

* reduce info logs

* fix unittest
  • Loading branch information
joanestebanr authored Jan 25, 2024
1 parent 19d493e commit 8124e52
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 13 deletions.
34 changes: 21 additions & 13 deletions synchronizer/l1_parallel_sync/l1_rollup_info_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"sync/atomic"
"time"

"github.com/0xPolygonHermez/zkevm-node"
"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/0xPolygonHermez/zkevm-node/synchronizer/common"
)
Expand Down Expand Up @@ -90,6 +89,7 @@ type workersInterface interface {
requestLastBlockWithRetries(ctx context.Context, timeout time.Duration, maxPermittedRetries int) responseL1LastBlock
getResponseChannelForRollupInfo() chan responseRollupInfoByBlockRange
String() string
ToStringBrief() string
howManyRunningWorkers() int
}

Expand Down Expand Up @@ -229,7 +229,7 @@ func (l *L1RollupInfoProducer) Reset(startingBlockNumber uint64) {
}

func (l *L1RollupInfoProducer) resetUnsafe(startingBlockNumber uint64) {
log.Infof("producer: Reset L1 sync process to blockNumber %d st=%s", startingBlockNumber, l.toStringBrief())
log.Debugf("producer: Reset L1 sync process to blockNumber %d st=%s", startingBlockNumber, l.toStringBrief())
l.setStatusReseting()
log.Debugf("producer: Reset(%d): stop previous run (state=%s)", startingBlockNumber, l.getStatus().String())
log.Debugf("producer: Reset(%d): syncStatus.reset", startingBlockNumber)
Expand All @@ -243,7 +243,7 @@ func (l *L1RollupInfoProducer) resetUnsafe(startingBlockNumber uint64) {
log.Debugf("producer: Reset(%d): reset Filter", startingBlockNumber)
l.filterToSendOrdererResultsToConsumer.Reset(startingBlockNumber)
l.setStatus(producerIdle)
log.Infof("producer: Reset(%d): reset done!", startingBlockNumber)
log.Infof("producer: Reset(%d): reset producer done!", startingBlockNumber)
}

func (l *L1RollupInfoProducer) isProducerRunning() bool {
Expand Down Expand Up @@ -351,13 +351,13 @@ func (l *L1RollupInfoProducer) step(waitDuration *time.Duration) bool {
if atomic.CompareAndSwapInt32((*int32)(&l.status), int32(producerNoRunning), int32(producerIdle)) { // l.getStatus() == producerNoRunning
log.Info("producer: step: status is no running, changing to idle %s", l.getStatus().String())
}
log.Infof("producer: build_time:%s step: status:%s", zkevm.BuildDate, l.toStringBrief())
log.Debugf("producer: step: status:%s", l.toStringBrief())
select {
case <-l.ctxWithCancel.Done():
log.Debugf("producer: context canceled")
return false
case cmd := <-l.channelCmds:
log.Infof("producer: received a command")
log.Debugf("producer: received a command")
res := l.executeCmd(cmd)
if !res {
log.Info("producer: cmd %s stop the process", cmd.cmd.String())
Expand Down Expand Up @@ -438,7 +438,7 @@ func (l *L1RollupInfoProducer) step(waitDuration *time.Duration) bool {
func (l *L1RollupInfoProducer) executeCmd(cmd producerCmd) bool {
switch cmd.cmd {
case producerStop:
log.Infof("producer: received a stop, so it stops processing")
log.Infof("producer: received a stop, so it stops requesting new rollup info and stop current requests")
l.stopUnsafe()
return false
case producerReset:
Expand Down Expand Up @@ -534,7 +534,7 @@ func (l *L1RollupInfoProducer) launchWork() (int, error) {
blockRangeMsg := br.String() + unsafeAreaMsg
_, err := l.workers.asyncRequestRollupInfoByBlockRange(l.ctxWithCancel.ctx, request)
if err != nil {
if errors.Is(err, errAllWorkersBusy) {
if !errors.Is(err, errAllWorkersBusy) {
accDebugStr += fmt.Sprintf(" segment %s -> [Error:%s] ", blockRangeMsg, err.Error())
}
break
Expand All @@ -545,7 +545,10 @@ func (l *L1RollupInfoProducer) launchWork() (int, error) {
log.Debugf("producer: launch_worker: Launched worker for segment %s, num_workers_in_this_iteration: %d", blockRangeMsg, launchedWorker)
l.syncStatus.OnStartedNewWorker(*br)
}
log.Infof("producer: launch_worker: num of launched workers: %d result: %s status_comm:%s", launchedWorker, accDebugStr, l.outgoingPackageStatusDebugString())
if launchedWorker > 0 {
log.Infof("producer: launch_worker: num of launched workers: %d (%s) result: %s ", launchedWorker, l.workers.ToStringBrief(), accDebugStr)
}
log.Debugf("producer: launch_worker: num of launched workers: %d result: %s status_comm:%s", launchedWorker, accDebugStr, l.outgoingPackageStatusDebugString())

return launchedWorker, nil
}
Expand All @@ -559,13 +562,13 @@ func (l *L1RollupInfoProducer) renewLastBlockOnL1IfNeeded(reason string) {
ttl := l.ttlOfLastBlockOnL1()
oldBlock := l.syncStatus.GetLastBlockOnL1()
if elapsed > ttl {
log.Infof("producer: Need a new value for Last Block On L1, doing the request reason:%s", reason)
log.Debugf("producer: Need a new value for Last Block On L1, doing the request reason:%s", reason)
result := l.workers.requestLastBlockWithRetries(l.ctxWithCancel.ctx, l.cfg.TimeoutForRequestLastBlockOnL1, l.cfg.NumOfAllowedRetriesForRequestLastBlockOnL1)
log.Infof("producer: Need a new value for Last Block On L1, doing the request old_block:%v -> new block:%v", oldBlock, result.result.block)
if result.generic.err != nil {
log.Error(result.generic.err)
return
}
log.Infof("producer: Need a new value for Last Block On L1, doing the request old_block:%v -> new block:%v", oldBlock, result.result.block)

l.onNewLastBlock(result.result.block)
}
}
Expand All @@ -588,7 +591,12 @@ func (l *L1RollupInfoProducer) onResponseRollupInfo(result responseRollupInfoByB
}
if isOk {
outgoingPackages := l.filterToSendOrdererResultsToConsumer.Filter(*newL1SyncMessageData(result.result))
log.Infof("producer: filtered Br[%s/%d], outgoing %d filter_status:%s", result.result.blockRange.String(), result.result.getHighestBlockNumberInResponse(), len(outgoingPackages), l.filterToSendOrdererResultsToConsumer.ToStringBrief())
log.Debugf("producer: filtered Br[%s/%d], outgoing %d filter_status:%s", result.result.blockRange.String(), result.result.getHighestBlockNumberInResponse(), len(outgoingPackages), l.filterToSendOrdererResultsToConsumer.ToStringBrief())
if len(outgoingPackages) > 0 {
for idx, msg := range outgoingPackages {
log.Infof("producer: sendind data to consumer: [%d/%d] -> range:[%s] Sending results [data] to consumer:%s ", idx, len(outgoingPackages), result.result.blockRange.String(), msg.toStringBrief())
}
}
l.sendPackages(outgoingPackages)
} else {
if errors.Is(result.generic.err, context.Canceled) {
Expand All @@ -601,7 +609,7 @@ func (l *L1RollupInfoProducer) onResponseRollupInfo(result responseRollupInfoByB

func (l *L1RollupInfoProducer) sendPackages(outgoingPackages []L1SyncMessage) {
for _, pkg := range outgoingPackages {
log.Infof("producer: Sending results [data] to consumer:%s: status_comm:%s", pkg.toStringBrief(), l.outgoingPackageStatusDebugString())
log.Debugf("producer: Sending results [data] to consumer:%s: status_comm:%s", pkg.toStringBrief(), l.outgoingPackageStatusDebugString())
l.outgoingChannel <- pkg
}
}
Expand Down
4 changes: 4 additions & 0 deletions synchronizer/l1_parallel_sync/l1_workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func (w *workers) String() string {
return result
}

func (w *workers) ToStringBrief() string {
return fmt.Sprintf(" working: %d of %d ", w.howManyRunningWorkers(), len(w.workers))
}

func newWorkers(ethermans []L1ParallelEthermanInterface, cfg workersConfig) *workers {
result := workers{chIncommingRollupInfo: make(chan responseRollupInfoByBlockRange, len(ethermans)+1),
cfg: cfg}
Expand Down
45 changes: 45 additions & 0 deletions synchronizer/l1_parallel_sync/mock_workers_interface.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 8124e52

Please sign in to comment.