Skip to content

Commit 6ce43bd

Browse files
committed
WIP: data stream refactoring to testable code; add data stream catch up test; pending asserts
1 parent 4792c13 commit 6ce43bd

19 files changed

+1548
-134
lines changed

cmd/rpcdaemon/cli/config_zkevm.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@ package cli
22

33
import (
44
"fmt"
5-
"github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer"
5+
6+
"github.com/ledgerwatch/erigon/zk/datastream/server"
67
"github.com/ledgerwatch/log/v3"
78
)
89

9-
func StartDataStream(server *datastreamer.StreamServer) error {
10+
func StartDataStream(server server.StreamServer) error {
1011
if server == nil {
1112
// no stream server to start, we might not have the right flags set to create one
1213
return nil

eth/backend.go

+24-11
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,10 @@ import (
129129
"github.com/ledgerwatch/erigon/turbo/stages/headerdownload"
130130
"github.com/ledgerwatch/erigon/zk/contracts"
131131
"github.com/ledgerwatch/erigon/zk/datastream/client"
132+
"github.com/ledgerwatch/erigon/zk/datastream/server"
132133
"github.com/ledgerwatch/erigon/zk/hermez_db"
133134
"github.com/ledgerwatch/erigon/zk/l1_cache"
135+
"github.com/ledgerwatch/erigon/zk/l1infotree"
134136
"github.com/ledgerwatch/erigon/zk/legacy_executor_verifier"
135137
zkStages "github.com/ledgerwatch/erigon/zk/stages"
136138
"github.com/ledgerwatch/erigon/zk/syncer"
@@ -139,9 +141,10 @@ import (
139141
"github.com/ledgerwatch/erigon/zk/utils"
140142
"github.com/ledgerwatch/erigon/zk/witness"
141143
"github.com/ledgerwatch/erigon/zkevm/etherman"
142-
"github.com/ledgerwatch/erigon/zk/l1infotree"
143144
)
144145

146+
var dataStreamServerFactory = server.NewZkEVMDataStreamServerFactory()
147+
145148
// Config contains the configuration options of the ETH protocol.
146149
// Deprecated: use ethconfig.Config instead.
147150
type Config = ethconfig.Config
@@ -219,7 +222,7 @@ type Ethereum struct {
219222
logger log.Logger
220223

221224
// zk
222-
dataStream *datastreamer.StreamServer
225+
streamServer server.StreamServer
223226
l1Syncer *syncer.L1Syncer
224227
etherManClients []*etherman.Client
225228
l1Cache *l1_cache.L1Cache
@@ -977,16 +980,17 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
977980
Level: "warn",
978981
Outputs: nil,
979982
}
983+
980984
// todo [zkevm] read the stream version from config and figure out what system id is used for
981-
backend.dataStream, err = datastreamer.NewServer(uint16(httpCfg.DataStreamPort), uint8(backend.config.DatastreamVersion), 1, datastreamer.StreamType(1), file, httpCfg.DataStreamWriteTimeout, httpCfg.DataStreamInactivityTimeout, httpCfg.DataStreamInactivityCheckInterval, logConfig)
985+
backend.streamServer, err = dataStreamServerFactory.CreateStreamServer(uint16(httpCfg.DataStreamPort), uint8(backend.config.DatastreamVersion), 1, datastreamer.StreamType(1), file, httpCfg.DataStreamWriteTimeout, httpCfg.DataStreamInactivityTimeout, httpCfg.DataStreamInactivityCheckInterval, logConfig)
982986
if err != nil {
983987
return nil, err
984988
}
985989

986990
// recovery here now, if the stream got into a bad state we want to be able to delete the file and have
987991
// the stream re-populated from scratch. So we check the stream for the latest header and if it is
988992
// 0 we can just set the datastream progress to 0 also which will force a re-population of the stream
989-
latestHeader := backend.dataStream.GetHeader()
993+
latestHeader := backend.streamServer.GetHeader()
990994
if latestHeader.TotalEntries == 0 {
991995
log.Info("[dataStream] setting the stream progress to 0")
992996
backend.preStartTasks.WarmUpDataStream = true
@@ -1100,6 +1104,11 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
11001104

11011105
l1InfoTreeUpdater := l1infotree.NewUpdater(cfg.Zk, l1InfoTreeSyncer)
11021106

1107+
var dataStreamServer server.DataStreamServer
1108+
if backend.streamServer != nil {
1109+
dataStreamServer = dataStreamServerFactory.CreateDataStreamServer(backend.streamServer, backend.chainConfig.ChainID.Uint64())
1110+
}
1111+
11031112
if isSequencer {
11041113
// if we are sequencing transactions, we do the sequencing loop...
11051114
witnessGenerator := witness.NewGenerator(
@@ -1129,10 +1138,9 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
11291138
verifier := legacy_executor_verifier.NewLegacyExecutorVerifier(
11301139
*cfg.Zk,
11311140
legacyExecutors,
1132-
backend.chainConfig,
11331141
backend.chainDB,
11341142
witnessGenerator,
1135-
backend.dataStream,
1143+
dataStreamServer,
11361144
)
11371145

11381146
if cfg.Zk.Limbo {
@@ -1167,7 +1175,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
11671175
backend.agg,
11681176
backend.forkValidator,
11691177
backend.engine,
1170-
backend.dataStream,
1178+
dataStreamServer,
11711179
backend.l1Syncer,
11721180
seqVerSyncer,
11731181
l1BlockSyncer,
@@ -1209,7 +1217,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
12091217
backend.engine,
12101218
backend.l1Syncer,
12111219
streamClient,
1212-
backend.dataStream,
1220+
dataStreamServer,
12131221
l1InfoTreeUpdater,
12141222
)
12151223

@@ -1330,7 +1338,11 @@ func (s *Ethereum) Init(stack *node.Node, config *ethconfig.Config, chainConfig
13301338
// apiList := jsonrpc.APIList(chainKv, borDb, ethRpcClient, txPoolRpcClient, miningRpcClient, ff, stateCache, blockReader, backend.agg, httpRpcCfg, backend.engine, config, backend.l1Syncer)
13311339
// authApiList := jsonrpc.AuthAPIList(chainKv, ethRpcClient, txPoolRpcClient, miningRpcClient, ff, stateCache, blockReader, backend.agg, httpRpcCfg, backend.engine, config)
13321340

1333-
s.apiList = jsonrpc.APIList(chainKv, ethRpcClient, txPoolRpcClient, s.txPool2, miningRpcClient, ff, stateCache, blockReader, s.agg, &httpRpcCfg, s.engine, config, s.l1Syncer, s.logger, s.dataStream)
1341+
var dataStreamServer server.DataStreamServer
1342+
if s.streamServer != nil {
1343+
dataStreamServer = dataStreamServerFactory.CreateDataStreamServer(s.streamServer, config.Zk.L2ChainId)
1344+
}
1345+
s.apiList = jsonrpc.APIList(chainKv, ethRpcClient, txPoolRpcClient, s.txPool2, miningRpcClient, ff, stateCache, blockReader, s.agg, &httpRpcCfg, s.engine, config, s.l1Syncer, s.logger, dataStreamServer)
13341346

13351347
if config.SilkwormRpcDaemon && httpRpcCfg.Enabled {
13361348
interface_log_settings := silkworm.RpcInterfaceLogSettings{
@@ -1368,7 +1380,7 @@ func (s *Ethereum) Init(stack *node.Node, config *ethconfig.Config, chainConfig
13681380
}
13691381

13701382
go func() {
1371-
if err := cli.StartDataStream(s.dataStream); err != nil {
1383+
if err := cli.StartDataStream(s.streamServer); err != nil {
13721384
log.Error(err.Error())
13731385
return
13741386
}
@@ -1391,8 +1403,9 @@ func (s *Ethereum) PreStart() error {
13911403
// we don't know when the server has actually started as it doesn't expose a signal that is has spun up
13921404
// so here we loop and take a brief pause waiting for it to be ready
13931405
attempts := 0
1406+
dataStreamServer := dataStreamServerFactory.CreateDataStreamServer(s.streamServer, s.chainConfig.ChainID.Uint64())
13941407
for {
1395-
_, err = zkStages.CatchupDatastream(s.sentryCtx, "stream-catchup", tx, s.dataStream, s.chainConfig.ChainID.Uint64())
1408+
_, err = zkStages.CatchupDatastream(s.sentryCtx, "stream-catchup", tx, dataStreamServer)
13961409
if err != nil {
13971410
if errors.Is(err, datastreamer.ErrAtomicOpNotAllowed) {
13981411
attempts++

turbo/jsonrpc/daemon.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,17 @@ import (
1717
"github.com/ledgerwatch/erigon/rpc"
1818
"github.com/ledgerwatch/erigon/turbo/rpchelper"
1919
"github.com/ledgerwatch/erigon/turbo/services"
20+
"github.com/ledgerwatch/erigon/zk/datastream/server"
2021
"github.com/ledgerwatch/erigon/zk/sequencer"
2122
"github.com/ledgerwatch/erigon/zk/syncer"
22-
2323
txpool2 "github.com/ledgerwatch/erigon/zk/txpool"
24-
"github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer"
2524
)
2625

2726
// APIList describes the list of available RPC apis
2827
func APIList(db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, rawPool *txpool2.TxPool, mining txpool.MiningClient,
2928
filters *rpchelper.Filters, stateCache kvcache.Cache,
3029
blockReader services.FullBlockReader, agg *libstate.Aggregator, cfg *httpcfg.HttpCfg, engine consensus.EngineReader,
31-
ethCfg *ethconfig.Config, l1Syncer *syncer.L1Syncer, logger log.Logger, datastreamServer *datastreamer.StreamServer,
30+
ethCfg *ethconfig.Config, l1Syncer *syncer.L1Syncer, logger log.Logger, dataStreamServer server.DataStreamServer,
3231
) (list []rpc.API) {
3332
// non-sequencer nodes should forward on requests to the sequencer
3433
rpcUrl := ""
@@ -69,7 +68,7 @@ func APIList(db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, r
6968
otsImpl := NewOtterscanAPI(base, db, cfg.OtsMaxPageSize)
7069
gqlImpl := NewGraphQLAPI(base, db)
7170
overlayImpl := NewOverlayAPI(base, db, cfg.Gascap, cfg.OverlayGetLogsTimeout, cfg.OverlayReplayBlockTimeout, otsImpl)
72-
zkEvmImpl := NewZkEvmAPI(ethImpl, db, cfg.ReturnDataLimit, ethCfg, l1Syncer, rpcUrl, datastreamServer)
71+
zkEvmImpl := NewZkEvmAPI(ethImpl, db, cfg.ReturnDataLimit, ethCfg, l1Syncer, rpcUrl, dataStreamServer)
7372

7473
if cfg.GraphQLEnabled {
7574
list = append(list, rpc.API{

turbo/jsonrpc/zkevm_api.go

+3-9
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616

1717
zktypes "github.com/ledgerwatch/erigon/zk/types"
1818

19-
"github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer"
2019
"github.com/holiman/uint256"
2120
"github.com/ledgerwatch/erigon-lib/common/hexutil"
2221
"github.com/ledgerwatch/erigon-lib/kv/membatchwithdb"
@@ -92,7 +91,7 @@ type ZkEvmAPIImpl struct {
9291
l1Syncer *syncer.L1Syncer
9392
l2SequencerUrl string
9493
semaphores map[string]chan struct{}
95-
datastreamServer *server.DataStreamServer
94+
datastreamServer server.DataStreamServer
9695
}
9796

9897
func (api *ZkEvmAPIImpl) initializeSemaphores(functionLimits map[string]int) {
@@ -113,22 +112,17 @@ func NewZkEvmAPI(
113112
zkConfig *ethconfig.Config,
114113
l1Syncer *syncer.L1Syncer,
115114
l2SequencerUrl string,
116-
datastreamServer *datastreamer.StreamServer,
115+
dataStreamServer server.DataStreamServer,
117116
) *ZkEvmAPIImpl {
118117

119-
var streamServer *server.DataStreamServer
120-
if datastreamServer != nil {
121-
streamServer = server.NewDataStreamServer(datastreamServer, zkConfig.Zk.L2ChainId)
122-
}
123-
124118
a := &ZkEvmAPIImpl{
125119
ethApi: base,
126120
db: db,
127121
ReturnDataLimit: returnDataLimit,
128122
config: zkConfig,
129123
l1Syncer: l1Syncer,
130124
l2SequencerUrl: l2SequencerUrl,
131-
datastreamServer: streamServer,
125+
datastreamServer: dataStreamServer,
132126
}
133127

134128
a.initializeSemaphores(map[string]int{

turbo/stages/zk_stages.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package stages
33
import (
44
"context"
55

6-
"github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer"
76
proto_downloader "github.com/ledgerwatch/erigon-lib/gointerfaces/downloader"
87
"github.com/ledgerwatch/erigon-lib/kv"
98
"github.com/ledgerwatch/erigon-lib/state"
@@ -16,11 +15,12 @@ import (
1615
"github.com/ledgerwatch/erigon/turbo/engineapi/engine_helpers"
1716
"github.com/ledgerwatch/erigon/turbo/shards"
1817
"github.com/ledgerwatch/erigon/turbo/snapshotsync/freezeblocks"
18+
"github.com/ledgerwatch/erigon/zk/datastream/server"
19+
"github.com/ledgerwatch/erigon/zk/l1infotree"
1920
"github.com/ledgerwatch/erigon/zk/legacy_executor_verifier"
2021
zkStages "github.com/ledgerwatch/erigon/zk/stages"
2122
"github.com/ledgerwatch/erigon/zk/syncer"
2223
"github.com/ledgerwatch/erigon/zk/txpool"
23-
"github.com/ledgerwatch/erigon/zk/l1infotree"
2424
)
2525

2626
// NewDefaultZkStages creates stages for zk syncer (RPC mode)
@@ -36,7 +36,7 @@ func NewDefaultZkStages(ctx context.Context,
3636
engine consensus.Engine,
3737
l1Syncer *syncer.L1Syncer,
3838
datastreamClient zkStages.DatastreamClient,
39-
datastreamServer *datastreamer.StreamServer,
39+
dataStreamServer server.DataStreamServer,
4040
infoTreeUpdater *l1infotree.Updater,
4141
) []*stagedsync.Stage {
4242
dirs := cfg.Dirs
@@ -54,7 +54,7 @@ func NewDefaultZkStages(ctx context.Context,
5454
zkStages.StageL1SyncerCfg(db, l1Syncer, cfg.Zk),
5555
zkStages.StageL1InfoTreeCfg(db, cfg.Zk, infoTreeUpdater),
5656
zkStages.StageBatchesCfg(db, datastreamClient, cfg.Zk, controlServer.ChainConfig, &cfg.Miner),
57-
zkStages.StageDataStreamCatchupCfg(datastreamServer, db, cfg.Genesis.Config.ChainID.Uint64(), cfg.DatastreamVersion, cfg.HasExecutors()),
57+
zkStages.StageDataStreamCatchupCfg(dataStreamServer, db, cfg.Genesis.Config.ChainID.Uint64(), cfg.DatastreamVersion, cfg.HasExecutors()),
5858
stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter),
5959
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, false, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd, nil),
6060
stagedsync.StageExecuteBlocksCfg(
@@ -99,7 +99,7 @@ func NewSequencerZkStages(ctx context.Context,
9999
agg *state.Aggregator,
100100
forkValidator *engine_helpers.ForkValidator,
101101
engine consensus.Engine,
102-
datastreamServer *datastreamer.StreamServer,
102+
dataStreamServer server.DataStreamServer,
103103
sequencerStageSyncer *syncer.L1Syncer,
104104
l1Syncer *syncer.L1Syncer,
105105
l1BlockSyncer *syncer.L1Syncer,
@@ -120,7 +120,7 @@ func NewSequencerZkStages(ctx context.Context,
120120
zkStages.StageL1SequencerSyncCfg(db, cfg.Zk, sequencerStageSyncer),
121121
zkStages.StageL1InfoTreeCfg(db, cfg.Zk, infoTreeUpdater),
122122
zkStages.StageSequencerL1BlockSyncCfg(db, cfg.Zk, l1BlockSyncer),
123-
zkStages.StageDataStreamCatchupCfg(datastreamServer, db, cfg.Genesis.Config.ChainID.Uint64(), cfg.DatastreamVersion, cfg.HasExecutors()),
123+
zkStages.StageDataStreamCatchupCfg(dataStreamServer, db, cfg.Genesis.Config.ChainID.Uint64(), cfg.DatastreamVersion, cfg.HasExecutors()),
124124
zkStages.StageSequenceBlocksCfg(
125125
db,
126126
cfg.Prune,
@@ -138,7 +138,7 @@ func NewSequencerZkStages(ctx context.Context,
138138
cfg.Genesis,
139139
cfg.Sync,
140140
agg,
141-
datastreamServer,
141+
dataStreamServer,
142142
cfg.Zk,
143143
&cfg.Miner,
144144
txPool,

0 commit comments

Comments
 (0)