diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index ba2440b620..ee1810d11e 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -46,12 +46,12 @@ jobs: sed -i -e "s/image: zkevm-node/image: hermeznetwork\/zkevm-node:$GIT_TAG_NAME/g" testnet/docker-compose.yml zip -r testnet.zip testnet # MAINNET - mkdir -p mainnet/config/environments/testnet + mkdir -p mainnet/config/environments/mainnet mkdir -p mainnet/db/scripts - cp config/environments/mainnet/* mainnet/config/environments/testnet + cp config/environments/mainnet/* mainnet/config/environments/mainnet cp docker-compose.yml mainnet cp db/scripts/init_prover_db.sql mainnet/db/scripts - mv mainnet/config/environments/testnet/example.env mainnet + mv mainnet/config/environments/mainnet/example.env mainnet sed -i -e "s/image: zkevm-node/image: hermeznetwork\/zkevm-node:$GIT_TAG_NAME/g" mainnet/docker-compose.yml zip -r mainnet.zip mainnet @@ -61,4 +61,4 @@ jobs: files: 'testnet.zip;mainnet.zip' repo-token: ${{ secrets.TOKEN_RELEASE }} release-tag: ${{ steps.tagName.outputs.tag }} - \ No newline at end of file + diff --git a/Dockerfile b/Dockerfile index 8719b468b5..69829d3151 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,7 +14,7 @@ RUN cd /src && make build # CONTAINER FOR RUNNING BINARY FROM alpine:3.18.0 COPY --from=build /src/dist/zkevm-node /app/zkevm-node -COPY --from=build /src/config/environments/testnet/testnet.node.config.toml /app/example.config.toml +COPY --from=build /src/config/environments/testnet/node.config.toml /app/example.config.toml RUN apk update && apk add postgresql15-client EXPOSE 8123 CMD ["/bin/sh", "-c", "/app/zkevm-node run"] diff --git a/README.md b/README.md index e23883dd41..158d990caa 100644 --- a/README.md +++ b/README.md @@ -115,6 +115,6 @@ It's recommended to use `make` for building, and testing the code, ... Run `make ## Contribute -Before opening a pull request, please read this [guide](CONTRIBUTING.md) +Before opening a pull request, please read this [guide](CONTRIBUTING.md). diff --git a/config/config.go b/config/config.go index 98f28fc48d..9660faac75 100644 --- a/config/config.go +++ b/config/config.go @@ -61,8 +61,8 @@ Config represents the configuration of the entire Hermez Node The file is [TOML format] You could find some examples: - `config/environments/local/local.node.config.toml`: running a permisionless node - - `config/environments/mainnet/public.node.config.toml` - - `config/environments/public/public.node.config.toml` + - `config/environments/mainnet/node.config.toml` + - `config/environments/public/node.config.toml` - `test/config/test.node.config.toml`: configuration for a trusted node used in CI [TOML format]: https://en.wikipedia.org/wiki/TOML diff --git a/config/environments/mainnet/public.node.config.toml b/config/environments/mainnet/node.config.toml similarity index 100% rename from config/environments/mainnet/public.node.config.toml rename to config/environments/mainnet/node.config.toml diff --git a/config/environments/mainnet/public.prover.config.json b/config/environments/mainnet/prover.config.json similarity index 98% rename from config/environments/mainnet/public.prover.config.json rename to config/environments/mainnet/prover.config.json index 8fcc181715..0aec07e541 100644 --- a/config/environments/mainnet/public.prover.config.json +++ b/config/environments/mainnet/prover.config.json @@ -112,5 +112,5 @@ "maxHashDBThreads": 8, "ECRecoverPrecalc": true, "ECRecoverPrecalcNThreads": 16, - "dbMultiWriteSinglePosition": true + "dbMultiWriteSinglePosition": false } diff --git a/config/environments/testnet/testnet.node.config.toml b/config/environments/testnet/node.config.toml similarity index 100% rename from config/environments/testnet/testnet.node.config.toml rename to config/environments/testnet/node.config.toml diff --git a/config/environments/testnet/testnet.prover.config.json b/config/environments/testnet/prover.config.json similarity index 98% rename from config/environments/testnet/testnet.prover.config.json rename to config/environments/testnet/prover.config.json index 8fcc181715..0aec07e541 100644 --- a/config/environments/testnet/testnet.prover.config.json +++ b/config/environments/testnet/prover.config.json @@ -112,5 +112,5 @@ "maxHashDBThreads": 8, "ECRecoverPrecalc": true, "ECRecoverPrecalcNThreads": 16, - "dbMultiWriteSinglePosition": true + "dbMultiWriteSinglePosition": false } diff --git a/config/gen_json_schema_test.go b/config/gen_json_schema_test.go index f43724c084..a5f2632f66 100644 --- a/config/gen_json_schema_test.go +++ b/config/gen_json_schema_test.go @@ -25,8 +25,8 @@ The file is [TOML format](https://en.wikipedia.org/wiki/TOML#) You could find some examples: - `config/environments/local/local.node.config.toml`: running a permisionless node -- `config/environments/mainnet/public.node.config.toml` -- `config/environments/public/public.node.config.toml` +- `config/environments/mainnet/node.config.toml` +- `config/environments/public/node.config.toml` - `test/config/test.node.config.toml`: configuration for a trusted node used in CI */ type MyTestConfig struct { diff --git a/docker-compose.yml b/docker-compose.yml index 27a64e0780..0755564b29 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -26,7 +26,7 @@ services: environment: - ZKEVM_NODE_ETHERMAN_URL=${ZKEVM_NODE_ETHERMAN_URL} volumes: - - ${ZKEVM_ADVANCED_CONFIG_DIR:-./config/environments/testnet}/testnet.node.config.toml:/app/config.toml + - ${ZKEVM_ADVANCED_CONFIG_DIR:-./config/environments/testnet}/node.config.toml:/app/config.toml command: - "/bin/sh" - "-c" @@ -50,7 +50,7 @@ services: environment: - ZKEVM_NODE_ETHERMAN_URL=${ZKEVM_NODE_ETHERMAN_URL} volumes: - - ${ZKEVM_ADVANCED_CONFIG_DIR:-./config/environments/testnet}/testnet.node.config.toml:/app/config.toml + - ${ZKEVM_ADVANCED_CONFIG_DIR:-./config/environments/testnet}/node.config.toml:/app/config.toml command: - "/bin/sh" - "-c" @@ -115,6 +115,6 @@ services: - 50061:50061 # MT - 50071:50071 # Executor volumes: - - ${ZKEVM_ADVANCED_CONFIG_DIR:-./config/environments/testnet}/testnet.prover.config.json:/usr/src/app/config.json + - ${ZKEVM_ADVANCED_CONFIG_DIR:-./config/environments/testnet}/prover.config.json:/usr/src/app/config.json command: > zkProver -c /usr/src/app/config.json diff --git a/docs/configuration.md b/docs/configuration.md index f38df7dfb1..a61bcac3c8 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -9,8 +9,8 @@ To configure a node you need 3 files: This file is a [TOML](https://en.wikipedia.org/wiki/TOML#) formatted file. You could find some examples here: - `config/environments/local/local.node.config.toml`: running a permisionless node - - `config/environments/mainnet/public.node.config.toml` - - `config/environments/public/public.node.config.toml` + - `config/environments/mainnet/node.config.toml` + - `config/environments/public/node.config.toml` - `test/config/test.node.config.toml`: configuration for a trusted node used in CI For details about the contents you can read specifications [here](config-file/node-config-doc.md) @@ -38,5 +38,5 @@ For details about the contents you can read specifications [here](config-file/cu Please check [prover repository](https://github.com/0xPolygonHermez/zkevm-prover) for further information Examples: - - `config/environments/mainnet/public.prover.config.json` - - `config/environments/testnet/testnet.prover.config.json` + - `config/environments/mainnet/prover.config.json` + - `config/environments/testnet/prover.config.json` diff --git a/docs/modes.md b/docs/modes.md index 64396de18b..3d264912c6 100644 --- a/docs/modes.md +++ b/docs/modes.md @@ -19,7 +19,7 @@ By default the config files found in the repository will spin up the Node in JSO This will syncronize with the Trusted Sequencer (run by Polygon). -Use the default [testnet config file](https://github.com/0xPolygonHermez/zkevm-node/blob/develop/config/environments/testnet/testnet.node.config.toml), and make sure the following values are set to: +Use the default [testnet config file](https://github.com/0xPolygonHermez/zkevm-node/blob/develop/config/environments/testnet/node.config.toml), and make sure the following values are set to: ```toml [RPC] @@ -78,7 +78,7 @@ Machine 1: #### Machine 1 -Use default [prover config](https://github.com/0xPolygonHermez/zkevm-node/blob/develop/config/environments/testnet/testnet.prover.config.json) but change the following values (`runProverServer` set to true, rest false): +Use default [prover config](https://github.com/0xPolygonHermez/zkevm-node/blob/develop/config/environments/testnet/prover.config.json) but change the following values (`runProverServer` set to true, rest false): For *only* Prover Config (`only-prover-config.json`): diff --git a/docs/production-setup.md b/docs/production-setup.md index 4e71cd3c81..1d66ee8e56 100644 --- a/docs/production-setup.md +++ b/docs/production-setup.md @@ -87,8 +87,8 @@ In the basic setup, there are Postgres being instanciated as Docker containers. - Run dedicated instances for Postgres. To achieve this you will need to: - Remove the Postgres services (`zkevm-pool-db` and `zkevm-state-db`) from the `docker-compose.yml` - Instantiate Postgres elsewhere (note that you will have to create credentials and run some queries to make this work, following the config files and docker-compose should give a clear idea of what to do) - - Update the `testnet.node.config.toml` to use the correct URI for both DBs - - Update `testnet.prover.config.json` to use the correct URI for the state DB + - Update the `node.config.toml` to use the correct URI for both DBs + - Update `prover.config.json` to use the correct URI for the state DB - Use a setup of Postgres that allows to have separated endpoints for read / write replicas ### JSON RPC diff --git a/event/event.go b/event/event.go index 5da961eb14..f05e65d634 100644 --- a/event/event.go +++ b/event/event.go @@ -36,8 +36,8 @@ const ( EventID_FinalizerRestart EventID = "FINALIZER RESTART" // EventID_FinalizerBreakEvenGasPriceBigDifference is triggered when the finalizer recalculates the break even gas price and detects a big difference EventID_FinalizerBreakEvenGasPriceBigDifference EventID = "FINALIZER BREAK EVEN GAS PRICE BIG DIFFERENCE" - // EventID_SynchonizerRestart is triggered when the Synchonizer restarts - EventID_SynchonizerRestart EventID = "SYNCHRONIZER RESTART" + // EventID_SynchronizerRestart is triggered when the Synchonizer restarts + EventID_SynchronizerRestart EventID = "SYNCHRONIZER RESTART" // Source_Node is the source of the event Source_Node Source = "node" diff --git a/go.mod b/go.mod index c5f3fcb246..7030449b8e 100644 --- a/go.mod +++ b/go.mod @@ -127,10 +127,12 @@ require ( go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.8.0 // indirect golang.org/x/exp v0.0.0-20230206171751-46f607a40771 // indirect + golang.org/x/mod v0.9.0 // indirect golang.org/x/sys v0.10.0 // indirect golang.org/x/term v0.10.0 // indirect golang.org/x/text v0.11.0 // indirect golang.org/x/time v0.1.0 // indirect + golang.org/x/tools v0.7.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect diff --git a/go.sum b/go.sum index 2189fd5796..37a9bf79f9 100644 --- a/go.sum +++ b/go.sum @@ -825,6 +825,8 @@ golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.9.0 h1:KENHtAZL2y3NLMYZeHY9DW8HW8V+kQyJsY/V9JlKvCs= +golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1080,6 +1082,8 @@ golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/tools v0.7.0 h1:W4OVu8VVOaIO0yzWMNdepAulS7YfoS3Zabrm8DOXXU4= +golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/sequencer/finalizer.go b/sequencer/finalizer.go index 52311e3bf5..6e39945075 100644 --- a/sequencer/finalizer.go +++ b/sequencer/finalizer.go @@ -60,14 +60,15 @@ type finalizer struct { maxBreakEvenGasPriceDeviationPercentage *big.Int defaultMinGasPriceAllowed uint64 // Processed txs - pendingTransactionsToStore chan transactionToStore - pendingTransactionsToStoreWG *sync.WaitGroup - pendingTransactionsToStoreMux *sync.RWMutex - storedFlushID uint64 - storedFlushIDCond *sync.Cond - proverID string - lastPendingFlushID uint64 - pendingFlushIDCond *sync.Cond + pendingTxsToStore chan transactionToStore + pendingTxsToStoreWG *sync.WaitGroup + pendingTxsToStoreMux *sync.RWMutex + pendingTxsPerAddressTrackers map[common.Address]*pendingTxPerAddressTracker + storedFlushID uint64 + storedFlushIDCond *sync.Cond + proverID string + lastPendingFlushID uint64 + pendingFlushIDCond *sync.Cond } type transactionToStore struct { @@ -113,6 +114,8 @@ func newFinalizer( closingSignalCh ClosingSignalCh, batchConstraints batchConstraints, eventLog *event.EventLog, + pendingTxsToStoreMux *sync.RWMutex, + pendingTxsPerAddressTrackers map[common.Address]*pendingTxPerAddressTracker, ) *finalizer { return &finalizer{ cfg: cfg, @@ -139,9 +142,10 @@ func newFinalizer( // event log eventLog: eventLog, maxBreakEvenGasPriceDeviationPercentage: new(big.Int).SetUint64(effectiveGasPriceCfg.MaxBreakEvenGasPriceDeviationPercentage), - pendingTransactionsToStore: make(chan transactionToStore, batchConstraints.MaxTxsPerBatch*pendingTxsBufferSizeMultiplier), - pendingTransactionsToStoreWG: new(sync.WaitGroup), - pendingTransactionsToStoreMux: &sync.RWMutex{}, + pendingTxsToStore: make(chan transactionToStore, batchConstraints.MaxTxsPerBatch*pendingTxsBufferSizeMultiplier), + pendingTxsToStoreWG: new(sync.WaitGroup), + pendingTxsToStoreMux: pendingTxsToStoreMux, + pendingTxsPerAddressTrackers: pendingTxsPerAddressTrackers, storedFlushID: 0, // Mutex is unlocked when the condition is broadcasted storedFlushIDCond: sync.NewCond(&sync.Mutex{}), @@ -395,7 +399,7 @@ func (f *finalizer) checkProverIDAndUpdateStoredFlushID(storedFlushID uint64, pr func (f *finalizer) storePendingTransactions(ctx context.Context) { for { select { - case tx, ok := <-f.pendingTransactionsToStore: + case tx, ok := <-f.pendingTxsToStore: if !ok { // Channel is closed return @@ -415,10 +419,19 @@ func (f *finalizer) storePendingTransactions(ctx context.Context) { // Now f.storedFlushID >= tx.flushId, you can store tx f.storeProcessedTx(ctx, tx) - f.pendingTransactionsToStoreWG.Done() + f.pendingTxsToStoreMux.Lock() + f.pendingTxsToStoreWG.Done() + f.pendingTxsPerAddressTrackers[tx.txTracker.From].wg.Done() + f.pendingTxsPerAddressTrackers[tx.txTracker.From].count-- + // Needed to avoid memory leaks + if f.pendingTxsPerAddressTrackers[tx.txTracker.From].count == 0 { + delete(f.pendingTxsPerAddressTrackers, tx.txTracker.From) + } + f.pendingTxsToStoreMux.Unlock() + case <-ctx.Done(): // The context was cancelled from outside, Wait for all goroutines to finish, cleanup and exit - f.pendingTransactionsToStoreWG.Wait() + f.pendingTxsToStoreWG.Wait() return default: time.Sleep(100 * time.Millisecond) //nolint:gomnd @@ -433,7 +446,7 @@ func (f *finalizer) newWIPBatch(ctx context.Context) (*WipBatch, error) { // Wait until all processed transactions are saved startWait := time.Now() - f.pendingTransactionsToStoreWG.Wait() + f.pendingTxsToStoreWG.Wait() endWait := time.Now() log.Info("waiting for pending transactions to be stored took: ", endWait.Sub(startWait).String()) @@ -672,18 +685,18 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx flushId: result.FlushID, } - f.pendingTransactionsToStoreMux.Lock() - f.pendingTransactionsToStoreWG.Add(1) + f.pendingTxsToStoreMux.Lock() + f.pendingTxsToStoreWG.Add(1) if result.FlushID > f.lastPendingFlushID { f.lastPendingFlushID = result.FlushID f.pendingFlushIDCond.Broadcast() } - f.pendingTransactionsToStoreMux.Unlock() + f.pendingTxsToStoreMux.Unlock() select { - case f.pendingTransactionsToStore <- processedTransaction: + case f.pendingTxsToStore <- processedTransaction: case <-ctx.Done(): // If context is cancelled before we can send to the channel, we must decrement the WaitGroup count - f.pendingTransactionsToStoreWG.Done() + f.pendingTxsToStoreWG.Done() } f.batch.countOfTxs++ @@ -721,26 +734,35 @@ func (f *finalizer) handleForcedTxsProcessResp(ctx context.Context, request stat flushId: result.FlushID, } - f.pendingTransactionsToStoreMux.Lock() - f.pendingTransactionsToStoreWG.Add(1) + f.pendingTxsToStoreMux.Lock() + f.pendingTxsToStoreWG.Add(1) if result.FlushID > f.lastPendingFlushID { f.lastPendingFlushID = result.FlushID f.pendingFlushIDCond.Broadcast() } - f.pendingTransactionsToStoreMux.Unlock() + f.pendingTxsToStoreMux.Unlock() oldStateRoot = txResp.StateRoot select { - case f.pendingTransactionsToStore <- processedTransaction: + case f.pendingTxsToStore <- processedTransaction: case <-ctx.Done(): // If context is cancelled before we can send to the channel, we must decrement the WaitGroup count - f.pendingTransactionsToStoreWG.Done() + f.pendingTxsToStoreWG.Done() } } } // storeProcessedTx stores the processed transaction in the database. func (f *finalizer) storeProcessedTx(ctx context.Context, txToStore transactionToStore) { + f.pendingTxsToStoreMux.Lock() + if _, ok := f.pendingTxsPerAddressTrackers[txToStore.txTracker.From]; !ok { + f.pendingTxsPerAddressTrackers[txToStore.txTracker.From] = new(pendingTxPerAddressTracker) + f.pendingTxsPerAddressTrackers[txToStore.txTracker.From].wg = &sync.WaitGroup{} + } + f.pendingTxsPerAddressTrackers[txToStore.txTracker.From].wg.Add(1) + f.pendingTxsPerAddressTrackers[txToStore.txTracker.From].count++ + f.pendingTxsToStoreMux.Unlock() + if txToStore.response != nil { log.Infof("storeProcessedTx: storing processed txToStore: %s", txToStore.response.TxHash.String()) } else { diff --git a/sequencer/finalizer_test.go b/sequencer/finalizer_test.go index 74751f453c..173bbf180e 100644 --- a/sequencer/finalizer_test.go +++ b/sequencer/finalizer_test.go @@ -118,7 +118,9 @@ func TestNewFinalizer(t *testing.T) { dbManagerMock.On("GetLastSentFlushID", context.Background()).Return(uint64(0), nil) // arrange and act - f = newFinalizer(cfg, effectiveGasPriceCfg, workerMock, dbManagerMock, executorMock, seqAddr, isSynced, closingSignalCh, bc, eventLog) + pendingTxsToStoreMux := new(sync.RWMutex) + pendingTxsPerAddressTrackers := make(map[common.Address]*pendingTxPerAddressTracker) + f = newFinalizer(cfg, effectiveGasPriceCfg, workerMock, dbManagerMock, executorMock, seqAddr, isSynced, closingSignalCh, bc, eventLog, pendingTxsToStoreMux, pendingTxsPerAddressTrackers) // assert assert.NotNil(t, f) @@ -268,14 +270,14 @@ func TestFinalizer_handleProcessTransactionResponse(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { storedTxs := make([]transactionToStore, 0) - f.pendingTransactionsToStore = make(chan transactionToStore) + f.pendingTxsToStore = make(chan transactionToStore) if tc.expectedStoredTx.batchResponse != nil { done = make(chan bool) // init a new done channel go func() { - for tx := range f.pendingTransactionsToStore { + for tx := range f.pendingTxsToStore { storedTxs = append(storedTxs, tx) - f.pendingTransactionsToStoreWG.Done() + f.pendingTxsToStoreWG.Done() } done <- true // signal that the goroutine is done }() @@ -311,9 +313,9 @@ func TestFinalizer_handleProcessTransactionResponse(t *testing.T) { } if tc.expectedStoredTx.batchResponse != nil { - close(f.pendingTransactionsToStore) // close the channel - <-done // wait for the goroutine to finish - f.pendingTransactionsToStoreWG.Wait() + close(f.pendingTxsToStore) // close the channel + <-done // wait for the goroutine to finish + f.pendingTxsToStoreWG.Wait() require.Len(t, storedTxs, 1) actualTx := storedTxs[0] assertEqualTransactionToStore(t, tc.expectedStoredTx, actualTx) @@ -893,13 +895,13 @@ func TestFinalizer_processForcedBatches(t *testing.T) { var newStateRoot common.Hash stateRoot := oldHash storedTxs := make([]transactionToStore, 0) - f.pendingTransactionsToStore = make(chan transactionToStore) + f.pendingTxsToStore = make(chan transactionToStore) if tc.expectedStoredTx != nil && len(tc.expectedStoredTx) > 0 { done = make(chan bool) // init a new done channel go func() { - for tx := range f.pendingTransactionsToStore { + for tx := range f.pendingTxsToStore { storedTxs = append(storedTxs, tx) - f.pendingTransactionsToStoreWG.Done() + f.pendingTxsToStoreWG.Done() } done <- true // signal that the goroutine is done }() @@ -957,9 +959,9 @@ func TestFinalizer_processForcedBatches(t *testing.T) { assert.EqualError(t, err, tc.expectedErr.Error()) } else { if tc.expectedStoredTx != nil && len(tc.expectedStoredTx) > 0 { - close(f.pendingTransactionsToStore) // ensure the channel is closed - <-done // wait for the goroutine to finish - f.pendingTransactionsToStoreWG.Wait() + close(f.pendingTxsToStore) // ensure the channel is closed + <-done // wait for the goroutine to finish + f.pendingTxsToStoreWG.Wait() for i := range tc.expectedStoredTx { require.Equal(t, tc.expectedStoredTx[i], storedTxs[i]) } @@ -1494,13 +1496,13 @@ func Test_processTransaction(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { storedTxs := make([]transactionToStore, 0) - f.pendingTransactionsToStore = make(chan transactionToStore, 1) + f.pendingTxsToStore = make(chan transactionToStore, 1) if tc.expectedStoredTx.batchResponse != nil { done = make(chan bool) // init a new done channel go func() { - for tx := range f.pendingTransactionsToStore { + for tx := range f.pendingTxsToStore { storedTxs = append(storedTxs, tx) - f.pendingTransactionsToStoreWG.Done() + f.pendingTxsToStoreWG.Done() } done <- true // signal that the goroutine is done }() @@ -1523,9 +1525,9 @@ func Test_processTransaction(t *testing.T) { errWg, err := f.processTransaction(tc.ctx, tc.tx) if tc.expectedStoredTx.batchResponse != nil { - close(f.pendingTransactionsToStore) // ensure the channel is closed - <-done // wait for the goroutine to finish - f.pendingTransactionsToStoreWG.Wait() + close(f.pendingTxsToStore) // ensure the channel is closed + <-done // wait for the goroutine to finish + f.pendingTxsToStoreWG.Wait() require.Equal(t, tc.expectedStoredTx, storedTxs[0]) } if tc.expectedErr != nil { @@ -1679,19 +1681,19 @@ func Test_handleForcedTxsProcessResp(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { storedTxs := make([]transactionToStore, 0) - f.pendingTransactionsToStore = make(chan transactionToStore) + f.pendingTxsToStore = make(chan transactionToStore) // Mock storeProcessedTx to store txs into the storedTxs slice go func() { - for tx := range f.pendingTransactionsToStore { + for tx := range f.pendingTxsToStore { storedTxs = append(storedTxs, tx) - f.pendingTransactionsToStoreWG.Done() + f.pendingTxsToStoreWG.Done() } }() f.handleForcedTxsProcessResp(ctx, tc.request, tc.result, tc.oldStateRoot) - f.pendingTransactionsToStoreWG.Wait() + f.pendingTxsToStoreWG.Wait() require.Nil(t, err) require.Equal(t, len(tc.expectedStoredTxs), len(storedTxs)) for i := 0; i < len(tc.expectedStoredTxs); i++ { @@ -1733,6 +1735,9 @@ func TestFinalizer_storeProcessedTx(t *testing.T) { response: &state.ProcessTransactionResponse{ TxHash: txHash, }, + txTracker: &TxTracker{ + From: senderAddr, + }, isForcedBatch: false, }, }, @@ -1755,6 +1760,9 @@ func TestFinalizer_storeProcessedTx(t *testing.T) { TxHash: txHash2, }, isForcedBatch: true, + txTracker: &TxTracker{ + From: senderAddr, + }, }, }, } @@ -2445,9 +2453,10 @@ func setupFinalizer(withWipBatch bool) *finalizer { handlingL2Reorg: false, eventLog: eventLog, maxBreakEvenGasPriceDeviationPercentage: big.NewInt(10), - pendingTransactionsToStore: make(chan transactionToStore, bc.MaxTxsPerBatch*pendingTxsBufferSizeMultiplier), - pendingTransactionsToStoreWG: new(sync.WaitGroup), - pendingTransactionsToStoreMux: new(sync.RWMutex), + pendingTxsToStore: make(chan transactionToStore, bc.MaxTxsPerBatch*pendingTxsBufferSizeMultiplier), + pendingTxsToStoreWG: new(sync.WaitGroup), + pendingTxsToStoreMux: new(sync.RWMutex), + pendingTxsPerAddressTrackers: make(map[common.Address]*pendingTxPerAddressTracker), storedFlushID: 0, storedFlushIDCond: sync.NewCond(new(sync.Mutex)), proverID: "", diff --git a/sequencer/sequencer.go b/sequencer/sequencer.go index ef41799b03..49f285eb47 100644 --- a/sequencer/sequencer.go +++ b/sequencer/sequencer.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "sync" "time" "github.com/0xPolygonHermez/zkevm-node/event" @@ -67,6 +68,12 @@ type ClosingSignalCh struct { L2ReorgCh chan L2ReorgEvent } +// pendingTxPerAddressTracker is a struct that tracks the number of pending transactions per address +type pendingTxPerAddressTracker struct { + wg *sync.WaitGroup + count uint +} + // New init sequencer func New(cfg Config, txPool txPool, state stateInterface, etherman etherman, manager ethTxManager, eventLog *event.EventLog) (*Sequencer, error) { addr, err := etherman.TrustedSequencer() @@ -127,12 +134,14 @@ func (s *Sequencer) Start(ctx context.Context) { if err != nil { log.Fatalf("failed to mark WIP txs as pending, err: %v", err) } + pendingTxsToStoreMux := new(sync.RWMutex) + pendingTxTrackerPerAddress := make(map[common.Address]*pendingTxPerAddressTracker) - worker := NewWorker(s.cfg.Worker, s.state, batchConstraints, batchResourceWeights) + worker := NewWorker(s.cfg.Worker, s.state, batchConstraints, batchResourceWeights, pendingTxsToStoreMux, pendingTxTrackerPerAddress) dbManager := newDBManager(ctx, s.cfg.DBManager, s.pool, s.state, worker, closingSignalCh, batchConstraints) go dbManager.Start() - finalizer := newFinalizer(s.cfg.Finalizer, s.cfg.EffectiveGasPrice, worker, dbManager, s.state, s.address, s.isSynced, closingSignalCh, batchConstraints, s.eventLog) + finalizer := newFinalizer(s.cfg.Finalizer, s.cfg.EffectiveGasPrice, worker, dbManager, s.state, s.address, s.isSynced, closingSignalCh, batchConstraints, s.eventLog, pendingTxsToStoreMux, pendingTxTrackerPerAddress) currBatch, processingReq := s.bootstrap(ctx, dbManager, finalizer) go finalizer.Start(ctx, currBatch, processingReq) diff --git a/sequencer/worker.go b/sequencer/worker.go index cba6938f7c..d0a3e44874 100644 --- a/sequencer/worker.go +++ b/sequencer/worker.go @@ -16,24 +16,35 @@ import ( // Worker represents the worker component of the sequencer type Worker struct { - cfg WorkerCfg - pool map[string]*addrQueue - efficiencyList *efficiencyList - workerMutex sync.Mutex - state stateInterface - batchConstraints batchConstraintsFloat64 - batchResourceWeights batchResourceWeights + cfg WorkerCfg + pool map[string]*addrQueue + efficiencyList *efficiencyList + workerMutex sync.Mutex + state stateInterface + batchConstraints batchConstraintsFloat64 + batchResourceWeights batchResourceWeights + pendingTxsToStoreMux *sync.RWMutex + pendingTxsPerAddressTrackers map[common.Address]*pendingTxPerAddressTracker } // NewWorker creates an init a worker -func NewWorker(cfg WorkerCfg, state stateInterface, constraints batchConstraints, weights batchResourceWeights) *Worker { +func NewWorker( + cfg WorkerCfg, + state stateInterface, + constraints batchConstraints, + weights batchResourceWeights, + pendingTxsToStoreMux *sync.RWMutex, + pendingTxTrackersPerAddress map[common.Address]*pendingTxPerAddressTracker, +) *Worker { w := Worker{ - cfg: cfg, - pool: make(map[string]*addrQueue), - efficiencyList: newEfficiencyList(), - state: state, - batchConstraints: convertBatchConstraintsToFloat64(constraints), - batchResourceWeights: weights, + cfg: cfg, + pool: make(map[string]*addrQueue), + efficiencyList: newEfficiencyList(), + state: state, + batchConstraints: convertBatchConstraintsToFloat64(constraints), + batchResourceWeights: weights, + pendingTxsToStoreMux: pendingTxsToStoreMux, + pendingTxsPerAddressTrackers: pendingTxTrackersPerAddress, } return &w @@ -55,6 +66,14 @@ func (w *Worker) AddTxTracker(ctx context.Context, tx *TxTracker) (replacedTx *T // Unlock the worker to let execute other worker functions while creating the new AddrQueue w.workerMutex.Unlock() + // Wait until all pending transactions are stored, so we can ensure getting the correct nonce and balance of the new AddrQueue + w.pendingTxsToStoreMux.RLock() + pendingTxsTracker, ok := w.pendingTxsPerAddressTrackers[tx.From] + w.pendingTxsToStoreMux.RUnlock() + if ok && pendingTxsTracker.wg != nil { + pendingTxsTracker.wg.Wait() + } + root, err := w.state.GetLastStateRoot(ctx, nil) if err != nil { dropReason = fmt.Errorf("AddTx GetLastStateRoot error: %v", err) diff --git a/sequencer/worker_test.go b/sequencer/worker_test.go index 51787f188c..1d254984f5 100644 --- a/sequencer/worker_test.go +++ b/sequencer/worker_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math/big" + "sync" "testing" "github.com/0xPolygonHermez/zkevm-node/state" @@ -307,6 +308,9 @@ func TestWorkerGetBestTx(t *testing.T) { } func initWorker(stateMock *StateMock, rcMax batchConstraints, rcWeigth batchResourceWeights) *Worker { - worker := NewWorker(workerCfg, stateMock, rcMax, rcWeigth) + pendingTxsToStoreMux := new(sync.RWMutex) + pendingTxsPerAddressTrackers := make(map[common.Address]*pendingTxPerAddressTracker) + worker := NewWorker(workerCfg, stateMock, rcMax, rcWeigth, pendingTxsToStoreMux, pendingTxsPerAddressTrackers) + return worker } diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 7695799027..eb2d5d59e9 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -234,7 +234,7 @@ func (s *ClientSynchronizer) Sync() error { log.Warn("error setting latest batch info into db. Error: ", err) continue } - + log.Infof("latestSequencedBatchNumber: %d, latestSyncedBatch: %d, lastVerifiedBatchNumber: %d", latestSequencedBatchNumber, latestSyncedBatch, lastVerifiedBatchNumber) // Sync trusted state if latestSyncedBatch >= latestSequencedBatchNumber { startTrusted := time.Now() @@ -1251,8 +1251,22 @@ func (s *ClientSynchronizer) processTrustedBatch(trustedBatch *types.Batch, dbTx // Find txs to be processed and included in the trusted state if *s.trustedState.lastStateRoot == batches[1].StateRoot { + // Delete txs that were stored before restart. We need to reprocess all txs because the intermediary stateRoot is only stored in memory + err := s.state.ResetTrustedState(s.ctx, uint64(trustedBatch.Number)-1, dbTx) + if err != nil { + log.Error("error resetting trusted state. Error: ", err) + return nil, nil, err + } // All txs need to be processed request.Transactions = trustedBatchL2Data + // Reopen batch + err = s.openBatch(trustedBatch, dbTx) + if err != nil { + log.Error("error openning batch. Error: ", err) + return nil, nil, err + } + request.GlobalExitRoot = trustedBatch.GlobalExitRoot + request.Transactions = trustedBatchL2Data } else { // Only new txs need to be processed storedTxs, syncedTxs, _, syncedEfficiencyPercentages, err := s.decodeTxs(trustedBatchL2Data, batches) @@ -1285,8 +1299,36 @@ func (s *ClientSynchronizer) processTrustedBatch(trustedBatch *types.Batch, dbTx } log.Debugf("closing batch %v", trustedBatch.Number) if err := s.state.CloseBatch(s.ctx, receipt, dbTx); err != nil { - log.Errorf("error closing batch %d", trustedBatch.Number) - return nil, nil, err + // This is a workaround to avoid closing a batch that was already closed + if err.Error() != state.ErrBatchAlreadyClosed.Error() { + log.Errorf("error closing batch %d", trustedBatch.Number) + return nil, nil, err + } else { + log.Warnf("CASE 02: the batch [%d] was already closed", trustedBatch.Number) + log.Info("batches[0].BatchNumber: ", batches[0].BatchNumber) + log.Info("batches[0].AccInputHash: ", batches[0].AccInputHash) + log.Info("batches[0].StateRoot: ", batches[0].StateRoot) + log.Info("batches[0].LocalExitRoot: ", batches[0].LocalExitRoot) + log.Info("batches[0].GlobalExitRoot: ", batches[0].GlobalExitRoot) + log.Info("batches[0].Coinbase: ", batches[0].Coinbase) + log.Info("batches[0].ForcedBatchNum: ", batches[0].ForcedBatchNum) + log.Info("####################################") + log.Info("batches[1].BatchNumber: ", batches[1].BatchNumber) + log.Info("batches[1].AccInputHash: ", batches[1].AccInputHash) + log.Info("batches[1].StateRoot: ", batches[1].StateRoot) + log.Info("batches[1].LocalExitRoot: ", batches[1].LocalExitRoot) + log.Info("batches[1].GlobalExitRoot: ", batches[1].GlobalExitRoot) + log.Info("batches[1].Coinbase: ", batches[1].Coinbase) + log.Info("batches[1].ForcedBatchNum: ", batches[1].ForcedBatchNum) + log.Info("###############################") + log.Info("trustedBatch.BatchNumber: ", trustedBatch.Number) + log.Info("trustedBatch.AccInputHash: ", trustedBatch.AccInputHash) + log.Info("trustedBatch.StateRoot: ", trustedBatch.StateRoot) + log.Info("trustedBatch.LocalExitRoot: ", trustedBatch.LocalExitRoot) + log.Info("trustedBatch.GlobalExitRoot: ", trustedBatch.GlobalExitRoot) + log.Info("trustedBatch.Coinbase: ", trustedBatch.Coinbase) + log.Info("trustedBatch.ForcedBatchNum: ", trustedBatch.ForcedBatchNumber) + } } batches[0].AccInputHash = trustedBatch.AccInputHash batches[0].StateRoot = trustedBatch.StateRoot @@ -1332,10 +1374,23 @@ func (s *ClientSynchronizer) processTrustedBatch(trustedBatch *types.Batch, dbTx BatchL2Data: trustedBatchL2Data, AccInputHash: trustedBatch.AccInputHash, } + log.Debugf("closing batch %v", trustedBatch.Number) if err := s.state.CloseBatch(s.ctx, receipt, dbTx); err != nil { - log.Errorf("error closing batch %d", trustedBatch.Number) - return nil, nil, err + // This is a workarround to avoid closing a batch that was already closed + if err.Error() != state.ErrBatchAlreadyClosed.Error() { + log.Errorf("error closing batch %d", trustedBatch.Number) + return nil, nil, err + } else { + log.Warnf("CASE 01: batch [%d] was already closed", trustedBatch.Number) + } + } + log.Info("Batch closed right after processing some tx") + if batches[0] != nil { + log.Debug("Updating batches[0] values...") + batches[0].AccInputHash = trustedBatch.AccInputHash + batches[0].StateRoot = trustedBatch.StateRoot + batches[0].LocalExitRoot = trustedBatch.LocalExitRoot } } @@ -1448,13 +1503,13 @@ func checkIfSynced(batches []*state.Batch, trustedBatch *types.Batch) bool { matchCoinbase && matchTimestamp && matchL2Data { return true } - log.Info("matchNumber", matchNumber) - log.Info("matchGER", matchGER) - log.Info("matchLER", matchLER) - log.Info("matchSR", matchSR) - log.Info("matchCoinbase", matchCoinbase) - log.Info("matchTimestamp", matchTimestamp) - log.Info("matchL2Data", matchL2Data) + log.Infof("matchNumber %v %d %d", matchNumber, batches[0].BatchNumber, uint64(trustedBatch.Number)) + log.Infof("matchGER %v %s %s", matchGER, batches[0].GlobalExitRoot.String(), trustedBatch.GlobalExitRoot.String()) + log.Infof("matchLER %v %s %s", matchLER, batches[0].LocalExitRoot.String(), trustedBatch.LocalExitRoot.String()) + log.Infof("matchSR %v %s %s", matchSR, batches[0].StateRoot.String(), trustedBatch.StateRoot.String()) + log.Infof("matchCoinbase %v %s %s", matchCoinbase, batches[0].Coinbase.String(), trustedBatch.Coinbase.String()) + log.Infof("matchTimestamp %v %d %d", matchTimestamp, uint64(batches[0].Timestamp.Unix()), uint64(trustedBatch.Timestamp)) + log.Infof("matchL2Data %v", matchL2Data) return false } @@ -1503,8 +1558,8 @@ func (s *ClientSynchronizer) updateAndCheckProverID(proverID string) { Source: event.Source_Node, Component: event.Component_Synchronizer, Level: event.Level_Critical, - EventID: event.EventID_SynchonizerRestart, - Description: fmt.Sprintf("proverID changed from %s to %s, restarting Synchonizer ", s.proverID, proverID), + EventID: event.EventID_SynchronizerRestart, + Description: fmt.Sprintf("proverID changed from %s to %s, restarting Synchronizer ", s.proverID, proverID), } err := s.eventLog.LogEvent(context.Background(), event) @@ -1536,7 +1591,7 @@ func (s *ClientSynchronizer) checkFlushID(dbTx pgx.Tx) error { s.updateAndCheckProverID(proverID) log.Debugf("storedFlushID (executor reported): %d, latestFlushID (pending): %d", storedFlushID, s.latestFlushID) if storedFlushID < s.latestFlushID { - log.Infof("Synchornized BLOCKED!: Wating for the flushID to be stored. FlushID to be stored: %d. Latest flushID stored: %d", s.latestFlushID, storedFlushID) + log.Infof("Synchronized BLOCKED!: Wating for the flushID to be stored. FlushID to be stored: %d. Latest flushID stored: %d", s.latestFlushID, storedFlushID) iteration := 0 start := time.Now() for storedFlushID < s.latestFlushID { @@ -1550,7 +1605,7 @@ func (s *ClientSynchronizer) checkFlushID(dbTx pgx.Tx) error { } iteration++ } - log.Infof("Synchornizer resumed, flushID stored: %d", s.latestFlushID) + log.Infof("Synchronizer resumed, flushID stored: %d", s.latestFlushID) } log.Infof("Pending Flushid fullfiled: %d, executor have write %d", s.latestFlushID, storedFlushID) s.latestFlushIDIsFulfilled = true diff --git a/synchronizer/synchronizer_test.go b/synchronizer/synchronizer_test.go index d8199eea79..f93c6f2874 100644 --- a/synchronizer/synchronizer_test.go +++ b/synchronizer/synchronizer_test.go @@ -686,6 +686,23 @@ func expectedCallsForsyncTrustedState(t *testing.T, m *mocks, sync *ClientSynchr Return(&stateBatchInPermissionLess, nil). Once() } + + m.State. + On("ResetTrustedState", sync.ctx, batchNumber-1, m.DbTx). + Return(nil). + Once() + + processCtx := state.ProcessingContext{ + BatchNumber: uint64(batchInTrustedNode.Number), + Coinbase: common.HexToAddress(batchInTrustedNode.Coinbase.String()), + Timestamp: time.Unix(int64(batchInTrustedNode.Timestamp), 0), + GlobalExitRoot: batchInTrustedNode.GlobalExitRoot, + } + m.State. + On("OpenBatch", sync.ctx, processCtx, m.DbTx). + Return(nil). + Once() + m.State. On("UpdateBatchL2Data", sync.ctx, batchNumber, stateBatchInTrustedNode.BatchL2Data, mock.Anything). Return(nil). diff --git a/test/config/test.permissionless.prover.config.json b/test/config/test.permissionless.prover.config.json index 24d4fbc67f..0f879c9c17 100644 --- a/test/config/test.permissionless.prover.config.json +++ b/test/config/test.permissionless.prover.config.json @@ -84,6 +84,6 @@ "ECRecoverPrecalc": true, "ECRecoverPrecalcNThreads": 16, - "dbMultiWriteSinglePosition": true + "dbMultiWriteSinglePosition": false } diff --git a/test/config/test.prover.config.json b/test/config/test.prover.config.json index 79d369b535..86f511f0c2 100644 --- a/test/config/test.prover.config.json +++ b/test/config/test.prover.config.json @@ -86,6 +86,6 @@ "ECRecoverPrecalc": true, "ECRecoverPrecalcNThreads": 16, - "dbMultiWriteSinglePosition": true + "dbMultiWriteSinglePosition": false }