forked from 0xPolygonHermez/zkevm-node
-
Notifications
You must be signed in to change notification settings - Fork 0
/
aggregator.go
1059 lines (888 loc) · 33.1 KB
/
aggregator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package aggregator
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/big"
"net"
"strconv"
"strings"
"sync"
"time"
"github.com/0xPolygonHermez/zkevm-node/aggregator/metrics"
"github.com/0xPolygonHermez/zkevm-node/aggregator/pb"
"github.com/0xPolygonHermez/zkevm-node/aggregator/prover"
"github.com/0xPolygonHermez/zkevm-node/config/types"
"github.com/0xPolygonHermez/zkevm-node/encoding"
ethmanTypes "github.com/0xPolygonHermez/zkevm-node/etherman/types"
"github.com/0xPolygonHermez/zkevm-node/ethtxmanager"
"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/0xPolygonHermez/zkevm-node/state"
"github.com/ethereum/go-ethereum/common"
"github.com/jackc/pgx/v4"
"google.golang.org/grpc"
grpchealth "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/peer"
)
const (
mockedStateRoot = "0x090bcaf734c4f06c93954a827b45a6e8c67b8e0fd1e0a35a1c5982d6961828f9"
mockedLocalExitRoot = "0x17c04c3760510b48c6012742c540a81aba4bca2f78b9d14bfd2f123e2e53ea3e"
ethTxManagerOwner = "aggregator"
monitoredIDFormat = "proof-from-%v-to-%v"
)
type finalProofMsg struct {
proverName string
proverID string
recursiveProof *state.Proof
finalProof *pb.FinalProof
}
// Aggregator represents an aggregator
type Aggregator struct {
pb.UnimplementedAggregatorServiceServer
cfg Config
State stateInterface
EthTxManager ethTxManager
Ethman etherman
ProfitabilityChecker aggregatorTxProfitabilityChecker
TimeSendFinalProof time.Time
TimeCleanupLockedProofs types.Duration
StateDBMutex *sync.Mutex
TimeSendFinalProofMutex *sync.RWMutex
finalProof chan finalProofMsg
verifyingProof bool
srv *grpc.Server
ctx context.Context
exit context.CancelFunc
}
// New creates a new aggregator.
func New(
cfg Config,
stateInterface stateInterface,
ethTxManager ethTxManager,
etherman etherman,
) (Aggregator, error) {
var profitabilityChecker aggregatorTxProfitabilityChecker
switch cfg.TxProfitabilityCheckerType {
case ProfitabilityBase:
profitabilityChecker = NewTxProfitabilityCheckerBase(stateInterface, cfg.IntervalAfterWhichBatchConsolidateAnyway.Duration, cfg.TxProfitabilityMinReward.Int)
case ProfitabilityAcceptAll:
profitabilityChecker = NewTxProfitabilityCheckerAcceptAll(stateInterface, cfg.IntervalAfterWhichBatchConsolidateAnyway.Duration)
}
a := Aggregator{
cfg: cfg,
State: stateInterface,
EthTxManager: ethTxManager,
Ethman: etherman,
ProfitabilityChecker: profitabilityChecker,
StateDBMutex: &sync.Mutex{},
TimeSendFinalProofMutex: &sync.RWMutex{},
TimeCleanupLockedProofs: cfg.CleanupLockedProofsInterval,
finalProof: make(chan finalProofMsg),
}
return a, nil
}
// Start starts the aggregator
func (a *Aggregator) Start(ctx context.Context) error {
var cancel context.CancelFunc
if ctx == nil {
ctx = context.Background()
}
ctx, cancel = context.WithCancel(ctx)
a.ctx = ctx
a.exit = cancel
metrics.Register()
// process monitored batch verifications before starting
a.EthTxManager.ProcessPendingMonitoredTxs(ctx, ethTxManagerOwner, func(result ethtxmanager.MonitoredTxResult, dbTx pgx.Tx) {
a.handleMonitoredTxResult(result)
}, nil)
// Delete ungenerated recursive proofs
err := a.State.DeleteUngeneratedProofs(ctx, nil)
if err != nil {
return fmt.Errorf("failed to initialize proofs cache %w", err)
}
address := fmt.Sprintf("%s:%d", a.cfg.Host, a.cfg.Port)
lis, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
a.srv = grpc.NewServer()
pb.RegisterAggregatorServiceServer(a.srv, a)
healthService := newHealthChecker()
grpchealth.RegisterHealthServer(a.srv, healthService)
go func() {
log.Infof("Server listening on port %d", a.cfg.Port)
if err := a.srv.Serve(lis); err != nil {
a.exit()
log.Fatalf("Failed to serve: %v", err)
}
}()
a.resetVerifyProofTime()
go a.cleanupLockedProofs()
go a.sendFinalProof()
<-ctx.Done()
return ctx.Err()
}
// Stop stops the Aggregator server.
func (a *Aggregator) Stop() {
a.exit()
a.srv.Stop()
}
// Channel implements the bi-directional communication channel between the
// Prover client and the Aggregator server.
func (a *Aggregator) Channel(stream pb.AggregatorService_ChannelServer) error {
metrics.ConnectedProver()
defer metrics.DisconnectedProver()
ctx := stream.Context()
var proverAddr net.Addr
p, ok := peer.FromContext(ctx)
if ok {
proverAddr = p.Addr
}
prover, err := prover.New(stream, proverAddr, a.cfg.ProofStatePollingInterval)
if err != nil {
return err
}
log := log.WithFields(
"prover", prover.Name(),
"proverId", prover.ID(),
"proverAddr", prover.Addr(),
)
log.Debug("Establishing stream connection with prover")
// Check if prover supports the required Fork ID
if !prover.SupportsForkID(a.cfg.ForkId) {
log.Warnf("Prover does not support required fork ID: %d.", a.cfg.ForkId)
return fmt.Errorf("prover does not support required fork ID: %d", a.cfg.ForkId)
}
for {
select {
case <-a.ctx.Done():
// server disconnected
return a.ctx.Err()
case <-ctx.Done():
// client disconnected
return ctx.Err()
default:
isIdle, err := prover.IsIdle()
if err != nil {
log.Errorf("failed to check if prover is idle: %v", err)
time.Sleep(a.cfg.RetryTime.Duration)
continue
}
if !isIdle {
log.Debug("Prover is not idle")
time.Sleep(a.cfg.RetryTime.Duration)
continue
}
_, err = a.tryBuildFinalProof(ctx, prover, nil)
if err != nil {
log.Errorf("error checking proofs to verify: %v", err)
}
proofGenerated, err := a.tryAggregateProofs(ctx, prover)
if err != nil {
log.Errorf("error trying to aggregate proofs: %v", err)
}
if !proofGenerated {
proofGenerated, err = a.tryGenerateBatchProof(ctx, prover)
if err != nil {
log.Errorf("error trying to generate proof: %v", err)
}
}
if !proofGenerated {
// if no proof was generated (aggregated or batch) wait some time before retry
time.Sleep(a.cfg.RetryTime.Duration)
} // if proof was generated we retry immediately as probably we have more proofs to process
}
}
}
// This function waits to receive a final proof from a prover. Once it receives
// the proof, it performs these steps in order:
// - send the final proof to L1
// - wait for the synchronizer to catch up
// - clean up the cache of recursive proofs
func (a *Aggregator) sendFinalProof() {
for {
select {
case <-a.ctx.Done():
return
case msg := <-a.finalProof:
ctx := a.ctx
proof := msg.recursiveProof
log.WithFields("proofId", proof.ProofID, "batches", fmt.Sprintf("%d-%d", proof.BatchNumber, proof.BatchNumberFinal))
log.Info("Verifying final proof with ethereum smart contract")
a.startProofVerification()
finalBatch, err := a.State.GetBatchByNumber(ctx, proof.BatchNumberFinal, nil)
if err != nil {
log.Errorf("failed to retrieve batch with number [%d]: %v", proof.BatchNumberFinal, err)
a.endProofVerification()
continue
}
inputs := ethmanTypes.FinalProofInputs{
FinalProof: msg.finalProof,
NewLocalExitRoot: finalBatch.LocalExitRoot.Bytes(),
NewStateRoot: finalBatch.StateRoot.Bytes(),
}
log.Infof("Final proof inputs: NewLocalExitRoot [%#x], NewStateRoot [%#x]", inputs.NewLocalExitRoot, inputs.NewStateRoot)
// add batch verification to be monitored
sender := common.HexToAddress(a.cfg.SenderAddress)
to, data, err := a.Ethman.BuildTrustedVerifyBatchesTxData(proof.BatchNumber-1, proof.BatchNumberFinal, &inputs)
if err != nil {
log.Errorf("error estimating batch verification to add to eth tx manager: %v", err)
a.handleFailureToAddVerifyBatchToBeMonitored(ctx, proof)
continue
}
monitoredTxID := buildMonitoredTxID(proof.BatchNumber, proof.BatchNumberFinal)
err = a.EthTxManager.Add(ctx, ethTxManagerOwner, monitoredTxID, sender, to, nil, data, nil)
if err != nil {
log := log.WithFields("tx", monitoredTxID)
log.Errorf("error to add batch verification tx to eth tx manager: %v", err)
a.handleFailureToAddVerifyBatchToBeMonitored(ctx, proof)
continue
}
// process monitored batch verifications before starting a next cycle
a.EthTxManager.ProcessPendingMonitoredTxs(ctx, ethTxManagerOwner, func(result ethtxmanager.MonitoredTxResult, dbTx pgx.Tx) {
a.handleMonitoredTxResult(result)
}, nil)
a.resetVerifyProofTime()
a.endProofVerification()
}
}
}
func (a *Aggregator) handleFailureToAddVerifyBatchToBeMonitored(ctx context.Context, proof *state.Proof) {
log := log.WithFields("proofId", proof.ProofID, "batches", fmt.Sprintf("%d-%d", proof.BatchNumber, proof.BatchNumberFinal))
proof.GeneratingSince = nil
err := a.State.UpdateGeneratedProof(ctx, proof, nil)
if err != nil {
log.Errorf("failed updating proof state (false): %v", err)
}
a.endProofVerification()
}
// buildFinalProof builds and return the final proof for an aggregated/batch proof.
func (a *Aggregator) buildFinalProof(ctx context.Context, prover proverInterface, proof *state.Proof) (*pb.FinalProof, error) {
log := log.WithFields(
"prover", prover.Name(),
"proverId", prover.ID(),
"proverAddr", prover.Addr(),
"recursiveProofId", *proof.ProofID,
"batches", fmt.Sprintf("%d-%d", proof.BatchNumber, proof.BatchNumberFinal),
)
log.Info("Generating final proof")
finalProofID, err := prover.FinalProof(proof.Proof, a.cfg.SenderAddress)
if err != nil {
return nil, fmt.Errorf("failed to get final proof id: %w", err)
}
proof.ProofID = finalProofID
log.Infof("Final proof ID for batches [%d-%d]: %s", proof.BatchNumber, proof.BatchNumberFinal, *proof.ProofID)
log = log.WithFields("finalProofId", finalProofID)
finalProof, err := prover.WaitFinalProof(ctx, *proof.ProofID)
if err != nil {
return nil, fmt.Errorf("failed to get final proof from prover: %w", err)
}
log.Info("Final proof generated")
// mock prover sanity check
if string(finalProof.Public.NewStateRoot) == mockedStateRoot && string(finalProof.Public.NewLocalExitRoot) == mockedLocalExitRoot {
// This local exit root and state root come from the mock
// prover, use the one captured by the executor instead
finalBatch, err := a.State.GetBatchByNumber(ctx, proof.BatchNumberFinal, nil)
if err != nil {
return nil, fmt.Errorf("failed to retrieve batch with number [%d]", proof.BatchNumberFinal)
}
log.Warnf("NewLocalExitRoot and NewStateRoot look like a mock values, using values from executor instead: LER: %v, SR: %v",
finalBatch.LocalExitRoot.TerminalString(), finalBatch.StateRoot.TerminalString())
finalProof.Public.NewStateRoot = finalBatch.StateRoot.Bytes()
finalProof.Public.NewLocalExitRoot = finalBatch.LocalExitRoot.Bytes()
}
return finalProof, nil
}
// tryBuildFinalProof checks if the provided proof is eligible to be used to
// build the final proof. If no proof is provided it looks for a previously
// generated proof. If the proof is eligible, then the final proof generation
// is triggered.
func (a *Aggregator) tryBuildFinalProof(ctx context.Context, prover proverInterface, proof *state.Proof) (bool, error) {
proverName := prover.Name()
proverID := prover.ID()
log := log.WithFields(
"prover", proverName,
"proverId", proverID,
"proverAddr", prover.Addr(),
)
log.Debug("tryBuildFinalProof start")
var err error
if !a.canVerifyProof() {
log.Debug("Time to verify proof not reached or proof verification in progress")
return false, nil
}
log.Debug("Send final proof time reached")
for !a.isSynced(ctx, nil) {
log.Info("Waiting for synchronizer to sync...")
time.Sleep(a.cfg.RetryTime.Duration)
continue
}
var lastVerifiedBatchNum uint64
lastVerifiedBatch, err := a.State.GetLastVerifiedBatch(ctx, nil)
if err != nil && !errors.Is(err, state.ErrNotFound) {
return false, fmt.Errorf("failed to get last verified batch, %w", err)
}
if lastVerifiedBatch != nil {
lastVerifiedBatchNum = lastVerifiedBatch.BatchNumber
}
if proof == nil {
// we don't have a proof generating at the moment, check if we
// have a proof ready to verify
proof, err = a.getAndLockProofReadyToVerify(ctx, prover, lastVerifiedBatchNum)
if errors.Is(err, state.ErrNotFound) {
// nothing to verify, swallow the error
log.Debug("No proof ready to verify")
return false, nil
}
if err != nil {
return false, err
}
defer func() {
if err != nil {
// Set the generating state to false for the proof ("unlock" it)
proof.GeneratingSince = nil
err2 := a.State.UpdateGeneratedProof(a.ctx, proof, nil)
if err2 != nil {
log.Errorf("failed to unlock proof: %v", err2)
}
}
}()
} else {
// we do have a proof generating at the moment, check if it is
// eligible to be verified
eligible, err := a.validateEligibleFinalProof(ctx, proof, lastVerifiedBatchNum)
if err != nil {
return false, fmt.Errorf("failed to validate eligible final proof, %w", err)
}
if !eligible {
return false, nil
}
}
log = log.WithFields(
"proofId", *proof.ProofID,
"batches", fmt.Sprintf("%d-%d", proof.BatchNumber, proof.BatchNumberFinal),
)
// at this point we have an eligible proof, build the final one using it
finalProof, err := a.buildFinalProof(ctx, prover, proof)
if err != nil {
return false, fmt.Errorf("failed to build final proof, %w", err)
}
msg := finalProofMsg{
proverName: proverName,
proverID: proverID,
recursiveProof: proof,
finalProof: finalProof,
}
select {
case <-a.ctx.Done():
return false, a.ctx.Err()
case a.finalProof <- msg:
}
log.Debug("tryBuildFinalProof end")
return true, nil
}
func (a *Aggregator) validateEligibleFinalProof(ctx context.Context, proof *state.Proof, lastVerifiedBatchNum uint64) (bool, error) {
batchNumberToVerify := lastVerifiedBatchNum + 1
if proof.BatchNumber != batchNumberToVerify {
if proof.BatchNumber < batchNumberToVerify && proof.BatchNumberFinal >= batchNumberToVerify {
// We have a proof that contains some batches below the last batch verified, anyway can be eligible as final proof
log.Warnf("Proof %d-%d contains some batches lower than last batch verified %d. Check anyway if it is eligible", proof.BatchNumber, proof.BatchNumberFinal, lastVerifiedBatchNum)
} else if proof.BatchNumberFinal < batchNumberToVerify {
// We have a proof that contains batches below that the last batch verified, we need to delete this proof
log.Warnf("Proof %d-%d lower than next batch to verify %d. Deleting it", proof.BatchNumber, proof.BatchNumberFinal, batchNumberToVerify)
err := a.State.DeleteGeneratedProofs(ctx, proof.BatchNumber, proof.BatchNumberFinal, nil)
if err != nil {
return false, fmt.Errorf("Failed to delete discarded proof, err: %w", err)
}
return false, nil
} else {
log.Debugf("Proof batch number %d is not the following to last verfied batch number %d", proof.BatchNumber, lastVerifiedBatchNum)
return false, nil
}
}
bComplete, err := a.State.CheckProofContainsCompleteSequences(ctx, proof, nil)
if err != nil {
return false, fmt.Errorf("failed to check if proof contains complete sequences, %w", err)
}
if !bComplete {
log.Infof("Recursive proof %d-%d not eligible to be verified: not containing complete sequences", proof.BatchNumber, proof.BatchNumberFinal)
return false, nil
}
return true, nil
}
func (a *Aggregator) getAndLockProofReadyToVerify(ctx context.Context, prover proverInterface, lastVerifiedBatchNum uint64) (*state.Proof, error) {
a.StateDBMutex.Lock()
defer a.StateDBMutex.Unlock()
// Get proof ready to be verified
proofToVerify, err := a.State.GetProofReadyToVerify(ctx, lastVerifiedBatchNum, nil)
if err != nil {
return nil, err
}
now := time.Now().Round(time.Microsecond)
proofToVerify.GeneratingSince = &now
err = a.State.UpdateGeneratedProof(ctx, proofToVerify, nil)
if err != nil {
return nil, err
}
return proofToVerify, nil
}
func (a *Aggregator) unlockProofsToAggregate(ctx context.Context, proof1 *state.Proof, proof2 *state.Proof) error {
// Release proofs from generating state in a single transaction
dbTx, err := a.State.BeginStateTransaction(ctx)
if err != nil {
log.Warnf("Failed to begin transaction to release proof aggregation state, err: %v", err)
return err
}
proof1.GeneratingSince = nil
err = a.State.UpdateGeneratedProof(ctx, proof1, dbTx)
if err == nil {
proof2.GeneratingSince = nil
err = a.State.UpdateGeneratedProof(ctx, proof2, dbTx)
}
if err != nil {
if err := dbTx.Rollback(ctx); err != nil {
err := fmt.Errorf("failed to rollback proof aggregation state: %w", err)
log.Error(err.Error())
return err
}
return fmt.Errorf("failed to release proof aggregation state: %w", err)
}
err = dbTx.Commit(ctx)
if err != nil {
return fmt.Errorf("failed to release proof aggregation state %w", err)
}
return nil
}
func (a *Aggregator) getAndLockProofsToAggregate(ctx context.Context, prover proverInterface) (*state.Proof, *state.Proof, error) {
log := log.WithFields(
"prover", prover.Name(),
"proverId", prover.ID(),
"proverAddr", prover.Addr(),
)
a.StateDBMutex.Lock()
defer a.StateDBMutex.Unlock()
proof1, proof2, err := a.State.GetProofsToAggregate(ctx, nil)
if err != nil {
return nil, nil, err
}
// Set proofs in generating state in a single transaction
dbTx, err := a.State.BeginStateTransaction(ctx)
if err != nil {
log.Errorf("Failed to begin transaction to set proof aggregation state, err: %v", err)
return nil, nil, err
}
now := time.Now().Round(time.Microsecond)
proof1.GeneratingSince = &now
err = a.State.UpdateGeneratedProof(ctx, proof1, dbTx)
if err == nil {
proof2.GeneratingSince = &now
err = a.State.UpdateGeneratedProof(ctx, proof2, dbTx)
}
if err != nil {
if err := dbTx.Rollback(ctx); err != nil {
err := fmt.Errorf("failed to rollback proof aggregation state %w", err)
log.Error(err.Error())
return nil, nil, err
}
return nil, nil, fmt.Errorf("failed to set proof aggregation state %w", err)
}
err = dbTx.Commit(ctx)
if err != nil {
return nil, nil, fmt.Errorf("failed to set proof aggregation state %w", err)
}
return proof1, proof2, nil
}
func (a *Aggregator) tryAggregateProofs(ctx context.Context, prover proverInterface) (bool, error) {
proverName := prover.Name()
proverID := prover.ID()
log := log.WithFields(
"prover", proverName,
"proverId", proverID,
"proverAddr", prover.Addr(),
)
log.Debug("tryAggregateProofs start")
proof1, proof2, err0 := a.getAndLockProofsToAggregate(ctx, prover)
if errors.Is(err0, state.ErrNotFound) {
// nothing to aggregate, swallow the error
log.Debug("Nothing to aggregate")
return false, nil
}
if err0 != nil {
return false, err0
}
var (
aggrProofID *string
err error
)
defer func() {
if err != nil {
err2 := a.unlockProofsToAggregate(a.ctx, proof1, proof2)
if err2 != nil {
log.Errorf("Failed to release aggregated proofs, err: %v", err2)
}
}
log.Debug("tryAggregateProofs end")
}()
log.Infof("Aggregating proofs: %d-%d and %d-%d", proof1.BatchNumber, proof1.BatchNumberFinal, proof2.BatchNumber, proof2.BatchNumberFinal)
log = log.WithFields("batches", fmt.Sprintf("%d-%d", proof1.BatchNumber, proof2.BatchNumberFinal))
inputProver := map[string]interface{}{
"recursive_proof_1": proof1.Proof,
"recursive_proof_2": proof2.Proof,
}
b, err := json.Marshal(inputProver)
if err != nil {
return false, fmt.Errorf("failed to serialize input prover, %w", err)
}
proof := &state.Proof{
BatchNumber: proof1.BatchNumber,
BatchNumberFinal: proof2.BatchNumberFinal,
Prover: &proverName,
ProverID: &proverID,
InputProver: string(b),
}
aggrProofID, err = prover.AggregatedProof(proof1.Proof, proof2.Proof)
if err != nil {
return false, fmt.Errorf("failed to get aggregated proof id, %w", err)
}
proof.ProofID = aggrProofID
log.Infof("Proof ID for aggregated proof: %v", *proof.ProofID)
log = log.WithFields("proofId", *proof.ProofID)
recursiveProof, err := prover.WaitRecursiveProof(ctx, *proof.ProofID)
if err != nil {
return false, fmt.Errorf("failed to get aggregated proof from prover, %w", err)
}
log.Info("Aggregated proof generated")
proof.Proof = recursiveProof
// update the state by removing the 2 aggregated proofs and storing the
// newly generated recursive proof
dbTx, err := a.State.BeginStateTransaction(ctx)
if err != nil {
return false, fmt.Errorf("failed to begin transaction to update proof aggregation state: %w", err)
}
err = a.State.DeleteGeneratedProofs(ctx, proof1.BatchNumber, proof2.BatchNumberFinal, dbTx)
if err != nil {
if err := dbTx.Rollback(ctx); err != nil {
err := fmt.Errorf("failed to rollback proof aggregation state: %w", err)
log.Error(err.Error())
return false, err
}
return false, fmt.Errorf("failed to delete previously aggregated proofs: %w", err)
}
now := time.Now().Round(time.Microsecond)
proof.GeneratingSince = &now
err = a.State.AddGeneratedProof(ctx, proof, dbTx)
if err != nil {
if err := dbTx.Rollback(ctx); err != nil {
err := fmt.Errorf("failed to rollback proof aggregation state: %w", err)
log.Error(err.Error())
return false, err
}
return false, fmt.Errorf("failed to store the recursive proof: %w", err)
}
err = dbTx.Commit(ctx)
if err != nil {
return false, fmt.Errorf("failed to store the recursive proof: %w", err)
}
// NOTE(pg): the defer func is useless from now on, use a different variable
// name for errors (or shadow err in inner scopes) to not trigger it.
// state is up to date, check if we can send the final proof using the
// one just crafted.
finalProofBuilt, finalProofErr := a.tryBuildFinalProof(ctx, prover, proof)
if finalProofErr != nil {
// just log the error and continue to handle the aggregated proof
log.Errorf("failed trying to check if recursive proof can be verified: %v", finalProofErr)
}
// NOTE(pg): prover is done, use a.ctx from now on
if !finalProofBuilt {
proof.GeneratingSince = nil
// final proof has not been generated, update the recursive proof
err := a.State.UpdateGeneratedProof(a.ctx, proof, nil)
if err != nil {
log.Errorf("Failed to store batch proof result, err %v", err)
return false, err
}
}
return true, nil
}
func (a *Aggregator) getAndLockBatchToProve(ctx context.Context, prover proverInterface) (*state.Batch, *state.Proof, error) {
proverID := prover.ID()
proverName := prover.Name()
log := log.WithFields(
"prover", proverName,
"proverId", proverID,
"proverAddr", prover.Addr(),
)
a.StateDBMutex.Lock()
defer a.StateDBMutex.Unlock()
lastVerifiedBatch, err := a.State.GetLastVerifiedBatch(ctx, nil)
if err != nil {
return nil, nil, err
}
// Get virtual batch pending to generate proof
batchToVerify, err := a.State.GetVirtualBatchToProve(ctx, lastVerifiedBatch.BatchNumber, nil)
if err != nil {
return nil, nil, err
}
log.Infof("Found virtual batch %d pending to generate proof", batchToVerify.BatchNumber)
log = log.WithFields("batch", batchToVerify.BatchNumber)
log.Info("Checking profitability to aggregate batch")
// pass matic collateral as zero here, bcs in smart contract fee for aggregator is not defined yet
isProfitable, err := a.ProfitabilityChecker.IsProfitable(ctx, big.NewInt(0))
if err != nil {
log.Errorf("Failed to check aggregator profitability, err: %v", err)
return nil, nil, err
}
if !isProfitable {
log.Infof("Batch is not profitable, matic collateral %d", big.NewInt(0))
return nil, nil, err
}
now := time.Now().Round(time.Microsecond)
proof := &state.Proof{
BatchNumber: batchToVerify.BatchNumber,
BatchNumberFinal: batchToVerify.BatchNumber,
Prover: &proverName,
ProverID: &proverID,
GeneratingSince: &now,
}
// Avoid other prover to process the same batch
err = a.State.AddGeneratedProof(ctx, proof, nil)
if err != nil {
log.Errorf("Failed to add batch proof, err: %v", err)
return nil, nil, err
}
return batchToVerify, proof, nil
}
func (a *Aggregator) tryGenerateBatchProof(ctx context.Context, prover proverInterface) (bool, error) {
log := log.WithFields(
"prover", prover.Name(),
"proverId", prover.ID(),
"proverAddr", prover.Addr(),
)
log.Debug("tryGenerateBatchProof start")
batchToProve, proof, err0 := a.getAndLockBatchToProve(ctx, prover)
if errors.Is(err0, state.ErrNotFound) {
// nothing to proof, swallow the error
log.Debug("Nothing to generate proof")
return false, nil
}
if err0 != nil {
return false, err0
}
var (
genProofID *string
err error
)
log = log.WithFields("batch", batchToProve.BatchNumber)
defer func() {
if err != nil {
err2 := a.State.DeleteGeneratedProofs(a.ctx, proof.BatchNumber, proof.BatchNumberFinal, nil)
if err2 != nil {
log.Errorf("Failed to delete proof in progress, err: %v", err2)
}
}
log.Debug("tryGenerateBatchProof end")
}()
log.Info("Generating proof from batch")
log.Infof("Sending zki + batch to the prover, batchNumber [%d]", batchToProve.BatchNumber)
inputProver, err := a.buildInputProver(ctx, batchToProve)
if err != nil {
return false, fmt.Errorf("failed to build input prover, %w", err)
}
b, err := json.Marshal(inputProver)
if err != nil {
return false, fmt.Errorf("failed to serialize input prover, %w", err)
}
proof.InputProver = string(b)
log.Infof("Sending a batch to the prover. OldStateRoot [%#x], OldBatchNum [%d]",
inputProver.PublicInputs.OldStateRoot, inputProver.PublicInputs.OldBatchNum)
genProofID, err = prover.BatchProof(inputProver)
if err != nil {
return false, fmt.Errorf("failed to get batch proof id %w", err)
}
proof.ProofID = genProofID
log.Infof("Proof ID %v", *proof.ProofID)
log = log.WithFields("proofId", *proof.ProofID)
resGetProof, err := prover.WaitRecursiveProof(ctx, *proof.ProofID)
if err != nil {
return false, fmt.Errorf("failed to get proof from prover %w", err)
}
log.Info("Batch proof generated")
proof.Proof = resGetProof
// NOTE(pg): the defer func is useless from now on, use a different variable
// name for errors (or shadow err in inner scopes) to not trigger it.
finalProofBuilt, finalProofErr := a.tryBuildFinalProof(ctx, prover, proof)
if finalProofErr != nil {
// just log the error and continue to handle the generated proof
log.Errorf("error trying to build final proof %v", finalProofErr)
}
// NOTE(pg): prover is done, use a.ctx from now on
if !finalProofBuilt {
proof.GeneratingSince = nil
// final proof has not been generated, update the batch proof
err := a.State.UpdateGeneratedProof(a.ctx, proof, nil)
if err != nil {
log.Errorf("Failed to store batch proof result, err %v", err)
return false, err
}
}
return true, nil
}
// canVerifyProof returns true if we have reached the timeout to verify a proof
// and no other prover is verifying a proof (verifyingProof = false).
func (a *Aggregator) canVerifyProof() bool {
a.TimeSendFinalProofMutex.RLock()
defer a.TimeSendFinalProofMutex.RUnlock()
return a.TimeSendFinalProof.Before(time.Now()) && !a.verifyingProof
}
// startProofVerification sets to true the verifyingProof variable to indicate that there is a proof verification in progress
func (a *Aggregator) startProofVerification() {
a.TimeSendFinalProofMutex.Lock()
defer a.TimeSendFinalProofMutex.Unlock()
a.verifyingProof = true
}
// endProofVerification set verifyingProof to false to indicate that there is not proof verification in progress
func (a *Aggregator) endProofVerification() {
a.TimeSendFinalProofMutex.Lock()
defer a.TimeSendFinalProofMutex.Unlock()
a.verifyingProof = false
}
// resetVerifyProofTime updates the timeout to verify a proof.
func (a *Aggregator) resetVerifyProofTime() {
a.TimeSendFinalProofMutex.Lock()
defer a.TimeSendFinalProofMutex.Unlock()
a.TimeSendFinalProof = time.Now().Add(a.cfg.VerifyProofInterval.Duration)
}
// isSynced checks if the state is synchronized with L1. If a batch number is
// provided, it makes sure that the state is synced with that batch.
func (a *Aggregator) isSynced(ctx context.Context, batchNum *uint64) bool {
// get latest verified batch as seen by the synchronizer
lastVerifiedBatch, err := a.State.GetLastVerifiedBatch(ctx, nil)
if err == state.ErrNotFound {
return false
}
if err != nil {
log.Warnf("Failed to get last consolidated batch: %v", err)
return false
}
if lastVerifiedBatch == nil {
return false
}
if batchNum != nil && lastVerifiedBatch.BatchNumber < *batchNum {
log.Infof("Waiting for the state to be synced, lastVerifiedBatchNum: %d, waiting for batch: %d", lastVerifiedBatch.BatchNumber, batchNum)
return false
}
// latest verified batch in L1
lastVerifiedEthBatchNum, err := a.Ethman.GetLatestVerifiedBatchNum()
if err != nil {
log.Warnf("Failed to get last eth batch, err: %v", err)
return false
}
// check if L2 is synced with L1
if lastVerifiedBatch.BatchNumber < lastVerifiedEthBatchNum {
log.Infof("Waiting for the state to be synced, lastVerifiedBatchNum: %d, lastVerifiedEthBatchNum: %d, waiting for batch",
lastVerifiedBatch.BatchNumber, lastVerifiedEthBatchNum)
return false
}
return true
}
func (a *Aggregator) buildInputProver(ctx context.Context, batchToVerify *state.Batch) (*pb.InputProver, error) {
previousBatch, err := a.State.GetBatchByNumber(ctx, batchToVerify.BatchNumber-1, nil)
if err != nil && err != state.ErrStateNotSynchronized {
return nil, fmt.Errorf("failed to get previous batch, err: %v", err)
}
inputProver := &pb.InputProver{
PublicInputs: &pb.PublicInputs{
OldStateRoot: previousBatch.StateRoot.Bytes(),
OldAccInputHash: previousBatch.AccInputHash.Bytes(),
OldBatchNum: previousBatch.BatchNumber,
ChainId: a.cfg.ChainID,
ForkId: a.cfg.ForkId,
BatchL2Data: batchToVerify.BatchL2Data,
GlobalExitRoot: batchToVerify.GlobalExitRoot.Bytes(),
EthTimestamp: uint64(batchToVerify.Timestamp.Unix()),
SequencerAddr: batchToVerify.Coinbase.String(),
AggregatorAddr: a.cfg.SenderAddress,
},
Db: map[string]string{},
ContractsBytecode: map[string]string{},
}
return inputProver, nil
}
// healthChecker will provide an implementation of the HealthCheck interface.
type healthChecker struct{}
// newHealthChecker returns a health checker according to standard package
// grpc.health.v1.
func newHealthChecker() *healthChecker {
return &healthChecker{}
}
// HealthCheck interface implementation.
// Check returns the current status of the server for unary gRPC health requests,
// for now if the server is up and able to respond we will always return SERVING.
func (hc *healthChecker) Check(ctx context.Context, req *grpchealth.HealthCheckRequest) (*grpchealth.HealthCheckResponse, error) {
log.Info("Serving the Check request for health check")
return &grpchealth.HealthCheckResponse{
Status: grpchealth.HealthCheckResponse_SERVING,
}, nil
}
// Watch returns the current status of the server for stream gRPC health requests,
// for now if the server is up and able to respond we will always return SERVING.
func (hc *healthChecker) Watch(req *grpchealth.HealthCheckRequest, server grpchealth.Health_WatchServer) error {
log.Info("Serving the Watch request for health check")
return server.Send(&grpchealth.HealthCheckResponse{
Status: grpchealth.HealthCheckResponse_SERVING,
})
}
func (a *Aggregator) handleMonitoredTxResult(result ethtxmanager.MonitoredTxResult) {