Skip to content

Commit

Permalink
implemented new aggregator proto support (0xPolygonHermez#355)
Browse files Browse the repository at this point in the history
* implemented new aggregator proto support

* adding eth timestamp

* adapted to the new proto e2e and etherman

* fixed converters.go

* mocks regenerated

* merged with dev

* deleted redundant time multiplier in aggr config.go

* added to config.local.toml IntervalFrequencyToGetProofGenerationStateInSeconds parameter

* merged with dev

* deleted etherman mock

* updated inputs in aggregator

* added IntervalFrequencyToGetProofGenerationStateInSeconds to default.go

* adding mock prover

* fixed GetProof method, fixed aggregator

* updated docker-compose.yml

* fixed aggregator.go

* updated docker-compose.yml

* updated docker-compose.yml

* trying bigger deadline

* trying bigger deadline

* returned deadlines back

* updated docker compose and fixed pr comments

* added docker push for mock prover

* added zk-prover generated files
  • Loading branch information
Mikelle authored Mar 18, 2022
1 parent 4340ae9 commit d4cc28f
Show file tree
Hide file tree
Showing 30 changed files with 5,566 additions and 640 deletions.
9 changes: 8 additions & 1 deletion .github/workflows/push-docker-develop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,11 @@ jobs:
uses: docker/build-push-action@v2
with:
push: true
tags: hermeznetwork/hermez-node-zkevm:develop
tags: hermeznetwork/hermez-node-zkevm:develop
- name: Build and push prover mock
id: docker_build
uses: docker/build-push-action@v2
with:
context: ./proverservice
push: true
tags: hermeznetwork/hez-mock-prover
9 changes: 8 additions & 1 deletion .github/workflows/push-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,11 @@ jobs:
uses: docker/build-push-action@v2
with:
push: true
tags: hermeznetwork/hermez-node-zkevm
tags: hermeznetwork/hermez-node-zkevm
- name: Build and push prover mock
id: docker_build
uses: docker/build-push-action@v2
with:
context: ./proverservice
push: true
tags: hermeznetwork/hez-mock-prover
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ generate-mocks: ## Generates mocks for the tests, using mockery tool
.PHONY: generate-code-from-proto
generate-code-from-proto: ## Generates code from proto files
cd proto/src/proto/mt/v1 && protoc --proto_path=. --go_out=../../../../../state/tree/pb --go-grpc_out=../../../../../state/tree/pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative mt.proto
cd proto/src/proto/zkprover/v1 && protoc --proto_path=. --go_out=../../../../../proverclient --go-grpc_out=../../../../../proverclient --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative zk-prover.proto
cd proto/src/proto/zkprover/v1 && protoc --proto_path=. --go_out=../../../../../proverservice/pb --go-grpc_out=../../../../../proverservice/pb --go-grpc_opt=paths=source_relative --go_opt=paths=source_relative zk-prover.proto

.PHONY: update-external-dependencies
update-external-dependencies: ## Updates external dependencies like images, test vectors or proto files
Expand Down
173 changes: 103 additions & 70 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ import (
"encoding/binary"
"fmt"
"math/big"
"strconv"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/hermeznetwork/hermez-core/encoding"
"github.com/hermeznetwork/hermez-core/hex"
"github.com/hermeznetwork/hermez-core/log"
"github.com/hermeznetwork/hermez-core/proverclient"
Expand All @@ -27,7 +25,7 @@ type Aggregator struct {

State stateInterface
EtherMan etherman
ZkProverClient proverclient.ZKProverClient
ZkProverClient proverclient.ZKProverServiceClient

ProfitabilityChecker aggregatorTxProfitabilityChecker

Expand All @@ -40,7 +38,7 @@ func NewAggregator(
cfg Config,
state stateInterface,
ethMan etherman,
zkProverClient proverclient.ZKProverClient,
zkProverClient proverclient.ZKProverServiceClient,
) (Aggregator, error) {
ctx, cancel := context.WithCancel(context.Background())

Expand Down Expand Up @@ -71,16 +69,12 @@ func (a *Aggregator) Start() {
// this is a batches, that were sent to ethereum to consolidate
batchesSent := make(map[uint64]bool)

// define those vars here, bcs it can be used in case <-a.ctx.Done()
var getProofCtx context.Context
var getProofCtxCancel context.CancelFunc
for {
select {
case <-time.After(a.cfg.IntervalToConsolidateState.Duration):
// init connection to the prover
var opts []grpc.CallOption
getProofClient, err := a.ZkProverClient.GenProof(a.ctx, opts...)
if err != nil {
log.Errorf("failed to connect to the prover, err: %v", err)
continue
}

// 1. check, if state is synced
lastConsolidatedBatch, err := a.State.GetLastBatch(a.ctx, false)
Expand Down Expand Up @@ -144,92 +138,125 @@ func (a *Aggregator) Start() {
continue
}

rawTxs := batchToConsolidate.RawTxsData
// Split transactions
var pos int64
var txs []string
const (
headerByteLength = 2
sLength = 32
rLength = 32
vLength = 1
)
for pos < int64(len(rawTxs)) {
length := rawTxs[pos : pos+1]
str := hex.EncodeToString(length)
num, err := strconv.ParseInt(str, hex.Base, encoding.BitSize64)
if err != nil {
log.Debug("error parsing header length: ", err)
txs = []string{}
break
}
// First byte is the length and must be ignored
const c0 = 192 // 192 is c0. This value is defined by the rlp protocol
num = num - c0 - 1

fullDataTx := rawTxs[pos : pos+num+rLength+sLength+vLength+headerByteLength]

pos = pos + num + rLength + sLength + vLength + headerByteLength

txs = append(txs, "0x"+hex.EncodeToString(fullDataTx))
}
log.Debug("Txs: ", txs)

// TODO: consider putting chain id to the batch, so we will get rid of additional request to db
seq, err := a.State.GetSequencer(a.ctx, batchToConsolidate.Sequencer)
if err != nil {
log.Warnf("failed to get sequencer from the state, addr: %s, err: %v", seq.Address, err)
continue
}
chainID := uint32(seq.ChainID.Uint64())

rawTxs := hex.EncodeToHex(batchToConsolidate.RawTxsData)
// TODO: change this, once we have dynamic exit root
globalExitRoot := common.HexToHash("0xa116e19a7984f21055d07b606c55628a5ffbf8ae1261c1e9f4e3a61620cf810a")
oldLocalExitRoot := common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000")
newLocalExitRoot := common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000")
fakeKeys := map[string]string{
db := map[string]string{
"0540ae2a259cb9179561cffe6a0a3852a2c1806ad894ed396a2ef16e1f10e9c7": "0000000000000000000000000000000000000000000000056bc75e2d63100000",
"061927dd2a72763869c1d5d9336a42d12a9a2f22809c9cf1feeb2a6d1643d950": "0000000000000000000000000000000000000000000000000000000000000000",
"03ae74d1bbdff41d14f155ec79bb389db716160c1766a49ee9c9707407f80a11": "00000000000000000000000000000000000000000000000ad78ebc5ac6200000",
"18d749d7bcc2bc831229c19256f9e933c08b6acdaff4915be158e34cbbc8a8e1": "0000000000000000000000000000000000000000000000000000000000000000",
}
batchHashData := common.BytesToHash(keccak256.Hash(batchToConsolidate.RawTxsData, globalExitRoot[:]))

batchChainIDByte := make([]byte, 4)
blockTimestampByte := make([]byte, 8)
binary.BigEndian.PutUint32(batchChainIDByte, uint32(batchToConsolidate.ChainID.Uint64()))
binary.BigEndian.PutUint64(blockTimestampByte, uint64(batchToConsolidate.ReceivedAt.Unix()))
batchHashData := common.BytesToHash(keccak256.Hash(
batchToConsolidate.RawTxsData,
globalExitRoot[:],
blockTimestampByte,
batchToConsolidate.Sequencer[:],
batchChainIDByte,
))
oldStateRoot := common.BytesToHash(stateRootConsolidated)
newStateRoot := common.BytesToHash(stateRootToConsolidate)
inputProver := &proverclient.InputProver{
Message: "calculate",
PublicInputs: &proverclient.PublicInputs{
OldStateRoot: oldStateRoot.String(),
OldLocalExitRoot: oldLocalExitRoot.String(),
NewStateRoot: newStateRoot.String(),
NewLocalExitRoot: newLocalExitRoot.String(),
SequencerAddr: batchToConsolidate.Sequencer.String(),
BatchHashData: batchHashData.String(),
ChainId: chainID,
ChainId: uint32(batchToConsolidate.ChainID.Uint64()),
BatchNum: uint32(batchToConsolidate.Number().Uint64()),
BlockNum: uint32(batchToConsolidate.BlockNumber),
EthTimestamp: uint64(batchToConsolidate.ReceivedAt.Unix()),
},
GlobalExitRoot: globalExitRoot.String(),
Txs: txs,
Keys: fakeKeys,
GlobalExitRoot: globalExitRoot.String(),
BatchL2Data: rawTxs,
Db: db,
ContractsBytecode: db,
}
log.Debugf("Data sent to the prover: %+v", inputProver)
err = getProofClient.Send(inputProver)

genProofRequest := proverclient.GenProofRequest{Input: inputProver}

// init connection to the prover
var opts []grpc.CallOption
resGenProof, err := a.ZkProverClient.GenProof(a.ctx, &genProofRequest, opts...)
if err != nil {
log.Warnf("failed to send batch to the prover, batchNumber: %v, err: %v", batchToConsolidate.Number().Uint64(), err)
log.Errorf("failed to connect to the prover to gen proof, err: %v", err)
continue
}
proofState, err := getProofClient.Recv()

log.Debugf("Data sent to the prover: %+v", inputProver)
genProofRes := resGenProof.GetResult()
if genProofRes != proverclient.GenProofResponse_RESULT_GEN_PROOF_OK {
log.Warnf("failed to get result from the prover, batchNumber: %d, err: %v", batchToConsolidate.Number().Uint64())
continue
}
genProofID := resGenProof.GetId()

resGetProof := &proverclient.GetProofResponse{
Result: -1,
}
getProofCtx, getProofCtxCancel = context.WithCancel(a.ctx)
getProofClient, err := a.ZkProverClient.GetProof(getProofCtx)
if err != nil {
log.Warnf("failed to get proof from the prover, batchNumber: %v, err: %v", batchToConsolidate.Number().Uint64(), err)
log.Warnf("failed to init getProofClient, batchNumber: %d, err: %v", batchToConsolidate.Number().Uint64(), err)
continue
}
for resGetProof.Result != proverclient.GetProofResponse_RESULT_GET_PROOF_COMPLETED_OK {
err = getProofClient.Send(&proverclient.GetProofRequest{
Id: genProofID,
})
if err != nil {
log.Warnf("failed to send get proof request to the prover, batchNumber: %d, err: %v", batchToConsolidate.Number().Uint64(), err)
break
}

// Calc inpuntHash
batchChainIDByte := make([]byte, 4)
resGetProof, err = getProofClient.Recv()
if err != nil {
log.Warnf("failed to get proof from the prover, batchNumber: %d, err: %v", batchToConsolidate.Number().Uint64(), err)
break
}

resGetProofState := resGetProof.GetResult()
if resGetProofState == proverclient.GetProofResponse_RESULT_GET_PROOF_ERROR ||
resGetProofState == proverclient.GetProofResponse_RESULT_GET_PROOF_COMPLETED_ERROR {
log.Fatalf("failed to get a proof for batch, batch number %d", batchToConsolidate.Number().Uint64())
}
if resGetProofState == proverclient.GetProofResponse_RESULT_GET_PROOF_INTERNAL_ERROR {
log.Warnf("failed to generate proof for batch, batchNumber: %v, ResGetProofState: %v", batchToConsolidate.Number().Uint64(), resGetProofState)
break
}

if resGetProofState == proverclient.GetProofResponse_RESULT_GET_PROOF_CANCEL {
log.Warnf("proof generation was cancelled, batchNumber: %v", batchToConsolidate.Number().Uint64())
break
}

if resGetProofState == proverclient.GetProofResponse_RESULT_GET_PROOF_PENDING {
// in this case aggregator will wait, to send another request
time.Sleep(a.cfg.IntervalFrequencyToGetProofGenerationStateInSeconds.Duration)
}
}

// getProofCtxCancel call closes the connection stream with the prover. This is the only way to close it by client
getProofCtxCancel()

if resGetProof.GetResult() != proverclient.GetProofResponse_RESULT_GET_PROOF_COMPLETED_OK {
continue
}

// Calc inputHash
batchNumberByte := make([]byte, 4)
binary.BigEndian.PutUint32(batchChainIDByte, inputProver.PublicInputs.ChainId)
blockNumberByte := make([]byte, 4)
binary.BigEndian.PutUint32(batchNumberByte, inputProver.PublicInputs.BatchNum)

binary.BigEndian.PutUint32(blockNumberByte, inputProver.PublicInputs.BlockNum)
hash := keccak256.Hash(
oldStateRoot[:],
oldLocalExitRoot[:],
Expand All @@ -239,17 +266,20 @@ func (a *Aggregator) Start() {
batchHashData[:],
batchChainIDByte[:],
batchNumberByte[:],
blockNumberByte[:],
blockTimestampByte[:],
)
frB, _ := new(big.Int).SetString(fr, 10)
inputHashMod := new(big.Int).Mod(new(big.Int).SetBytes(hash), frB)
internalInputHash := inputHashMod.Bytes()

// InputHash must match
internalInputHashS := fmt.Sprintf("0x%064s", hex.EncodeToString(internalInputHash))
if proofState.Proof.PublicInputsExtended.InputHash != internalInputHashS {
log.Error("inputHash received from the prover (", proofState.Proof.PublicInputsExtended.InputHash,
publicInputsExtended := resGetProof.GetPublic()
if resGetProof.GetPublic().InputHash != internalInputHashS {
log.Error("inputHash received from the prover (", publicInputsExtended.InputHash,
") doesn't match with the internal value: ", internalInputHashS)
log.Debug("internalBatchHashData: ", batchHashData, " externalBatchHashData: ", proofState.Proof.PublicInputsExtended.PublicInputs.BatchHashData)
log.Debug("internalBatchHashData: ", batchHashData, " externalBatchHashData: ", publicInputsExtended.PublicInputs.BatchHashData)
log.Debug("inputProver.PublicInputs.OldStateRoot: ", inputProver.PublicInputs.OldStateRoot)
log.Debug("inputProver.PublicInputs.OldLocalExitRoot:", inputProver.PublicInputs.OldLocalExitRoot)
log.Debug("inputProver.PublicInputs.NewStateRoot: ", inputProver.PublicInputs.NewStateRoot)
Expand All @@ -258,11 +288,13 @@ func (a *Aggregator) Start() {
log.Debug("inputProver.PublicInputs.BatchHashData: ", inputProver.PublicInputs.BatchHashData)
log.Debug("inputProver.PublicInputs.ChainId: ", inputProver.PublicInputs.ChainId)
log.Debug("inputProver.PublicInputs.BatchNum: ", inputProver.PublicInputs.BatchNum)
log.Debug("inputProver.PublicInputs.BlockNum: ", inputProver.PublicInputs.BlockNum)
log.Debug("inputProver.PublicInputs.EthTimestamp: ", inputProver.PublicInputs.EthTimestamp)
}

// 4. send proof + txs to the SC
batchNum := new(big.Int).SetUint64(batchToConsolidate.Number().Uint64())
h, err := a.EtherMan.ConsolidateBatch(batchNum, proofState.Proof)
h, err := a.EtherMan.ConsolidateBatch(batchNum, resGetProof)
if err != nil {
log.Warnf("failed to send request to consolidate batch to ethereum, batch number: %d, err: %v",
batchToConsolidate.Number().Uint64(), err)
Expand All @@ -272,6 +304,7 @@ func (a *Aggregator) Start() {

log.Infof("Batch %d consolidated: %s", batchToConsolidate.Number().Uint64(), h.Hash())
case <-a.ctx.Done():
getProofCtxCancel()
return
}
}
Expand Down
4 changes: 4 additions & 0 deletions aggregator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ type Config struct {
// trying to consolidate a new state
IntervalToConsolidateState Duration `mapstructure:"IntervalToConsolidateState"`

// IntervalFrequencyToGetProofGenerationStateInSeconds is the time the aggregator waits until
// trying to get proof generation status, in case prover client returns PENDING state
IntervalFrequencyToGetProofGenerationStateInSeconds Duration `mapstructure:"IntervalFrequencyToGetProofGenerationStateInSeconds"`

// TxProfitabilityCheckerType type for checking is it profitable for aggregator to validate batch
// possible values: base/acceptall
TxProfitabilityCheckerType TxProfitabilityCheckerType `mapstructure:"TxProfitabilityCheckerType"`
Expand Down
2 changes: 1 addition & 1 deletion aggregator/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
// etherman contains the methods required to interact with
// ethereum.
type etherman interface {
ConsolidateBatch(batchNum *big.Int, proof *proverclient.Proof) (*types.Transaction, error)
ConsolidateBatch(batchNum *big.Int, proof *proverclient.GetProofResponse) (*types.Transaction, error)
}

// aggregatorTxProfitabilityChecker interface for different profitability
Expand Down
6 changes: 3 additions & 3 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func newEtherman(c config.Config) (*etherman.Client, error) {
return etherman, nil
}

func newProverClient(c proverclient.Config) (proverclient.ZKProverClient, *grpc.ClientConn) {
func newProverClient(c proverclient.Config) (proverclient.ZKProverServiceClient, *grpc.ClientConn) {
opts := []grpc.DialOption{
// TODO: once we have user and password for prover server, change this
grpc.WithTransportCredentials(insecure.NewCredentials()),
Expand All @@ -157,7 +157,7 @@ func newProverClient(c proverclient.Config) (proverclient.ZKProverClient, *grpc.
log.Fatalf("fail to dial: %v", err)
}

proverClient := proverclient.NewZKProverClient(proverConn)
proverClient := proverclient.NewZKProverServiceClient(proverConn)
return proverClient, proverConn
}

Expand Down Expand Up @@ -218,7 +218,7 @@ func createSequencer(c sequencer.Config, etherman *etherman.Client, pool *pool.P
return seq
}

func runAggregator(c aggregator.Config, etherman *etherman.Client, proverclient proverclient.ZKProverClient, state *state.State) {
func runAggregator(c aggregator.Config, etherman *etherman.Client, proverclient proverclient.ZKProverServiceClient, state *state.State) {
agg, err := aggregator.NewAggregator(c, state, etherman, proverclient)
if err != nil {
log.Fatal(err)
Expand Down
1 change: 1 addition & 0 deletions config/config.debug.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ SyncedBlockDif = 1

[Aggregator]
IntervalToConsolidateState = "10s"
IntervalFrequencyToGetProofGenerationStateInSeconds = "5s"
TxProfitabilityCheckerType = "acceptall"
TxProfitabilityMinReward = "1.1"

Expand Down
1 change: 1 addition & 0 deletions config/config.local.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ Type = "default"
DefaultGasPriceWei = 1000000000

[Aggregator]
IntervalFrequencyToGetProofGenerationStateInSeconds = "5s"
IntervalToConsolidateState = "10s"
TxProfitabilityCheckerType = "acceptall"
TxProfitabilityMinReward = "1.1"
Expand Down
1 change: 1 addition & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ SyncedBlockDif = 1
RewardPercentageToAggregator = 50
[Aggregator]
IntervalFrequencyToGetProofGenerationStateInSeconds = "5s"
IntervalToConsolidateState = "3s"
TxProfitabilityCheckerType = "acceptall"
TxProfitabilityMinReward = "1.1"
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ services:

hez-prover:
container_name: hez-prover
image: hermeznetwork/zk-mock-prover@sha256:4f4e1c3f3006d96401314b3988c3b2b9bdda8a61cc7166e4016245ab2033c3a6
image: hermeznetwork/hez-mock-prover@sha256:5eab47124ac1943fa6380035cf87586bd9dfea9d7e53e606e14f7d671e4bdb7f
ports:
- 50051:50051
environment:
Expand Down
Loading

0 comments on commit d4cc28f

Please sign in to comment.