diff --git a/synchronizer/ext_control.go b/synchronizer/ext_control.go index 3cee92158c..0ca2840dca 100644 --- a/synchronizer/ext_control.go +++ b/synchronizer/ext_control.go @@ -9,6 +9,7 @@ import ( "time" "github.com/0xPolygonHermez/zkevm-node/log" + "github.com/0xPolygonHermez/zkevm-node/synchronizer/l1_parallel_sync" ) const ( @@ -28,11 +29,11 @@ const ( // echo "l1_producer_stop" >> /tmp/synchronizer_in // echo "l1_orchestrator_reset|8577060" >> /tmp/synchronizer_in type externalControl struct { - producer *l1RollupInfoProducer - orquestrator *l1SyncOrchestration + producer *l1_parallel_sync.L1RollupInfoProducer + orquestrator *l1_parallel_sync.L1SyncOrchestration } -func newExternalControl(producer *l1RollupInfoProducer, orquestrator *l1SyncOrchestration) *externalControl { +func newExternalControl(producer *l1_parallel_sync.L1RollupInfoProducer, orquestrator *l1_parallel_sync.L1SyncOrchestration) *externalControl { return &externalControl{producer: producer, orquestrator: orquestrator} } @@ -102,7 +103,7 @@ func (e *externalControl) cmdL1OrchestratorReset(args []string) { return } log.Infof("EXT:cmdL1OrchestratorReset: calling orchestrator reset(%d)", blockNumber) - e.orquestrator.reset(blockNumber) + e.orquestrator.Reset(blockNumber) log.Infof("EXT:cmdL1OrchestratorReset: calling orchestrator reset(%d) returned", blockNumber) } @@ -113,7 +114,7 @@ func (e *externalControl) cmdL1OrchestratorAbort(args []string) { return } log.Infof("EXT:cmdL1OrchestratorAbort: calling orquestrator stop") - e.orquestrator.abort() + e.orquestrator.Abort() log.Infof("EXT:cmdL1OrchestratorAbort: calling orquestrator stop returned") } diff --git a/synchronizer/block_range.go b/synchronizer/l1_parallel_sync/block_range.go similarity index 98% rename from synchronizer/block_range.go rename to synchronizer/l1_parallel_sync/block_range.go index 7aa7e9afdc..802ebf40c6 100644 --- a/synchronizer/block_range.go +++ b/synchronizer/l1_parallel_sync/block_range.go @@ -1,4 +1,4 @@ -package synchronizer +package l1_parallel_sync import ( "errors" diff --git a/synchronizer/generic_cache.go b/synchronizer/l1_parallel_sync/generic_cache.go similarity index 99% rename from synchronizer/generic_cache.go rename to synchronizer/l1_parallel_sync/generic_cache.go index 8696955a76..a039f5e17e 100644 --- a/synchronizer/generic_cache.go +++ b/synchronizer/l1_parallel_sync/generic_cache.go @@ -1,4 +1,4 @@ -package synchronizer +package l1_parallel_sync import ( "time" diff --git a/synchronizer/generic_cache_test.go b/synchronizer/l1_parallel_sync/generic_cache_test.go similarity index 99% rename from synchronizer/generic_cache_test.go rename to synchronizer/l1_parallel_sync/generic_cache_test.go index b18dc81226..eaf48b28e8 100644 --- a/synchronizer/generic_cache_test.go +++ b/synchronizer/l1_parallel_sync/generic_cache_test.go @@ -1,4 +1,4 @@ -package synchronizer +package l1_parallel_sync import ( "testing" diff --git a/synchronizer/l1_common.go b/synchronizer/l1_parallel_sync/l1_common.go similarity index 98% rename from synchronizer/l1_common.go rename to synchronizer/l1_parallel_sync/l1_common.go index 69dc968989..4db2ea4455 100644 --- a/synchronizer/l1_common.go +++ b/synchronizer/l1_parallel_sync/l1_common.go @@ -1,4 +1,4 @@ -package synchronizer +package l1_parallel_sync import ( "context" diff --git a/synchronizer/l1_data_message.go b/synchronizer/l1_parallel_sync/l1_data_message.go similarity index 88% rename from synchronizer/l1_data_message.go rename to synchronizer/l1_parallel_sync/l1_data_message.go index 575f9fb833..54f93512ab 100644 --- a/synchronizer/l1_data_message.go +++ b/synchronizer/l1_parallel_sync/l1_data_message.go @@ -9,7 +9,7 @@ // Constructors: // - newL1PackageDataControl: create a l1PackageData with only control information // - newL1PackageData: create a l1PackageData with data and control information -package synchronizer +package l1_parallel_sync import ( "fmt" @@ -17,10 +17,10 @@ import ( "github.com/0xPolygonHermez/zkevm-node/log" ) -// l1SyncMessage : struct to hold L1 rollup info data package +// L1SyncMessage : struct to hold L1 rollup info data package // It could contain data or control information, or both. // A control package is used to send actions to consumer or to notify that producer is fully synced. -type l1SyncMessage struct { +type L1SyncMessage struct { // dataIsValid : true if data field is valid dataIsValid bool // data: is the rollup info data @@ -43,8 +43,8 @@ const ( eventProducerIsFullySynced eventEnum = 2 ) -func newL1SyncMessageControl(event eventEnum) *l1SyncMessage { - return &l1SyncMessage{ +func newL1SyncMessageControl(event eventEnum) *L1SyncMessage { + return &L1SyncMessage{ dataIsValid: false, ctrlIsValid: true, ctrl: l1ConsumerControl{ @@ -53,11 +53,11 @@ func newL1SyncMessageControl(event eventEnum) *l1SyncMessage { } } -func newL1SyncMessageData(result *rollupInfoByBlockRangeResult) *l1SyncMessage { +func newL1SyncMessageData(result *rollupInfoByBlockRangeResult) *L1SyncMessage { if result == nil { log.Fatal("newL1PackageDataFromResult: result is nil, the idea of this func is create packages with data") } - return &l1SyncMessage{ + return &L1SyncMessage{ dataIsValid: true, data: *result, ctrlIsValid: false, @@ -81,7 +81,7 @@ func (l *l1ConsumerControl) String() string { return fmt.Sprintf("action:%s", l.event.String()) } -func (l *l1SyncMessage) toStringBrief() string { +func (l *L1SyncMessage) toStringBrief() string { res := "" if l.dataIsValid { res += fmt.Sprintf("data:%v ", l.data.toStringBrief()) diff --git a/synchronizer/l1_parallel_sync/l1_etherman_interface.go b/synchronizer/l1_parallel_sync/l1_etherman_interface.go new file mode 100644 index 0000000000..61fb8fa8f7 --- /dev/null +++ b/synchronizer/l1_parallel_sync/l1_etherman_interface.go @@ -0,0 +1,21 @@ +package l1_parallel_sync + +import ( + "context" + "math/big" + + "github.com/0xPolygonHermez/zkevm-node/etherman" + "github.com/ethereum/go-ethereum/common" + ethTypes "github.com/ethereum/go-ethereum/core/types" +) + +// L1ParallelEthermanInterface is an interface for the etherman package +type L1ParallelEthermanInterface interface { + HeaderByNumber(ctx context.Context, number *big.Int) (*ethTypes.Header, error) + GetRollupInfoByBlockRange(ctx context.Context, fromBlock uint64, toBlock *uint64) ([]etherman.Block, map[common.Hash][]etherman.Order, error) + EthBlockByNumber(ctx context.Context, blockNumber uint64) (*ethTypes.Block, error) + GetLatestBatchNumber() (uint64, error) + GetTrustedSequencerURL() (string, error) + VerifyGenBlockNumber(ctx context.Context, genBlockNumber uint64) (bool, error) + GetLatestVerifiedBatchNum() (uint64, error) +} diff --git a/synchronizer/l1_filter_send_orderer_results_to_synchronizer.go b/synchronizer/l1_parallel_sync/l1_filter_send_orderer_results_to_synchronizer.go similarity index 91% rename from synchronizer/l1_filter_send_orderer_results_to_synchronizer.go rename to synchronizer/l1_parallel_sync/l1_filter_send_orderer_results_to_synchronizer.go index d504775896..e5dc3855fe 100644 --- a/synchronizer/l1_filter_send_orderer_results_to_synchronizer.go +++ b/synchronizer/l1_parallel_sync/l1_filter_send_orderer_results_to_synchronizer.go @@ -1,6 +1,4 @@ -// Impelements - -package synchronizer +package l1_parallel_sync import ( "fmt" @@ -14,7 +12,7 @@ type filterToSendOrdererResultsToConsumer struct { mutex sync.Mutex lastBlockOnSynchronizer uint64 // pendingResults is a queue of results that are waiting to be sent to the consumer - pendingResults []l1SyncMessage + pendingResults []L1SyncMessage } func newFilterToSendOrdererResultsToConsumer(lastBlockOnSynchronizer uint64) *filterToSendOrdererResultsToConsumer { @@ -35,20 +33,20 @@ func (s *filterToSendOrdererResultsToConsumer) Reset(lastBlockOnSynchronizer uin s.mutex.Lock() defer s.mutex.Unlock() s.lastBlockOnSynchronizer = lastBlockOnSynchronizer - s.pendingResults = []l1SyncMessage{} + s.pendingResults = []L1SyncMessage{} } -func (s *filterToSendOrdererResultsToConsumer) Filter(data l1SyncMessage) []l1SyncMessage { +func (s *filterToSendOrdererResultsToConsumer) Filter(data L1SyncMessage) []L1SyncMessage { s.mutex.Lock() defer s.mutex.Unlock() s.checkValidDataUnsafe(&data) s.addPendingResultUnsafe(&data) - res := []l1SyncMessage{} + res := []L1SyncMessage{} res = s.sendResultIfPossibleUnsafe(res) return res } -func (s *filterToSendOrdererResultsToConsumer) checkValidDataUnsafe(result *l1SyncMessage) { +func (s *filterToSendOrdererResultsToConsumer) checkValidDataUnsafe(result *L1SyncMessage) { if result.dataIsValid { if result.data.blockRange.fromBlock < s.lastBlockOnSynchronizer { log.Warnf("It's not possible to receive a old block [%s] range that have been already send to synchronizer. Ignoring it. status:[%s]", @@ -64,7 +62,7 @@ func (s *filterToSendOrdererResultsToConsumer) checkValidDataUnsafe(result *l1Sy } // sendResultIfPossibleUnsafe returns true is have send any result -func (s *filterToSendOrdererResultsToConsumer) sendResultIfPossibleUnsafe(previous []l1SyncMessage) []l1SyncMessage { +func (s *filterToSendOrdererResultsToConsumer) sendResultIfPossibleUnsafe(previous []L1SyncMessage) []L1SyncMessage { resultListPackages := previous indexToRemove := []int{} send := false @@ -100,7 +98,7 @@ func (s *filterToSendOrdererResultsToConsumer) sendResultIfPossibleUnsafe(previo } func (s *filterToSendOrdererResultsToConsumer) removeIndexFromPendingResultsUnsafe(indexToRemove []int) { - newPendingResults := []l1SyncMessage{} + newPendingResults := []L1SyncMessage{} for j := range s.pendingResults { if slices.Contains(indexToRemove, j) { continue @@ -122,6 +120,6 @@ func (s *filterToSendOrdererResultsToConsumer) matchNextBlockUnsafe(results *rol return results.blockRange.fromBlock == s.lastBlockOnSynchronizer+1 } -func (s *filterToSendOrdererResultsToConsumer) addPendingResultUnsafe(results *l1SyncMessage) { +func (s *filterToSendOrdererResultsToConsumer) addPendingResultUnsafe(results *L1SyncMessage) { s.pendingResults = append(s.pendingResults, *results) } diff --git a/synchronizer/l1_filter_send_orderer_results_to_synchronizer_test.go b/synchronizer/l1_parallel_sync/l1_filter_send_orderer_results_to_synchronizer_test.go similarity index 83% rename from synchronizer/l1_filter_send_orderer_results_to_synchronizer_test.go rename to synchronizer/l1_parallel_sync/l1_filter_send_orderer_results_to_synchronizer_test.go index 86a43db86e..53f542d056 100644 --- a/synchronizer/l1_filter_send_orderer_results_to_synchronizer_test.go +++ b/synchronizer/l1_parallel_sync/l1_filter_send_orderer_results_to_synchronizer_test.go @@ -1,4 +1,4 @@ -package synchronizer +package l1_parallel_sync import ( "math/big" @@ -13,8 +13,8 @@ func TestSORMulticaseWithReset(t *testing.T) { tcs := []struct { description string lastBlock uint64 - packages []l1SyncMessage - expected []l1SyncMessage + packages []L1SyncMessage + expected []L1SyncMessage expectedlastBlockOnSynchronizer uint64 resetOnPackageNumber int resetToBlock uint64 @@ -22,11 +22,11 @@ func TestSORMulticaseWithReset(t *testing.T) { { description: "inverse_br", lastBlock: 100, - packages: []l1SyncMessage{ + packages: []L1SyncMessage{ *newDataPackage(131, 141), *newDataPackage(120, 130), *newDataPackage(101, 119)}, - expected: []l1SyncMessage{ + expected: []L1SyncMessage{ *newDataPackage(101, 119), *newDataPackage(120, 130), }, @@ -37,12 +37,12 @@ func TestSORMulticaseWithReset(t *testing.T) { { description: "crtl_linked_to_br", lastBlock: 100, - packages: []l1SyncMessage{ + packages: []L1SyncMessage{ *newDataPackage(131, 141), *newActionPackage(eventNone), *newDataPackage(120, 130), *newDataPackage(101, 119)}, - expected: []l1SyncMessage{ + expected: []L1SyncMessage{ *newActionPackage(eventNone), *newDataPackage(101, 119), *newDataPackage(120, 130), @@ -55,7 +55,7 @@ func TestSORMulticaseWithReset(t *testing.T) { for _, tc := range tcs { t.Run(tc.description, func(t *testing.T) { sut := newFilterToSendOrdererResultsToConsumer(tc.lastBlock) - sendData := []l1SyncMessage{} + sendData := []L1SyncMessage{} for i, p := range tc.packages { if i == tc.resetOnPackageNumber { sut.Reset(tc.resetToBlock) @@ -74,46 +74,46 @@ func TestSORMulticase(t *testing.T) { tcs := []struct { description string lastBlock uint64 - packages []l1SyncMessage - expected []l1SyncMessage + packages []L1SyncMessage + expected []L1SyncMessage excpectedLastBlockOnSynchronizer uint64 }{ { description: "empty_case", lastBlock: 100, - packages: []l1SyncMessage{}, - expected: []l1SyncMessage{}, + packages: []L1SyncMessage{}, + expected: []L1SyncMessage{}, excpectedLastBlockOnSynchronizer: 100, }, { description: "just_ctrl", lastBlock: 100, - packages: []l1SyncMessage{*newActionPackage(eventNone)}, - expected: []l1SyncMessage{*newActionPackage(eventNone)}, + packages: []L1SyncMessage{*newActionPackage(eventNone)}, + expected: []L1SyncMessage{*newActionPackage(eventNone)}, excpectedLastBlockOnSynchronizer: 100, }, { description: "just_br", lastBlock: 100, - packages: []l1SyncMessage{*newDataPackage(101, 119)}, - expected: []l1SyncMessage{*newDataPackage(101, 119)}, + packages: []L1SyncMessage{*newDataPackage(101, 119)}, + expected: []L1SyncMessage{*newDataPackage(101, 119)}, excpectedLastBlockOnSynchronizer: 119, }, { description: "just_br_missing_intermediate_block", lastBlock: 100, - packages: []l1SyncMessage{*newDataPackage(102, 119)}, - expected: []l1SyncMessage{}, + packages: []L1SyncMessage{*newDataPackage(102, 119)}, + expected: []L1SyncMessage{}, excpectedLastBlockOnSynchronizer: 100, }, { description: "inverse_br", lastBlock: 100, - packages: []l1SyncMessage{ + packages: []L1SyncMessage{ *newDataPackage(131, 141), *newDataPackage(120, 130), *newDataPackage(101, 119)}, - expected: []l1SyncMessage{ + expected: []L1SyncMessage{ *newDataPackage(101, 119), *newDataPackage(120, 130), *newDataPackage(131, 141), @@ -123,12 +123,12 @@ func TestSORMulticase(t *testing.T) { { description: "crtl_linked_to_br", lastBlock: 100, - packages: []l1SyncMessage{ + packages: []L1SyncMessage{ *newDataPackage(131, 141), *newActionPackage(eventNone), *newDataPackage(120, 130), *newDataPackage(101, 119)}, - expected: []l1SyncMessage{ + expected: []L1SyncMessage{ *newDataPackage(101, 119), *newDataPackage(120, 130), *newDataPackage(131, 141), @@ -139,13 +139,13 @@ func TestSORMulticase(t *testing.T) { { description: "crtl_linked_to_last_br", lastBlock: 100, - packages: []l1SyncMessage{ + packages: []L1SyncMessage{ *newDataPackage(111, 120), *newDataPackage(121, 130), *newDataPackage(131, 140), *newActionPackage(eventNone), *newDataPackage(101, 110)}, - expected: []l1SyncMessage{ + expected: []L1SyncMessage{ *newDataPackage(101, 110), *newDataPackage(111, 120), *newDataPackage(121, 130), @@ -157,13 +157,13 @@ func TestSORMulticase(t *testing.T) { { description: "latest with no data doesnt change last block", lastBlock: 100, - packages: []l1SyncMessage{ + packages: []L1SyncMessage{ *newDataPackage(111, 120), *newDataPackage(121, 130), *newDataPackage(131, latestBlockNumber), *newActionPackage(eventNone), *newDataPackage(101, 110)}, - expected: []l1SyncMessage{ + expected: []L1SyncMessage{ *newDataPackage(101, 110), *newDataPackage(111, 120), *newDataPackage(121, 130), @@ -175,7 +175,7 @@ func TestSORMulticase(t *testing.T) { { description: "two latest one empty and one with data change to highest block in rollupinfo", lastBlock: 100, - packages: []l1SyncMessage{ + packages: []L1SyncMessage{ *newDataPackage(111, 120), *newDataPackage(121, 130), *newDataPackage(131, latestBlockNumber), @@ -183,7 +183,7 @@ func TestSORMulticase(t *testing.T) { *newDataPackage(101, 110), *newDataPackageWithData(131, latestBlockNumber, 140), }, - expected: []l1SyncMessage{ + expected: []L1SyncMessage{ *newDataPackage(101, 110), *newDataPackage(111, 120), *newDataPackage(121, 130), @@ -196,7 +196,7 @@ func TestSORMulticase(t *testing.T) { { description: "one latest one normal", lastBlock: 100, - packages: []l1SyncMessage{ + packages: []L1SyncMessage{ *newDataPackage(111, 120), *newDataPackage(121, 130), *newDataPackage(131, latestBlockNumber), @@ -204,7 +204,7 @@ func TestSORMulticase(t *testing.T) { *newActionPackage(eventNone), *newDataPackage(101, 110), }, - expected: []l1SyncMessage{ + expected: []L1SyncMessage{ *newDataPackage(101, 110), *newDataPackage(111, 120), *newDataPackage(121, 130), @@ -217,7 +217,7 @@ func TestSORMulticase(t *testing.T) { { description: "a rollupinfo with data", lastBlock: 100, - packages: []l1SyncMessage{ + packages: []L1SyncMessage{ *newDataPackage(111, 120), *newDataPackageWithData(121, 130, 125), *newDataPackage(131, latestBlockNumber), @@ -227,7 +227,7 @@ func TestSORMulticase(t *testing.T) { *newDataPackage(101, 110), *newDataPackage(131, 140), }, - expected: []l1SyncMessage{ + expected: []L1SyncMessage{ *newDataPackage(101, 110), *newDataPackage(111, 120), *newDataPackageWithData(121, 130, 125), @@ -242,7 +242,7 @@ func TestSORMulticase(t *testing.T) { { description: "two latest empty with control in between", lastBlock: 100, - packages: []l1SyncMessage{ + packages: []L1SyncMessage{ *newDataPackage(111, 120), *newDataPackage(121, 130), *newDataPackage(131, latestBlockNumber), @@ -252,7 +252,7 @@ func TestSORMulticase(t *testing.T) { *newDataPackage(101, 110), *newDataPackage(131, 140), }, - expected: []l1SyncMessage{ + expected: []L1SyncMessage{ *newDataPackage(101, 110), *newDataPackage(111, 120), *newDataPackage(121, 130), @@ -268,7 +268,7 @@ func TestSORMulticase(t *testing.T) { for _, tc := range tcs { t.Run(tc.description, func(t *testing.T) { sut := newFilterToSendOrdererResultsToConsumer(tc.lastBlock) - sendData := []l1SyncMessage{} + sendData := []L1SyncMessage{} for _, p := range tc.packages { dataToSend := sut.Filter(p) sendData = append(sendData, dataToSend...) @@ -280,8 +280,8 @@ func TestSORMulticase(t *testing.T) { } } -func newDataPackage(fromBlock, toBlock uint64) *l1SyncMessage { - res := l1SyncMessage{ +func newDataPackage(fromBlock, toBlock uint64) *L1SyncMessage { + res := L1SyncMessage{ data: rollupInfoByBlockRangeResult{ blockRange: blockRange{ fromBlock: fromBlock, @@ -298,8 +298,8 @@ func newDataPackage(fromBlock, toBlock uint64) *l1SyncMessage { return &res } -func newDataPackageWithData(fromBlock, toBlock uint64, blockWithData uint64) *l1SyncMessage { - res := l1SyncMessage{ +func newDataPackageWithData(fromBlock, toBlock uint64, blockWithData uint64) *L1SyncMessage { + res := L1SyncMessage{ data: rollupInfoByBlockRangeResult{ blockRange: blockRange{ fromBlock: fromBlock, @@ -314,8 +314,8 @@ func newDataPackageWithData(fromBlock, toBlock uint64, blockWithData uint64) *l1 return &res } -func newActionPackage(action eventEnum) *l1SyncMessage { - return &l1SyncMessage{ +func newActionPackage(action eventEnum) *L1SyncMessage { + return &L1SyncMessage{ dataIsValid: false, data: rollupInfoByBlockRangeResult{ blockRange: blockRange{ diff --git a/synchronizer/l1_live_block_ranges.go b/synchronizer/l1_parallel_sync/l1_live_block_ranges.go similarity index 98% rename from synchronizer/l1_live_block_ranges.go rename to synchronizer/l1_parallel_sync/l1_live_block_ranges.go index daf09c143a..6cda39e2c3 100644 --- a/synchronizer/l1_live_block_ranges.go +++ b/synchronizer/l1_parallel_sync/l1_live_block_ranges.go @@ -1,4 +1,4 @@ -package synchronizer +package l1_parallel_sync import ( "errors" diff --git a/synchronizer/l1_live_block_ranges_test.go b/synchronizer/l1_parallel_sync/l1_live_block_ranges_test.go similarity index 98% rename from synchronizer/l1_live_block_ranges_test.go rename to synchronizer/l1_parallel_sync/l1_live_block_ranges_test.go index 0946851151..cd883fa439 100644 --- a/synchronizer/l1_live_block_ranges_test.go +++ b/synchronizer/l1_parallel_sync/l1_live_block_ranges_test.go @@ -1,4 +1,4 @@ -package synchronizer +package l1_parallel_sync import ( "testing" diff --git a/synchronizer/l1_rollup_info_consumer.go b/synchronizer/l1_parallel_sync/l1_rollup_info_consumer.go similarity index 95% rename from synchronizer/l1_rollup_info_consumer.go rename to synchronizer/l1_parallel_sync/l1_rollup_info_consumer.go index e6f6d91ece..8117ca8f10 100644 --- a/synchronizer/l1_rollup_info_consumer.go +++ b/synchronizer/l1_parallel_sync/l1_rollup_info_consumer.go @@ -1,4 +1,4 @@ -package synchronizer +package l1_parallel_sync import ( "context" @@ -25,7 +25,8 @@ var ( errL1Reorg = errors.New("consumer: L1 reorg detected") ) -type configConsumer struct { +// ConfigConsumer configuration for L1 sync parallel consumer +type ConfigConsumer struct { ApplyAfterNumRollupReceived int AceptableInacctivityTime time.Duration } @@ -33,14 +34,14 @@ type configConsumer struct { // synchronizerProcessBlockRangeInterface is the interface with synchronizer // to execute blocks. This interface is used to mock the synchronizer in the tests type synchronizerProcessBlockRangeInterface interface { - processBlockRange(blocks []etherman.Block, order map[common.Hash][]etherman.Order) error + ProcessBlockRange(blocks []etherman.Block, order map[common.Hash][]etherman.Order) error } // l1RollupInfoConsumer is the object that process the rollup info data incomming from channel chIncommingRollupInfo type l1RollupInfoConsumer struct { mutex sync.Mutex synchronizer synchronizerProcessBlockRangeInterface - chIncommingRollupInfo chan l1SyncMessage + chIncommingRollupInfo chan L1SyncMessage ctx context.Context statistics l1RollupInfoConsumerStatistics lastEthBlockSynced *state.Block // Have been written in DB @@ -48,8 +49,9 @@ type l1RollupInfoConsumer struct { highestBlockProcessed uint64 } -func newL1RollupInfoConsumer(cfg configConsumer, - synchronizer synchronizerProcessBlockRangeInterface, ch chan l1SyncMessage) *l1RollupInfoConsumer { +// NewL1RollupInfoConsumer creates a new l1RollupInfoConsumer +func NewL1RollupInfoConsumer(cfg ConfigConsumer, + synchronizer synchronizerProcessBlockRangeInterface, ch chan L1SyncMessage) *l1RollupInfoConsumer { if cfg.AceptableInacctivityTime < minAcceptableTimeWaitingForNewRollupInfoData { log.Warnf("consumer: the AceptableInacctivityTime is too low (%s) minimum recommended %s", cfg.AceptableInacctivityTime, minAcceptableTimeWaitingForNewRollupInfoData) } @@ -229,7 +231,7 @@ func (l *l1RollupInfoConsumer) processUnsafe(rollupInfo rollupInfoByBlockRangeRe return nil, nil } b := convertL1BlockToEthBlock(lb) - err := l.synchronizer.processBlockRange([]etherman.Block{b}, order) + err := l.synchronizer.ProcessBlockRange([]etherman.Block{b}, order) if err != nil { log.Error("consumer: Error processing last block of range: ", rollupInfo.blockRange, " err:", err) return nil, err @@ -241,7 +243,7 @@ func (l *l1RollupInfoConsumer) processUnsafe(rollupInfo rollupInfoByBlockRangeRe tmpStateBlock := convertEthmanBlockToStateBlock(&blocks[len(blocks)-1]) lastEthBlockSynced = &tmpStateBlock logBlocks(blocks) - err := l.synchronizer.processBlockRange(blocks, order) + err := l.synchronizer.ProcessBlockRange(blocks, order) if err != nil { log.Info("consumer: Error processing block range: ", rollupInfo.blockRange, " err:", err) return nil, err diff --git a/synchronizer/l1_rollup_info_consumer_statistics.go b/synchronizer/l1_parallel_sync/l1_rollup_info_consumer_statistics.go similarity index 97% rename from synchronizer/l1_rollup_info_consumer_statistics.go rename to synchronizer/l1_parallel_sync/l1_rollup_info_consumer_statistics.go index f987c2e4a4..c4c70c573e 100644 --- a/synchronizer/l1_rollup_info_consumer_statistics.go +++ b/synchronizer/l1_parallel_sync/l1_rollup_info_consumer_statistics.go @@ -1,4 +1,4 @@ -package synchronizer +package l1_parallel_sync import ( "fmt" @@ -15,7 +15,7 @@ type l1RollupInfoConsumerStatistics struct { startTime time.Time timePreviousProcessingDuration time.Duration startStepTime time.Time - cfg configConsumer + cfg ConfigConsumer } func (l *l1RollupInfoConsumerStatistics) onStart() { diff --git a/synchronizer/l1_rollup_info_consumer_statistics_test.go b/synchronizer/l1_parallel_sync/l1_rollup_info_consumer_statistics_test.go similarity index 97% rename from synchronizer/l1_rollup_info_consumer_statistics_test.go rename to synchronizer/l1_parallel_sync/l1_rollup_info_consumer_statistics_test.go index 82ec6bb8a1..aca51692ff 100644 --- a/synchronizer/l1_rollup_info_consumer_statistics_test.go +++ b/synchronizer/l1_parallel_sync/l1_rollup_info_consumer_statistics_test.go @@ -1,4 +1,4 @@ -package synchronizer +package l1_parallel_sync import ( "testing" @@ -9,7 +9,7 @@ import ( ) func TestL1RollupInfoConsumerStatistics(t *testing.T) { - cfg := configConsumer{ + cfg := ConfigConsumer{ ApplyAfterNumRollupReceived: 10, AceptableInacctivityTime: 5 * time.Second, } @@ -47,7 +47,7 @@ func TestL1RollupInfoConsumerStatistics(t *testing.T) { } func TestL1RollupInfoConsumerStatisticsWithExceedTimeButNoWarningGenerated(t *testing.T) { - cfg := configConsumer{ + cfg := ConfigConsumer{ ApplyAfterNumRollupReceived: 10, AceptableInacctivityTime: 0 * time.Second, } @@ -81,7 +81,7 @@ func TestL1RollupInfoConsumerStatisticsWithExceedTimeButNoWarningGenerated(t *te } func TestL1RollupInfoConsumerStatisticsWithExceedTimeButAndWarningGenerated(t *testing.T) { - cfg := configConsumer{ + cfg := ConfigConsumer{ ApplyAfterNumRollupReceived: 1, AceptableInacctivityTime: 0 * time.Second, } diff --git a/synchronizer/l1_rollup_info_consumer_test.go b/synchronizer/l1_parallel_sync/l1_rollup_info_consumer_test.go similarity index 91% rename from synchronizer/l1_rollup_info_consumer_test.go rename to synchronizer/l1_parallel_sync/l1_rollup_info_consumer_test.go index 588b2a1803..bafce11f2a 100644 --- a/synchronizer/l1_rollup_info_consumer_test.go +++ b/synchronizer/l1_parallel_sync/l1_rollup_info_consumer_test.go @@ -1,4 +1,4 @@ -package synchronizer +package l1_parallel_sync import ( "context" @@ -16,8 +16,8 @@ import ( type consumerTestData struct { sut *l1RollupInfoConsumer - syncMock *synchronizerProcessBlockRangeMock - ch chan l1SyncMessage + syncMock *synchronizerProcessBlockRangeInterfaceMock + ch chan L1SyncMessage } func TestGivenConsumerWhenReceiveAFullSyncAndChannelIsEmptyThenStopOk(t *testing.T) { @@ -54,7 +54,7 @@ func TestGivenConsumerWhenFailsToProcessRollupThenDontKnownLastEthBlock(t *testi lastBlockOfRange: types.NewBlock(&types.Header{Number: big.NewInt(123)}, nil, nil, nil, nil), } data.syncMock. - On("processBlockRange", mock.Anything, mock.Anything). + On("ProcessBlockRange", mock.Anything, mock.Anything). Return(errors.New("error")). Once() data.ch <- *newL1SyncMessageData(&responseRollupInfoByBlockRange) @@ -105,7 +105,7 @@ func TestGivenConsumerWhenNextBlockNumberIsNoSetThenAcceptAnythingAndProcess(t * data.ch <- *newL1SyncMessageData(&responseRollupInfoByBlockRange) data.ch <- *newL1SyncMessageControl(eventProducerIsFullySynced) data.syncMock. - On("processBlockRange", mock.Anything, mock.Anything). + On("ProcessBlockRange", mock.Anything, mock.Anything). Return(nil). Once() err := data.sut.Start(ctxTimeout, nil) @@ -134,7 +134,7 @@ func TestGivenConsumerWhenNextBlockNumberIsNoSetThenFirstRollupInfoSetIt(t *test data.ch <- *newL1SyncMessageData(&responseRollupInfoByBlockRange) data.ch <- *newL1SyncMessageControl(eventProducerIsFullySynced) data.syncMock. - On("processBlockRange", mock.Anything, mock.Anything). + On("ProcessBlockRange", mock.Anything, mock.Anything). Return(nil). Once() err := data.sut.Start(ctxTimeout, nil) @@ -145,13 +145,13 @@ func TestGivenConsumerWhenNextBlockNumberIsNoSetThenFirstRollupInfoSetIt(t *test } func setupConsumerTest(t *testing.T) consumerTestData { - syncMock := newSynchronizerProcessBlockRangeMock(t) - ch := make(chan l1SyncMessage, 10) + syncMock := newSynchronizerProcessBlockRangeInterfaceMock(t) + ch := make(chan L1SyncMessage, 10) - cfg := configConsumer{ + cfg := ConfigConsumer{ ApplyAfterNumRollupReceived: minNumIterationsBeforeStartCheckingTimeWaitingForNewRollupInfoData, AceptableInacctivityTime: minAcceptableTimeWaitingForNewRollupInfoData, } - sut := newL1RollupInfoConsumer(cfg, syncMock, ch) + sut := NewL1RollupInfoConsumer(cfg, syncMock, ch) return consumerTestData{sut, syncMock, ch} } diff --git a/synchronizer/l1_rollup_info_producer.go b/synchronizer/l1_parallel_sync/l1_rollup_info_producer.go similarity index 81% rename from synchronizer/l1_rollup_info_producer.go rename to synchronizer/l1_parallel_sync/l1_rollup_info_producer.go index 235f58c9aa..fd9be6f92b 100644 --- a/synchronizer/l1_rollup_info_producer.go +++ b/synchronizer/l1_parallel_sync/l1_rollup_info_producer.go @@ -10,7 +10,7 @@ // TODO: // - Check all log.fatals to remove it or add status before the panic -package synchronizer +package l1_parallel_sync import ( "context" @@ -38,7 +38,7 @@ const ( type filter interface { ToStringBrief() string - Filter(data l1SyncMessage) []l1SyncMessage + Filter(data L1SyncMessage) []L1SyncMessage Reset(lastBlockOnSynchronizer uint64) numItemBlockedInQueue() int } @@ -105,43 +105,48 @@ func (s producerStatusEnum) String() string { return [...]string{"idle", "working", "synchronized", "no_running", "reseting"}[s] } -type configProducer struct { - syncChunkSize uint64 - ttlOfLastBlockOnL1 time.Duration +// ConfigProducer : configuration for producer +type ConfigProducer struct { + // SyncChunkSize is the number of blocks to be retrieved in each request + SyncChunkSize uint64 + // TtlOfLastBlockOnL1 is the time to wait before ask for a new last block on L1 + TtlOfLastBlockOnL1 time.Duration + // TimeoutForRequestLastBlockOnL1 is the timeout for request a new last block on L1 + TimeoutForRequestLastBlockOnL1 time.Duration + // NumOfAllowedRetriesForRequestLastBlockOnL1 is the number of retries for request a new last block on L1 + NumOfAllowedRetriesForRequestLastBlockOnL1 int - timeoutForRequestLastBlockOnL1 time.Duration - numOfAllowedRetriesForRequestLastBlockOnL1 int - - //timeout for main loop if no is synchronized yet, this time is a safeguard because is not needed - timeOutMainLoop time.Duration - //how ofter we show a log with statistics, 0 means disabled - timeForShowUpStatisticsLog time.Duration - minTimeBetweenRetriesForRollupInfo time.Duration + //TimeOutMainLoop timeout for main loop if no is synchronized yet, this time is a safeguard because is not needed + TimeOutMainLoop time.Duration + //TimeForShowUpStatisticsLog how ofter we show a log with statistics, 0 means disabled + TimeForShowUpStatisticsLog time.Duration + // MinTimeBetweenRetriesForRollupInfo is the minimum time between retries for rollup info + MinTimeBetweenRetriesForRollupInfo time.Duration } -func (cfg *configProducer) String() string { +func (cfg *ConfigProducer) String() string { return fmt.Sprintf("syncChunkSize:%d ttlOfLastBlockOnL1:%s timeoutForRequestLastBlockOnL1:%s numOfAllowedRetriesForRequestLastBlockOnL1:%d timeOutMainLoop:%s timeForShowUpStatisticsLog:%s", - cfg.syncChunkSize, cfg.ttlOfLastBlockOnL1, cfg.timeoutForRequestLastBlockOnL1, cfg.numOfAllowedRetriesForRequestLastBlockOnL1, cfg.timeOutMainLoop, cfg.timeForShowUpStatisticsLog) + cfg.SyncChunkSize, cfg.TtlOfLastBlockOnL1, cfg.TimeoutForRequestLastBlockOnL1, cfg.NumOfAllowedRetriesForRequestLastBlockOnL1, cfg.TimeOutMainLoop, cfg.TimeForShowUpStatisticsLog) } -func (cfg *configProducer) normalize() { - if cfg.syncChunkSize == 0 { +func (cfg *ConfigProducer) normalize() { + if cfg.SyncChunkSize == 0 { log.Fatalf("producer:config: SyncChunkSize must be greater than 0") } - if cfg.ttlOfLastBlockOnL1 < minTTLOfLastBlock { - log.Warnf("producer:config: ttlOfLastBlockOnL1 is too low (%s) minimum recomender value %s", cfg.ttlOfLastBlockOnL1, minTTLOfLastBlock) + if cfg.TtlOfLastBlockOnL1 < minTTLOfLastBlock { + log.Warnf("producer:config: ttlOfLastBlockOnL1 is too low (%s) minimum recomender value %s", cfg.TtlOfLastBlockOnL1, minTTLOfLastBlock) } - if cfg.timeoutForRequestLastBlockOnL1 < minTimeoutForRequestLastBlockOnL1 { - log.Warnf("producer:config: timeRequestInitialValueOfLastBlock is too low (%s) minimum recomender value%s", cfg.timeoutForRequestLastBlockOnL1, minTimeoutForRequestLastBlockOnL1) + if cfg.TimeoutForRequestLastBlockOnL1 < minTimeoutForRequestLastBlockOnL1 { + log.Warnf("producer:config: timeRequestInitialValueOfLastBlock is too low (%s) minimum recomender value%s", cfg.TimeoutForRequestLastBlockOnL1, minTimeoutForRequestLastBlockOnL1) } - if cfg.numOfAllowedRetriesForRequestLastBlockOnL1 < minNumOfAllowedRetriesForRequestLastBlockOnL1 { - log.Warnf("producer:config: retriesForRequestnitialValueOfLastBlock is too low (%d) minimum recomender value %d", cfg.numOfAllowedRetriesForRequestLastBlockOnL1, minNumOfAllowedRetriesForRequestLastBlockOnL1) + if cfg.NumOfAllowedRetriesForRequestLastBlockOnL1 < minNumOfAllowedRetriesForRequestLastBlockOnL1 { + log.Warnf("producer:config: retriesForRequestnitialValueOfLastBlock is too low (%d) minimum recomender value %d", cfg.NumOfAllowedRetriesForRequestLastBlockOnL1, minNumOfAllowedRetriesForRequestLastBlockOnL1) } - if cfg.timeOutMainLoop < minTimeOutMainLoop { - log.Warnf("producer:config: timeOutMainLoop is too low (%s) minimum recomender value %s", cfg.timeOutMainLoop, minTimeOutMainLoop) + if cfg.TimeOutMainLoop < minTimeOutMainLoop { + log.Warnf("producer:config: timeOutMainLoop is too low (%s) minimum recomender value %s", cfg.TimeOutMainLoop, minTimeOutMainLoop) } - if cfg.minTimeBetweenRetriesForRollupInfo <= 0 { - log.Warnf("producer:config: minTimeBetweenRetriesForRollup is too low (%s)", cfg.minTimeBetweenRetriesForRollupInfo) + if cfg.MinTimeBetweenRetriesForRollupInfo <= 0 { + log.Warnf("producer:config: minTimeBetweenRetriesForRollup is too low (%s)", cfg.MinTimeBetweenRetriesForRollupInfo) } } @@ -162,34 +167,35 @@ type producerCmd struct { param1 uint64 } -type l1RollupInfoProducer struct { +// L1RollupInfoProducer is the object that retrieves data from L1 +type L1RollupInfoProducer struct { mutex sync.Mutex ctxParent context.Context ctxWithCancel contextWithCancel workers workersInterface syncStatus syncStatusInterface - outgoingChannel chan l1SyncMessage + outgoingChannel chan L1SyncMessage timeLastBLockOnL1 time.Time status producerStatusEnum // filter is an object that sort l1DataMessage to be send ordered by block number filterToSendOrdererResultsToConsumer filter statistics l1RollupInfoProducerStatistics - cfg configProducer + cfg ConfigProducer channelCmds chan producerCmd } -func (l *l1RollupInfoProducer) toStringBrief() string { +func (l *L1RollupInfoProducer) toStringBrief() string { l.mutex.Lock() defer l.mutex.Unlock() return l.toStringBriefUnsafe() } -func (l *l1RollupInfoProducer) toStringBriefUnsafe() string { +func (l *L1RollupInfoProducer) toStringBriefUnsafe() string { return fmt.Sprintf("status:%s syncStatus:[%s] workers:[%s] filter:[%s] cfg:[%s]", l.getStatus(), l.syncStatus.String(), l.workers.String(), l.filterToSendOrdererResultsToConsumer.ToStringBrief(), l.cfg.String()) } -// l1DataRetrieverStatistics : create an instance of l1RollupInfoProducer -func newL1DataRetriever(cfg configProducer, ethermans []EthermanInterface, outgoingChannel chan l1SyncMessage) *l1RollupInfoProducer { +// NewL1DataRetriever creates a new object +func NewL1DataRetriever(cfg ConfigProducer, ethermans []L1ParallelEthermanInterface, outgoingChannel chan L1SyncMessage) *L1RollupInfoProducer { if cap(outgoingChannel) < len(ethermans) { log.Warnf("producer: outgoingChannel must have a capacity (%d) of at least equal to number of ether clients (%d)", cap(outgoingChannel), len(ethermans)) } @@ -198,9 +204,9 @@ func newL1DataRetriever(cfg configProducer, ethermans []EthermanInterface, outgo // TODO: move this to config file workersConfig := workersConfig{timeoutRollupInfo: time.Duration(math.MaxInt64)} - result := l1RollupInfoProducer{ - syncStatus: newSyncStatus(invalidBlockNumber, cfg.syncChunkSize), - workers: newWorkerDecoratorLimitRetriesByTime(newWorkers(ethermans, workersConfig), cfg.minTimeBetweenRetriesForRollupInfo), + result := L1RollupInfoProducer{ + syncStatus: newSyncStatus(invalidBlockNumber, cfg.SyncChunkSize), + workers: newWorkerDecoratorLimitRetriesByTime(newWorkers(ethermans, workersConfig), cfg.MinTimeBetweenRetriesForRollupInfo), filterToSendOrdererResultsToConsumer: newFilterToSendOrdererResultsToConsumer(invalidBlockNumber), outgoingChannel: outgoingChannel, statistics: newRollupInfoProducerStatistics(invalidBlockNumber, DefaultTimeProvider{}), @@ -211,16 +217,15 @@ func newL1DataRetriever(cfg configProducer, ethermans []EthermanInterface, outgo return &result } -// ResetAndStop: reset the object and stop the current process. Set first block to be retrieved -// This function could be call from outside of main goroutine -func (l *l1RollupInfoProducer) Reset(startingBlockNumber uint64) { +// Reset reset the object and stop the current process. Set first block to be retrieved +func (l *L1RollupInfoProducer) Reset(startingBlockNumber uint64) { log.Infof("producer: Reset(%d) queue cmd and discarding all info in channel", startingBlockNumber) l.setStatusReseting() l.emptyChannel() l.channelCmds <- producerCmd{cmd: producerReset, param1: startingBlockNumber} } -func (l *l1RollupInfoProducer) resetUnsafe(startingBlockNumber uint64) { +func (l *L1RollupInfoProducer) resetUnsafe(startingBlockNumber uint64) { log.Infof("producer: Reset L1 sync process to blockNumber %d st=%s", startingBlockNumber, l.toStringBrief()) l.setStatusReseting() log.Debugf("producer: Reset(%d): stop previous run (state=%s)", startingBlockNumber, l.getStatus().String()) @@ -238,43 +243,47 @@ func (l *l1RollupInfoProducer) resetUnsafe(startingBlockNumber uint64) { log.Infof("producer: Reset(%d): reset done!", startingBlockNumber) } -func (l *l1RollupInfoProducer) isProducerRunning() bool { +func (l *L1RollupInfoProducer) isProducerRunning() bool { return l.getStatus() != producerNoRunning } -func (l *l1RollupInfoProducer) setStatusReseting() { +func (l *L1RollupInfoProducer) setStatusReseting() { l.mutex.Lock() defer l.mutex.Unlock() l.setStatus(producerReseting) } -func (l *l1RollupInfoProducer) getStatus() producerStatusEnum { +func (l *L1RollupInfoProducer) getStatus() producerStatusEnum { return producerStatusEnum(atomic.LoadInt32((*int32)(&l.status))) } -func (l *l1RollupInfoProducer) setStatus(newStatus producerStatusEnum) { +func (l *L1RollupInfoProducer) setStatus(newStatus producerStatusEnum) { previousStatus := l.getStatus() atomic.StoreInt32((*int32)(&l.status), int32(newStatus)) if previousStatus != newStatus { log.Infof("producer: Status changed from [%s] to [%s]", previousStatus.String(), newStatus.String()) if newStatus == producerSynchronized { log.Infof("producer: send a message to consumer to indicate that we are synchronized") - l.sendPackages([]l1SyncMessage{*newL1SyncMessageControl(eventProducerIsFullySynced)}) + l.sendPackages([]L1SyncMessage{*newL1SyncMessageControl(eventProducerIsFullySynced)}) } } } -func (l *l1RollupInfoProducer) Abort() { + +// Abort stop inmediatly the current process +func (l *L1RollupInfoProducer) Abort() { l.emptyChannel() l.ctxWithCancel.cancelCtx() l.ctxWithCancel.createWithCancel(l.ctxParent) } -func (l *l1RollupInfoProducer) Stop() { +// Stop stop the current process sending a stop command to the process queue +// so it stops when finish to process all packages in queue +func (l *L1RollupInfoProducer) Stop() { log.Infof("producer: Stop() queue cmd") l.channelCmds <- producerCmd{cmd: producerStop} } -func (l *l1RollupInfoProducer) stopUnsafe() { +func (l *L1RollupInfoProducer) stopUnsafe() { log.Infof("producer: stop() called st=%s", l.toStringBrief()) if l.isProducerRunning() { @@ -287,18 +296,18 @@ func (l *l1RollupInfoProducer) stopUnsafe() { l.workers.stop() } -func (l *l1RollupInfoProducer) emptyChannel() { +func (l *L1RollupInfoProducer) emptyChannel() { for len(l.outgoingChannel) > 0 { <-l.outgoingChannel } } // verify: test params and status without if not allowModify avoid doing connection or modification of objects -func (l *l1RollupInfoProducer) verify() error { +func (l *L1RollupInfoProducer) verify() error { return l.syncStatus.Verify() } -func (l *l1RollupInfoProducer) initialize(ctx context.Context) error { +func (l *L1RollupInfoProducer) initialize(ctx context.Context) error { log.Debug("producer: initialize") err := l.verify() if err != nil { @@ -316,8 +325,8 @@ func (l *l1RollupInfoProducer) initialize(ctx context.Context) error { return nil } -// Before calling Start you must set lastBlockOnDB calling ResetAndStop -func (l *l1RollupInfoProducer) Start(ctx context.Context) error { +// Start a producer +func (l *L1RollupInfoProducer) Start(ctx context.Context) error { log.Infof("producer: starting L1 sync from:%s", l.syncStatus.String()) err := l.initialize(ctx) if err != nil { @@ -334,7 +343,7 @@ func (l *l1RollupInfoProducer) Start(ctx context.Context) error { return nil } -func (l *l1RollupInfoProducer) step(waitDuration *time.Duration) bool { +func (l *L1RollupInfoProducer) step(waitDuration *time.Duration) bool { if atomic.CompareAndSwapInt32((*int32)(&l.status), int32(producerNoRunning), int32(producerIdle)) { // l.getStatus() == producerNoRunning log.Info("producer: step: status is no running, changing to idle %s", l.getStatus().String()) } @@ -412,7 +421,7 @@ func (l *l1RollupInfoProducer) step(waitDuration *time.Duration) bool { log.Infof("producer: producerReseting") } - if l.cfg.timeForShowUpStatisticsLog != 0 && time.Since(l.statistics.lastShowUpTime) > l.cfg.timeForShowUpStatisticsLog { + if l.cfg.TimeForShowUpStatisticsLog != 0 && time.Since(l.statistics.lastShowUpTime) > l.cfg.TimeForShowUpStatisticsLog { log.Infof("producer: Statistics:%s", l.statistics.getStatisticsDebugString()) l.statistics.lastShowUpTime = time.Now() } @@ -422,7 +431,7 @@ func (l *l1RollupInfoProducer) step(waitDuration *time.Duration) bool { } // return if the producer must keep running (false -> stop) -func (l *l1RollupInfoProducer) executeCmd(cmd producerCmd) bool { +func (l *L1RollupInfoProducer) executeCmd(cmd producerCmd) bool { switch cmd.cmd { case producerStop: log.Infof("producer: received a stop, so it stops processing") @@ -436,12 +445,12 @@ func (l *l1RollupInfoProducer) executeCmd(cmd producerCmd) bool { return true } -func (l *l1RollupInfoProducer) ttlOfLastBlockOnL1() time.Duration { - return l.cfg.ttlOfLastBlockOnL1 +func (l *L1RollupInfoProducer) ttlOfLastBlockOnL1() time.Duration { + return l.cfg.TtlOfLastBlockOnL1 } -func (l *l1RollupInfoProducer) getNextTimeout() time.Duration { - timeOutMainLoop := l.cfg.timeOutMainLoop +func (l *L1RollupInfoProducer) getNextTimeout() time.Duration { + timeOutMainLoop := l.cfg.TimeOutMainLoop status := l.getStatus() switch status { case producerIdle: @@ -462,7 +471,7 @@ func (l *l1RollupInfoProducer) getNextTimeout() time.Duration { } // OnNewLastBlock is called when a new last block on L1 is received -func (l *l1RollupInfoProducer) onNewLastBlock(lastBlock uint64) onNewLastBlockResponse { +func (l *L1RollupInfoProducer) onNewLastBlock(lastBlock uint64) onNewLastBlockResponse { resp := l.syncStatus.OnNewLastBlockOnL1(lastBlock) l.statistics.updateLastBlockNumber(resp.fullRange.toBlock) l.timeLastBLockOnL1 = time.Now() @@ -472,7 +481,7 @@ func (l *l1RollupInfoProducer) onNewLastBlock(lastBlock uint64) onNewLastBlockRe return resp } -func (l *l1RollupInfoProducer) canISendNewRequestsUnsafe() (bool, string) { +func (l *L1RollupInfoProducer) canISendNewRequestsUnsafe() (bool, string) { queued := l.filterToSendOrdererResultsToConsumer.numItemBlockedInQueue() inChannel := len(l.outgoingChannel) maximum := cap(l.outgoingChannel) @@ -487,7 +496,7 @@ func (l *l1RollupInfoProducer) canISendNewRequestsUnsafe() (bool, string) { // launchWork: launch new workers if possible and returns new channels created // returns the number of workers launched -func (l *l1RollupInfoProducer) launchWork() (int, error) { +func (l *L1RollupInfoProducer) launchWork() (int, error) { launchedWorker := 0 allowNewRequests, allowNewRequestMsg := l.canISendNewRequestsUnsafe() accDebugStr := "[" + allowNewRequestMsg + "] " @@ -537,17 +546,17 @@ func (l *l1RollupInfoProducer) launchWork() (int, error) { return launchedWorker, nil } -func (l *l1RollupInfoProducer) outgoingPackageStatusDebugString() string { +func (l *L1RollupInfoProducer) outgoingPackageStatusDebugString() string { return fmt.Sprintf("outgoint_channel[%d/%d], filter:%s workers:%s", len(l.outgoingChannel), cap(l.outgoingChannel), l.filterToSendOrdererResultsToConsumer.ToStringBrief(), l.workers.String()) } -func (l *l1RollupInfoProducer) renewLastBlockOnL1IfNeeded(reason string) { +func (l *L1RollupInfoProducer) renewLastBlockOnL1IfNeeded(reason string) { elapsed := time.Since(l.timeLastBLockOnL1) ttl := l.ttlOfLastBlockOnL1() oldBlock := l.syncStatus.GetLastBlockOnL1() if elapsed > ttl { log.Infof("producer: Need a new value for Last Block On L1, doing the request reason:%s", reason) - result := l.workers.requestLastBlockWithRetries(l.ctxWithCancel.ctx, l.cfg.timeoutForRequestLastBlockOnL1, l.cfg.numOfAllowedRetriesForRequestLastBlockOnL1) + result := l.workers.requestLastBlockWithRetries(l.ctxWithCancel.ctx, l.cfg.TimeoutForRequestLastBlockOnL1, l.cfg.NumOfAllowedRetriesForRequestLastBlockOnL1) log.Infof("producer: Need a new value for Last Block On L1, doing the request old_block:%v -> new block:%v", oldBlock, result.result.block) if result.generic.err != nil { log.Error(result.generic.err) @@ -557,7 +566,7 @@ func (l *l1RollupInfoProducer) renewLastBlockOnL1IfNeeded(reason string) { } } -func (l *l1RollupInfoProducer) onResponseRollupInfo(result responseRollupInfoByBlockRange) { +func (l *L1RollupInfoProducer) onResponseRollupInfo(result responseRollupInfoByBlockRange) { log.Infof("producer: Received responseRollupInfoByBlockRange: %s", result.toStringBrief()) if l.getStatus() == producerReseting { log.Infof("producer: Ignoring result because is in reseting status: %s", result.toStringBrief()) @@ -586,7 +595,7 @@ func (l *l1RollupInfoProducer) onResponseRollupInfo(result responseRollupInfoByB } } -func (l *l1RollupInfoProducer) sendPackages(outgoingPackages []l1SyncMessage) { +func (l *L1RollupInfoProducer) sendPackages(outgoingPackages []L1SyncMessage) { for _, pkg := range outgoingPackages { log.Infof("producer: Sending results [data] to consumer:%s: status_comm:%s", pkg.toStringBrief(), l.outgoingPackageStatusDebugString()) l.outgoingChannel <- pkg diff --git a/synchronizer/l1_rollup_info_producer_statistics.go b/synchronizer/l1_parallel_sync/l1_rollup_info_producer_statistics.go similarity index 99% rename from synchronizer/l1_rollup_info_producer_statistics.go rename to synchronizer/l1_parallel_sync/l1_rollup_info_producer_statistics.go index 90375557f8..92b3506c73 100644 --- a/synchronizer/l1_rollup_info_producer_statistics.go +++ b/synchronizer/l1_parallel_sync/l1_rollup_info_producer_statistics.go @@ -1,4 +1,4 @@ -package synchronizer +package l1_parallel_sync import ( "fmt" diff --git a/synchronizer/l1_rollup_info_producer_statistics_test.go b/synchronizer/l1_parallel_sync/l1_rollup_info_producer_statistics_test.go similarity index 96% rename from synchronizer/l1_rollup_info_producer_statistics_test.go rename to synchronizer/l1_parallel_sync/l1_rollup_info_producer_statistics_test.go index 467ad96967..3f633270dd 100644 --- a/synchronizer/l1_rollup_info_producer_statistics_test.go +++ b/synchronizer/l1_parallel_sync/l1_rollup_info_producer_statistics_test.go @@ -1,4 +1,4 @@ -package synchronizer +package l1_parallel_sync import ( "testing" diff --git a/synchronizer/l1_rollup_info_producer_test.go b/synchronizer/l1_parallel_sync/l1_rollup_info_producer_test.go similarity index 79% rename from synchronizer/l1_rollup_info_producer_test.go rename to synchronizer/l1_parallel_sync/l1_rollup_info_producer_test.go index 5a3e0abbcb..be344d3db7 100644 --- a/synchronizer/l1_rollup_info_producer_test.go +++ b/synchronizer/l1_parallel_sync/l1_rollup_info_producer_test.go @@ -1,4 +1,4 @@ -package synchronizer +package l1_parallel_sync import ( "context" @@ -97,27 +97,27 @@ func TestGivenNoSetFirstBlockWhenCallStartThenDontReturnError(t *testing.T) { require.NoError(t, err) } -func setup(t *testing.T) (*l1RollupInfoProducer, []*ethermanMock, chan l1SyncMessage) { +func setup(t *testing.T) (*L1RollupInfoProducer, []*L1ParallelEthermanInterfaceMock, chan L1SyncMessage) { sut, ethermansMock, resultChannel := setupNoResetCall(t) sut.Reset(100) return sut, ethermansMock, resultChannel } -func setupNoResetCall(t *testing.T) (*l1RollupInfoProducer, []*ethermanMock, chan l1SyncMessage) { - ethermansMock := []*ethermanMock{newEthermanMock(t), newEthermanMock(t)} - ethermans := []EthermanInterface{ethermansMock[0], ethermansMock[1]} - resultChannel := make(chan l1SyncMessage, 100) - cfg := configProducer{ - syncChunkSize: 100, - ttlOfLastBlockOnL1: time.Second, - timeOutMainLoop: time.Second, +func setupNoResetCall(t *testing.T) (*L1RollupInfoProducer, []*L1ParallelEthermanInterfaceMock, chan L1SyncMessage) { + ethermansMock := []*L1ParallelEthermanInterfaceMock{NewL1ParallelEthermanInterfaceMock(t), NewL1ParallelEthermanInterfaceMock(t)} + ethermans := []L1ParallelEthermanInterface{ethermansMock[0], ethermansMock[1]} + resultChannel := make(chan L1SyncMessage, 100) + cfg := ConfigProducer{ + SyncChunkSize: 100, + TtlOfLastBlockOnL1: time.Second, + TimeOutMainLoop: time.Second, } - sut := newL1DataRetriever(cfg, ethermans, resultChannel) + sut := NewL1DataRetriever(cfg, ethermans, resultChannel) return sut, ethermansMock, resultChannel } -func expectedForGettingL1LastBlock(t *testing.T, etherman *ethermanMock, blockNumber int64) { +func expectedForGettingL1LastBlock(t *testing.T, etherman *L1ParallelEthermanInterfaceMock, blockNumber int64) { header := new(ethTypes.Header) header.Number = big.NewInt(blockNumber) etherman. @@ -126,7 +126,7 @@ func expectedForGettingL1LastBlock(t *testing.T, etherman *ethermanMock, blockNu Maybe() } -func expectedRollupInfoCalls(t *testing.T, etherman *ethermanMock, calls int) { +func expectedRollupInfoCalls(t *testing.T, etherman *L1ParallelEthermanInterfaceMock, calls int) { etherman. On("GetRollupInfoByBlockRange", mock.Anything, mock.Anything, mock.Anything). Return(nil, nil, nil). diff --git a/synchronizer/l1_sync_orchestration.go b/synchronizer/l1_parallel_sync/l1_sync_orchestration.go similarity index 77% rename from synchronizer/l1_sync_orchestration.go rename to synchronizer/l1_parallel_sync/l1_sync_orchestration.go index 604b4a42a8..34ed79c6db 100644 --- a/synchronizer/l1_sync_orchestration.go +++ b/synchronizer/l1_parallel_sync/l1_sync_orchestration.go @@ -1,4 +1,4 @@ -package synchronizer +package l1_parallel_sync import ( "context" @@ -31,7 +31,8 @@ type l1RollupConsumerInterface interface { Reset(startingBlockNumber uint64) } -type l1SyncOrchestration struct { +// L1SyncOrchestration is the object that coordinates the producer and the consumer process. +type L1SyncOrchestration struct { mutex sync.Mutex producer l1RollupProducerInterface consumer l1RollupConsumerInterface @@ -51,8 +52,9 @@ const ( errMissingLastEthBlockSynced = "orchestration: missing last eth block synced" ) -func newL1SyncOrchestration(ctx context.Context, producer l1RollupProducerInterface, consumer l1RollupConsumerInterface) *l1SyncOrchestration { - res := l1SyncOrchestration{ +// NewL1SyncOrchestration create a new L1 sync orchestration object +func NewL1SyncOrchestration(ctx context.Context, producer l1RollupProducerInterface, consumer l1RollupConsumerInterface) *L1SyncOrchestration { + res := L1SyncOrchestration{ producer: producer, consumer: consumer, producerRunning: false, @@ -65,7 +67,8 @@ func newL1SyncOrchestration(ctx context.Context, producer l1RollupProducerInterf return &res } -func (l *l1SyncOrchestration) reset(startingBlockNumber uint64) { +// Reset set a new starting point and cancel current process if any +func (l *L1SyncOrchestration) Reset(startingBlockNumber uint64) { log.Warnf("orchestration: Reset L1 sync process to blockNumber %d", startingBlockNumber) if l.isRunning { log.Infof("orchestration: reset(%d) is going to reset producer", startingBlockNumber) @@ -75,33 +78,38 @@ func (l *l1SyncOrchestration) reset(startingBlockNumber uint64) { // If orchestrator is running then producer is going to be started by orchestrate() select function when detects that producer has finished } -func (l *l1SyncOrchestration) start(lastEthBlockSynced *state.Block) (*state.Block, error) { +// Start launch a new process to retrieve and execute data from L1 +func (l *L1SyncOrchestration) Start(lastEthBlockSynced *state.Block) (*state.Block, error) { l.isRunning = true - l.launchProducer(l.ctxWithCancel.ctx, l.chProducer, &l.wg) + l.launchProducer(l.ctxWithCancel.ctx, lastEthBlockSynced, l.chProducer, &l.wg) l.launchConsumer(l.ctxWithCancel.ctx, lastEthBlockSynced, l.chConsumer, &l.wg) return l.orchestrate(l.ctxParent, &l.wg, l.chProducer, l.chConsumer) } -func (l *l1SyncOrchestration) abort() { +// Abort stop inmediatly the current process +func (l *L1SyncOrchestration) Abort() { l.producer.Abort() l.ctxWithCancel.cancel() l.wg.Wait() l.ctxWithCancel.createWithCancel(l.ctxParent) } -func (l *l1SyncOrchestration) isProducerRunning() bool { +// IsProducerRunning return true if producer is running +func (l *L1SyncOrchestration) IsProducerRunning() bool { l.mutex.Lock() defer l.mutex.Unlock() return l.producerRunning } -func (l *l1SyncOrchestration) launchProducer(ctx context.Context, chProducer chan error, wg *sync.WaitGroup) { +func (l *L1SyncOrchestration) launchProducer(ctx context.Context, lastEthBlockSynced *state.Block, chProducer chan error, wg *sync.WaitGroup) { l.mutex.Lock() defer l.mutex.Unlock() if !l.producerRunning { if wg != nil { wg.Add(1) } + log.Infof("orchestration: producer is not running. Resetting the state to start from block %v (last on DB)", lastEthBlockSynced.BlockNumber) + l.producer.Reset(lastEthBlockSynced.BlockNumber) // Start producer: L1DataRetriever from L1 l.producerRunning = true @@ -123,7 +131,7 @@ func (l *l1SyncOrchestration) launchProducer(ctx context.Context, chProducer cha } } -func (l *l1SyncOrchestration) launchConsumer(ctx context.Context, lastEthBlockSynced *state.Block, chConsumer chan error, wg *sync.WaitGroup) { +func (l *L1SyncOrchestration) launchConsumer(ctx context.Context, lastEthBlockSynced *state.Block, chConsumer chan error, wg *sync.WaitGroup) { l.mutex.Lock() if l.consumerRunning { l.mutex.Unlock() @@ -148,7 +156,7 @@ func (l *l1SyncOrchestration) launchConsumer(ctx context.Context, lastEthBlockSy }() } -func (l *l1SyncOrchestration) orchestrate(ctx context.Context, wg *sync.WaitGroup, chProducer chan error, chConsumer chan error) (*state.Block, error) { +func (l *L1SyncOrchestration) orchestrate(ctx context.Context, wg *sync.WaitGroup, chProducer chan error, chConsumer chan error) (*state.Block, error) { // Wait a cond_var for known if consumer have finish var err error done := false diff --git a/synchronizer/l1_sync_orchestration_test.go b/synchronizer/l1_parallel_sync/l1_sync_orchestration_test.go similarity index 88% rename from synchronizer/l1_sync_orchestration_test.go rename to synchronizer/l1_parallel_sync/l1_sync_orchestration_test.go index 28f03802ff..36163c28ae 100644 --- a/synchronizer/l1_sync_orchestration_test.go +++ b/synchronizer/l1_parallel_sync/l1_sync_orchestration_test.go @@ -1,4 +1,4 @@ -package synchronizer +package l1_parallel_sync import ( "context" @@ -31,19 +31,19 @@ func TestGivenOrquestrationWhenHappyPathThenReturnsBlockAndNoErrorAndProducerIsR time.Sleep(time.Millisecond * 100) return nil }) - sut.reset(123) - returnedBlock, err := sut.start(&block) + sut.Reset(123) + returnedBlock, err := sut.Start(&block) require.NoError(t, err) require.Equal(t, block, *returnedBlock) require.Equal(t, true, sut.producerRunning) require.Equal(t, false, sut.consumerRunning) } -func setupOrchestrationTest(t *testing.T, ctx context.Context) (*l1SyncOrchestration, mocksOrgertration) { +func setupOrchestrationTest(t *testing.T, ctx context.Context) (*L1SyncOrchestration, mocksOrgertration) { producer := newL1RollupProducerInterfaceMock(t) consumer := newL1RollupConsumerInterfaceMock(t) - return newL1SyncOrchestration(ctx, producer, consumer), mocksOrgertration{ + return NewL1SyncOrchestration(ctx, producer, consumer), mocksOrgertration{ producer: producer, consumer: consumer, } diff --git a/synchronizer/l1_syncstatus.go b/synchronizer/l1_parallel_sync/l1_syncstatus.go similarity index 99% rename from synchronizer/l1_syncstatus.go rename to synchronizer/l1_parallel_sync/l1_syncstatus.go index 109de2f7a1..8c85686cd7 100644 --- a/synchronizer/l1_syncstatus.go +++ b/synchronizer/l1_parallel_sync/l1_syncstatus.go @@ -1,4 +1,4 @@ -package synchronizer +package l1_parallel_sync import ( "errors" diff --git a/synchronizer/l1_syncstatus_test.go b/synchronizer/l1_parallel_sync/l1_syncstatus_test.go similarity index 99% rename from synchronizer/l1_syncstatus_test.go rename to synchronizer/l1_parallel_sync/l1_syncstatus_test.go index 60cfb13a4c..72c54bf896 100644 --- a/synchronizer/l1_syncstatus_test.go +++ b/synchronizer/l1_parallel_sync/l1_syncstatus_test.go @@ -1,4 +1,4 @@ -package synchronizer +package l1_parallel_sync import ( "testing" diff --git a/synchronizer/l1_worker_etherman.go b/synchronizer/l1_parallel_sync/l1_worker_etherman.go similarity index 98% rename from synchronizer/l1_worker_etherman.go rename to synchronizer/l1_parallel_sync/l1_worker_etherman.go index 7932327448..6345f37ae7 100644 --- a/synchronizer/l1_worker_etherman.go +++ b/synchronizer/l1_parallel_sync/l1_worker_etherman.go @@ -1,4 +1,4 @@ -package synchronizer +package l1_parallel_sync import ( "context" @@ -169,7 +169,7 @@ type retrieveL1LastBlockResult struct { type workerEtherman struct { mutex sync.Mutex - etherman EthermanInterface + etherman L1ParallelEthermanInterface status ethermanStatusEnum typeOfCurrentRequest typeOfRequest request requestRollupInfoByBlockRange @@ -186,7 +186,7 @@ func (w *workerEtherman) String() string { return fmt.Sprintf("status:%s", w.status.String()) } -func newWorker(etherman EthermanInterface) *workerEtherman { +func newWorker(etherman L1ParallelEthermanInterface) *workerEtherman { return &workerEtherman{etherman: etherman, status: ethermanIdle} } diff --git a/synchronizer/l1_worker_etherman_test.go b/synchronizer/l1_parallel_sync/l1_worker_etherman_test.go similarity index 96% rename from synchronizer/l1_worker_etherman_test.go rename to synchronizer/l1_parallel_sync/l1_worker_etherman_test.go index c946a0b2e8..eaca9f93b3 100644 --- a/synchronizer/l1_worker_etherman_test.go +++ b/synchronizer/l1_parallel_sync/l1_worker_etherman_test.go @@ -1,4 +1,4 @@ -package synchronizer +package l1_parallel_sync import ( context "context" @@ -235,7 +235,7 @@ func TestIfRollupInfoFailPreviousBlockContainBlockRange(t *testing.T) { require.Equal(t, result.result.blockRange, blockRange) } -func expectedCallsForEmptyRollupInfo(mockEtherman *ethermanMock, blockRange blockRange, getRollupError error, ethBlockError error) { +func expectedCallsForEmptyRollupInfo(mockEtherman *L1ParallelEthermanInterfaceMock, blockRange blockRange, getRollupError error, ethBlockError error) { mockEtherman. On("GetRollupInfoByBlockRange", mock.Anything, blockRange.fromBlock, mock.Anything). Return([]etherman.Block{}, map[common.Hash][]etherman.Order{}, getRollupError). @@ -249,8 +249,8 @@ func expectedCallsForEmptyRollupInfo(mockEtherman *ethermanMock, blockRange bloc } } -func setupWorkerEthermanTest(t *testing.T) (*workerEtherman, *ethermanMock, chan responseRollupInfoByBlockRange) { - mockEtherman := newEthermanMock(t) +func setupWorkerEthermanTest(t *testing.T) (*workerEtherman, *L1ParallelEthermanInterfaceMock, chan responseRollupInfoByBlockRange) { + mockEtherman := NewL1ParallelEthermanInterfaceMock(t) worker := newWorker(mockEtherman) ch := make(chan responseRollupInfoByBlockRange, 2) return worker, mockEtherman, ch diff --git a/synchronizer/l1_workers.go b/synchronizer/l1_parallel_sync/l1_workers.go similarity index 98% rename from synchronizer/l1_workers.go rename to synchronizer/l1_parallel_sync/l1_workers.go index b7109515c9..a55951434f 100644 --- a/synchronizer/l1_workers.go +++ b/synchronizer/l1_parallel_sync/l1_workers.go @@ -1,4 +1,4 @@ -package synchronizer +package l1_parallel_sync import ( "context" @@ -67,7 +67,7 @@ func (w *workers) String() string { return result } -func newWorkers(ethermans []EthermanInterface, cfg workersConfig) *workers { +func newWorkers(ethermans []L1ParallelEthermanInterface, cfg workersConfig) *workers { result := workers{chIncommingRollupInfo: make(chan responseRollupInfoByBlockRange, len(ethermans)+1), cfg: cfg} if (len(ethermans)) < minimumNumberOfEthermans { diff --git a/synchronizer/l1_workers_decorator_limit_retries_by_time.go b/synchronizer/l1_parallel_sync/l1_workers_decorator_limit_retries_by_time.go similarity index 98% rename from synchronizer/l1_workers_decorator_limit_retries_by_time.go rename to synchronizer/l1_parallel_sync/l1_workers_decorator_limit_retries_by_time.go index 1d0180394f..8b6c6937c7 100644 --- a/synchronizer/l1_workers_decorator_limit_retries_by_time.go +++ b/synchronizer/l1_parallel_sync/l1_workers_decorator_limit_retries_by_time.go @@ -1,4 +1,4 @@ -package synchronizer +package l1_parallel_sync import ( "context" diff --git a/synchronizer/l1_workers_decorator_limit_retries_by_time_test.go b/synchronizer/l1_parallel_sync/l1_workers_decorator_limit_retries_by_time_test.go similarity index 95% rename from synchronizer/l1_workers_decorator_limit_retries_by_time_test.go rename to synchronizer/l1_parallel_sync/l1_workers_decorator_limit_retries_by_time_test.go index c362823611..592d39be13 100644 --- a/synchronizer/l1_workers_decorator_limit_retries_by_time_test.go +++ b/synchronizer/l1_parallel_sync/l1_workers_decorator_limit_retries_by_time_test.go @@ -1,5 +1,4 @@ -// BEGIN: 9c3d4f5g2hj6 -package synchronizer +package l1_parallel_sync import ( "context" @@ -12,7 +11,7 @@ import ( func TestWorkerDecoratorLimitRetriesByTime_asyncRequestRollupInfoByBlockRange(t *testing.T) { // Create a new worker decorator with a minimum time between calls of 1 second - workersMock := newWorkersMock(t) + workersMock := newWorkersInterfaceMock(t) decorator := newWorkerDecoratorLimitRetriesByTime(workersMock, time.Second) // Create a block range to use for testing @@ -32,7 +31,7 @@ func TestWorkerDecoratorLimitRetriesByTime_asyncRequestRollupInfoByBlockRange(t func TestWorkerDecoratorLimitRetriesByTimeIfRealWorkerReturnsAllBusyDoesntCountAsRetry(t *testing.T) { // Create a new worker decorator with a minimum time between calls of 1 second - workersMock := newWorkersMock(t) + workersMock := newWorkersInterfaceMock(t) decorator := newWorkerDecoratorLimitRetriesByTime(workersMock, time.Second) // Create a block range to use for testing diff --git a/synchronizer/l1_parallel_sync/mock_l1_parallel_etherman_interface.go b/synchronizer/l1_parallel_sync/mock_l1_parallel_etherman_interface.go new file mode 100644 index 0000000000..a707851306 --- /dev/null +++ b/synchronizer/l1_parallel_sync/mock_l1_parallel_etherman_interface.go @@ -0,0 +1,218 @@ +// Code generated by mockery v2.32.0. DO NOT EDIT. + +package l1_parallel_sync + +import ( + context "context" + big "math/big" + + common "github.com/ethereum/go-ethereum/common" + + etherman "github.com/0xPolygonHermez/zkevm-node/etherman" + + mock "github.com/stretchr/testify/mock" + + types "github.com/ethereum/go-ethereum/core/types" +) + +// L1ParallelEthermanInterfaceMock is an autogenerated mock type for the L1ParallelEthermanInterface type +type L1ParallelEthermanInterfaceMock struct { + mock.Mock +} + +// EthBlockByNumber provides a mock function with given fields: ctx, blockNumber +func (_m *L1ParallelEthermanInterfaceMock) EthBlockByNumber(ctx context.Context, blockNumber uint64) (*types.Block, error) { + ret := _m.Called(ctx, blockNumber) + + var r0 *types.Block + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, uint64) (*types.Block, error)); ok { + return rf(ctx, blockNumber) + } + if rf, ok := ret.Get(0).(func(context.Context, uint64) *types.Block); ok { + r0 = rf(ctx, blockNumber) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.Block) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, uint64) error); ok { + r1 = rf(ctx, blockNumber) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetLatestBatchNumber provides a mock function with given fields: +func (_m *L1ParallelEthermanInterfaceMock) GetLatestBatchNumber() (uint64, error) { + ret := _m.Called() + + var r0 uint64 + var r1 error + if rf, ok := ret.Get(0).(func() (uint64, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() uint64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint64) + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetLatestVerifiedBatchNum provides a mock function with given fields: +func (_m *L1ParallelEthermanInterfaceMock) GetLatestVerifiedBatchNum() (uint64, error) { + ret := _m.Called() + + var r0 uint64 + var r1 error + if rf, ok := ret.Get(0).(func() (uint64, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() uint64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint64) + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetRollupInfoByBlockRange provides a mock function with given fields: ctx, fromBlock, toBlock +func (_m *L1ParallelEthermanInterfaceMock) GetRollupInfoByBlockRange(ctx context.Context, fromBlock uint64, toBlock *uint64) ([]etherman.Block, map[common.Hash][]etherman.Order, error) { + ret := _m.Called(ctx, fromBlock, toBlock) + + var r0 []etherman.Block + var r1 map[common.Hash][]etherman.Order + var r2 error + if rf, ok := ret.Get(0).(func(context.Context, uint64, *uint64) ([]etherman.Block, map[common.Hash][]etherman.Order, error)); ok { + return rf(ctx, fromBlock, toBlock) + } + if rf, ok := ret.Get(0).(func(context.Context, uint64, *uint64) []etherman.Block); ok { + r0 = rf(ctx, fromBlock, toBlock) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]etherman.Block) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, uint64, *uint64) map[common.Hash][]etherman.Order); ok { + r1 = rf(ctx, fromBlock, toBlock) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(map[common.Hash][]etherman.Order) + } + } + + if rf, ok := ret.Get(2).(func(context.Context, uint64, *uint64) error); ok { + r2 = rf(ctx, fromBlock, toBlock) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// GetTrustedSequencerURL provides a mock function with given fields: +func (_m *L1ParallelEthermanInterfaceMock) GetTrustedSequencerURL() (string, error) { + ret := _m.Called() + + var r0 string + var r1 error + if rf, ok := ret.Get(0).(func() (string, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// HeaderByNumber provides a mock function with given fields: ctx, number +func (_m *L1ParallelEthermanInterfaceMock) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + ret := _m.Called(ctx, number) + + var r0 *types.Header + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) (*types.Header, error)); ok { + return rf(ctx, number) + } + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) *types.Header); ok { + r0 = rf(ctx, number) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.Header) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *big.Int) error); ok { + r1 = rf(ctx, number) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// VerifyGenBlockNumber provides a mock function with given fields: ctx, genBlockNumber +func (_m *L1ParallelEthermanInterfaceMock) VerifyGenBlockNumber(ctx context.Context, genBlockNumber uint64) (bool, error) { + ret := _m.Called(ctx, genBlockNumber) + + var r0 bool + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, uint64) (bool, error)); ok { + return rf(ctx, genBlockNumber) + } + if rf, ok := ret.Get(0).(func(context.Context, uint64) bool); ok { + r0 = rf(ctx, genBlockNumber) + } else { + r0 = ret.Get(0).(bool) + } + + if rf, ok := ret.Get(1).(func(context.Context, uint64) error); ok { + r1 = rf(ctx, genBlockNumber) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewL1ParallelEthermanInterfaceMock creates a new instance of L1ParallelEthermanInterfaceMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewL1ParallelEthermanInterfaceMock(t interface { + mock.TestingT + Cleanup(func()) +}) *L1ParallelEthermanInterfaceMock { + mock := &L1ParallelEthermanInterfaceMock{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/synchronizer/mock_l1_rollup_consumer_interface.go b/synchronizer/l1_parallel_sync/mock_l1_rollup_consumer_interface.go similarity index 98% rename from synchronizer/mock_l1_rollup_consumer_interface.go rename to synchronizer/l1_parallel_sync/mock_l1_rollup_consumer_interface.go index 99a4c62cb0..b0fb46f690 100644 --- a/synchronizer/mock_l1_rollup_consumer_interface.go +++ b/synchronizer/l1_parallel_sync/mock_l1_rollup_consumer_interface.go @@ -1,6 +1,6 @@ // Code generated by mockery v2.32.0. DO NOT EDIT. -package synchronizer +package l1_parallel_sync import ( context "context" diff --git a/synchronizer/mock_l1_rollup_producer_interface.go b/synchronizer/l1_parallel_sync/mock_l1_rollup_producer_interface.go similarity index 98% rename from synchronizer/mock_l1_rollup_producer_interface.go rename to synchronizer/l1_parallel_sync/mock_l1_rollup_producer_interface.go index ac24de2ebb..2f92658222 100644 --- a/synchronizer/mock_l1_rollup_producer_interface.go +++ b/synchronizer/l1_parallel_sync/mock_l1_rollup_producer_interface.go @@ -1,6 +1,6 @@ // Code generated by mockery v2.32.0. DO NOT EDIT. -package synchronizer +package l1_parallel_sync import ( context "context" diff --git a/synchronizer/l1_parallel_sync/mock_synchronizer_process_block_range_interface.go b/synchronizer/l1_parallel_sync/mock_synchronizer_process_block_range_interface.go new file mode 100644 index 0000000000..60aa5e1a23 --- /dev/null +++ b/synchronizer/l1_parallel_sync/mock_synchronizer_process_block_range_interface.go @@ -0,0 +1,43 @@ +// Code generated by mockery v2.32.0. DO NOT EDIT. + +package l1_parallel_sync + +import ( + etherman "github.com/0xPolygonHermez/zkevm-node/etherman" + common "github.com/ethereum/go-ethereum/common" + + mock "github.com/stretchr/testify/mock" +) + +// synchronizerProcessBlockRangeInterfaceMock is an autogenerated mock type for the synchronizerProcessBlockRangeInterface type +type synchronizerProcessBlockRangeInterfaceMock struct { + mock.Mock +} + +// ProcessBlockRange provides a mock function with given fields: blocks, order +func (_m *synchronizerProcessBlockRangeInterfaceMock) ProcessBlockRange(blocks []etherman.Block, order map[common.Hash][]etherman.Order) error { + ret := _m.Called(blocks, order) + + var r0 error + if rf, ok := ret.Get(0).(func([]etherman.Block, map[common.Hash][]etherman.Order) error); ok { + r0 = rf(blocks, order) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// newSynchronizerProcessBlockRangeInterfaceMock creates a new instance of synchronizerProcessBlockRangeInterfaceMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func newSynchronizerProcessBlockRangeInterfaceMock(t interface { + mock.TestingT + Cleanup(func()) +}) *synchronizerProcessBlockRangeInterfaceMock { + mock := &synchronizerProcessBlockRangeInterfaceMock{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/synchronizer/mock_l1_worker.go b/synchronizer/l1_parallel_sync/mock_worker.go similarity index 98% rename from synchronizer/mock_l1_worker.go rename to synchronizer/l1_parallel_sync/mock_worker.go index b2ee4ea776..a4f13d90e3 100644 --- a/synchronizer/mock_l1_worker.go +++ b/synchronizer/l1_parallel_sync/mock_worker.go @@ -1,6 +1,6 @@ // Code generated by mockery v2.32.0. DO NOT EDIT. -package synchronizer +package l1_parallel_sync import ( context "context" diff --git a/synchronizer/mock_workers.go b/synchronizer/l1_parallel_sync/mock_workers_interface.go similarity index 69% rename from synchronizer/mock_workers.go rename to synchronizer/l1_parallel_sync/mock_workers_interface.go index c1a2369539..fead23f695 100644 --- a/synchronizer/mock_workers.go +++ b/synchronizer/l1_parallel_sync/mock_workers_interface.go @@ -1,6 +1,6 @@ // Code generated by mockery v2.32.0. DO NOT EDIT. -package synchronizer +package l1_parallel_sync import ( context "context" @@ -9,13 +9,13 @@ import ( mock "github.com/stretchr/testify/mock" ) -// workersMock is an autogenerated mock type for the workersInterface type -type workersMock struct { +// workersInterfaceMock is an autogenerated mock type for the workersInterface type +type workersInterfaceMock struct { mock.Mock } // String provides a mock function with given fields: -func (_m *workersMock) String() string { +func (_m *workersInterfaceMock) String() string { ret := _m.Called() var r0 string @@ -29,7 +29,7 @@ func (_m *workersMock) String() string { } // asyncRequestRollupInfoByBlockRange provides a mock function with given fields: ctx, request -func (_m *workersMock) asyncRequestRollupInfoByBlockRange(ctx context.Context, request requestRollupInfoByBlockRange) (chan responseRollupInfoByBlockRange, error) { +func (_m *workersInterfaceMock) asyncRequestRollupInfoByBlockRange(ctx context.Context, request requestRollupInfoByBlockRange) (chan responseRollupInfoByBlockRange, error) { ret := _m.Called(ctx, request) var r0 chan responseRollupInfoByBlockRange @@ -55,7 +55,7 @@ func (_m *workersMock) asyncRequestRollupInfoByBlockRange(ctx context.Context, r } // getResponseChannelForRollupInfo provides a mock function with given fields: -func (_m *workersMock) getResponseChannelForRollupInfo() chan responseRollupInfoByBlockRange { +func (_m *workersInterfaceMock) getResponseChannelForRollupInfo() chan responseRollupInfoByBlockRange { ret := _m.Called() var r0 chan responseRollupInfoByBlockRange @@ -71,7 +71,7 @@ func (_m *workersMock) getResponseChannelForRollupInfo() chan responseRollupInfo } // howManyRunningWorkers provides a mock function with given fields: -func (_m *workersMock) howManyRunningWorkers() int { +func (_m *workersInterfaceMock) howManyRunningWorkers() int { ret := _m.Called() var r0 int @@ -85,7 +85,7 @@ func (_m *workersMock) howManyRunningWorkers() int { } // initialize provides a mock function with given fields: -func (_m *workersMock) initialize() error { +func (_m *workersInterfaceMock) initialize() error { ret := _m.Called() var r0 error @@ -99,7 +99,7 @@ func (_m *workersMock) initialize() error { } // requestLastBlockWithRetries provides a mock function with given fields: ctx, timeout, maxPermittedRetries -func (_m *workersMock) requestLastBlockWithRetries(ctx context.Context, timeout time.Duration, maxPermittedRetries int) responseL1LastBlock { +func (_m *workersInterfaceMock) requestLastBlockWithRetries(ctx context.Context, timeout time.Duration, maxPermittedRetries int) responseL1LastBlock { ret := _m.Called(ctx, timeout, maxPermittedRetries) var r0 responseL1LastBlock @@ -113,22 +113,22 @@ func (_m *workersMock) requestLastBlockWithRetries(ctx context.Context, timeout } // stop provides a mock function with given fields: -func (_m *workersMock) stop() { +func (_m *workersInterfaceMock) stop() { _m.Called() } // waitFinishAllWorkers provides a mock function with given fields: -func (_m *workersMock) waitFinishAllWorkers() { +func (_m *workersInterfaceMock) waitFinishAllWorkers() { _m.Called() } -// newWorkersMock creates a new instance of workersMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// newWorkersInterfaceMock creates a new instance of workersInterfaceMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. -func newWorkersMock(t interface { +func newWorkersInterfaceMock(t interface { mock.TestingT Cleanup(func()) -}) *workersMock { - mock := &workersMock{} +}) *workersInterfaceMock { + mock := &workersInterfaceMock{} mock.Mock.Test(t) t.Cleanup(func() { mock.AssertExpectations(t) }) diff --git a/synchronizer/time_provider.go b/synchronizer/l1_parallel_sync/time_provider.go similarity index 94% rename from synchronizer/time_provider.go rename to synchronizer/l1_parallel_sync/time_provider.go index fea32d8e53..bd5b17a44b 100644 --- a/synchronizer/time_provider.go +++ b/synchronizer/l1_parallel_sync/time_provider.go @@ -1,4 +1,4 @@ -package synchronizer +package l1_parallel_sync import ( "time" diff --git a/synchronizer/mock_synchronizer_process_block_range.go b/synchronizer/mock_synchronizer_process_block_range.go deleted file mode 100644 index 5b1a5714b2..0000000000 --- a/synchronizer/mock_synchronizer_process_block_range.go +++ /dev/null @@ -1,43 +0,0 @@ -// Code generated by mockery v2.32.0. DO NOT EDIT. - -package synchronizer - -import ( - etherman "github.com/0xPolygonHermez/zkevm-node/etherman" - common "github.com/ethereum/go-ethereum/common" - - mock "github.com/stretchr/testify/mock" -) - -// synchronizerProcessBlockRangeMock is an autogenerated mock type for the synchronizerProcessBlockRangeInterface type -type synchronizerProcessBlockRangeMock struct { - mock.Mock -} - -// processBlockRange provides a mock function with given fields: blocks, order -func (_m *synchronizerProcessBlockRangeMock) processBlockRange(blocks []etherman.Block, order map[common.Hash][]etherman.Order) error { - ret := _m.Called(blocks, order) - - var r0 error - if rf, ok := ret.Get(0).(func([]etherman.Block, map[common.Hash][]etherman.Order) error); ok { - r0 = rf(blocks, order) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// newSynchronizerProcessBlockRangeMock creates a new instance of synchronizerProcessBlockRangeMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func newSynchronizerProcessBlockRangeMock(t interface { - mock.TestingT - Cleanup(func()) -}) *synchronizerProcessBlockRangeMock { - mock := &synchronizerProcessBlockRangeMock{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 500fda2611..c8832cb882 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -28,6 +28,7 @@ import ( "github.com/0xPolygonHermez/zkevm-node/state/runtime/executor" "github.com/0xPolygonHermez/zkevm-node/synchronizer/actions" "github.com/0xPolygonHermez/zkevm-node/synchronizer/actions/processor_manager" + "github.com/0xPolygonHermez/zkevm-node/synchronizer/l1_parallel_sync" "github.com/0xPolygonHermez/zkevm-node/synchronizer/l1event_orders" "github.com/0xPolygonHermez/zkevm-node/synchronizer/metrics" "github.com/ethereum/go-ethereum/common" @@ -77,7 +78,7 @@ type ClientSynchronizer struct { proverID string // Previous value returned by state.GetStoredFlushID, is used for decide if write a log or not previousExecutorFlushID uint64 - l1SyncOrchestration *l1SyncOrchestration + l1SyncOrchestration *l1_parallel_sync.L1SyncOrchestration l1EventProcessors *processor_manager.L1EventProcessors } @@ -135,25 +136,30 @@ func NewSynchronizer( var waitDuration = time.Duration(0) -func newL1SyncParallel(ctx context.Context, cfg Config, etherManForL1 []EthermanInterface, sync *ClientSynchronizer, runExternalControl bool) (*l1SyncOrchestration, error) { - chIncommingRollupInfo := make(chan l1SyncMessage, cfg.L1ParallelSynchronization.MaxPendingNoProcessedBlocks) - cfgConsumer := configConsumer{ +func newL1SyncParallel(ctx context.Context, cfg Config, etherManForL1 []EthermanInterface, sync *ClientSynchronizer, runExternalControl bool) (*l1_parallel_sync.L1SyncOrchestration, error) { + chIncommingRollupInfo := make(chan l1_parallel_sync.L1SyncMessage, cfg.L1ParallelSynchronization.MaxPendingNoProcessedBlocks) + cfgConsumer := l1_parallel_sync.ConfigConsumer{ ApplyAfterNumRollupReceived: cfg.L1ParallelSynchronization.PerformanceWarning.ApplyAfterNumRollupReceived, AceptableInacctivityTime: cfg.L1ParallelSynchronization.PerformanceWarning.AceptableInacctivityTime.Duration, } - L1DataProcessor := newL1RollupInfoConsumer(cfgConsumer, sync, chIncommingRollupInfo) + L1DataProcessor := l1_parallel_sync.NewL1RollupInfoConsumer(cfgConsumer, sync, chIncommingRollupInfo) - cfgProducer := configProducer{ - syncChunkSize: cfg.SyncChunkSize, - ttlOfLastBlockOnL1: cfg.L1ParallelSynchronization.RequestLastBlockPeriod.Duration, - timeoutForRequestLastBlockOnL1: cfg.L1ParallelSynchronization.RequestLastBlockTimeout.Duration, - numOfAllowedRetriesForRequestLastBlockOnL1: cfg.L1ParallelSynchronization.RequestLastBlockMaxRetries, - timeForShowUpStatisticsLog: cfg.L1ParallelSynchronization.StatisticsPeriod.Duration, - timeOutMainLoop: cfg.L1ParallelSynchronization.TimeOutMainLoop.Duration, - minTimeBetweenRetriesForRollupInfo: cfg.L1ParallelSynchronization.RollupInfoRetriesSpacing.Duration, + cfgProducer := l1_parallel_sync.ConfigProducer{ + SyncChunkSize: cfg.SyncChunkSize, + TtlOfLastBlockOnL1: cfg.L1ParallelSynchronization.RequestLastBlockPeriod.Duration, + TimeoutForRequestLastBlockOnL1: cfg.L1ParallelSynchronization.RequestLastBlockTimeout.Duration, + NumOfAllowedRetriesForRequestLastBlockOnL1: cfg.L1ParallelSynchronization.RequestLastBlockMaxRetries, + TimeForShowUpStatisticsLog: cfg.L1ParallelSynchronization.StatisticsPeriod.Duration, + TimeOutMainLoop: cfg.L1ParallelSynchronization.TimeOutMainLoop.Duration, + MinTimeBetweenRetriesForRollupInfo: cfg.L1ParallelSynchronization.RollupInfoRetriesSpacing.Duration, } - l1DataRetriever := newL1DataRetriever(cfgProducer, etherManForL1, chIncommingRollupInfo) - l1SyncOrchestration := newL1SyncOrchestration(ctx, l1DataRetriever, L1DataProcessor) + // Convert EthermanInterface to l1_sync_parallel.EthermanInterface + etherManForL1Converted := make([]l1_parallel_sync.L1ParallelEthermanInterface, len(etherManForL1)) + for i, etherMan := range etherManForL1 { + etherManForL1Converted[i] = etherMan + } + l1DataRetriever := l1_parallel_sync.NewL1DataRetriever(cfgProducer, etherManForL1Converted, chIncommingRollupInfo) + l1SyncOrchestration := l1_parallel_sync.NewL1SyncOrchestration(ctx, l1DataRetriever, L1DataProcessor) if runExternalControl { log.Infof("Starting external control") externalControl := newExternalControl(l1DataRetriever, l1SyncOrchestration) @@ -385,7 +391,7 @@ func (s *ClientSynchronizer) Sync() error { } else { if s.l1SyncOrchestration != nil { log.Infof("Switching to sequential mode, stopping parallel sync and deleting object") - s.l1SyncOrchestration.abort() + s.l1SyncOrchestration.Abort() s.l1SyncOrchestration = nil } log.Infof("Syncing L1 blocks sequentially lastEthBlockSynced=%d", lastEthBlockSynced.BlockNumber) @@ -401,7 +407,7 @@ func (s *ClientSynchronizer) Sync() error { if s.l1SyncOrchestration != nil { // If have failed execution and get starting point from DB, we must reset parallel sync to this point // producer must start requesting this block - s.l1SyncOrchestration.reset(lastEthBlockSynced.BlockNumber) + s.l1SyncOrchestration.Reset(lastEthBlockSynced.BlockNumber) } if s.ctx.Err() != nil { continue @@ -427,17 +433,13 @@ func (s *ClientSynchronizer) syncBlocksParallel(lastEthBlockSynced *state.Block) err = s.resetState(block.BlockNumber) if err != nil { log.Errorf("error resetting the state to a previous block. Retrying... Err: %v", err) - s.l1SyncOrchestration.reset(lastEthBlockSynced.BlockNumber) + s.l1SyncOrchestration.Reset(lastEthBlockSynced.BlockNumber) return lastEthBlockSynced, fmt.Errorf("error resetting the state to a previous block") } return block, nil } - if !s.l1SyncOrchestration.isProducerRunning() { - log.Infof("producer is not running. Resetting the state to start from block %v (last on DB)", lastEthBlockSynced.BlockNumber) - s.l1SyncOrchestration.producer.Reset(lastEthBlockSynced.BlockNumber) - } log.Infof("Starting L1 sync orchestrator in parallel block: %d", lastEthBlockSynced.BlockNumber) - return s.l1SyncOrchestration.start(lastEthBlockSynced) + return s.l1SyncOrchestration.Start(lastEthBlockSynced) } // This function syncs the node from a specific block to the latest @@ -485,7 +487,7 @@ func (s *ClientSynchronizer) syncBlocksSequential(lastEthBlockSynced *state.Bloc return lastEthBlockSynced, err } start = time.Now() - err = s.processBlockRange(blocks, order) + err = s.ProcessBlockRange(blocks, order) metrics.ProcessL1DataTime(time.Since(start)) if err != nil { return lastEthBlockSynced, err @@ -519,7 +521,7 @@ func (s *ClientSynchronizer) syncBlocksSequential(lastEthBlockSynced *state.Bloc ParentHash: fb.ParentHash(), ReceivedAt: time.Unix(int64(fb.Time()), 0), } - err = s.processBlockRange([]etherman.Block{b}, order) + err = s.ProcessBlockRange([]etherman.Block{b}, order) if err != nil { return lastEthBlockSynced, err } @@ -616,7 +618,8 @@ func (s *ClientSynchronizer) syncTrustedState(latestSyncedBatch uint64) error { return nil } -func (s *ClientSynchronizer) processBlockRange(blocks []etherman.Block, order map[common.Hash][]etherman.Order) error { +// ProcessBlockRange process the L1 events and stores the information in the db +func (s *ClientSynchronizer) ProcessBlockRange(blocks []etherman.Block, order map[common.Hash][]etherman.Order) error { // New info has to be included into the db using the state for i := range blocks { // Begin db transaction @@ -747,7 +750,7 @@ func (s *ClientSynchronizer) resetState(blockNumber uint64) error { return err } if s.l1SyncOrchestration != nil { - s.l1SyncOrchestration.reset(blockNumber) + s.l1SyncOrchestration.Reset(blockNumber) } return nil } diff --git a/test/Makefile b/test/Makefile index c317ba8d8a..ff2fd0b243 100644 --- a/test/Makefile +++ b/test/Makefile @@ -550,20 +550,25 @@ generate-mocks-sequencer: ## Generates mocks for sequencer , using mockery tool export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=dbManagerInterface --dir=../sequencer --output=../sequencer --outpkg=sequencer --inpackage --structname=DbManagerMock --filename=mock_db_manager.go export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=etherman --dir=../sequencer --output=../sequencer --outpkg=sequencer --inpackage --structname=EthermanMock --filename=mock_etherman.go +SYNC_L1_PARALLEL_FOLDER="../synchronizer/l1_parallel_sync" +SYNC_L1_PARALLEL_MOCKS_FOLDER="../synchronizer/l1_parallel_sync/mocks" +SYNC_L1_PARALLEL_PARAMS=--inpackage --outpkg=l1_parallel_sync .PHONY: generate-mocks-synchronizer generate-mocks-synchronizer: ## Generates mocks for synchronizer , using mockery tool ## mocks for synchronizer - export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=EthermanInterface --dir=../synchronizer --output=../synchronizer --outpkg=synchronizer --structname=ethermanMock --filename=mock_etherman.go - export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=stateInterface --dir=../synchronizer --output=../synchronizer --outpkg=synchronizer --structname=stateMock --filename=mock_state.go - export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=ethTxManager --dir=../synchronizer --output=../synchronizer --outpkg=synchronizer --structname=ethTxManagerMock --filename=mock_ethtxmanager.go - export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=poolInterface --dir=../synchronizer --output=../synchronizer --outpkg=synchronizer --structname=poolMock --filename=mock_pool.go - export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=zkEVMClientInterface --dir=../synchronizer --output=../synchronizer --outpkg=synchronizer --structname=zkEVMClientMock --filename=mock_zkevmclient.go - export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=Tx --srcpkg=github.com/jackc/pgx/v4 --output=../synchronizer/mocks --structname=DbTxMock --filename=mock_dbtx.go - export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=l1RollupProducerInterface --dir=../synchronizer --output=../synchronizer --outpkg=synchronizer --structname=l1RollupProducerInterfaceMock --filename=mock_l1_rollup_producer_interface.go --inpackage - export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=l1RollupConsumerInterface --dir=../synchronizer --output=../synchronizer --outpkg=synchronizer --structname=l1RollupConsumerInterfaceMock --filename=mock_l1_rollup_consumer_interface.go --inpackage - export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=worker --dir=../synchronizer --output=../synchronizer --outpkg=synchronizer --structname=workerMock --filename=mock_l1_worker.go --inpackage - export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=synchronizerProcessBlockRangeInterface --dir=../synchronizer --output=../synchronizer --outpkg=synchronizer --structname=synchronizerProcessBlockRangeMock --filename=mock_synchronizer_process_block_range.go --inpackage - export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=workersInterface --dir=../synchronizer --output=../synchronizer --outpkg=synchronizer --structname=workersMock --filename=mock_workers.go --inpackage + + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=EthermanInterface --dir=../synchronizer --output=../synchronizer --outpkg=synchronizer --structname=ethermanMock --filename=mock_etherman.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=stateInterface --dir=../synchronizer --output=../synchronizer --outpkg=synchronizer --structname=stateMock --filename=mock_state.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=ethTxManager --dir=../synchronizer --output=../synchronizer --outpkg=synchronizer --structname=ethTxManagerMock --filename=mock_ethtxmanager.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=poolInterface --dir=../synchronizer --output=../synchronizer --outpkg=synchronizer --structname=poolMock --filename=mock_pool.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=zkEVMClientInterface --dir=../synchronizer --output=../synchronizer --outpkg=synchronizer --structname=zkEVMClientMock --filename=mock_zkevmclient.go + for i in l1RollupProducerInterface l1RollupConsumerInterface worker synchronizerProcessBlockRangeInterface workersInterface L1ParallelEthermanInterface; do \ + camelcase=$$(echo $$i | sed 's/\([a-z0-9]\)\([A-Z]\)/\1_\L\2/g' | tr '[:upper:]' '[:lower:]') ; \ + echo $$camelcase ; \ + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=$$i --dir=../synchronizer/l1_parallel_sync --output=../synchronizer/l1_parallel_sync/mocks --outpkg=l1_parallel_sync --structname=$$i"Mock" --filename=mock_$$camelcase.go --inpackage ; \ + done + + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=Tx --srcpkg=github.com/jackc/pgx/v4 --output=../synchronizer/mocks --structname=DbTxMock --filename=mock_dbtx.go .PHONY: generate-mocks-etherman generate-mocks-etherman: ## Generates mocks for etherman , using mockery tool