forked from 0xPolygonHermez/zkevm-node
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatastreamer.go
132 lines (105 loc) · 3.62 KB
/
datastreamer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package sequencer
import (
"context"
"fmt"
"github.com/0xPolygonHermez/zkevm-node/state"
"github.com/0xPolygonHermez/zkevm-node/state/datastream"
"github.com/ethereum/go-ethereum/common"
)
func (f *finalizer) DSSendL2Block(ctx context.Context, batchNumber uint64, blockResponse *state.ProcessBlockResponse, l1InfoTreeIndex uint32, minTimestamp uint64, blockHash common.Hash) error {
forkID := f.stateIntf.GetForkIDByBatchNumber(batchNumber)
// Send data to streamer
if f.streamServer != nil {
l2Block := state.DSL2Block{
BatchNumber: batchNumber,
L2BlockNumber: blockResponse.BlockNumber,
Timestamp: blockResponse.Timestamp,
MinTimestamp: minTimestamp,
L1InfoTreeIndex: l1InfoTreeIndex,
L1BlockHash: blockResponse.BlockHashL1,
GlobalExitRoot: blockResponse.GlobalExitRoot,
Coinbase: f.l2Coinbase,
ForkID: forkID,
BlockHash: blockHash,
StateRoot: blockResponse.BlockHash, //From etrog, the blockhash is the block root
BlockInfoRoot: blockResponse.BlockInfoRoot,
}
if l2Block.ForkID >= state.FORKID_ETROG && l2Block.L1InfoTreeIndex == 0 {
l2Block.MinTimestamp = 0
}
l2Transactions := []state.DSL2Transaction{}
for i, txResponse := range blockResponse.TransactionResponses {
binaryTxData, err := txResponse.Tx.MarshalBinary()
if err != nil {
return err
}
l2Transaction := state.DSL2Transaction{
L2BlockNumber: blockResponse.BlockNumber,
EffectiveGasPricePercentage: uint8(txResponse.EffectivePercentage),
Index: uint64(i),
IsValid: 1,
EncodedLength: uint32(len(binaryTxData)),
Encoded: binaryTxData,
StateRoot: txResponse.StateRoot,
}
if txResponse.Logs != nil && len(txResponse.Logs) > 0 {
l2Transaction.Index = uint64(txResponse.Logs[0].TxIndex)
}
l2Transactions = append(l2Transactions, l2Transaction)
}
f.checkDSBufferIsFull(ctx)
f.dataToStream <- state.DSL2FullBlock{
DSL2Block: l2Block,
Txs: l2Transactions,
}
f.dataToStreamCount.Add(1)
}
return nil
}
func (f *finalizer) DSSendBatchBookmark(ctx context.Context, batchNumber uint64) {
// Check if stream server enabled
if f.streamServer != nil {
f.checkDSBufferIsFull(ctx)
// Send batch bookmark to the streamer
f.dataToStream <- datastream.BookMark{
Type: datastream.BookmarkType_BOOKMARK_TYPE_BATCH,
Value: batchNumber,
}
f.dataToStreamCount.Add(1)
}
}
func (f *finalizer) checkDSBufferIsFull(ctx context.Context) {
if f.dataToStreamCount.Load() == datastreamChannelBufferSize {
f.Halt(ctx, fmt.Errorf("datastream channel buffer full"), true)
}
}
func (f *finalizer) DSSendBatchStart(ctx context.Context, batchNumber uint64, isForced bool) {
forkID := f.stateIntf.GetForkIDByBatchNumber(batchNumber)
batchStart := datastream.BatchStart{
Number: batchNumber,
ForkId: forkID,
}
if isForced {
batchStart.Type = datastream.BatchType_BATCH_TYPE_FORCED
} else {
batchStart.Type = datastream.BatchType_BATCH_TYPE_REGULAR
}
if f.streamServer != nil {
f.checkDSBufferIsFull(ctx)
// Send batch start to the streamer
f.dataToStream <- batchStart
f.dataToStreamCount.Add(1)
}
}
func (f *finalizer) DSSendBatchEnd(ctx context.Context, batchNumber uint64, stateRoot common.Hash, localExitRoot common.Hash) {
if f.streamServer != nil {
f.checkDSBufferIsFull(ctx)
// Send batch end to the streamer
f.dataToStream <- datastream.BatchEnd{
Number: batchNumber,
StateRoot: stateRoot.Bytes(),
LocalExitRoot: localExitRoot.Bytes(),
}
f.dataToStreamCount.Add(1)
}
}