Skip to content

Commit

Permalink
seqv2: broadcast component (0xPolygonHermez#788)
Browse files Browse the repository at this point in the history
* Added methods equired in state for broadcast queries

* updated to use batch from state

* added broadcast server and client config

* run broadcast service component

* add component

* added e2e test

* Added readme about CI groups

* rename component broadcast -> broadcast-trusted-state

* updated to latest changes in statev2

* add max msg size for executor client
  • Loading branch information
fgimenez authored Jun 23, 2022
1 parent cbca44f commit 8157cf0
Show file tree
Hide file tree
Showing 18 changed files with 400 additions and 66 deletions.
11 changes: 11 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ DOCKERCOMPOSEAPPSEQ := hez-core-sequencer
DOCKERCOMPOSEAPPAGG := hez-core-agg
DOCKERCOMPOSEAPPRPC := hez-core-rpc
DOCKERCOMPOSEAPPSYNC := hez-core-sync
DOCKERCOMPOSEAPPBROADCAST := hez-core-broadcast
DOCKERCOMPOSEDB := hez-postgres
DOCKERCOMPOSENETWORK := hez-network
DOCKERCOMPOSEPROVER := hez-prover
Expand All @@ -15,6 +16,7 @@ RUNCORESEQ := $(DOCKERCOMPOSE) up -d $(DOCKERCOMPOSEAPPSEQ)
RUNCOREAGG := $(DOCKERCOMPOSE) up -d $(DOCKERCOMPOSEAPPAGG)
RUNCORERPC := $(DOCKERCOMPOSE) up -d $(DOCKERCOMPOSEAPPRPC)
RUNCORESYNC := $(DOCKERCOMPOSE) up -d $(DOCKERCOMPOSEAPPSYNC)
RUNCOREBROADCAST := $(DOCKERCOMPOSE) up -d $(DOCKERCOMPOSEAPPBROADCAST)

RUNNETWORK := $(DOCKERCOMPOSE) up -d $(DOCKERCOMPOSENETWORK)
RUNPROVER := $(DOCKERCOMPOSE) up -d $(DOCKERCOMPOSEPROVER)
Expand All @@ -28,6 +30,7 @@ STOPCORESEQ := $(DOCKERCOMPOSE) stop $(DOCKERCOMPOSEAPPSEQ) && $(DOCKERCOMPOSE)
STOPCOREAGG := $(DOCKERCOMPOSE) stop $(DOCKERCOMPOSEAPPAGG) && $(DOCKERCOMPOSE) rm -f $(DOCKERCOMPOSEAPPAGG)
STOPCORERPC := $(DOCKERCOMPOSE) stop $(DOCKERCOMPOSEAPPRPC) && $(DOCKERCOMPOSE) rm -f $(DOCKERCOMPOSEAPPRPC)
STOPCORESYNC := $(DOCKERCOMPOSE) stop $(DOCKERCOMPOSEAPPSYNC) && $(DOCKERCOMPOSE) rm -f $(DOCKERCOMPOSEAPPSYNC)
STOPCOREBROADCAST := $(DOCKERCOMPOSE) stop $(DOCKERCOMPOSEAPPBROADCAST) && $(DOCKERCOMPOSE) rm -f $(DOCKERCOMPOSEAPPBROADCAST)

STOPNETWORK := $(DOCKERCOMPOSE) stop $(DOCKERCOMPOSENETWORK) && $(DOCKERCOMPOSE) rm -f $(DOCKERCOMPOSENETWORK)
STOPPROVER := $(DOCKERCOMPOSE) stop $(DOCKERCOMPOSEPROVER) && $(DOCKERCOMPOSE) rm -f $(DOCKERCOMPOSEPROVER)
Expand Down Expand Up @@ -184,6 +187,14 @@ run: compile-scs ## Runs all the services
$(RUNCORESYNC)
$(RUNEXPLORER)

.PHONY: run-broadcast
run-broadcast: ## Runs the broadcast service
$(RUNCOREBROADCAST)

.PHONY: stop-broadcast
stop-broadcast: ## Stops the broadcast service
$(STOPCOREBROADCAST)

.PHONY: init-network
init-network: ## Initializes the network
go run ./scripts/init_network/main.go .
Expand Down
30 changes: 30 additions & 0 deletions ci/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# CI groups

In order to reduce the total time we spend in CI executions we are running the
end to end tests, lint and unit tests in parallel, so that the time of one
execution is roughly equal to the time spent on the longest parallel execution.

We have 3 different github actions workflows:
* `lint`, just runs the linter
* `test-full-non-e2e`, runs all non-e2e tests (unit, integration and functional)
* `test-e2e`, which uses a matrix strategy to run the e2e tests in 3 groups.

The e2e CI groups are defined in the `./ci/e2e-group{1,3}` directories. In each
directory we have symlinks that point to the actual e2e test to be executed (these
tests are defined under `./test/e2e`). The goal of these symlinks is keeping the
same code organization we have now while being able to run the costly e2e tests
in parallel on CI.

So, if for instance we have the following e2e tests defined:
* `./test/e2e/testA_test.go`
* `./test/e2e/testB_test.go`
* `./test/e2e/testC_test.go`
* `./test/e2e/testD_test.go`
and we want to run tests A and B in group1, test C in group 2 and test D in group 3
we would need to create these symlinks:
```
./ci/e2e-group1/testA_test.go -> ./test/e2e/testA_test.go
./ci/e2e-group1/testB_test.go -> ./test/e2e/testB_test.go
./ci/e2e-group2/testC_test.go -> ./test/e2e/testC_test.go
./ci/e2e-group3/testD_test.go -> ./test/e2e/testD_test.go
```
1 change: 1 addition & 0 deletions ci/e2e-group2/broadcast_test.go
1 change: 1 addition & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (
SEQUENCER = "sequencer"
RPC = "rpc"
SYNCHRONIZER = "synchronizer"
BROADCAST = "broadcast-trusted-state"
)

func main() {
Expand Down
35 changes: 35 additions & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,13 @@ import (
"github.com/hermeznetwork/hermez-core/proverclient"
proverclientpb "github.com/hermeznetwork/hermez-core/proverclient/pb"
"github.com/hermeznetwork/hermez-core/sequencer"
"github.com/hermeznetwork/hermez-core/sequencerv2/broadcast"
"github.com/hermeznetwork/hermez-core/sequencerv2/broadcast/pb"
"github.com/hermeznetwork/hermez-core/state"
"github.com/hermeznetwork/hermez-core/state/tree"
"github.com/hermeznetwork/hermez-core/statev2"
"github.com/hermeznetwork/hermez-core/statev2/runtime/executor"
executorclientpb "github.com/hermeznetwork/hermez-core/statev2/runtime/executor/pb"
"github.com/hermeznetwork/hermez-core/synchronizer"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -134,6 +139,12 @@ func start(ctx *cli.Context) error {
npool = pool.NewPool(poolDb, st, stateCfg.L2GlobalExitRootManagerAddr)
gpe = createGasPriceEstimator(c.GasPriceEstimator, st, npool)
go runSynchronizer(c.NetworkConfig, etherman, st, c.Synchronizer, gpe)
case BROADCAST:
log.Info("Running broadcast service")
stateDb := statev2.NewPostgresStorage(sqlDB)
executorClient, _ := newExecutorClient(c.Executor)
st := statev2.NewState(statev2.Config{}, stateDb, &executorClient)
go runBroadcastServer(c.BroadcastServer, st)
}
}

Expand Down Expand Up @@ -181,6 +192,21 @@ func newProverClient(c proverclient.Config) (proverclientpb.ZKProverServiceClien
return proverClient, proverConn
}

func newExecutorClient(c executor.Config) (executorclientpb.ExecutorServiceClient, *grpc.ClientConn) {
const maxMsgSize = 100000000
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)),
}
executorConn, err := grpc.Dial(c.URI, opts...)
if err != nil {
log.Fatalf("fail to dial: %v", err)
}

executorClient := executorclientpb.NewExecutorServiceClient(executorConn)
return executorClient, executorConn
}

func runSynchronizer(networkConfig config.NetworkConfig, etherman *etherman.Client, st *state.State, cfg synchronizer.Config, gpe gasPriceEstimator) {
genesisBlock, err := etherman.EtherClient.BlockByNumber(context.Background(), big.NewInt(0).SetUint64(networkConfig.GenBlockNumber))
if err != nil {
Expand Down Expand Up @@ -231,6 +257,15 @@ func runAggregator(c aggregator.Config, etherman *etherman.Client, proverclient
agg.Start()
}

func runBroadcastServer(c broadcast.ServerConfig, st *statev2.State) {
s := grpc.NewServer()

broadcastSrv := broadcast.NewServer(&c, st)
pb.RegisterBroadcastServiceServer(s, broadcastSrv)

broadcastSrv.Start()
}

// gasPriceEstimator interface for gas price estimator.
type gasPriceEstimator interface {
GetAvgGasPrice(ctx context.Context) (*big.Int, error)
Expand Down
7 changes: 7 additions & 0 deletions config/config.debug.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,10 @@ URI = "127.0.0.1:50060"

[Executor]
URI = "51.210.116.237:50071"

[BroadcastServer]
Host = "0.0.0.0"
Port = 61090

[BroadcastClient]
URI = "127.0.0.1:61090"
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/hermeznetwork/hermez-core/proverclient"
"github.com/hermeznetwork/hermez-core/sequencer"
"github.com/hermeznetwork/hermez-core/sequencerv2"
"github.com/hermeznetwork/hermez-core/sequencerv2/broadcast"
"github.com/hermeznetwork/hermez-core/state/tree"
"github.com/hermeznetwork/hermez-core/statev2/runtime/executor"
"github.com/hermeznetwork/hermez-core/synchronizer"
Expand Down Expand Up @@ -53,6 +54,8 @@ type Config struct {
MTServer tree.ServerConfig
MTClient tree.ClientConfig
Executor executor.Config
BroadcastServer broadcast.ServerConfig
BroadcastClient broadcast.ClientConfig
}

// Load loads the configuration
Expand Down
7 changes: 7 additions & 0 deletions config/config.local.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,10 @@ URI = "127.0.0.1:50060"

[Executor]
URI = "51.210.116.237:50071"

[BroadcastServer]
Host = "0.0.0.0"
Port = 61090

[BroadcastClient]
URI = "127.0.0.1:61090"
12 changes: 12 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,18 @@ func Test_Defaults(t *testing.T) {
path: "RPC.SequencerAddress",
expectedValue: "0x617b3a3528F9cDd6630fd3301B9c8911F7Bf063D",
},
{
path: "BroadcastServer.Host",
expectedValue: "0.0.0.0",
},
{
path: "BroadcastServer.Port",
expectedValue: 61090,
},
{
path: "BroadcastClient.URI",
expectedValue: "127.0.0.1:61090",
},
}

ctx := cli.NewContext(cli.NewApp(), flag.NewFlagSet("", flag.PanicOnError), nil)
Expand Down
7 changes: 7 additions & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,11 @@ URI = "127.0.0.1:50060"
[Executor]
URI = "51.210.116.237:50071"
[BroadcastServer]
Host = "0.0.0.0"
Port = 61090
[BroadcastClient]
URI = "127.0.0.1:61090"
`
25 changes: 21 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ services:
command:
- "/bin/sh"
- "-c"
- "/app/hezcore run --network local --cfg /app/config.toml --components sequencer"
- "/app/hezcore run --network local --cfg /app/config.toml --components sequencer"

hez-core-rpc:
container_name: hez-core-rpc
Expand All @@ -36,7 +36,7 @@ services:
- "/bin/sh"
- "-c"
- "/app/hezcore run --network local --cfg /app/config.toml --components rpc"

hez-core-agg:
container_name: hez-core-agg
image: hezcore
Expand All @@ -53,7 +53,7 @@ services:
- "/bin/sh"
- "-c"
- "/app/hezcore run --network local --cfg /app/config.toml --components aggregator"

hez-core-sync:
container_name: hez-core-sync
image: hezcore
Expand All @@ -70,7 +70,24 @@ services:
- "/bin/sh"
- "-c"
- "/app/hezcore run --network local --cfg /app/config.toml --components synchronizer"


hez-core-broadcast:
container_name: hez-core-broadcast
image: hezcore
environment:
- HERMEZCORE_DATABASE_USER=test_user
- HERMEZCORE_DATABASE_PASSWORD=test_password
- HERMEZCORE_DATABASE_NAME=test_db
- HERMEZCORE_DATABASE_HOST=hez-postgres
ports:
- 61090:61090
volumes:
- ./config/config.local.toml:/app/config.toml
command:
- "/bin/sh"
- "-c"
- "/app/hezcore run --network local --cfg /app/config.toml --components broadcast-trusted-state"

hez-postgres:
container_name: hez-postgres
image: postgres
Expand Down
15 changes: 3 additions & 12 deletions sequencerv2/broadcast/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,17 @@ package broadcast

import (
"context"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/hermeznetwork/hermez-core/statev2"
"github.com/jackc/pgx/v4"
)

// Consumer interfaces required by the package.

type stateInterface interface {
GetLastBatch(ctx context.Context, tx pgx.Tx) (*Batch, error)
GetBatchByNumber(ctx context.Context, batchNumber uint64, tx pgx.Tx) (*Batch, error)
GetLastBatch(ctx context.Context, tx pgx.Tx) (*statev2.Batch, error)
GetBatchByNumber(ctx context.Context, batchNumber uint64, tx pgx.Tx) (*statev2.Batch, error)
GetEncodedTransactionsByBatchNumber(ctx context.Context, batchNumber uint64, tx pgx.Tx) (encoded []string, err error)
}

// This should be moved into the state package

// Batch represents a Batch
type Batch struct {
BatchNumber uint64
GlobalExitRoot common.Hash
RawTxsData []byte
Timestamp time.Time
}
16 changes: 9 additions & 7 deletions sequencerv2/broadcast/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ import (

"github.com/hermeznetwork/hermez-core/log"
"github.com/hermeznetwork/hermez-core/sequencerv2/broadcast/pb"
"github.com/hermeznetwork/hermez-core/statev2"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
)

// Server provides the functionality of the MerkleTree service.
// Server provides the functionality of the Broadcast service.
type Server struct {
cfg *ServerConfig

Expand All @@ -20,7 +21,7 @@ type Server struct {
state stateInterface
}

// NewServer is the MT server constructor.
// NewServer is the Broadcast server constructor.
func NewServer(cfg *ServerConfig, state stateInterface) *Server {
return &Server{
cfg: cfg,
Expand All @@ -47,6 +48,7 @@ func (s *Server) Start() {
healthService := newHealthChecker()
grpc_health_v1.RegisterHealthServer(s.srv, healthService)

log.Infof("Server listening in %q", address)
if err := s.srv.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
Expand Down Expand Up @@ -75,8 +77,8 @@ func (s *Server) GetLastBatch(ctx context.Context, empty *pb.Empty) (*pb.GetBatc
return s.genericGetBatch(ctx, batch)
}

func (s *Server) genericGetBatch(ctx context.Context, batch *Batch) (*pb.GetBatchResponse, error) {
txs, err := s.state.GetEncodedTransactionsByBatchNumber(ctx, batch.BatchNumber, nil)
func (s *Server) genericGetBatch(ctx context.Context, batch *statev2.Batch) (*pb.GetBatchResponse, error) {
txs, err := s.state.GetEncodedTransactionsByBatchNumber(ctx, batch.BatchNum, nil)
if err != nil {
return nil, err
}
Expand All @@ -88,9 +90,9 @@ func (s *Server) genericGetBatch(ctx context.Context, batch *Batch) (*pb.GetBatc
}

return &pb.GetBatchResponse{
BatchNumber: batch.BatchNumber,
GlobalExitRoot: batch.GlobalExitRoot.String(),
Timestamp: uint64(batch.Timestamp.Unix()),
BatchNumber: batch.BatchNum,
GlobalExitRoot: batch.GlobalExitRootNum.String(),
Timestamp: uint64(batch.EthTimestamp.Unix()),
Transactions: transactions,
}, nil
}
Expand Down
Loading

0 comments on commit 8157cf0

Please sign in to comment.