Skip to content

Commit

Permalink
fix stream (0xPolygonHermez#3101)
Browse files Browse the repository at this point in the history
  • Loading branch information
ToniRamirezM authored Jan 18, 2024
1 parent 3c7afae commit 5332980
Showing 1 changed file with 10 additions and 14 deletions.
24 changes: 10 additions & 14 deletions state/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@ func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.St
header := streamServer.GetHeader()

var currentBatchNumber uint64 = 0
var currentL2Block uint64 = 0
var lastAddedL2Block uint64 = 0

if header.TotalEntries == 0 {
Expand Down Expand Up @@ -312,11 +311,11 @@ func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.St
currentBatchNumber++
case EntryTypeL2BlockEnd:
log.Info("Latest entry type is L2BlockEnd")
currentL2Block = binary.LittleEndian.Uint64(latestEntry.Data[0:8])
currentL2BlockNumber := binary.LittleEndian.Uint64(latestEntry.Data[0:8])

bookMark := DSBookMark{
Type: BookMarkTypeL2Block,
Value: currentL2Block,
Value: currentL2BlockNumber,
}

firstEntry, err := streamServer.GetFirstEventAfterBookmark(bookMark.Encode())
Expand Down Expand Up @@ -383,7 +382,7 @@ func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.St
}

// Generate full batches
fullBatches := computeFullBatches(batches, l2Blocks, l2Txs, currentL2Block)
fullBatches := computeFullBatches(batches, l2Blocks, l2Txs)
currentBatchNumber += limit

for b, batch := range fullBatches {
Expand All @@ -397,15 +396,15 @@ func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.St
Value: batch.BatchNumber,
}

bookMarkAlreadyExists := false
missingBookMark := false
if b == 0 {
_, err = streamServer.GetBookmark(bookMark.Encode())
if err != nil {
bookMarkAlreadyExists = true
missingBookMark = true
}
}

if !bookMarkAlreadyExists {
if missingBookMark {
_, err = streamServer.AddStreamBookmark(bookMark.Encode())
if err != nil {
return err
Expand Down Expand Up @@ -567,14 +566,14 @@ func GetSystemSCPosition(blockNumber uint64) []byte {
}

// computeFullBatches computes the full batches
func computeFullBatches(batches []*DSBatch, l2Blocks []*DSL2Block, l2Txs []*DSL2Transaction, currentL2BlockInStream uint64) []*DSFullBatch {
func computeFullBatches(batches []*DSBatch, l2Blocks []*DSL2Block, l2Txs []*DSL2Transaction) []*DSFullBatch {
prevL2BlockNumber := uint64(0)
currentL2Block := currentL2BlockInStream + 1
currentL2Tx := 0
currentL2Block := uint64(0)

fullBatches := make([]*DSFullBatch, 0)

for b, batch := range batches {
for _, batch := range batches {
fullBatch := &DSFullBatch{
DSBatch: *batch,
}
Expand Down Expand Up @@ -609,10 +608,7 @@ func computeFullBatches(batches []*DSBatch, l2Blocks []*DSL2Block, l2Txs []*DSL2
break
}
}

if !(b == 0 && currentL2BlockInStream > 0 && len(fullBatch.L2Blocks) > 0) {
fullBatches = append(fullBatches, fullBatch)
}
fullBatches = append(fullBatches, fullBatch)
}

return fullBatches
Expand Down

0 comments on commit 5332980

Please sign in to comment.