forked from okx/xlayer-node
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pgstatestorage.go
2395 lines (2058 loc) · 81.3 KB
/
pgstatestorage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package state
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/big"
"time"
"github.com/0xPolygonHermez/zkevm-node/hex"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
)
const maxTopics = 4
const (
getLastBatchNumberSQL = "SELECT batch_num FROM state.batch ORDER BY batch_num DESC LIMIT 1"
getLastBlockNumSQL = "SELECT block_num FROM state.block ORDER BY block_num DESC LIMIT 1"
getBlockTimeByNumSQL = "SELECT received_at FROM state.block WHERE block_num = $1"
)
// PostgresStorage implements the Storage interface
type PostgresStorage struct {
*pgxpool.Pool
}
// NewPostgresStorage creates a new StateDB
func NewPostgresStorage(db *pgxpool.Pool) *PostgresStorage {
return &PostgresStorage{
db,
}
}
// getExecQuerier determines which execQuerier to use, dbTx or the main pgxpool
func (p *PostgresStorage) getExecQuerier(dbTx pgx.Tx) execQuerier {
if dbTx != nil {
return dbTx
}
return p
}
// Reset resets the state to a block for the given DB tx
func (p *PostgresStorage) Reset(ctx context.Context, blockNumber uint64, dbTx pgx.Tx) error {
e := p.getExecQuerier(dbTx)
const resetSQL = "DELETE FROM state.block WHERE block_num > $1"
if _, err := e.Exec(ctx, resetSQL, blockNumber); err != nil {
return err
}
return nil
}
// ResetForkID resets the state to reprocess the newer batches with the correct forkID
func (p *PostgresStorage) ResetForkID(ctx context.Context, batchNumber, forkID uint64, version string, dbTx pgx.Tx) error {
e := p.getExecQuerier(dbTx)
const resetVirtualStateSQL = "delete from state.block where block_num >=(select min(block_num) from state.virtual_batch where batch_num >= $1)"
if _, err := e.Exec(ctx, resetVirtualStateSQL, batchNumber); err != nil {
return err
}
err := p.ResetTrustedState(ctx, batchNumber-1, dbTx)
if err != nil {
return err
}
reorg := TrustedReorg{
BatchNumber: batchNumber,
Reason: fmt.Sprintf("New ForkID: %d. Version: %s", forkID, version),
}
err = p.AddTrustedReorg(ctx, &reorg, dbTx)
if err != nil {
return err
}
// Delete proofs for higher batches
const deleteProofsSQL = "delete from state.proof where batch_num >= $1 or (batch_num <= $1 and batch_num_final >= $1)"
if _, err := e.Exec(ctx, deleteProofsSQL, batchNumber); err != nil {
return err
}
return nil
}
// ResetTrustedState removes the batches with number greater than the given one
// from the database.
func (p *PostgresStorage) ResetTrustedState(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error {
const resetTrustedStateSQL = "DELETE FROM state.batch WHERE batch_num > $1"
e := p.getExecQuerier(dbTx)
if _, err := e.Exec(ctx, resetTrustedStateSQL, batchNumber); err != nil {
return err
}
return nil
}
// AddBlock adds a new block to the State Store
func (p *PostgresStorage) AddBlock(ctx context.Context, block *Block, dbTx pgx.Tx) error {
const addBlockSQL = "INSERT INTO state.block (block_num, block_hash, parent_hash, received_at) VALUES ($1, $2, $3, $4)"
e := p.getExecQuerier(dbTx)
_, err := e.Exec(ctx, addBlockSQL, block.BlockNumber, block.BlockHash.String(), block.ParentHash.String(), block.ReceivedAt)
return err
}
// GetTxsOlderThanNL1Blocks get txs hashes to delete from tx pool
func (p *PostgresStorage) GetTxsOlderThanNL1Blocks(ctx context.Context, nL1Blocks uint64, dbTx pgx.Tx) ([]common.Hash, error) {
var batchNum, blockNum uint64
const getBatchNumByBlockNumFromVirtualBatch = "SELECT batch_num FROM state.virtual_batch WHERE block_num <= $1 ORDER BY batch_num DESC LIMIT 1"
const getTxsHashesBeforeBatchNum = "SELECT hash FROM state.transaction JOIN state.l2block ON state.transaction.l2_block_num = state.l2block.block_num AND state.l2block.batch_num <= $1"
e := p.getExecQuerier(dbTx)
err := e.QueryRow(ctx, getLastBlockNumSQL).Scan(&blockNum)
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrNotFound
} else if err != nil {
return nil, err
}
blockNum = blockNum - nL1Blocks
if blockNum <= 0 {
return nil, errors.New("blockNumDiff is too big, there are no txs to delete")
}
err = e.QueryRow(ctx, getBatchNumByBlockNumFromVirtualBatch, blockNum).Scan(&batchNum)
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrNotFound
} else if err != nil {
return nil, err
}
rows, err := e.Query(ctx, getTxsHashesBeforeBatchNum, batchNum)
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrNotFound
} else if err != nil {
return nil, err
}
hashes := make([]common.Hash, 0, len(rows.RawValues()))
for rows.Next() {
var hash string
err := rows.Scan(&hash)
if err != nil {
return nil, err
}
hashes = append(hashes, common.HexToHash(hash))
}
return hashes, nil
}
// GetLastBlock returns the last L1 block.
func (p *PostgresStorage) GetLastBlock(ctx context.Context, dbTx pgx.Tx) (*Block, error) {
var (
blockHash string
parentHash string
block Block
)
const getLastBlockSQL = "SELECT block_num, block_hash, parent_hash, received_at FROM state.block ORDER BY block_num DESC LIMIT 1"
q := p.getExecQuerier(dbTx)
err := q.QueryRow(ctx, getLastBlockSQL).Scan(&block.BlockNumber, &blockHash, &parentHash, &block.ReceivedAt)
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrStateNotSynchronized
}
block.BlockHash = common.HexToHash(blockHash)
block.ParentHash = common.HexToHash(parentHash)
return &block, err
}
// GetPreviousBlock gets the offset previous L1 block respect to latest.
func (p *PostgresStorage) GetPreviousBlock(ctx context.Context, offset uint64, dbTx pgx.Tx) (*Block, error) {
var (
blockHash string
parentHash string
block Block
)
const getPreviousBlockSQL = "SELECT block_num, block_hash, parent_hash, received_at FROM state.block ORDER BY block_num DESC LIMIT 1 OFFSET $1"
q := p.getExecQuerier(dbTx)
err := q.QueryRow(ctx, getPreviousBlockSQL, offset).Scan(&block.BlockNumber, &blockHash, &parentHash, &block.ReceivedAt)
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrNotFound
}
block.BlockHash = common.HexToHash(blockHash)
block.ParentHash = common.HexToHash(parentHash)
return &block, err
}
// AddGlobalExitRoot adds a new ExitRoot to the db
func (p *PostgresStorage) AddGlobalExitRoot(ctx context.Context, exitRoot *GlobalExitRoot, dbTx pgx.Tx) error {
const addGlobalExitRootSQL = "INSERT INTO state.exit_root (block_num, timestamp, mainnet_exit_root, rollup_exit_root, global_exit_root) VALUES ($1, $2, $3, $4, $5)"
e := p.getExecQuerier(dbTx)
_, err := e.Exec(ctx, addGlobalExitRootSQL, exitRoot.BlockNumber, exitRoot.Timestamp, exitRoot.MainnetExitRoot, exitRoot.RollupExitRoot, exitRoot.GlobalExitRoot)
return err
}
// GetLatestGlobalExitRoot get the latest global ExitRoot synced.
func (p *PostgresStorage) GetLatestGlobalExitRoot(ctx context.Context, maxBlockNumber uint64, dbTx pgx.Tx) (GlobalExitRoot, time.Time, error) {
const getLatestExitRootSQL = "SELECT block_num, mainnet_exit_root, rollup_exit_root, global_exit_root FROM state.exit_root WHERE block_num <= $1 ORDER BY id DESC LIMIT 1"
var (
exitRoot GlobalExitRoot
err error
receivedAt time.Time
)
e := p.getExecQuerier(dbTx)
err = e.QueryRow(ctx, getLatestExitRootSQL, maxBlockNumber).Scan(&exitRoot.BlockNumber, &exitRoot.MainnetExitRoot, &exitRoot.RollupExitRoot, &exitRoot.GlobalExitRoot)
if errors.Is(err, pgx.ErrNoRows) {
return GlobalExitRoot{}, time.Time{}, ErrNotFound
} else if err != nil {
return GlobalExitRoot{}, time.Time{}, err
}
err = e.QueryRow(ctx, getBlockTimeByNumSQL, exitRoot.BlockNumber).Scan(&receivedAt)
if errors.Is(err, pgx.ErrNoRows) {
return GlobalExitRoot{}, time.Time{}, ErrNotFound
} else if err != nil {
return GlobalExitRoot{}, time.Time{}, err
}
return exitRoot, receivedAt, nil
}
// GetNumberOfBlocksSinceLastGERUpdate gets number of blocks since last global exit root update
func (p *PostgresStorage) GetNumberOfBlocksSinceLastGERUpdate(ctx context.Context, dbTx pgx.Tx) (uint64, error) {
var (
lastBlockNum uint64
lastExitRootBlockNum uint64
err error
)
const getLatestExitRootBlockNumSQL = "SELECT block_num FROM state.exit_root ORDER BY id DESC LIMIT 1"
e := p.getExecQuerier(dbTx)
err = e.QueryRow(ctx, getLastBlockNumSQL).Scan(&lastBlockNum)
if errors.Is(err, pgx.ErrNoRows) {
return 0, ErrNotFound
} else if err != nil {
return 0, err
}
err = p.QueryRow(ctx, getLatestExitRootBlockNumSQL).Scan(&lastExitRootBlockNum)
if errors.Is(err, pgx.ErrNoRows) {
return 0, ErrNotFound
} else if err != nil {
return 0, err
}
return lastBlockNum - lastExitRootBlockNum, nil
}
// GetBlockNumAndMainnetExitRootByGER gets block number and mainnet exit root by the global exit root
func (p *PostgresStorage) GetBlockNumAndMainnetExitRootByGER(ctx context.Context, ger common.Hash, dbTx pgx.Tx) (uint64, common.Hash, error) {
var (
blockNum uint64
mainnetExitRoot common.Hash
)
const getMainnetExitRoot = "SELECT block_num, mainnet_exit_root FROM state.exit_root WHERE global_exit_root = $1"
e := p.getExecQuerier(dbTx)
err := e.QueryRow(ctx, getMainnetExitRoot, ger.Bytes()).Scan(&blockNum, &mainnetExitRoot)
if errors.Is(err, pgx.ErrNoRows) {
return 0, common.Hash{}, ErrNotFound
} else if err != nil {
return 0, common.Hash{}, err
}
return blockNum, mainnetExitRoot, nil
}
// GetTimeForLatestBatchVirtualization returns the timestamp of the latest
// virtual batch.
func (p *PostgresStorage) GetTimeForLatestBatchVirtualization(ctx context.Context, dbTx pgx.Tx) (time.Time, error) {
var (
blockNum uint64
timestamp time.Time
)
const getLastVirtualBatchBlockNumSQL = "SELECT block_num FROM state.virtual_batch ORDER BY batch_num DESC LIMIT 1"
e := p.getExecQuerier(dbTx)
err := e.QueryRow(ctx, getLastVirtualBatchBlockNumSQL).Scan(&blockNum)
if errors.Is(err, pgx.ErrNoRows) {
return time.Time{}, ErrNotFound
} else if err != nil {
return time.Time{}, err
}
err = p.QueryRow(ctx, getBlockTimeByNumSQL, blockNum).Scan(×tamp)
if errors.Is(err, pgx.ErrNoRows) {
return time.Time{}, ErrNotFound
} else if err != nil {
return time.Time{}, err
}
return timestamp, nil
}
// AddForcedBatch adds a new ForcedBatch to the db
func (p *PostgresStorage) AddForcedBatch(ctx context.Context, forcedBatch *ForcedBatch, tx pgx.Tx) error {
const addForcedBatchSQL = "INSERT INTO state.forced_batch (forced_batch_num, global_exit_root, timestamp, raw_txs_data, coinbase, block_num) VALUES ($1, $2, $3, $4, $5, $6)"
_, err := tx.Exec(ctx, addForcedBatchSQL, forcedBatch.ForcedBatchNumber, forcedBatch.GlobalExitRoot.String(), forcedBatch.ForcedAt, hex.EncodeToString(forcedBatch.RawTxsData), forcedBatch.Sequencer.String(), forcedBatch.BlockNumber)
return err
}
// GetForcedBatch get an L1 forcedBatch.
func (p *PostgresStorage) GetForcedBatch(ctx context.Context, forcedBatchNumber uint64, dbTx pgx.Tx) (*ForcedBatch, error) {
var (
forcedBatch ForcedBatch
globalExitRoot string
rawTxs string
seq string
)
const getForcedBatchSQL = "SELECT forced_batch_num, global_exit_root, timestamp, raw_txs_data, coinbase, block_num FROM state.forced_batch WHERE forced_batch_num = $1"
e := p.getExecQuerier(dbTx)
err := e.QueryRow(ctx, getForcedBatchSQL, forcedBatchNumber).Scan(&forcedBatch.ForcedBatchNumber, &globalExitRoot, &forcedBatch.ForcedAt, &rawTxs, &seq, &forcedBatch.BlockNumber)
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrNotFound
} else if err != nil {
return nil, err
}
forcedBatch.RawTxsData, err = hex.DecodeString(rawTxs)
if err != nil {
return nil, err
}
forcedBatch.Sequencer = common.HexToAddress(seq)
forcedBatch.GlobalExitRoot = common.HexToHash(globalExitRoot)
return &forcedBatch, nil
}
// GetForcedBatchesSince gets L1 forced batches since forcedBatchNumber
func (p *PostgresStorage) GetForcedBatchesSince(ctx context.Context, forcedBatchNumber, maxBlockNumber uint64, dbTx pgx.Tx) ([]*ForcedBatch, error) {
const getForcedBatchesSQL = "SELECT forced_batch_num, global_exit_root, timestamp, raw_txs_data, coinbase, block_num FROM state.forced_batch WHERE forced_batch_num > $1 AND block_num <= $2 ORDER BY forced_batch_num ASC"
q := p.getExecQuerier(dbTx)
rows, err := q.Query(ctx, getForcedBatchesSQL, forcedBatchNumber, maxBlockNumber)
if errors.Is(err, pgx.ErrNoRows) {
return []*ForcedBatch{}, nil
} else if err != nil {
return nil, err
}
defer rows.Close()
forcesBatches := make([]*ForcedBatch, 0, len(rows.RawValues()))
for rows.Next() {
forcedBatch, err := scanForcedBatch(rows)
if err != nil {
return nil, err
}
forcesBatches = append(forcesBatches, &forcedBatch)
}
return forcesBatches, nil
}
// AddVerifiedBatch adds a new VerifiedBatch to the db
func (p *PostgresStorage) AddVerifiedBatch(ctx context.Context, verifiedBatch *VerifiedBatch, dbTx pgx.Tx) error {
e := p.getExecQuerier(dbTx)
const addVerifiedBatchSQL = "INSERT INTO state.verified_batch (block_num, batch_num, tx_hash, aggregator, state_root, is_trusted) VALUES ($1, $2, $3, $4, $5, $6)"
_, err := e.Exec(ctx, addVerifiedBatchSQL, verifiedBatch.BlockNumber, verifiedBatch.BatchNumber, verifiedBatch.TxHash.String(), verifiedBatch.Aggregator.String(), verifiedBatch.StateRoot.String(), verifiedBatch.IsTrusted)
return err
}
// GetVerifiedBatch get an L1 verifiedBatch.
func (p *PostgresStorage) GetVerifiedBatch(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*VerifiedBatch, error) {
var (
verifiedBatch VerifiedBatch
txHash string
agg string
sr string
)
const getVerifiedBatchSQL = `
SELECT block_num, batch_num, tx_hash, aggregator, state_root, is_trusted
FROM state.verified_batch
WHERE batch_num = $1`
e := p.getExecQuerier(dbTx)
err := e.QueryRow(ctx, getVerifiedBatchSQL, batchNumber).Scan(&verifiedBatch.BlockNumber, &verifiedBatch.BatchNumber, &txHash, &agg, &sr, &verifiedBatch.IsTrusted)
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrNotFound
} else if err != nil {
return nil, err
}
verifiedBatch.Aggregator = common.HexToAddress(agg)
verifiedBatch.TxHash = common.HexToHash(txHash)
verifiedBatch.StateRoot = common.HexToHash(sr)
return &verifiedBatch, nil
}
// GetLastNBatches returns the last numBatches batches.
func (p *PostgresStorage) GetLastNBatches(ctx context.Context, numBatches uint, dbTx pgx.Tx) ([]*Batch, error) {
const getLastNBatchesSQL = "SELECT batch_num, global_exit_root, local_exit_root, acc_input_hash, state_root, timestamp, coinbase, raw_txs_data, forced_batch_num from state.batch ORDER BY batch_num DESC LIMIT $1"
e := p.getExecQuerier(dbTx)
rows, err := e.Query(ctx, getLastNBatchesSQL, numBatches)
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrStateNotSynchronized
} else if err != nil {
return nil, err
}
defer rows.Close()
batches := make([]*Batch, 0, len(rows.RawValues()))
for rows.Next() {
batch, err := scanBatch(rows)
if err != nil {
return nil, err
}
batches = append(batches, &batch)
}
return batches, nil
}
// GetLastNBatchesByL2BlockNumber returns the last numBatches batches along with the l2 block state root by l2BlockNumber
// if the l2BlockNumber parameter is nil, it means we want to get the most recent last N batches
func (p *PostgresStorage) GetLastNBatchesByL2BlockNumber(ctx context.Context, l2BlockNumber *uint64, numBatches uint, dbTx pgx.Tx) ([]*Batch, common.Hash, error) {
const getLastNBatchesByBlockNumberSQL = `
SELECT b.batch_num,
b.global_exit_root,
b.local_exit_root,
b.acc_input_hash,
b.state_root,
b.timestamp,
b.coinbase,
b.raw_txs_data,
/* gets the state root of the l2 block with the highest number associated to the batch in the row */
(SELECT l2b1.header->>'stateRoot'
FROM state.l2block l2b1
WHERE l2b1.block_num = (SELECT MAX(l2b2.block_num)
FROM state.l2block l2b2
WHERE l2b2.batch_num = b.batch_num)) as l2_block_state_root
FROM state.batch b
/* if there is a value for the parameter $1 (l2 block number), filter the batches with batch number
* smaller or equal than the batch associated to the l2 block number */
WHERE ($1::int8 IS NOT NULL AND b.batch_num <= (SELECT MAX(l2b.batch_num)
FROM state.l2block l2b
WHERE l2b.block_num = $1))
/* OR if $1 is null, this means we want to get the most updated information from state, so it considers all the batches.
* this is generally used by estimate gas, process unsigned transactions and it is required by claim transactions to add
* the open batch to the result and get the most updated globalExitRoot synced from L1 and stored in the current open batch when
* there was not transactions yet to create a l2 block with it */
OR $1 IS NULL
ORDER BY b.batch_num DESC
LIMIT $2;`
var l2BlockStateRoot *common.Hash
e := p.getExecQuerier(dbTx)
rows, err := e.Query(ctx, getLastNBatchesByBlockNumberSQL, l2BlockNumber, numBatches)
if errors.Is(err, pgx.ErrNoRows) {
return nil, common.Hash{}, ErrStateNotSynchronized
} else if err != nil {
return nil, common.Hash{}, err
}
defer rows.Close()
batches := make([]*Batch, 0, len(rows.RawValues()))
emptyHash := common.Hash{}
for rows.Next() {
batch, _l2BlockStateRoot, err := scanBatchWithL2BlockStateRoot(rows)
if err != nil {
return nil, common.Hash{}, err
}
batches = append(batches, &batch)
if l2BlockStateRoot == nil && _l2BlockStateRoot != nil {
l2BlockStateRoot = _l2BlockStateRoot
}
// if there is no corresponding l2_block, it will use the latest batch state_root
// it is related to https://github.com/0xPolygonHermez/zkevm-node/issues/1299
if l2BlockStateRoot == nil && batch.StateRoot != emptyHash {
l2BlockStateRoot = &batch.StateRoot
}
}
return batches, *l2BlockStateRoot, nil
}
// GetLastBatchNumber get last trusted batch number
func (p *PostgresStorage) GetLastBatchNumber(ctx context.Context, dbTx pgx.Tx) (uint64, error) {
var batchNumber uint64
q := p.getExecQuerier(dbTx)
err := q.QueryRow(ctx, getLastBatchNumberSQL).Scan(&batchNumber)
if errors.Is(err, pgx.ErrNoRows) {
return 0, ErrStateNotSynchronized
}
return batchNumber, err
}
// GetLastBatchTime gets last trusted batch time
func (p *PostgresStorage) GetLastBatchTime(ctx context.Context, dbTx pgx.Tx) (time.Time, error) {
var timestamp time.Time
const getLastBatchTimeSQL = "SELECT timestamp FROM state.batch ORDER BY batch_num DESC LIMIT 1"
e := p.getExecQuerier(dbTx)
err := e.QueryRow(ctx, getLastBatchTimeSQL).Scan(×tamp)
if errors.Is(err, pgx.ErrNoRows) {
return time.Time{}, ErrStateNotSynchronized
} else if err != nil {
return time.Time{}, err
}
return timestamp, nil
}
// GetLastVirtualBatchNum gets last virtual batch num
func (p *PostgresStorage) GetLastVirtualBatchNum(ctx context.Context, dbTx pgx.Tx) (uint64, error) {
var batchNum uint64
const getLastVirtualBatchNumSQL = "SELECT COALESCE(MAX(batch_num), 0) FROM state.virtual_batch"
e := p.getExecQuerier(dbTx)
err := e.QueryRow(ctx, getLastVirtualBatchNumSQL).Scan(&batchNum)
if errors.Is(err, pgx.ErrNoRows) {
return 0, ErrNotFound
} else if err != nil {
return 0, err
}
return batchNum, nil
}
// GetLatestVirtualBatchTimestamp gets last virtual batch timestamp
func (p *PostgresStorage) GetLatestVirtualBatchTimestamp(ctx context.Context, dbTx pgx.Tx) (time.Time, error) {
const getLastVirtualBatchTimestampSQL = `SELECT COALESCE(MAX(block.received_at), NOW()) FROM state.virtual_batch INNER JOIN state.block ON state.block.block_num = virtual_batch.block_num`
var timestamp time.Time
e := p.getExecQuerier(dbTx)
err := e.QueryRow(ctx, getLastVirtualBatchTimestampSQL).Scan(×tamp)
if errors.Is(err, pgx.ErrNoRows) {
return time.Unix(0, 0), ErrNotFound
} else if err != nil {
return time.Unix(0, 0), err
}
return timestamp, nil
}
// SetLastBatchInfoSeenOnEthereum sets the last batch number that affected
// the roll-up and the last batch number that was consolidated on ethereum
// in order to allow the components to know if the state is synchronized or not
func (p *PostgresStorage) SetLastBatchInfoSeenOnEthereum(ctx context.Context, lastBatchNumberSeen, lastBatchNumberVerified uint64, dbTx pgx.Tx) error {
const query = `
UPDATE state.sync_info
SET last_batch_num_seen = $1
, last_batch_num_consolidated = $2`
e := p.getExecQuerier(dbTx)
_, err := e.Exec(ctx, query, lastBatchNumberSeen, lastBatchNumberVerified)
return err
}
// SetInitSyncBatch sets the initial batch number where the synchronization started
func (p *PostgresStorage) SetInitSyncBatch(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error {
updateInitBatchSQL := "UPDATE state.sync_info SET init_sync_batch = $1"
e := p.getExecQuerier(dbTx)
_, err := e.Exec(ctx, updateInitBatchSQL, batchNumber)
return err
}
// GetBatchByNumber returns the batch with the given number.
func (p *PostgresStorage) GetBatchByNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*Batch, error) {
const getBatchByNumberSQL = `
SELECT batch_num, global_exit_root, local_exit_root, acc_input_hash, state_root, timestamp, coinbase, raw_txs_data, forced_batch_num
FROM state.batch
WHERE batch_num = $1`
e := p.getExecQuerier(dbTx)
row := e.QueryRow(ctx, getBatchByNumberSQL, batchNumber)
batch, err := scanBatch(row)
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrStateNotSynchronized
} else if err != nil {
return nil, err
}
return &batch, nil
}
// GetBatchByTxHash returns the batch including the given tx
func (p *PostgresStorage) GetBatchByTxHash(ctx context.Context, transactionHash common.Hash, dbTx pgx.Tx) (*Batch, error) {
const getBatchByTxHashSQL = `
SELECT b.batch_num, b.global_exit_root, b.local_exit_root, b.acc_input_hash, b.state_root, b.timestamp, b.coinbase, b.raw_txs_data, b.forced_batch_num
FROM state.transaction t, state.batch b, state.l2block l
WHERE t.hash = $1 AND l.block_num = t.l2_block_num AND b.batch_num = l.batch_num`
e := p.getExecQuerier(dbTx)
row := e.QueryRow(ctx, getBatchByTxHashSQL, transactionHash.String())
batch, err := scanBatch(row)
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrStateNotSynchronized
} else if err != nil {
return nil, err
}
return &batch, nil
}
// GetBatchByL2BlockNumber returns the batch related to the l2 block accordingly to the provided l2 block number.
func (p *PostgresStorage) GetBatchByL2BlockNumber(ctx context.Context, l2BlockNumber uint64, dbTx pgx.Tx) (*Batch, error) {
const getBatchByL2BlockNumberSQL = `
SELECT bt.batch_num, bt.global_exit_root, bt.local_exit_root, bt.acc_input_hash, bt.state_root, bt.timestamp, bt.coinbase, bt.raw_txs_data, bt.forced_batch_num
FROM state.batch bt
INNER JOIN state.l2block bl
ON bt.batch_num = bl.batch_num
WHERE bl.block_num = $1
LIMIT 1;`
e := p.getExecQuerier(dbTx)
row := e.QueryRow(ctx, getBatchByL2BlockNumberSQL, l2BlockNumber)
batch, err := scanBatch(row)
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrStateNotSynchronized
} else if err != nil {
return nil, err
}
return &batch, nil
}
// GetVirtualBatchByNumber gets batch from batch table that exists on virtual batch
func (p *PostgresStorage) GetVirtualBatchByNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*Batch, error) {
const query = `
SELECT
batch_num,
global_exit_root,
local_exit_root,
acc_input_hash,
state_root,
timestamp,
coinbase,
raw_txs_data,
forced_batch_num
FROM
state.batch
WHERE
batch_num = $1 AND
EXISTS (SELECT batch_num FROM state.virtual_batch WHERE batch_num = $1)
`
e := p.getExecQuerier(dbTx)
row := e.QueryRow(ctx, query, batchNumber)
batch, err := scanBatch(row)
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrNotFound
} else if err != nil {
return nil, err
}
return &batch, nil
}
// IsBatchVirtualized checks if batch is virtualized
func (p *PostgresStorage) IsBatchVirtualized(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (bool, error) {
const query = `SELECT EXISTS (SELECT 1 FROM state.virtual_batch WHERE batch_num = $1)`
e := p.getExecQuerier(dbTx)
var exists bool
err := e.QueryRow(ctx, query, batchNumber).Scan(&exists)
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
return exists, err
}
return exists, nil
}
// IsBatchConsolidated checks if batch is consolidated/verified.
func (p *PostgresStorage) IsBatchConsolidated(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (bool, error) {
const query = `SELECT EXISTS (SELECT 1 FROM state.verified_batch WHERE batch_num = $1)`
e := p.getExecQuerier(dbTx)
var exists bool
err := e.QueryRow(ctx, query, batchNumber).Scan(&exists)
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
return exists, err
}
return exists, nil
}
// IsSequencingTXSynced checks if sequencing tx has been synced into the state
func (p *PostgresStorage) IsSequencingTXSynced(ctx context.Context, transactionHash common.Hash, dbTx pgx.Tx) (bool, error) {
const query = `SELECT EXISTS (SELECT 1 FROM state.virtual_batch WHERE tx_hash = $1)`
e := p.getExecQuerier(dbTx)
var exists bool
err := e.QueryRow(ctx, query, transactionHash.String()).Scan(&exists)
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
return exists, err
}
return exists, nil
}
// GetProcessingContext returns the processing context for the given batch.
func (p *PostgresStorage) GetProcessingContext(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*ProcessingContext, error) {
const getProcessingContextSQL = "SELECT batch_num, global_exit_root, timestamp, coinbase, forced_batch_num from state.batch WHERE batch_num = $1"
e := p.getExecQuerier(dbTx)
row := e.QueryRow(ctx, getProcessingContextSQL, batchNumber)
processingContext := ProcessingContext{}
var (
gerStr string
coinbaseStr string
)
if err := row.Scan(
&processingContext.BatchNumber,
&gerStr,
&processingContext.Timestamp,
&coinbaseStr,
&processingContext.ForcedBatchNum,
); errors.Is(err, pgx.ErrNoRows) {
return nil, ErrStateNotSynchronized
} else if err != nil {
return nil, err
}
processingContext.GlobalExitRoot = common.HexToHash(gerStr)
processingContext.Coinbase = common.HexToAddress(coinbaseStr)
return &processingContext, nil
}
func scanBatch(row pgx.Row) (Batch, error) {
batch := Batch{}
var (
gerStr string
lerStr *string
aihStr *string
stateStr *string
coinbaseStr string
)
if err := row.Scan(
&batch.BatchNumber,
&gerStr,
&lerStr,
&aihStr,
&stateStr,
&batch.Timestamp,
&coinbaseStr,
&batch.BatchL2Data,
&batch.ForcedBatchNum,
); err != nil {
return batch, err
}
batch.GlobalExitRoot = common.HexToHash(gerStr)
if lerStr != nil {
batch.LocalExitRoot = common.HexToHash(*lerStr)
}
if stateStr != nil {
batch.StateRoot = common.HexToHash(*stateStr)
}
if aihStr != nil {
batch.AccInputHash = common.HexToHash(*aihStr)
}
batch.Coinbase = common.HexToAddress(coinbaseStr)
return batch, nil
}
func scanBatchWithL2BlockStateRoot(row pgx.Row) (Batch, *common.Hash, error) {
batch := Batch{}
var (
gerStr string
lerStr *string
aihStr *string
stateStr *string
coinbaseStr string
l2BlockStateRootStr *string
)
if err := row.Scan(
&batch.BatchNumber,
&gerStr,
&lerStr,
&aihStr,
&stateStr,
&batch.Timestamp,
&coinbaseStr,
&batch.BatchL2Data,
&l2BlockStateRootStr,
); err != nil {
return batch, nil, err
}
batch.GlobalExitRoot = common.HexToHash(gerStr)
if lerStr != nil {
batch.LocalExitRoot = common.HexToHash(*lerStr)
}
if stateStr != nil {
batch.StateRoot = common.HexToHash(*stateStr)
}
if stateStr != nil {
batch.AccInputHash = common.HexToHash(*aihStr)
}
var l2BlockStateRoot *common.Hash
if l2BlockStateRootStr != nil {
h := common.HexToHash(*l2BlockStateRootStr)
l2BlockStateRoot = &h
}
batch.Coinbase = common.HexToAddress(coinbaseStr)
return batch, l2BlockStateRoot, nil
}
func scanForcedBatch(row pgx.Row) (ForcedBatch, error) {
forcedBatch := ForcedBatch{}
var (
gerStr string
coinbaseStr string
)
if err := row.Scan(
&forcedBatch.ForcedBatchNumber,
&gerStr,
&forcedBatch.ForcedAt,
&forcedBatch.RawTxsData,
&coinbaseStr,
&forcedBatch.BlockNumber,
); err != nil {
return forcedBatch, err
}
forcedBatch.GlobalExitRoot = common.HexToHash(gerStr)
forcedBatch.Sequencer = common.HexToAddress(coinbaseStr)
return forcedBatch, nil
}
// GetEncodedTransactionsByBatchNumber returns the encoded field of all
// transactions in the given batch.
func (p *PostgresStorage) GetEncodedTransactionsByBatchNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (encoded []string, err error) {
const getEncodedTransactionsByBatchNumberSQL = "SELECT encoded FROM state.transaction t INNER JOIN state.l2block b ON t.l2_block_num = b.block_num WHERE b.batch_num = $1 ORDER BY l2_block_num ASC"
e := p.getExecQuerier(dbTx)
rows, err := e.Query(ctx, getEncodedTransactionsByBatchNumberSQL, batchNumber)
if !errors.Is(err, pgx.ErrNoRows) && err != nil {
return nil, err
}
defer rows.Close()
txs := make([]string, 0, len(rows.RawValues()))
for rows.Next() {
var encoded string
err := rows.Scan(&encoded)
if err != nil {
return nil, err
}
txs = append(txs, encoded)
}
return txs, nil
}
// GetTransactionsByBatchNumber returns the transactions in the given batch.
func (p *PostgresStorage) GetTransactionsByBatchNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (txs []types.Transaction, err error) {
encodedTxs, err := p.GetEncodedTransactionsByBatchNumber(ctx, batchNumber, dbTx)
if err != nil {
return nil, err
}
for i := 0; i < len(encodedTxs); i++ {
tx, err := DecodeTx(encodedTxs[i])
if err != nil {
return nil, err
}
txs = append(txs, *tx)
}
return
}
// GetTxsHashesByBatchNumber returns the hashes of the transactions in the
// given batch.
func (p *PostgresStorage) GetTxsHashesByBatchNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (encoded []common.Hash, err error) {
const getTransactionHashesByBatchNumberSQL = "SELECT hash FROM state.transaction t INNER JOIN state.l2block b ON t.l2_block_num = b.block_num WHERE b.batch_num = $1 ORDER BY l2_block_num ASC"
e := p.getExecQuerier(dbTx)
rows, err := e.Query(ctx, getTransactionHashesByBatchNumberSQL, batchNumber)
if !errors.Is(err, pgx.ErrNoRows) && err != nil {
return nil, err
}
defer rows.Close()
txs := make([]common.Hash, 0, len(rows.RawValues()))
for rows.Next() {
var hexHash string
err := rows.Scan(&hexHash)
if err != nil {
return nil, err
}
txs = append(txs, common.HexToHash(hexHash))
}
return txs, nil
}
// AddVirtualBatch adds a new virtual batch to the storage.
func (p *PostgresStorage) AddVirtualBatch(ctx context.Context, virtualBatch *VirtualBatch, dbTx pgx.Tx) error {
const addVirtualBatchSQL = "INSERT INTO state.virtual_batch (batch_num, tx_hash, coinbase, block_num, sequencer_addr) VALUES ($1, $2, $3, $4, $5)"
e := p.getExecQuerier(dbTx)
_, err := e.Exec(ctx, addVirtualBatchSQL, virtualBatch.BatchNumber, virtualBatch.TxHash.String(), virtualBatch.Coinbase.String(), virtualBatch.BlockNumber, virtualBatch.SequencerAddr.String())
return err
}
// GetVirtualBatch get an L1 virtualBatch.
func (p *PostgresStorage) GetVirtualBatch(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*VirtualBatch, error) {
var (
virtualBatch VirtualBatch
txHash string
coinbase string
sequencerAddr string
)
const getVirtualBatchSQL = `
SELECT block_num, batch_num, tx_hash, coinbase, sequencer_addr
FROM state.virtual_batch
WHERE batch_num = $1`
e := p.getExecQuerier(dbTx)
err := e.QueryRow(ctx, getVirtualBatchSQL, batchNumber).Scan(&virtualBatch.BlockNumber, &virtualBatch.BatchNumber, &txHash, &coinbase, &sequencerAddr)
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrNotFound
} else if err != nil {
return nil, err
}
virtualBatch.Coinbase = common.HexToAddress(coinbase)
virtualBatch.SequencerAddr = common.HexToAddress(sequencerAddr)
virtualBatch.TxHash = common.HexToHash(txHash)
return &virtualBatch, nil
}
func (p *PostgresStorage) storeGenesisBatch(ctx context.Context, batch Batch, dbTx pgx.Tx) error {
const addGenesisBatchSQL = "INSERT INTO state.batch (batch_num, global_exit_root, local_exit_root, acc_input_hash, state_root, timestamp, coinbase, raw_txs_data, forced_batch_num) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)"
if batch.BatchNumber != 0 {
return fmt.Errorf("%w. Got %d, should be 0", ErrUnexpectedBatch, batch.BatchNumber)
}
e := p.getExecQuerier(dbTx)
_, err := e.Exec(
ctx,
addGenesisBatchSQL,
batch.BatchNumber,
batch.GlobalExitRoot.String(),
batch.LocalExitRoot.String(),
batch.AccInputHash.String(),
batch.StateRoot.String(),
batch.Timestamp.UTC(),
batch.Coinbase.String(),
batch.BatchL2Data,
batch.ForcedBatchNum,
)
return err
}
// openBatch adds a new batch into the state, with the necessary data to start processing transactions within it.
// It's meant to be used by sequencers, since they don't necessarily know what transactions are going to be added
// in this batch yet. In other words it's the creation of a WIP batch.
// Note that this will add a batch with batch number N + 1, where N it's the greatest batch number on the state.
func (p *PostgresStorage) openBatch(ctx context.Context, batchContext ProcessingContext, dbTx pgx.Tx) error {
const openBatchSQL = "INSERT INTO state.batch (batch_num, global_exit_root, timestamp, coinbase, forced_batch_num) VALUES ($1, $2, $3, $4, $5)"
e := p.getExecQuerier(dbTx)
_, err := e.Exec(
ctx, openBatchSQL,
batchContext.BatchNumber,
batchContext.GlobalExitRoot.String(),
batchContext.Timestamp.UTC(),
batchContext.Coinbase.String(),
batchContext.ForcedBatchNum,
)
return err
}
func (p *PostgresStorage) closeBatch(ctx context.Context, receipt ProcessingReceipt, dbTx pgx.Tx) error {
const closeBatchSQL = `UPDATE state.batch
SET state_root = $1, local_exit_root = $2, acc_input_hash = $3, raw_txs_data = $4, batch_resources = $5, closing_reason = $6
WHERE batch_num = $7`
e := p.getExecQuerier(dbTx)
batchResourcesJsonBytes, err := json.Marshal(receipt.BatchResources)
if err != nil {
return err
}
_, err = e.Exec(ctx, closeBatchSQL, receipt.StateRoot.String(), receipt.LocalExitRoot.String(),
receipt.AccInputHash.String(), receipt.BatchL2Data, string(batchResourcesJsonBytes), receipt.ClosingReason, receipt.BatchNumber)
return err
}
// UpdateGERInOpenBatch update ger in open batch
func (p *PostgresStorage) UpdateGERInOpenBatch(ctx context.Context, ger common.Hash, dbTx pgx.Tx) error {
if dbTx == nil {
return ErrDBTxNil
}
var (
batchNumber uint64
isBatchHasTxs bool
)
e := p.getExecQuerier(dbTx)
err := e.QueryRow(ctx, getLastBatchNumberSQL).Scan(&batchNumber)
if errors.Is(err, pgx.ErrNoRows) {
return ErrStateNotSynchronized