Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
* Revert "add wait txs methods to state (0xPolygonHermez#1426)"

This reverts commit b37e38b.

* Revert "Hotfix/sync2 (0xPolygonHermez#1424)"

This reverts commit ef1d6c1.

---------

Co-authored-by: Toni Ramírez <[email protected]>
Co-authored-by: Arnau Bennassar <[email protected]>
  • Loading branch information
3 people authored May 25, 2023
1 parent b37e38b commit c26f089
Show file tree
Hide file tree
Showing 56 changed files with 1,327 additions and 750 deletions.
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,4 @@
/test/contracts/bin/**/*.abi

**/.DS_Store
.vscode
.idea/
.vscode
127 changes: 39 additions & 88 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,65 +33,41 @@ type Aggregator struct {
// NewAggregator creates a new aggregator
func NewAggregator(
cfg Config,
stateInterface stateInterface,
state stateInterface,
ethTxManager ethTxManager,
etherman etherman,
grpcClientConns []*grpc.ClientConn,
) (Aggregator, error) {
var profitabilityChecker aggregatorTxProfitabilityChecker
switch cfg.TxProfitabilityCheckerType {
case ProfitabilityBase:
profitabilityChecker = NewTxProfitabilityCheckerBase(stateInterface, cfg.IntervalAfterWhichBatchConsolidateAnyway.Duration, cfg.TxProfitabilityMinReward.Int)
profitabilityChecker = NewTxProfitabilityCheckerBase(state, cfg.IntervalAfterWhichBatchConsolidateAnyway.Duration, cfg.TxProfitabilityMinReward.Int)
case ProfitabilityAcceptAll:
profitabilityChecker = NewTxProfitabilityCheckerAcceptAll(stateInterface, cfg.IntervalAfterWhichBatchConsolidateAnyway.Duration)
profitabilityChecker = NewTxProfitabilityCheckerAcceptAll(state, cfg.IntervalAfterWhichBatchConsolidateAnyway.Duration)
}

proverClients := make([]proverClientInterface, 0, len(cfg.ProverURIs))
ctx := context.Background()

a := Aggregator{
cfg: cfg,

State: stateInterface,
EthTxManager: ethTxManager,
Ethman: etherman,
ProverClients: proverClients,
ProfitabilityChecker: profitabilityChecker,
}

for _, proverURI := range cfg.ProverURIs {
proverClient := prover.NewClient(proverURI, cfg.IntervalFrequencyToGetProofGenerationState)
proverClients = append(proverClients, proverClient)
grpcClientConns = append(grpcClientConns, proverClient.Prover.Conn)
log.Infof("Connected to prover %v", proverURI)
}

// Check if prover is already working in a proof generation
proof, err := stateInterface.GetWIPProofByProver(ctx, proverURI, nil)
if err != nil && err != state.ErrNotFound {
log.Errorf("Error while getting WIP proof for prover %v", proverURI)
continue
}
a := Aggregator{
cfg: cfg,

if proof != nil {
log.Infof("Resuming WIP proof generation for batchNumber %v in prover %v", proof.BatchNumber, *proof.Prover)
go func() {
a.resumeWIPProofGeneration(ctx, proof, proverClient)
}()
}
State: state,
EthTxManager: ethTxManager,
Ethman: etherman,
ProverClients: proverClients,
ProfitabilityChecker: profitabilityChecker,
}

a.ProverClients = proverClients

return a, nil
}

func (a *Aggregator) resumeWIPProofGeneration(ctx context.Context, proof *state.Proof, prover proverClientInterface) {
err := a.getAndStoreProof(ctx, proof, prover)
if err != nil {
log.Warnf("Could not resume WIP Proof Generation for prover %v and batchNumber %v", *proof.Prover, proof.BatchNumber)
}
}

// Start starts the aggregator
func (a *Aggregator) Start(ctx context.Context) {
// define those vars here, bcs it can be used in case <-a.ctx.Done()
Expand All @@ -100,13 +76,19 @@ func (a *Aggregator) Start(ctx context.Context) {
defer tickerVerifyBatch.Stop()
defer tickerSendVerifiedBatch.Stop()

// Delete proofs that where being generated during last reboot
err := a.State.DeleteUngeneratedProofs(ctx, nil)
if err != nil && err != state.ErrNotFound {
log.Warn("error deleting work in progress proofs from state")
}

for i := 0; i < len(a.ProverClients); i++ {
go func() {
for {
a.tryVerifyBatch(ctx, tickerVerifyBatch)
}
}()
time.Sleep(10 * time.Second) //nolint:gomnd
time.Sleep(time.Second)
}

go func() {
Expand Down Expand Up @@ -146,15 +128,10 @@ func (a *Aggregator) tryToSendVerifiedBatch(ctx context.Context, ticker *time.Ti
return
}

if proof != nil && proof.Proof != nil {
if proof != nil {
log.Infof("sending verified proof to the ethereum smart contract, batchNumber %d", batchNumberToVerify)
err := a.EthTxManager.VerifyBatch(batchNumberToVerify, proof.Proof)
if err != nil {
log.Errorf("proof for the batch was NOT sent, batchNumber: %v, error: %w", batchNumberToVerify, err)
} else {
log.Infof("proof for the batch was sent, batchNumber: %v", batchNumberToVerify)
}

a.EthTxManager.VerifyBatch(batchNumberToVerify, proof)
log.Infof("proof for the batch was sent, batchNumber: %v", batchNumberToVerify)
/*
err := a.State.DeleteGeneratedProof(ctx, batchNumberToVerify, nil)
if err != nil {
Expand All @@ -171,6 +148,7 @@ func (a *Aggregator) tryToSendVerifiedBatch(ctx context.Context, ticker *time.Ti

func (a *Aggregator) tryVerifyBatch(ctx context.Context, ticker *time.Ticker) {
log.Info("checking if network is synced")

for !a.isSynced(ctx) {
log.Infof("waiting for synchronizer to sync...")
waitTick(ctx, ticker)
Expand Down Expand Up @@ -213,6 +191,7 @@ func (a *Aggregator) tryVerifyBatch(ctx context.Context, ticker *time.Ticker) {
// Look for a free prover
for _, prover = range a.ProverClients {
if prover.IsIdle(ctx) {
log.Infof("Prover %s is going to be used for batchNumber: %d", prover.GetURI(), batchToVerify.BatchNumber)
idleProverFound = true
break
}
Expand All @@ -224,88 +203,60 @@ func (a *Aggregator) tryVerifyBatch(ctx context.Context, ticker *time.Ticker) {
return
}

proverURI := prover.GetURI()
proof := &state.Proof{BatchNumber: batchToVerify.BatchNumber, Prover: &proverURI, InputProver: inputProver}

// Avoid other thread to process the same batch
err = a.State.AddGeneratedProof(ctx, proof, nil)
err = a.State.AddGeneratedProof(ctx, batchToVerify.BatchNumber, nil, nil)
if err != nil {
log.Warnf("failed to create proof generation mark, err: %v", err)
log.Warnf("failed to store proof generation mark, err: %v", err)
waitTick(ctx, ticker)
return
}

log.Infof("Prover %s is going to be used for batchNumber: %d", prover.GetURI(), batchToVerify.BatchNumber)

genProofID, err := prover.GetGenProofID(ctx, inputProver)
if err != nil {
log.Warnf("failed to get gen proof id, err: %v", err)
err2 := a.State.DeleteGeneratedProof(ctx, proof.BatchNumber, nil)
err2 := a.State.DeleteGeneratedProof(ctx, batchToVerify.BatchNumber, nil)
if err2 != nil {
log.Errorf("failed to delete proof generation mark after error, err: %v", err2)
}
waitTick(ctx, ticker)
return
}

proof.ProofID = &genProofID
log.Infof("Proof ID for batchNumber %d: %v", batchToVerify.BatchNumber, genProofID)

// Avoid other thread to process the same batch
err = a.State.UpdateGeneratedProof(ctx, proof, nil)
if err != nil {
log.Warnf("failed to update proof generation mark, err: %v", err)
waitTick(ctx, ticker)
return
}

log.Infof("Proof ID for batchNumber %d: %v", proof.BatchNumber, *proof.ProofID)

err = a.getAndStoreProof(ctx, proof, prover)
if err != nil {
waitTick(ctx, ticker)
return
}
}

func (a *Aggregator) getAndStoreProof(ctx context.Context, proof *state.Proof, prover proverClientInterface) error {
resGetProof, err := prover.GetResGetProof(ctx, *proof.ProofID, proof.BatchNumber)
resGetProof, err := prover.GetResGetProof(ctx, genProofID, batchToVerify.BatchNumber)
if err != nil {
log.Warnf("failed to get proof from prover, err: %v", err)
err2 := a.State.DeleteGeneratedProof(ctx, proof.BatchNumber, nil)
err2 := a.State.DeleteGeneratedProof(ctx, batchToVerify.BatchNumber, nil)
if err2 != nil {
log.Errorf("failed to delete proof generation mark after error, err: %v", err2)
return err2
}
return err
waitTick(ctx, ticker)
return
}

proof.Proof = resGetProof

a.compareInputHashes(proof.InputProver, proof.Proof)
a.compareInputHashes(inputProver, resGetProof)

// Handle local exit root in the case of the mock prover
if proof.Proof.Public.PublicInputs.NewLocalExitRoot == "0x17c04c3760510b48c6012742c540a81aba4bca2f78b9d14bfd2f123e2e53ea3e" {
if resGetProof.Public.PublicInputs.NewLocalExitRoot == "0x17c04c3760510b48c6012742c540a81aba4bca2f78b9d14bfd2f123e2e53ea3e" {
// This local exit root comes from the mock, use the one captured by the executor instead
log.Warnf(
"NewLocalExitRoot looks like a mock value, using value from executor instead: %v",
proof.InputProver.PublicInputs.NewLocalExitRoot,
inputProver.PublicInputs.NewLocalExitRoot,
)
resGetProof.Public.PublicInputs.NewLocalExitRoot = proof.InputProver.PublicInputs.NewLocalExitRoot
resGetProof.Public.PublicInputs.NewLocalExitRoot = inputProver.PublicInputs.NewLocalExitRoot
}

// Store proof
err = a.State.UpdateGeneratedProof(ctx, proof, nil)
err = a.State.UpdateGeneratedProof(ctx, batchToVerify.BatchNumber, resGetProof, nil)
if err != nil {
log.Warnf("failed to store generated proof, err: %v", err)
err2 := a.State.DeleteGeneratedProof(ctx, proof.BatchNumber, nil)
err2 := a.State.DeleteGeneratedProof(ctx, batchToVerify.BatchNumber, nil)
if err2 != nil {
log.Errorf("failed to delete proof generation mark after error, err: %v", err2)
return err2
}
return err
waitTick(ctx, ticker)
return
}

return nil
}

func (a *Aggregator) isSynced(ctx context.Context) bool {
Expand Down
4 changes: 2 additions & 2 deletions aggregator/aggregator_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,8 @@ func TestAggregatorFlow(t *testing.T) {
proverClient.On("GetURI", mock.Anything).Return("mockProver:MockPort")
st.On("GetLastVerifiedBatch", mock.Anything, nil).Return(verifiedBatch, nil)
st.On("GetGeneratedProofByBatchNumber", mock.Anything, verifiedBatch.BatchNumber+1, nil).Return(nil, state.ErrNotFound)
st.On("AddGeneratedProof", mock.Anything, mock.Anything, nil).Return(nil)
st.On("UpdateGeneratedProof", mock.Anything, mock.Anything, nil).Return(nil)
st.On("AddGeneratedProof", mock.Anything, mock.Anything, mock.Anything, nil).Return(nil)
st.On("UpdateGeneratedProof", mock.Anything, mock.Anything, mock.Anything, nil).Return(nil)
etherman.On("GetLatestVerifiedBatchNum").Return(uint64(1), nil)
etherman.On("GetPublicAddress").Return(aggrAddress)

Expand Down
9 changes: 4 additions & 5 deletions aggregator/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
// ethTxManager contains the methods required to send txs to
// ethereum.
type ethTxManager interface {
VerifyBatch(batchNum uint64, proof *pb.GetProofResponse) error
VerifyBatch(batchNum uint64, proof *pb.GetProofResponse)
}

// etherman contains the methods required to interact with ethereum
Expand Down Expand Up @@ -43,10 +43,9 @@ type stateInterface interface {
GetLastVerifiedBatch(ctx context.Context, dbTx pgx.Tx) (*state.VerifiedBatch, error)
GetVirtualBatchByNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*state.Batch, error)
GetBatchByNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*state.Batch, error)
AddGeneratedProof(ctx context.Context, proof *state.Proof, dbTx pgx.Tx) error
UpdateGeneratedProof(ctx context.Context, proof *state.Proof, dbTx pgx.Tx) error
GetGeneratedProofByBatchNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*state.Proof, error)
AddGeneratedProof(ctx context.Context, batchNumber uint64, proof *pb.GetProofResponse, dbTx pgx.Tx) error
UpdateGeneratedProof(ctx context.Context, batchNumber uint64, proof *pb.GetProofResponse, dbTx pgx.Tx) error
GetGeneratedProofByBatchNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*pb.GetProofResponse, error)
DeleteGeneratedProof(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error
DeleteUngeneratedProofs(ctx context.Context, dbTx pgx.Tx) error
GetWIPProofByProver(ctx context.Context, prover string, dbTx pgx.Tx) (*state.Proof, error)
}
13 changes: 2 additions & 11 deletions aggregator/mocks/mock_ethtxmanager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 17 additions & 38 deletions aggregator/mocks/mock_state.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit c26f089

Please sign in to comment.