Skip to content

Commit

Permalink
synchronized refactor: move l1_parallel to a folder (0xPolygonHermez#…
Browse files Browse the repository at this point in the history
…2836)

* moved l1_parallel files to l1_parallel_sync folder
  • Loading branch information
joanestebanr authored Nov 27, 2023
1 parent f755f8f commit ad201b9
Show file tree
Hide file tree
Showing 38 changed files with 576 additions and 312 deletions.
11 changes: 6 additions & 5 deletions synchronizer/ext_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/0xPolygonHermez/zkevm-node/synchronizer/l1_parallel_sync"
)

const (
Expand All @@ -28,11 +29,11 @@ const (
// echo "l1_producer_stop" >> /tmp/synchronizer_in
// echo "l1_orchestrator_reset|8577060" >> /tmp/synchronizer_in
type externalControl struct {
producer *l1RollupInfoProducer
orquestrator *l1SyncOrchestration
producer *l1_parallel_sync.L1RollupInfoProducer
orquestrator *l1_parallel_sync.L1SyncOrchestration
}

func newExternalControl(producer *l1RollupInfoProducer, orquestrator *l1SyncOrchestration) *externalControl {
func newExternalControl(producer *l1_parallel_sync.L1RollupInfoProducer, orquestrator *l1_parallel_sync.L1SyncOrchestration) *externalControl {
return &externalControl{producer: producer, orquestrator: orquestrator}
}

Expand Down Expand Up @@ -102,7 +103,7 @@ func (e *externalControl) cmdL1OrchestratorReset(args []string) {
return
}
log.Infof("EXT:cmdL1OrchestratorReset: calling orchestrator reset(%d)", blockNumber)
e.orquestrator.reset(blockNumber)
e.orquestrator.Reset(blockNumber)
log.Infof("EXT:cmdL1OrchestratorReset: calling orchestrator reset(%d) returned", blockNumber)
}

Expand All @@ -113,7 +114,7 @@ func (e *externalControl) cmdL1OrchestratorAbort(args []string) {
return
}
log.Infof("EXT:cmdL1OrchestratorAbort: calling orquestrator stop")
e.orquestrator.abort()
e.orquestrator.Abort()
log.Infof("EXT:cmdL1OrchestratorAbort: calling orquestrator stop returned")
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package synchronizer
package l1_parallel_sync

import (
"errors"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package synchronizer
package l1_parallel_sync

import (
"time"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package synchronizer
package l1_parallel_sync

import (
"testing"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package synchronizer
package l1_parallel_sync

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@
// Constructors:
// - newL1PackageDataControl: create a l1PackageData with only control information
// - newL1PackageData: create a l1PackageData with data and control information
package synchronizer
package l1_parallel_sync

import (
"fmt"

"github.com/0xPolygonHermez/zkevm-node/log"
)

// l1SyncMessage : struct to hold L1 rollup info data package
// L1SyncMessage : struct to hold L1 rollup info data package
// It could contain data or control information, or both.
// A control package is used to send actions to consumer or to notify that producer is fully synced.
type l1SyncMessage struct {
type L1SyncMessage struct {
// dataIsValid : true if data field is valid
dataIsValid bool
// data: is the rollup info data
Expand All @@ -43,8 +43,8 @@ const (
eventProducerIsFullySynced eventEnum = 2
)

func newL1SyncMessageControl(event eventEnum) *l1SyncMessage {
return &l1SyncMessage{
func newL1SyncMessageControl(event eventEnum) *L1SyncMessage {
return &L1SyncMessage{
dataIsValid: false,
ctrlIsValid: true,
ctrl: l1ConsumerControl{
Expand All @@ -53,11 +53,11 @@ func newL1SyncMessageControl(event eventEnum) *l1SyncMessage {
}
}

func newL1SyncMessageData(result *rollupInfoByBlockRangeResult) *l1SyncMessage {
func newL1SyncMessageData(result *rollupInfoByBlockRangeResult) *L1SyncMessage {
if result == nil {
log.Fatal("newL1PackageDataFromResult: result is nil, the idea of this func is create packages with data")
}
return &l1SyncMessage{
return &L1SyncMessage{
dataIsValid: true,
data: *result,
ctrlIsValid: false,
Expand All @@ -81,7 +81,7 @@ func (l *l1ConsumerControl) String() string {
return fmt.Sprintf("action:%s", l.event.String())
}

func (l *l1SyncMessage) toStringBrief() string {
func (l *L1SyncMessage) toStringBrief() string {
res := ""
if l.dataIsValid {
res += fmt.Sprintf("data:%v ", l.data.toStringBrief())
Expand Down
21 changes: 21 additions & 0 deletions synchronizer/l1_parallel_sync/l1_etherman_interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package l1_parallel_sync

import (
"context"
"math/big"

"github.com/0xPolygonHermez/zkevm-node/etherman"
"github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types"
)

// L1ParallelEthermanInterface is an interface for the etherman package
type L1ParallelEthermanInterface interface {
HeaderByNumber(ctx context.Context, number *big.Int) (*ethTypes.Header, error)
GetRollupInfoByBlockRange(ctx context.Context, fromBlock uint64, toBlock *uint64) ([]etherman.Block, map[common.Hash][]etherman.Order, error)
EthBlockByNumber(ctx context.Context, blockNumber uint64) (*ethTypes.Block, error)
GetLatestBatchNumber() (uint64, error)
GetTrustedSequencerURL() (string, error)
VerifyGenBlockNumber(ctx context.Context, genBlockNumber uint64) (bool, error)
GetLatestVerifiedBatchNum() (uint64, error)
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
// Impelements

package synchronizer
package l1_parallel_sync

import (
"fmt"
Expand All @@ -14,7 +12,7 @@ type filterToSendOrdererResultsToConsumer struct {
mutex sync.Mutex
lastBlockOnSynchronizer uint64
// pendingResults is a queue of results that are waiting to be sent to the consumer
pendingResults []l1SyncMessage
pendingResults []L1SyncMessage
}

func newFilterToSendOrdererResultsToConsumer(lastBlockOnSynchronizer uint64) *filterToSendOrdererResultsToConsumer {
Expand All @@ -35,20 +33,20 @@ func (s *filterToSendOrdererResultsToConsumer) Reset(lastBlockOnSynchronizer uin
s.mutex.Lock()
defer s.mutex.Unlock()
s.lastBlockOnSynchronizer = lastBlockOnSynchronizer
s.pendingResults = []l1SyncMessage{}
s.pendingResults = []L1SyncMessage{}
}

func (s *filterToSendOrdererResultsToConsumer) Filter(data l1SyncMessage) []l1SyncMessage {
func (s *filterToSendOrdererResultsToConsumer) Filter(data L1SyncMessage) []L1SyncMessage {
s.mutex.Lock()
defer s.mutex.Unlock()
s.checkValidDataUnsafe(&data)
s.addPendingResultUnsafe(&data)
res := []l1SyncMessage{}
res := []L1SyncMessage{}
res = s.sendResultIfPossibleUnsafe(res)
return res
}

func (s *filterToSendOrdererResultsToConsumer) checkValidDataUnsafe(result *l1SyncMessage) {
func (s *filterToSendOrdererResultsToConsumer) checkValidDataUnsafe(result *L1SyncMessage) {
if result.dataIsValid {
if result.data.blockRange.fromBlock < s.lastBlockOnSynchronizer {
log.Warnf("It's not possible to receive a old block [%s] range that have been already send to synchronizer. Ignoring it. status:[%s]",
Expand All @@ -64,7 +62,7 @@ func (s *filterToSendOrdererResultsToConsumer) checkValidDataUnsafe(result *l1Sy
}

// sendResultIfPossibleUnsafe returns true is have send any result
func (s *filterToSendOrdererResultsToConsumer) sendResultIfPossibleUnsafe(previous []l1SyncMessage) []l1SyncMessage {
func (s *filterToSendOrdererResultsToConsumer) sendResultIfPossibleUnsafe(previous []L1SyncMessage) []L1SyncMessage {
resultListPackages := previous
indexToRemove := []int{}
send := false
Expand Down Expand Up @@ -100,7 +98,7 @@ func (s *filterToSendOrdererResultsToConsumer) sendResultIfPossibleUnsafe(previo
}

func (s *filterToSendOrdererResultsToConsumer) removeIndexFromPendingResultsUnsafe(indexToRemove []int) {
newPendingResults := []l1SyncMessage{}
newPendingResults := []L1SyncMessage{}
for j := range s.pendingResults {
if slices.Contains(indexToRemove, j) {
continue
Expand All @@ -122,6 +120,6 @@ func (s *filterToSendOrdererResultsToConsumer) matchNextBlockUnsafe(results *rol
return results.blockRange.fromBlock == s.lastBlockOnSynchronizer+1
}

func (s *filterToSendOrdererResultsToConsumer) addPendingResultUnsafe(results *l1SyncMessage) {
func (s *filterToSendOrdererResultsToConsumer) addPendingResultUnsafe(results *L1SyncMessage) {
s.pendingResults = append(s.pendingResults, *results)
}
Loading

0 comments on commit ad201b9

Please sign in to comment.