Skip to content

Commit

Permalink
Broadcast URI (0xPolygonHermez#1722)
Browse files Browse the repository at this point in the history
* Broadcast URI

* fix synchronizer test

* fix

* check if reorg in sequencer
  • Loading branch information
ARR552 authored Mar 7, 2023
1 parent ba50430 commit 4738fac
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 29 deletions.
11 changes: 6 additions & 5 deletions sequencer/broadcast/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,21 @@ import (
)

// NewClient creates a grpc client to communicates with the Broadcast server
func NewClient(ctx context.Context, serverAddress string) (pb.BroadcastServiceClient, *grpc.ClientConn, context.CancelFunc) {
func NewClient(ctx context.Context, serverAddress string) (pb.BroadcastServiceClient, *grpc.ClientConn, context.CancelFunc, error) {
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
}
const maxWaitSeconds = 120
ctx, cancel := context.WithTimeout(ctx, maxWaitSeconds*time.Second)
ctx2, cancel := context.WithTimeout(ctx, maxWaitSeconds*time.Second)
log.Infof("connecting to broadcast service: %v", serverAddress)
conn, err := grpc.DialContext(ctx, serverAddress, opts...)
conn, err := grpc.DialContext(ctx2, serverAddress, opts...)
if err != nil {
log.Fatalf("failed to connect to broadcast service: %v", err)
log.Errorf("failed to connect to broadcast service: %v", err)
return nil, nil, cancel, err
}
client := pb.NewBroadcastServiceClient(conn)
log.Info("connected to broadcast service")

return client, conn, cancel
return client, conn, cancel, nil
}
34 changes: 22 additions & 12 deletions sequencer/dbmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ func newDBManager(ctx context.Context, txPool txPool, state dbManagerStateInterf
// Start stars the dbManager routines
func (d *dbManager) Start() {
go d.loadFromPool()
go func() {
for {
// TODO: Move this to a config parameter
time.Sleep(wait * time.Second)
d.checkIfReorg()
}
}()
go d.storeProcessedTxAndDeleteFromPool()
}

Expand Down Expand Up @@ -99,24 +106,26 @@ func (d *dbManager) CreateFirstBatch(ctx context.Context, sequencerAddress commo
return processingCtx
}

// checkIfReorg checks if a reorg has happened
func (d *dbManager) checkIfReorg() {
numberOfReorgs, err := d.state.CountReorgs(d.ctx, nil)
if err != nil {
log.Error("failed to get number of reorgs: %v", err)
}

if numberOfReorgs != d.numberOfReorgs {
log.Warnf("New L2 reorg detected")
d.l2ReorgCh <- L2ReorgEvent{}
d.txsStore.Wg.Done()
}
}

// loadFromPool keeps loading transactions from the pool
func (d *dbManager) loadFromPool() {
for {
// TODO: Move this to a config parameter
time.Sleep(wait * time.Second)

numberOfReorgs, err := d.state.CountReorgs(d.ctx, nil)
if err != nil {
log.Error("failed to get number of reorgs: %v", err)
}

if numberOfReorgs != d.numberOfReorgs {
log.Warnf("New L2 reorg detected")
d.l2ReorgCh <- L2ReorgEvent{}
d.txsStore.Wg.Done()
continue
}

poolTransactions, err := d.txPool.GetNonWIPPendingTxs(d.ctx, false, 0)
if err != nil && err != pgpoolstorage.ErrNotFound {
log.Errorf("load tx from pool: %v", err)
Expand Down Expand Up @@ -172,6 +181,7 @@ func (d *dbManager) storeProcessedTxAndDeleteFromPool() {
// TODO: Finish the retry mechanism and error handling
for {
txToStore := <-d.txsStore.Ch
d.checkIfReorg()
log.Debugf("Storing tx %v", txToStore.txResponse.TxHash)
dbTx, err := d.BeginStateTransaction(d.ctx)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ func (s *State) sendBatchRequestToExecutor(ctx context.Context, processBatchRequ
if caller != DiscardCallerLabel {
metrics.ExecutorProcessingTime(string(caller), elapsed)
}
log.Infof("It took %v for the executor to process the request", elapsed)
log.Infof("Batch: %d took %v to be processed by the executor ", processBatchRequest.OldBatchNum+1, elapsed)

return res, err
}
Expand Down
36 changes: 28 additions & 8 deletions synchronizer/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type ClientSynchronizer struct {
state stateInterface
pool poolInterface
ethTxManager ethTxManager
broadcastURI string
ctx context.Context
cancelCtx context.CancelFunc
genesis state.Genesis
Expand All @@ -52,6 +53,19 @@ func NewSynchronizer(
cfg Config) (Synchronizer, error) {
ctx, cancel := context.WithCancel(context.Background())

var broadcastURI string
if !isTrustedSequencer {
var err error
log.Debug("Getting broadcast URI")
broadcastURI, err = getBroadcastURI(ethMan)
if err != nil {
log.Errorf("error getting broadcast URI. Error: %v", err)
cancel()
return nil, err
}
log.Debug("broadcastURI ", broadcastURI)
}

return &ClientSynchronizer{
isTrustedSequencer: isTrustedSequencer,
state: st,
Expand All @@ -60,6 +74,7 @@ func NewSynchronizer(
ctx: ctx,
cancelCtx: cancel,
ethTxManager: ethTxManager,
broadcastURI: broadcastURI,
genesis: genesis,
cfg: cfg,
}, nil
Expand Down Expand Up @@ -157,7 +172,6 @@ func (s *ClientSynchronizer) Sync() error {
log.Warn("error syncing trusted state. Error: ", err)
continue
}
log.Info("Trusted state fully synchronized")
waitDuration = s.cfg.SyncInterval.Duration
}
}
Expand Down Expand Up @@ -265,25 +279,25 @@ func (s *ClientSynchronizer) syncTrustedState(latestSyncedBatch uint64) error {
return nil
}

log.Debug("Getting broadcast URI")
broadcastURI, err := s.getBroadcastURI()
broadcastClient, _, cancel, err := broadcast.NewClient(s.ctx, s.broadcastURI)
if err != nil {
log.Errorf("error getting broadcast URI. Error: %v", err)
log.Warn("error connecting to the broadcast. Error: ", err)
cancel()
return err
}
log.Debug("broadcastURI ", broadcastURI)
broadcastClient, _, _ := broadcast.NewClient(s.ctx, broadcastURI)

log.Info("Getting trusted state info")
lastTrustedStateBatch, err := broadcastClient.GetLastBatch(s.ctx, &emptypb.Empty{})
if err != nil {
log.Warn("error syncing trusted state. Error: ", err)
cancel()
return err
}

log.Debug("lastTrustedStateBatch.BatchNumber ", lastTrustedStateBatch.BatchNumber)
log.Debug("latestSyncedBatch ", latestSyncedBatch)
if lastTrustedStateBatch.BatchNumber < latestSyncedBatch {
cancel()
return nil
}

Expand All @@ -292,17 +306,20 @@ func (s *ClientSynchronizer) syncTrustedState(latestSyncedBatch uint64) error {
batchToSync, err := broadcastClient.GetBatch(s.ctx, &pb.GetBatchRequest{BatchNumber: batchNumberToSync})
if err != nil {
log.Warnf("failed to get batch %v from trusted state via broadcast. Error: %v", batchNumberToSync, err)
cancel()
return err
}

dbTx, err := s.state.BeginStateTransaction(s.ctx)
if err != nil {
log.Errorf("error creating db transaction to sync trusted batch %v: %v", batchNumberToSync, err)
cancel()
return err
}

if err := s.processTrustedBatch(batchToSync, dbTx); err != nil {
log.Errorf("error processing trusted batch %v: %v", batchNumberToSync, err)
cancel()
err := dbTx.Rollback(s.ctx)
if err != nil {
log.Errorf("error rolling back db transaction to sync trusted batch %v: %v", batchNumberToSync, err)
Expand All @@ -313,19 +330,22 @@ func (s *ClientSynchronizer) syncTrustedState(latestSyncedBatch uint64) error {

if err := dbTx.Commit(s.ctx); err != nil {
log.Errorf("error committing db transaction to sync trusted batch %v: %v", batchNumberToSync, err)
cancel()
return err
}

batchNumberToSync++
}

cancel()
log.Info("Trusted state fully synchronized")
return nil
}

// gets the broadcast URI from trusted sequencer JSON RPC server
func (s *ClientSynchronizer) getBroadcastURI() (string, error) {
func getBroadcastURI(etherMan ethermanInterface) (string, error) {
log.Debug("getting trusted sequencer URL from smc")
trustedSequencerURL, err := s.etherMan.GetTrustedSequencerURL()
trustedSequencerURL, err := etherMan.GetTrustedSequencerURL()
if err != nil {
return "", err
}
Expand Down
28 changes: 26 additions & 2 deletions synchronizer/synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package synchronizer

import (
context "context"
"fmt"
"math/big"
"net/http"
"net/http/httptest"
"strconv"
"testing"
"time"
Expand All @@ -28,6 +31,11 @@ type mocks struct {
}

func TestTrustedStateReorg(t *testing.T) {
data := `{"jsonrpc":"2.0","id":1,"result":"zkevm-broadcast:61090"}`
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, data)
}))
defer svr.Close()
type testCase struct {
Name string
getTrustedBatch func(*mocks, context.Context, etherman.SequencedBatch) *state.Batch
Expand All @@ -41,7 +49,13 @@ func TestTrustedStateReorg(t *testing.T) {
SyncChunkSize: 10,
GenBlockNumber: uint64(123456),
}
sync, err := NewSynchronizer(true, m.Etherman, m.State, m.Pool, m.EthTxManager, genesis, cfg)

m.Etherman.
On("GetTrustedSequencerURL").
Return(svr.URL, nil).
Once()

sync, err := NewSynchronizer(false, m.Etherman, m.State, m.Pool, m.EthTxManager, genesis, cfg)
require.NoError(t, err)

// state preparation
Expand Down Expand Up @@ -337,6 +351,11 @@ func TestTrustedStateReorg(t *testing.T) {
}

func TestForcedBatch(t *testing.T) {
data := `{"jsonrpc":"2.0","id":1,"result":"zkevm-broadcast:61090"}`
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, data)
}))
defer svr.Close()
genesis := state.Genesis{}
cfg := Config{
SyncInterval: cfgTypes.Duration{Duration: 1 * time.Second},
Expand All @@ -351,7 +370,12 @@ func TestForcedBatch(t *testing.T) {
DbTx: newDbTxMock(t),
}

sync, err := NewSynchronizer(true, m.Etherman, m.State, m.Pool, m.EthTxManager, genesis, cfg)
m.Etherman.
On("GetTrustedSequencerURL").
Return(svr.URL, nil).
Once()

sync, err := NewSynchronizer(false, m.Etherman, m.State, m.Pool, m.EthTxManager, genesis, cfg)
require.NoError(t, err)

// state preparation
Expand Down
3 changes: 2 additions & 1 deletion test/e2e/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ func TestBroadcast(t *testing.T) {

require.NoError(t, populateDB(ctx, st))

client, conn, cancel := broadcast.NewClient(ctx, serverAddress)
client, conn, cancel, err := broadcast.NewClient(ctx, serverAddress)
require.NoError(t, err)
defer func() {
cancel()
require.NoError(t, conn.Close())
Expand Down

0 comments on commit 4738fac

Please sign in to comment.