Skip to content

Commit

Permalink
[fix][broker] Fix getPositionAfterN infinite loop. (apache#17971)
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- authored Oct 13, 2022
1 parent 3857540 commit c732852
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3327,20 +3327,16 @@ public PositionImpl getPositionAfterN(final PositionImpl startPosition, long n,
long entriesToSkip = n;
long currentLedgerId;
long currentEntryId;

if (startRange == PositionBound.startIncluded) {
currentLedgerId = startPosition.getLedgerId();
currentEntryId = startPosition.getEntryId();
} else {
// e.g. a mark-delete position
PositionImpl nextValidPosition = getNextValidPosition(startPosition);
currentLedgerId = nextValidPosition.getLedgerId();
currentEntryId = nextValidPosition.getEntryId();
}

boolean lastLedger = false;
long totalEntriesInCurrentLedger;

while (entriesToSkip >= 0) {
// for the current ledger, the number of entries written is deduced from the lastConfirmedEntry
// for previous ledgers, LedgerInfo in ZK has the number of entries
Expand All @@ -3355,10 +3351,8 @@ public PositionImpl getPositionAfterN(final PositionImpl startPosition, long n,
LedgerInfo ledgerInfo = ledgers.get(currentLedgerId);
totalEntriesInCurrentLedger = ledgerInfo != null ? ledgerInfo.getEntries() : 0;
}


long unreadEntriesInCurrentLedger = totalEntriesInCurrentLedger - currentEntryId;

long unreadEntriesInCurrentLedger = totalEntriesInCurrentLedger > 0
? totalEntriesInCurrentLedger - currentEntryId : 0;
if (unreadEntriesInCurrentLedger >= entriesToSkip) {
// if the current ledger has more entries than what we need to skip
// then the return position is in the same ledger
Expand All @@ -3371,11 +3365,10 @@ public PositionImpl getPositionAfterN(final PositionImpl startPosition, long n,
// there are no more ledgers, return the last position
currentEntryId = totalEntriesInCurrentLedger;
break;
} else {
Long lid = ledgers.ceilingKey(currentLedgerId + 1);
currentLedgerId = lid != null ? lid : (ledgers.lastKey() + 1);
currentEntryId = 0;
}
Long lid = ledgers.ceilingKey(currentLedgerId + 1);
currentLedgerId = lid != null ? lid : ledgers.lastKey();
currentEntryId = 0;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2478,6 +2478,23 @@ public void testGetPositionAfterN() throws Exception {
log.info("Target position is {}", targetPosition);
assertEquals(targetPosition.getLedgerId(), secondLedger);
assertEquals(targetPosition.getEntryId(), 4);

// test for n > NumberOfEntriesInStorage
searchPosition = new PositionImpl(secondLedger, 0);
targetPosition = managedLedger.getPositionAfterN(searchPosition, 100, ManagedLedgerImpl.PositionBound.startIncluded);
assertEquals(targetPosition.getLedgerId(), secondLedger);
assertEquals(targetPosition.getEntryId(), 4);

// test for startPosition > current ledger
searchPosition = new PositionImpl(999, 0);
targetPosition = managedLedger.getPositionAfterN(searchPosition, 0, ManagedLedgerImpl.PositionBound.startIncluded);
assertEquals(targetPosition.getLedgerId(), secondLedger);
assertEquals(targetPosition.getEntryId(), 4);

searchPosition = new PositionImpl(999, 0);
targetPosition = managedLedger.getPositionAfterN(searchPosition, 10, ManagedLedgerImpl.PositionBound.startExcluded);
assertEquals(targetPosition.getLedgerId(), secondLedger);
assertEquals(targetPosition.getEntryId(), 4);
}

@Test
Expand Down

0 comments on commit c732852

Please sign in to comment.