Skip to content

Commit

Permalink
Populate stream file on sequencer startup (0xPolygonHermez#2613) (0xP…
Browse files Browse the repository at this point in the history
…olygonHermez#2614)

* populates stream file on startup

* fix test
  • Loading branch information
ToniRamirezM authored Oct 5, 2023
1 parent 4dfabaf commit ac53abd
Show file tree
Hide file tree
Showing 20 changed files with 1,177 additions and 124 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
/test/contracts/bin/**/*.bin
/test/contracts/bin/**/*.abi

/tools/datastreamer/*.bin

**/.DS_Store
.vscode
.idea/
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/0xPolygonHermez/zkevm-node
go 1.19

require (
github.com/0xPolygonHermez/zkevm-data-streamer v0.0.6
github.com/0xPolygonHermez/zkevm-data-streamer v0.0.8
github.com/didip/tollbooth/v6 v6.1.2
github.com/dop251/goja v0.0.0-20230806174421-c933cf95e127
github.com/ethereum/go-ethereum v1.13.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3f
dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk=
dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/0xPolygonHermez/zkevm-data-streamer v0.0.6 h1:jdK1BMWftw0vHFlxFc+pi202rH+3av1x/bQ20NXFINk=
github.com/0xPolygonHermez/zkevm-data-streamer v0.0.6/go.mod h1:PNLzimb4objx43SXmVlzEjp2lUFxeeq+WpQumm1PUY0=
github.com/0xPolygonHermez/zkevm-data-streamer v0.0.8 h1:hOByFEvUC8hJnfbINMFzXxBru07AQLEhN50afow6Eu8=
github.com/0xPolygonHermez/zkevm-data-streamer v0.0.8/go.mod h1:UqLxA+/R20fm63Mp+J7wYMfh6WoE+6vBj6rOmFGuRm4=
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
Expand Down
63 changes: 0 additions & 63 deletions sequencer/datastream.go

This file was deleted.

36 changes: 0 additions & 36 deletions sequencer/datastream_test.go

This file was deleted.

50 changes: 37 additions & 13 deletions sequencer/dbmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type dbManager struct {
batchConstraints state.BatchConstraintsCfg
numberOfReorgs uint64
streamServer *datastreamer.StreamServer
dataToStream chan DSL2FullBlock
dataToStream chan state.DSL2FullBlock
}

func (d *dbManager) GetBatchByNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*state.Batch, error) {
Expand All @@ -48,16 +48,16 @@ type ClosingBatchParameters struct {
EffectivePercentages []uint8
}

func newDBManager(ctx context.Context, config DBManagerCfg, txPool txPool, state stateInterface, worker *Worker, closingSignalCh ClosingSignalCh, batchConstraints state.BatchConstraintsCfg) *dbManager {
numberOfReorgs, err := state.CountReorgs(ctx, nil)
func newDBManager(ctx context.Context, config DBManagerCfg, txPool txPool, stateInterface stateInterface, worker *Worker, closingSignalCh ClosingSignalCh, batchConstraints state.BatchConstraintsCfg) *dbManager {
numberOfReorgs, err := stateInterface.CountReorgs(ctx, nil)
if err != nil {
log.Error("failed to get number of reorgs: %v", err)
}

return &dbManager{ctx: ctx, cfg: config, txPool: txPool,
state: state, worker: worker, l2ReorgCh: closingSignalCh.L2ReorgCh,
state: stateInterface, worker: worker, l2ReorgCh: closingSignalCh.L2ReorgCh,
batchConstraints: batchConstraints, numberOfReorgs: numberOfReorgs,
dataToStream: make(chan DSL2FullBlock, batchConstraints.MaxTxsPerBatch*datastreamChannelMultiplier)}
dataToStream: make(chan state.DSL2FullBlock, batchConstraints.MaxTxsPerBatch*datastreamChannelMultiplier)}
}

// Start stars the dbManager routines
Expand Down Expand Up @@ -174,20 +174,40 @@ func (d *dbManager) sendDataToStreamer() {
continue
}

_, err = d.streamServer.AddStreamEntry(EntryTypeL2Block, l2Block.Encode())
blockStart := state.DSL2BlockStart{
BatchNumber: l2Block.BatchNumber,
L2BlockNumber: l2Block.L2BlockNumber,
Timestamp: l2Block.Timestamp,
GlobalExitRoot: l2Block.GlobalExitRoot,
Coinbase: l2Block.Coinbase,
ForkID: l2Block.ForkID,
}

_, err = d.streamServer.AddStreamEntry(state.EntryTypeL2BlockStart, blockStart.Encode())
if err != nil {
log.Errorf("failed to add stream entry for l2block %v: %v", l2Block.L2BlockNumber, err)
continue
}

for _, l2Transaction := range l2Transactions {
_, err = d.streamServer.AddStreamEntry(EntryTypeL2Tx, l2Transaction.Encode())
_, err = d.streamServer.AddStreamEntry(state.EntryTypeL2Tx, l2Transaction.Encode())
if err != nil {
log.Errorf("failed to add l2tx stream entry for l2block %v: %v", l2Block.L2BlockNumber, err)
continue
}
}

blockEnd := state.DSL2BlockEnd{
L2BlockNumber: l2Block.L2BlockNumber,
BlockHash: l2Block.BlockHash,
StateRoot: l2Block.StateRoot,
}

_, err = d.streamServer.AddStreamEntry(state.EntryTypeL2BlockEnd, blockEnd.Encode())
if err != nil {
log.Fatal(err)
}

err = d.streamServer.CommitAtomicOp()
if err != nil {
log.Errorf("failed to commit atomic op for l2block %v: %v ", l2Block.L2BlockNumber, err)
Expand Down Expand Up @@ -286,25 +306,29 @@ func (d *dbManager) StoreProcessedTxAndDeleteFromPool(ctx context.Context, tx tr

// Send data to streamer
if d.streamServer != nil {
l2Block := DSL2Block{
forkID := d.state.GetForkIDByBatchNumber(tx.batchNumber)

l2Block := state.DSL2Block{
BatchNumber: tx.batchNumber,
L2BlockNumber: l2BlockHeader.Number.Uint64(),
Timestamp: uint64(tx.timestamp.Unix()),
Timestamp: tx.timestamp.Unix(),
GlobalExitRoot: batch.GlobalExitRoot,
Coinbase: tx.coinbase,
ForkID: uint16(forkID),
BlockHash: l2BlockHeader.Hash(),
StateRoot: l2BlockHeader.Root,
}

l2Transaction := DSL2Transaction{
BatchNumber: batch.BatchNumber,
l2Transaction := state.DSL2Transaction{
EffectiveGasPricePercentage: uint8(tx.response.EffectivePercentage),
IsValid: 1,
EncodedLength: uint32(len(txData)),
Encoded: txData,
}

d.dataToStream <- DSL2FullBlock{
d.dataToStream <- state.DSL2FullBlock{
L2Block: l2Block,
Txs: []DSL2Transaction{l2Transaction},
Txs: []state.DSL2Transaction{l2Transaction},
}
}

Expand Down
3 changes: 3 additions & 0 deletions sequencer/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ type stateInterface interface {
FlushMerkleTree(ctx context.Context) error
GetStoredFlushID(ctx context.Context) (uint64, string, error)
GetForkIDByBatchNumber(batchNumber uint64) uint64
GetDSGenesisBlock(ctx context.Context, dbTx pgx.Tx) (*state.DSL2Block, error)
GetDSL2Blocks(ctx context.Context, limit, offset uint64, dbTx pgx.Tx) ([]*state.DSL2Block, error)
GetDSL2Transactions(ctx context.Context, minL2Block, maxL2Block uint64, dbTx pgx.Tx) ([]*state.DSL2Transaction, error)
}

type workerInterface interface {
Expand Down
78 changes: 78 additions & 0 deletions sequencer/mock_state.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit ac53abd

Please sign in to comment.