Skip to content

Commit

Permalink
Clean up individually deleted messages before the mark-delete position (
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Feb 14, 2022
1 parent 1c0e17d commit 6d717a0
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1579,7 +1579,9 @@ void initializeCursorPosition(Pair<PositionImpl, Long> lastPositionCounter) {
*/
PositionImpl setAcknowledgedPosition(PositionImpl newMarkDeletePosition) {
if (newMarkDeletePosition.compareTo(markDeletePosition) < 0) {
throw new IllegalArgumentException("Mark deleting an already mark-deleted position");
throw new IllegalArgumentException(
"Mark deleting an already mark-deleted position. Current mark-delete: " + markDeletePosition
+ " -- attempted mark delete: " + newMarkDeletePosition);
}

PositionImpl oldMarkDeletePosition = markDeletePosition;
Expand Down Expand Up @@ -2003,6 +2005,19 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
// mark-delete to the upper bound of the first range segment
Range<PositionImpl> range = individualDeletedMessages.firstRange();

// If the upper bound is before the mark-delete position, we need to move ahead as these
// individualDeletedMessages are now irrelevant
if (range.upperEndpoint().compareTo(markDeletePosition) <= 0) {
individualDeletedMessages.removeAtMost(markDeletePosition.getLedgerId(),
markDeletePosition.getEntryId());
range = individualDeletedMessages.firstRange();
}

if (range == null) {
// The set was completely cleaned up now
return;
}

// If the lowerBound is ahead of MarkDelete, verify if there are any entries in-between
if (range.lowerEndpoint().compareTo(markDeletePosition) <= 0 || ledger
.getNumberOfEntries(Range.openClosed(markDeletePosition, range.lowerEndpoint())) <= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
Expand Down Expand Up @@ -3558,6 +3559,31 @@ public void testFlushCursorAfterError() throws Exception {
});
}

@Test
public void testConsistencyOfIndividualMessages() throws Exception {
ManagedLedger ledger1 = factory.open("testConsistencyOfIndividualMessages");
ManagedCursorImpl c1 = (ManagedCursorImpl) ledger1.openCursor("c");

PositionImpl p1 = (PositionImpl) ledger1.addEntry(new byte[1024]);
c1.markDelete(p1);

// Artificially add a position that is before the current mark-delete position
LongPairRangeSet<PositionImpl> idm = c1.getIndividuallyDeletedMessagesSet();
idm.addOpenClosed(p1.getLedgerId() - 1, 0, p1.getLedgerId() - 1, 10);

List<Position> positions = new ArrayList<>();
for (int i = 0; i < 20; i++) {
positions.add(ledger1.addEntry(new byte[1024]));
}

for (int i = 0; i < 20; i++) {
c1.delete(positions.get(i));
}

assertEquals(c1.getTotalNonContiguousDeletedMessagesRange(), 0);
assertEquals(c1.getMarkDeletedPosition(), positions.get(positions.size() -1));
}

@Test
public void testCursorCheckReadPositionChanged() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig());
Expand Down

0 comments on commit 6d717a0

Please sign in to comment.