Skip to content

Commit

Permalink
Trim deleted entries after recover cursor. (apache#4987)
Browse files Browse the repository at this point in the history
* Trim deleted entries after recover cursor.

* Fix errors

* Add managed cursor unit tests.

* Fix tests and handle cursor reset.

* fix unit tests

* Fix tests

* Fix check style
  • Loading branch information
codelipenghui authored and aahmed-se committed Sep 6, 2019
1 parent 00fc2e8 commit dc7d01e
Show file tree
Hide file tree
Showing 10 changed files with 184 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@

import com.google.common.annotations.Beta;
import com.google.common.base.Predicate;
import com.google.common.collect.Range;

import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback;
import org.apache.bookkeeper.mledger.impl.PositionImpl;

/**
* A ManangedCursor is a persisted cursor inside a ManagedLedger.
Expand Down Expand Up @@ -594,4 +598,14 @@ Set<? extends Position> asyncReplayEntries(
*/
ManagedLedger getManagedLedger();

/**
* Get last individual deleted range
* @return range
*/
Range<PositionImpl> getLastIndividualDeletedRange();

/**
* Trim delete entries for the given entries
*/
void trimDeletedEntries(List<Entry> entries);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,7 @@ public Set<? extends Position> asyncReplayEntries(Set<? extends Position> positi
positions.stream()
.filter(position -> individualDeletedMessages.contains(((PositionImpl) position).getLedgerId(),
((PositionImpl) position).getEntryId())
|| ((PositionImpl) position).compareTo(markDeletePosition) < 0)
|| ((PositionImpl) position).compareTo(markDeletePosition) <= 0)
.forEach(alreadyAcknowledgedPositions::add);
} finally {
lock.readLock().unlock();
Expand Down Expand Up @@ -2590,5 +2590,16 @@ public ManagedLedger getManagedLedger() {
return this.ledger;
}

@Override
public Range<PositionImpl> getLastIndividualDeletedRange() {
return individualDeletedMessages.lastRange();
}

@Override
public void trimDeletedEntries(List<Entry> entries) {
entries.removeIf(entry -> ((PositionImpl) entry.getPosition()).compareTo(markDeletePosition) <= 0
|| individualDeletedMessages.contains(entry.getLedgerId(), entry.getEntryId()));
}

private static final Logger log = LoggerFactory.getLogger(ManagedCursorImpl.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import com.google.common.base.Predicate;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -315,6 +316,16 @@ public ManagedLedger getManagedLedger() {
return null;
}

@Override
public Range<PositionImpl> getLastIndividualDeletedRange() {
return null;
}

@Override
public void trimDeletedEntries(List<Entry> entries) {

}

}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.nio.charset.Charset;
Expand All @@ -53,6 +54,8 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import io.netty.buffer.ByteBufAllocator;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
Expand Down Expand Up @@ -2225,6 +2228,62 @@ void testReplayEntries() throws Exception {
}
}

@Test(timeOut = 20000)
void testGetLastIndividualDeletedRange() throws Exception {
ManagedLedger ledger = factory.open("test_last_individual_deleted");

ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
PositionImpl markDeletedPosition = (PositionImpl) c1.getMarkDeletedPosition();
for(int i = 0; i < 10; i++) {
ledger.addEntry(("entry" + i).getBytes(Encoding));
}
PositionImpl p1 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 1);
PositionImpl p2 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 2);
PositionImpl p3 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 5);
PositionImpl p4 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 6);

c1.delete(Lists.newArrayList(p1, p2, p3, p4));

assertEquals(c1.getLastIndividualDeletedRange(), Range.openClosed(PositionImpl.get(p3.getLedgerId(),
p3.getEntryId() - 1), p4));

PositionImpl p5 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 8);
c1.delete(p5);

assertEquals(c1.getLastIndividualDeletedRange(), Range.openClosed(PositionImpl.get(p5.getLedgerId(),
p5.getEntryId() - 1), p5));

}

@Test(timeOut = 20000)
void testTrimDeletedEntries() throws ManagedLedgerException, InterruptedException {
ManagedLedger ledger = factory.open("my_test_ledger");

ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
PositionImpl markDeletedPosition = (PositionImpl) c1.getMarkDeletedPosition();
for(int i = 0; i < 10; i++) {
ledger.addEntry(("entry" + i).getBytes(Encoding));
}
PositionImpl p1 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 1);
PositionImpl p2 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 2);
PositionImpl p3 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 5);
PositionImpl p4 = PositionImpl.get(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 6);

c1.delete(Lists.newArrayList(p1, p2, p3, p4));

EntryImpl entry1 = EntryImpl.create(p1, ByteBufAllocator.DEFAULT.buffer(0));
EntryImpl entry2 = EntryImpl.create(p2, ByteBufAllocator.DEFAULT.buffer(0));
EntryImpl entry3 = EntryImpl.create(p3, ByteBufAllocator.DEFAULT.buffer(0));
EntryImpl entry4 = EntryImpl.create(p4, ByteBufAllocator.DEFAULT.buffer(0));
EntryImpl entry5 = EntryImpl.create(markDeletedPosition.getLedgerId() , markDeletedPosition.getEntryId() + 7,
ByteBufAllocator.DEFAULT.buffer(0));
List<Entry> entries = Lists.newArrayList(entry1, entry2, entry3, entry4, entry5);
c1.trimDeletedEntries(entries);
assertEquals(entries.size(), 1);
assertEquals(entries.get(0).getPosition(), PositionImpl.get(markDeletedPosition.getLedgerId() ,
markDeletedPosition.getEntryId() + 7));
}

@Test(timeOut = 20000)
void outOfOrderAcks() throws Exception {
ManagedLedger ledger = factory.open("outOfOrderAcks");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,8 @@ default boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadat
default long getNumberOfDelayedMessages() {
return 0;
}

default void cursorIsReset() {
//No-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import com.google.common.collect.Range;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
Expand Down Expand Up @@ -72,6 +73,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul

protected final PersistentTopic topic;
protected final ManagedCursor cursor;
protected volatile Range<PositionImpl> lastIndividualDeletedRangeFromCursorRecovery;

private CompletableFuture<Void> closeFuture = null;
LongPairSet messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
Expand Down Expand Up @@ -106,6 +108,7 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso
super(subscription);
this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
this.cursor = cursor;
this.lastIndividualDeletedRangeFromCursorRecovery = cursor.getLastIndividualDeletedRange();
this.name = topic.getName() + " / " + Codec.decode(cursor.getName());
this.topic = topic;
this.redeliveryTracker = this.serviceConfig.isSubscriptionRedeliveryTrackerEnabled()
Expand Down Expand Up @@ -431,6 +434,13 @@ public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
}

protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {

if (entries == null || entries.size() == 0) {
return;
}
if (needTrimAckedMessages()) {
cursor.trimDeletedEntries(entries);
}
int start = 0;
int entriesToDispatch = entries.size();
long totalMessagesSent = 0;
Expand Down Expand Up @@ -558,6 +568,14 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj

}

private boolean needTrimAckedMessages() {
if (lastIndividualDeletedRangeFromCursorRecovery == null) {
return false;
} else {
return lastIndividualDeletedRangeFromCursorRecovery.upperEndpoint()
.compareTo((PositionImpl) cursor.getReadPosition()) > 0;
}
}

/**
* returns true only if {@link consumerList} has atleast one unblocked consumer and have available permits
Expand Down Expand Up @@ -726,6 +744,13 @@ public synchronized long getNumberOfDelayedMessages() {
}
}

@Override
public void cursorIsReset() {
if (this.lastIndividualDeletedRangeFromCursorRecovery != null) {
this.lastIndividualDeletedRangeFromCursorRecovery = null;
}
}

public PersistentTopic getTopic() {
return topic;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,9 @@ public void resetComplete(Object ctx) {
log.debug("[{}][{}] Successfully reset subscription to position {}", topicName, subName,
finalPosition);
}
if (dispatcher != null) {
dispatcher.cursorIsReset();
}
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
future.complete(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,26 @@ public void forEach(RangeProcessor<T> action, LongPairConsumer<? extends T> cons

@Override
public Range<T> firstRange() {
if (rangeBitSetMap.isEmpty()) {
return null;
}
Entry<Long, BitSet> firstSet = rangeBitSetMap.firstEntry();
int lower = firstSet.getValue().nextSetBit(0);
int upper = Math.max(lower, firstSet.getValue().nextClearBit(lower) - 1);
return Range.openClosed(consumer.apply(firstSet.getKey(), lower - 1), consumer.apply(firstSet.getKey(), upper));
}

@Override
public Range<T> lastRange() {
if (rangeBitSetMap.isEmpty()) {
return null;
}
Entry<Long, BitSet> lastSet = rangeBitSetMap.lastEntry();
int upper = lastSet.getValue().previousSetBit(lastSet.getValue().size());
int lower = Math.min(lastSet.getValue().previousClearBit(upper), upper);
return Range.openClosed(consumer.apply(lastSet.getKey(), lower), consumer.apply(lastSet.getKey(), upper));
}

@Override
public int size() {
if (updatedAfterCachedForSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
package org.apache.pulsar.common.util.collections;

import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import com.google.common.collect.TreeRangeSet;

import java.util.Collection;
import java.util.List;
import java.util.Set;

/**
Expand Down Expand Up @@ -114,6 +117,13 @@ public interface LongPairRangeSet<T extends Comparable<T>> {
*/
Range<T> firstRange();

/**
* It returns very last biggest range in the rangeSet.
*
* @return last biggest range into the set
*/
Range<T> lastRange();

/**
* Represents a function that accepts two long arguments and produces a result.
*
Expand Down Expand Up @@ -259,6 +269,15 @@ public Range<T> firstRange() {
return set.asRanges().iterator().next();
}

@Override
public Range<T> lastRange() {
if (set.asRanges().isEmpty()) {
return null;
}
List<Range<T>> list = Lists.newArrayList(set.asRanges().iterator());
return list.get(list.size() - 1);
}

@Override
public int size() {
return set.asRanges().size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public void testSpanWithGuava() {
@Test
public void testFirstRange() {
ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
assertNull(set.firstRange());
Range<LongPair> range = Range.openClosed(new LongPair(0, 97), new LongPair(0, 99));
set.add(range);
assertEquals(set.firstRange(), range);
Expand All @@ -211,6 +212,28 @@ public void testFirstRange() {
assertEquals(set.size(), 2);
}

@Test
public void testLastRange() {
ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
assertNull(set.lastRange());
Range<LongPair> range = Range.openClosed(new LongPair(0, 97), new LongPair(0, 99));
set.add(range);
assertEquals(set.lastRange(), range);
assertEquals(set.size(), 1);
range = Range.openClosed(new LongPair(0, 98), new LongPair(0, 105));
set.add(range);
assertEquals(set.lastRange(), Range.openClosed(new LongPair(0, 97), new LongPair(0, 105)));
assertEquals(set.size(), 1);
range = Range.openClosed(new LongPair(1, 5), new LongPair(1, 75));
set.add(range);
assertEquals(set.lastRange(), range);
assertEquals(set.size(), 2);
range = Range.openClosed(new LongPair(1, 80), new LongPair(1, 120));
set.add(range);
assertEquals(set.lastRange(), range);
assertEquals(set.size(), 3);
}

@Test
public void testToString() {
ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
Expand Down

0 comments on commit dc7d01e

Please sign in to comment.