Skip to content

Commit

Permalink
Fix delta stream etrog (0xPolygonHermez#3022)
Browse files Browse the repository at this point in the history
* fix delta stream for etrog

* fix query for backwards compatibility

* fix query for backwards compatibility

* fix

* add labels to loops

* fix

* fix

* fix

* add sanity check when adding l2blocks

* fix

* fix

* fix

* fix
  • Loading branch information
ToniRamirezM authored Jan 5, 2024
1 parent c49ec70 commit 5ee8bda
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 63 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/0xPolygonHermez/zkevm-node
go 1.21

require (
github.com/0xPolygonHermez/zkevm-data-streamer v0.1.15
github.com/0xPolygonHermez/zkevm-data-streamer v0.1.17
github.com/didip/tollbooth/v6 v6.1.2
github.com/dop251/goja v0.0.0-20230806174421-c933cf95e127
github.com/ethereum/go-ethereum v1.13.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9
dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk=
dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/0xPolygonHermez/zkevm-data-streamer v0.1.15 h1:WmSnrlSHzlpkD33mRoUVlxScR6f+xig9pMKqYM8n+hQ=
github.com/0xPolygonHermez/zkevm-data-streamer v0.1.15/go.mod h1:VrOfhxA3y9XVpZh2jpBqwv7eGBKwxgKJ7jVTzFr6vYI=
github.com/0xPolygonHermez/zkevm-data-streamer v0.1.17 h1:pCA2k5ke1otBTNAyE8yiSlkDwpZxvJQH55Nf0GXWvfk=
github.com/0xPolygonHermez/zkevm-data-streamer v0.1.17/go.mod h1:0QkAXcFa92mFJrCbN3UPUJGJYes851yEgYHLONnaosE=
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8=
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8=
Expand Down
16 changes: 8 additions & 8 deletions sequencer/datastreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ func (f *finalizer) DSSendL2Block(batchNumber uint64, blockResponse *state.Proce
// Send data to streamer
if f.streamServer != nil {
l2Block := state.DSL2Block{
BatchNumber: batchNumber,
L2BlockNumber: blockResponse.BlockNumber,
Timestamp: int64(blockResponse.Timestamp),
GlobalExitRoot: blockResponse.BlockInfoRoot, //TODO: is it ok?
Coinbase: f.sequencerAddress,
ForkID: uint16(forkID),
BlockHash: blockResponse.BlockHash,
StateRoot: blockResponse.BlockHash, //TODO: in etrog the blockhash is the block root
BatchNumber: batchNumber,
L2BlockNumber: blockResponse.BlockNumber,
Timestamp: int64(blockResponse.Timestamp),
GERorInfoRoot: blockResponse.BlockInfoRoot,
Coinbase: f.sequencerAddress,
ForkID: uint16(forkID),
BlockHash: blockResponse.BlockHash,
StateRoot: blockResponse.BlockHash, //From etrog the blockhash is the block root
}

l2Transactions := []state.DSL2Transaction{}
Expand Down
12 changes: 6 additions & 6 deletions sequencer/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,12 @@ func (s *Sequencer) sendDataToStreamer() {
}

blockStart := state.DSL2BlockStart{
BatchNumber: l2Block.BatchNumber,
L2BlockNumber: l2Block.L2BlockNumber,
Timestamp: l2Block.Timestamp,
GlobalExitRoot: l2Block.GlobalExitRoot,
Coinbase: l2Block.Coinbase,
ForkID: l2Block.ForkID,
BatchNumber: l2Block.BatchNumber,
L2BlockNumber: l2Block.L2BlockNumber,
Timestamp: l2Block.Timestamp,
GERorInfoRoot: l2Block.GERorInfoRoot,
Coinbase: l2Block.Coinbase,
ForkID: l2Block.ForkID,
}

_, err = s.streamServer.AddStreamEntry(state.EntryTypeL2BlockStart, blockStart.Encode())
Expand Down
81 changes: 46 additions & 35 deletions state/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,24 @@ type DSL2FullBlock struct {

// DSL2Block is a full l2 block
type DSL2Block struct {
BatchNumber uint64 // 8 bytes
L2BlockNumber uint64 // 8 bytes
Timestamp int64 // 8 bytes
GlobalExitRoot common.Hash // 32 bytes
Coinbase common.Address // 20 bytes
ForkID uint16 // 2 bytes
BlockHash common.Hash // 32 bytes
StateRoot common.Hash // 32 bytes
BatchNumber uint64 // 8 bytes
L2BlockNumber uint64 // 8 bytes
Timestamp int64 // 8 bytes
GERorInfoRoot common.Hash // 32 bytes
Coinbase common.Address // 20 bytes
ForkID uint16 // 2 bytes
BlockHash common.Hash // 32 bytes
StateRoot common.Hash // 32 bytes
}

// DSL2BlockStart represents a data stream L2 block start
type DSL2BlockStart struct {
BatchNumber uint64 // 8 bytes
L2BlockNumber uint64 // 8 bytes
Timestamp int64 // 8 bytes
GlobalExitRoot common.Hash // 32 bytes
Coinbase common.Address // 20 bytes
ForkID uint16 // 2 bytes
BatchNumber uint64 // 8 bytes
L2BlockNumber uint64 // 8 bytes
Timestamp int64 // 8 bytes
GERorInfoRoot common.Hash // 32 bytes
Coinbase common.Address // 20 bytes
ForkID uint16 // 2 bytes
}

// Encode returns the encoded DSL2BlockStart as a byte slice
Expand All @@ -79,7 +79,7 @@ func (b DSL2BlockStart) Encode() []byte {
bytes = binary.LittleEndian.AppendUint64(bytes, b.BatchNumber)
bytes = binary.LittleEndian.AppendUint64(bytes, b.L2BlockNumber)
bytes = binary.LittleEndian.AppendUint64(bytes, uint64(b.Timestamp))
bytes = append(bytes, b.GlobalExitRoot.Bytes()...)
bytes = append(bytes, b.GERorInfoRoot.Bytes()...)
bytes = append(bytes, b.Coinbase.Bytes()...)
bytes = binary.LittleEndian.AppendUint16(bytes, b.ForkID)
return bytes
Expand All @@ -90,7 +90,7 @@ func (b DSL2BlockStart) Decode(data []byte) DSL2BlockStart {
b.BatchNumber = binary.LittleEndian.Uint64(data[0:8])
b.L2BlockNumber = binary.LittleEndian.Uint64(data[8:16])
b.Timestamp = int64(binary.LittleEndian.Uint64(data[16:24]))
b.GlobalExitRoot = common.BytesToHash(data[24:56])
b.GERorInfoRoot = common.BytesToHash(data[24:56])
b.Coinbase = common.BytesToAddress(data[56:76])
b.ForkID = binary.LittleEndian.Uint16(data[76:78])
return b
Expand Down Expand Up @@ -221,6 +221,7 @@ func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.St

var currentBatchNumber uint64 = 0
var currentL2Block uint64 = 0
var lastAddedL2Block uint64 = 0

if header.TotalEntries == 0 {
// Get Genesis block
Expand All @@ -245,12 +246,12 @@ func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.St
}

genesisBlock := DSL2BlockStart{
BatchNumber: genesisL2Block.BatchNumber,
L2BlockNumber: genesisL2Block.L2BlockNumber,
Timestamp: genesisL2Block.Timestamp,
GlobalExitRoot: genesisL2Block.GlobalExitRoot,
Coinbase: genesisL2Block.Coinbase,
ForkID: genesisL2Block.ForkID,
BatchNumber: genesisL2Block.BatchNumber,
L2BlockNumber: genesisL2Block.L2BlockNumber,
Timestamp: genesisL2Block.Timestamp,
GERorInfoRoot: genesisL2Block.GERorInfoRoot,
Coinbase: genesisL2Block.Coinbase,
ForkID: genesisL2Block.ForkID,
}

log.Infof("Genesis block: %+v", genesisBlock)
Expand Down Expand Up @@ -316,14 +317,13 @@ func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.St

// Start on the current batch number + 1
currentBatchNumber++

var err error

const limit = 10000

for err == nil {
log.Debugf("Current entry number: %d", entry)
log.Debugf("Current batch number: %d", currentBatchNumber)

// Get Next Batch
batches, err := stateDB.GetDSBatches(ctx, currentBatchNumber, currentBatchNumber+limit, readWIPBatch, nil)
if err != nil {
Expand Down Expand Up @@ -354,7 +354,7 @@ func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.St
}
}

// Gererate full batches
// Generate full batches
fullBatches := computeFullBatches(batches, l2Blocks, l2Txs)
currentBatchNumber += limit

Expand Down Expand Up @@ -398,13 +398,19 @@ func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.St
}

for _, l2block := range batch.L2Blocks {
if l2block.L2BlockNumber <= lastAddedL2Block && lastAddedL2Block != 0 {
continue
} else {
lastAddedL2Block = l2block.L2BlockNumber
}

blockStart := DSL2BlockStart{
BatchNumber: l2block.BatchNumber,
L2BlockNumber: l2block.L2BlockNumber,
Timestamp: l2block.Timestamp,
GlobalExitRoot: l2block.GlobalExitRoot,
Coinbase: l2block.Coinbase,
ForkID: l2block.ForkID,
BatchNumber: l2block.BatchNumber,
L2BlockNumber: l2block.L2BlockNumber,
Timestamp: l2block.Timestamp,
GERorInfoRoot: l2block.GERorInfoRoot,
Coinbase: l2block.Coinbase,
ForkID: l2block.ForkID,
}

bookMark := DSBookMark{
Expand Down Expand Up @@ -451,7 +457,7 @@ func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.St
if err != nil {
return err
}
currentGER = l2block.GlobalExitRoot
currentGER = l2block.GERorInfoRoot
}
// Commit at the end of each batch group
err = streamServer.CommitAtomicOp()
Expand Down Expand Up @@ -482,6 +488,7 @@ func GetSystemSCPosition(blockNumber uint64) []byte {

// computeFullBatches computes the full batches
func computeFullBatches(batches []*DSBatch, l2Blocks []*DSL2Block, l2Txs []*DSL2Transaction) []*DSFullBatch {
prevL2BlockNumber := uint64(0)
currentL2Block := 0
currentL2Tx := 0

Expand All @@ -494,6 +501,11 @@ func computeFullBatches(batches []*DSBatch, l2Blocks []*DSL2Block, l2Txs []*DSL2

for i := currentL2Block; i < len(l2Blocks); i++ {
l2Block := l2Blocks[i]

if prevL2BlockNumber != 0 && l2Block.L2BlockNumber <= prevL2BlockNumber {
continue
}

if l2Block.BatchNumber == batch.BatchNumber {
fullBlock := DSL2FullBlock{
DSL2Block: *l2Block,
Expand All @@ -511,10 +523,9 @@ func computeFullBatches(batches []*DSBatch, l2Blocks []*DSL2Block, l2Txs []*DSL2
}

fullBatch.L2Blocks = append(fullBatch.L2Blocks, fullBlock)
prevL2BlockNumber = l2Block.L2BlockNumber
currentL2Block++
}

if l2Block.BatchNumber > batch.BatchNumber {
} else if l2Block.BatchNumber > batch.BatchNumber {
break
}
}
Expand Down
12 changes: 9 additions & 3 deletions state/pgstatestorage/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

// GetDSGenesisBlock returns the genesis block
func (p *PostgresStorage) GetDSGenesisBlock(ctx context.Context, dbTx pgx.Tx) (*state.DSL2Block, error) {
const genesisL2BlockSQL = `SELECT 0 as batch_num, l2b.block_num, l2b.received_at, '0x0000000000000000000000000000000000000000' as global_exit_root, l2b.header->>'miner' AS coinbase, 0 as fork_id, l2b.block_hash, l2b.state_root
const genesisL2BlockSQL = `SELECT 0 as batch_num, l2b.block_num, l2b.received_at, '0x0000000000000000000000000000000000000000' as global_exit_root, l2b.header->>'miner' AS coinbase, 0 as fork_id, l2b.block_hash, l2b.state_root, '' AS info_root
FROM state.l2block l2b
WHERE l2b.block_num = 0`

Expand All @@ -29,7 +29,7 @@ func (p *PostgresStorage) GetDSGenesisBlock(ctx context.Context, dbTx pgx.Tx) (*

// GetDSL2Blocks returns the L2 blocks
func (p *PostgresStorage) GetDSL2Blocks(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, dbTx pgx.Tx) ([]*state.DSL2Block, error) {
const l2BlockSQL = `SELECT l2b.batch_num, l2b.block_num, l2b.received_at, b.global_exit_root, l2b.header->>'miner' AS coinbase, f.fork_id, l2b.block_hash, l2b.state_root
const l2BlockSQL = `SELECT l2b.batch_num, l2b.block_num, l2b.received_at, b.global_exit_root, l2b.header->>'miner' AS coinbase, f.fork_id, l2b.block_hash, l2b.state_root, coalesce(l2b.header->>'blockInfoRoot', '') AS info_root
FROM state.l2block l2b, state.batch b, state.fork_id f
WHERE l2b.batch_num BETWEEN $1 AND $2 AND l2b.batch_num = b.batch_num AND l2b.batch_num between f.from_batch_num AND f.to_batch_num
ORDER BY l2b.block_num ASC`
Expand Down Expand Up @@ -61,6 +61,7 @@ func scanL2Block(row pgx.Row) (*state.DSL2Block, error) {
timestamp time.Time
blockHashStr string
stateRootStr string
infoRootStr string
)
if err := row.Scan(
&l2Block.BatchNumber,
Expand All @@ -71,15 +72,20 @@ func scanL2Block(row pgx.Row) (*state.DSL2Block, error) {
&l2Block.ForkID,
&blockHashStr,
&stateRootStr,
&infoRootStr,
); err != nil {
return &l2Block, err
}
l2Block.GlobalExitRoot = common.HexToHash(gerStr)
l2Block.GERorInfoRoot = common.HexToHash(gerStr)
l2Block.Coinbase = common.HexToAddress(coinbaseStr)
l2Block.Timestamp = timestamp.Unix()
l2Block.BlockHash = common.HexToHash(blockHashStr)
l2Block.StateRoot = common.HexToHash(stateRootStr)

if infoRootStr != "" {
l2Block.GERorInfoRoot = common.HexToHash(infoRootStr)
}

return &l2Block, nil
}

Expand Down
12 changes: 6 additions & 6 deletions state/test/datastream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import (

func TestL2BlockStartEncode(t *testing.T) {
l2BlockStart := state.DSL2BlockStart{
BatchNumber: 1, // 8 bytes
L2BlockNumber: 2, // 8 bytes
Timestamp: 3, // 8 bytes
GlobalExitRoot: common.HexToHash("0x04"), // 32 bytes
Coinbase: common.HexToAddress("0x05"), // 20 bytes
ForkID: 5,
BatchNumber: 1, // 8 bytes
L2BlockNumber: 2, // 8 bytes
Timestamp: 3, // 8 bytes
GERorInfoRoot: common.HexToHash("0x04"), // 32 bytes
Coinbase: common.HexToAddress("0x05"), // 20 bytes
ForkID: 5,
}

encoded := l2BlockStart.Encode()
Expand Down
4 changes: 2 additions & 2 deletions tools/datastreamer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,8 +750,8 @@ func printEntry(entry datastreamer.FileEntry) {
printColored(color.FgHiWhite, fmt.Sprintf("%d\n", blockStart.L2BlockNumber))
printColored(color.FgGreen, "Timestamp.......: ")
printColored(color.FgHiWhite, fmt.Sprintf("%v (%d)\n", time.Unix(blockStart.Timestamp, 0), blockStart.Timestamp))
printColored(color.FgGreen, "Global Exit Root: ")
printColored(color.FgHiWhite, fmt.Sprintf("%s\n", blockStart.GlobalExitRoot))
printColored(color.FgGreen, "GER or Info Root: ")
printColored(color.FgHiWhite, fmt.Sprintf("%s\n", blockStart.GERorInfoRoot))
printColored(color.FgGreen, "Coinbase........: ")
printColored(color.FgHiWhite, fmt.Sprintf("%s\n", blockStart.Coinbase))
printColored(color.FgGreen, "Fork ID.........: ")
Expand Down

0 comments on commit 5ee8bda

Please sign in to comment.