Skip to content

Commit

Permalink
[pulsar-broker] Cache unack-messageId into openRangeSet (apache#3819)
Browse files Browse the repository at this point in the history
* [pulsar-common] add open Concurrent LongPair RangeSet

* add open-range set methods

* [pulsar-broker] Cache unack-messageId into openRangeSet

* add forEach

* use forEach of the range

* add forEach with consumer

* add Recyclable Position converter

* Removed unused imports after merge

* address comments
  • Loading branch information
rdhabalia authored May 21, 2019
1 parent 1bb9a11 commit 5556f1b
Show file tree
Hide file tree
Showing 12 changed files with 544 additions and 120 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,9 @@ managedLedgerOffloadDriver=
# Maximum number of thread pool threads for ledger offloading
managedLedgerOffloadMaxThreads=2

# Use Open Range-Set to cache unacked messages
managedLedgerUnackedRangesOpenCacheSetEnabled=true

# For Amazon S3 ledger offload, AWS region
s3ManagedLedgerOffloadRegion=

Expand Down
3 changes: 3 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,9 @@ managedLedgerReadEntryTimeoutSeconds=120
# Add entry timeout when broker tries to publish message to bookkeeper (0 to disable it).
managedLedgerAddEntryTimeoutSeconds=120

# Use Open Range-Set to cache unacked messages
managedLedgerUnackedRangesOpenCacheSetEnabled=true

### --- Load balancer --- ###

loadManagerClassName=org.apache.pulsar.broker.loadbalance.NoopLoadManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.bookkeeper.client.api.DigestType;

import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;

/**
* Configuration class for a ManagedLedger.
*/
Expand Down Expand Up @@ -61,6 +60,7 @@ public class ManagedLedgerConfig {
private long addEntryTimeoutSeconds = 120;
private DigestType digestType = DigestType.CRC32C;
private byte[] password = "".getBytes(Charsets.UTF_8);
private boolean unackedRangesOpenCacheSetEnabled = true;
private LedgerOffloader ledgerOffloader = NullLedgerOffloader.INSTANCE;
private Clock clock = Clock.systemUTC();

Expand Down Expand Up @@ -235,6 +235,19 @@ public ManagedLedgerConfig setPassword(String password) {
return this;
}

/**
* should use {@link ConcurrentOpenLongPairRangeSet} to store unacked ranges.
* @return
*/
public boolean isUnackedRangesOpenCacheSetEnabled() {
return unackedRangesOpenCacheSetEnabled;
}

public ManagedLedgerConfig setUnackedRangesOpenCacheSetEnabled(boolean unackedRangesOpenCacheSetEnabled) {
this.unackedRangesOpenCacheSetEnabled = unackedRangesOpenCacheSetEnabled;
return this;
}

/**
* @return the metadataEnsemblesize
*/
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@

public class PositionImpl implements Position, Comparable<PositionImpl> {

private final long ledgerId;
private final long entryId;
protected long ledgerId;
protected long entryId;

public static PositionImpl earliest = new PositionImpl(-1, -1);
public static PositionImpl latest = new PositionImpl(Long.MAX_VALUE, Long.MAX_VALUE);
public static final PositionImpl earliest = new PositionImpl(-1, -1);
public static final PositionImpl latest = new PositionImpl(Long.MAX_VALUE, Long.MAX_VALUE);

public PositionImpl(PositionInfo pi) {
this.ledgerId = pi.getLedgerId();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl;

import org.apache.bookkeeper.mledger.Position;

import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;

public class PositionImplRecyclable extends PositionImpl implements Position {

private final Handle<PositionImplRecyclable> recyclerHandle;

private static final Recycler<PositionImplRecyclable> RECYCLER = new Recycler<PositionImplRecyclable>() {
protected PositionImplRecyclable newObject(Recycler.Handle<PositionImplRecyclable> recyclerHandle) {
return new PositionImplRecyclable(recyclerHandle);
}
};

private PositionImplRecyclable(Handle<PositionImplRecyclable> recyclerHandle) {
super(PositionImpl.earliest);
this.recyclerHandle = recyclerHandle;
}

public static PositionImplRecyclable create() {
PositionImplRecyclable position = RECYCLER.get();
return position;
}

public void recycle() {
recyclerHandle.recycle(this);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,17 @@
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase {

private static final Logger log = LoggerFactory.getLogger(ManagedCursorConcurrencyTest.class);

@DataProvider(name = "useOpenRangeSet")
public static Object[][] useOpenRangeSet() {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
}

private final AsyncCallbacks.DeleteCallback deleteCallback = new AsyncCallbacks.DeleteCallback() {
@Override
Expand All @@ -60,9 +66,11 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {
}
};

@Test
public void testMarkDeleteAndRead() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
@Test(dataProvider = "useOpenRangeSet")
public void testMarkDeleteAndRead(boolean useOpenRangeSet) throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(2)
.setUnackedRangesOpenCacheSetEnabled(useOpenRangeSet);
ManagedLedger ledger = factory.open("my_test_ledger", config);

final ManagedCursor cursor = ledger.openCursor("c1");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,22 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.zookeeper.KeeperException.Code;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class ManagedCursorTest extends MockedBookKeeperTestCase {

private static final Charset Encoding = Charsets.UTF_8;

@DataProvider(name = "useOpenRangeSet")
public static Object[][] useOpenRangeSet() {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
}


@Test(timeOut = 20000)
void readFromEmptyLedger() throws Exception {
Expand Down Expand Up @@ -1315,9 +1321,10 @@ void testCountingWithDeletedEntries() throws Exception {
assertEquals(cursor.getNumberOfEntriesInBacklog(), 3);
}

@Test(timeOut = 20000)
void testMarkDeleteTwice() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
@Test(timeOut = 20000, dataProvider = "useOpenRangeSet")
void testMarkDeleteTwice(boolean useOpenRangeSet) throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig()
.setUnackedRangesOpenCacheSetEnabled(useOpenRangeSet).setMaxEntriesPerLedger(2));
ManagedCursor cursor = ledger.openCursor("c1");

Position p1 = ledger.addEntry("entry1".getBytes());
Expand All @@ -1327,9 +1334,10 @@ void testMarkDeleteTwice() throws Exception {
assertEquals(cursor.getMarkDeletedPosition(), p1);
}

@Test(timeOut = 20000)
void testSkipEntries() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
@Test(timeOut = 20000, dataProvider = "useOpenRangeSet")
void testSkipEntries(boolean useOpenRangeSet) throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig()
.setUnackedRangesOpenCacheSetEnabled(useOpenRangeSet).setMaxEntriesPerLedger(2));
Position pos;

ManagedCursor c1 = ledger.openCursor("c1");
Expand Down Expand Up @@ -1368,10 +1376,10 @@ void testSkipEntries() throws Exception {
assertEquals(c1.getMarkDeletedPosition(), pos);
}

@Test(timeOut = 20000)
void testSkipEntriesWithIndividualDeletedMessages() throws Exception {
ManagedLedger ledger = factory.open("testSkipEntriesWithIndividualDeletedMessages",
new ManagedLedgerConfig().setMaxEntriesPerLedger(5));
@Test(timeOut = 20000, dataProvider = "useOpenRangeSet")
void testSkipEntriesWithIndividualDeletedMessages(boolean useOpenRangeSet) throws Exception {
ManagedLedger ledger = factory.open("testSkipEntriesWithIndividualDeletedMessages", new ManagedLedgerConfig()
.setUnackedRangesOpenCacheSetEnabled(useOpenRangeSet).setMaxEntriesPerLedger(5));
ManagedCursor c1 = ledger.openCursor("c1");

Position pos1 = ledger.addEntry("dummy-entry-1".getBytes(Encoding));
Expand Down Expand Up @@ -1404,9 +1412,10 @@ void testSkipEntriesWithIndividualDeletedMessages() throws Exception {
assertEquals(c1.getMarkDeletedPosition(), pos4);
}

@Test(timeOut = 20000)
void testClearBacklog() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
@Test(timeOut = 20000, dataProvider = "useOpenRangeSet")
void testClearBacklog(boolean useOpenRangeSet) throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig()
.setUnackedRangesOpenCacheSetEnabled(useOpenRangeSet).setMaxEntriesPerLedger(1));

ManagedCursor c1 = ledger.openCursor("c1");
ledger.addEntry("dummy-entry-1".getBytes(Encoding));
Expand Down Expand Up @@ -1455,10 +1464,10 @@ void testClearBacklog() throws Exception {
factory2.shutdown();
}

@Test(timeOut = 20000)
void testRateLimitMarkDelete() throws Exception {
@Test(timeOut = 20000, dataProvider = "useOpenRangeSet")
void testRateLimitMarkDelete(boolean useOpenRangeSet) throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setThrottleMarkDelete(1); // Throttle to 1/s
config.setThrottleMarkDelete(1).setUnackedRangesOpenCacheSetEnabled(useOpenRangeSet); // Throttle to 1/s
ManagedLedger ledger = factory.open("my_test_ledger", config);

ManagedCursor c1 = ledger.openCursor("c1");
Expand All @@ -1484,9 +1493,10 @@ void testRateLimitMarkDelete() throws Exception {
factory2.shutdown();
}

@Test(timeOut = 20000)
void deleteSingleMessageTwice() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger");
@Test(timeOut = 20000, dataProvider = "useOpenRangeSet")
void deleteSingleMessageTwice(boolean useOpenRangeSet) throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger",
new ManagedLedgerConfig().setUnackedRangesOpenCacheSetEnabled(useOpenRangeSet));

ManagedCursor c1 = ledger.openCursor("c1");

Expand Down Expand Up @@ -1551,9 +1561,10 @@ void deleteSingleMessageTwice() throws Exception {
assertEquals(c1.getReadPosition(), p4.getNext());
}

@Test(timeOut = 10000)
void testReadEntriesOrWait() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger");
@Test(timeOut = 10000, dataProvider = "useOpenRangeSet")
void testReadEntriesOrWait(boolean useOpenRangeSet) throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger",
new ManagedLedgerConfig().setUnackedRangesOpenCacheSetEnabled(useOpenRangeSet));

final int Consumers = 10;
final CountDownLatch counter = new CountDownLatch(Consumers);
Expand Down Expand Up @@ -1846,10 +1857,10 @@ void testFindNewestMatchingEdgeCase5() throws Exception {
p2);
}

@Test(timeOut = 20000)
void testFindNewestMatchingEdgeCase6() throws Exception {
ManagedLedger ledger = factory.open("testFindNewestMatchingEdgeCase6",
new ManagedLedgerConfig().setMaxEntriesPerLedger(3));
@Test(timeOut = 20000, dataProvider = "useOpenRangeSet")
void testFindNewestMatchingEdgeCase6(boolean useOpenRangeSet) throws Exception {
ManagedLedger ledger = factory.open("testFindNewestMatchingEdgeCase6", new ManagedLedgerConfig()
.setUnackedRangesOpenCacheSetEnabled(useOpenRangeSet).setMaxEntriesPerLedger(3));

ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
ledger.addEntry("expired".getBytes(Encoding));
Expand All @@ -1865,9 +1876,10 @@ void testFindNewestMatchingEdgeCase6() throws Exception {
newPosition);
}

@Test(timeOut = 20000)
void testFindNewestMatchingEdgeCase7() throws Exception {
ManagedLedger ledger = factory.open("testFindNewestMatchingEdgeCase7");
@Test(timeOut = 20000, dataProvider = "useOpenRangeSet")
void testFindNewestMatchingEdgeCase7(boolean useOpenRangeSet) throws Exception {
ManagedLedger ledger = factory.open("testFindNewestMatchingEdgeCase7",
new ManagedLedgerConfig().setUnackedRangesOpenCacheSetEnabled(useOpenRangeSet));

ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
ledger.addEntry("expired".getBytes(Encoding));
Expand Down Expand Up @@ -1959,9 +1971,10 @@ void testFindNewestMatchingEdgeCase10() throws Exception {
lastPosition);
}

@Test(timeOut = 20000)
void testIndividuallyDeletedMessages() throws Exception {
ManagedLedger ledger = factory.open("testIndividuallyDeletedMessages");
@Test(timeOut = 20000, dataProvider = "useOpenRangeSet")
void testIndividuallyDeletedMessages(boolean useOpenRangeSet) throws Exception {
ManagedLedger ledger = factory.open("testIndividuallyDeletedMessages",
new ManagedLedgerConfig().setUnackedRangesOpenCacheSetEnabled(useOpenRangeSet));

ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
ledger.addEntry("entry-0".getBytes(Encoding));
Expand Down Expand Up @@ -1998,9 +2011,10 @@ void testIndividuallyDeletedMessages1() throws Exception {
assertTrue(c1.isIndividuallyDeletedEntriesEmpty());
}

@Test(timeOut = 20000)
void testIndividuallyDeletedMessages2() throws Exception {
ManagedLedger ledger = factory.open("testIndividuallyDeletedMessages2");
@Test(timeOut = 20000, dataProvider = "useOpenRangeSet")
void testIndividuallyDeletedMessages2(boolean useOpenRangeSet) throws Exception {
ManagedLedger ledger = factory.open("testIndividuallyDeletedMessages2",
new ManagedLedgerConfig().setUnackedRangesOpenCacheSetEnabled(useOpenRangeSet));

ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
ledger.addEntry("entry-0".getBytes(Encoding));
Expand All @@ -2018,9 +2032,10 @@ void testIndividuallyDeletedMessages2() throws Exception {
assertTrue(c1.isIndividuallyDeletedEntriesEmpty());
}

@Test(timeOut = 20000)
void testIndividuallyDeletedMessages3() throws Exception {
ManagedLedger ledger = factory.open("testIndividuallyDeletedMessages3");
@Test(timeOut = 20000, dataProvider = "useOpenRangeSet")
void testIndividuallyDeletedMessages3(boolean useOpenRangeSet) throws Exception {
ManagedLedger ledger = factory.open("testIndividuallyDeletedMessages3",
new ManagedLedgerConfig().setUnackedRangesOpenCacheSetEnabled(useOpenRangeSet));

ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
ledger.addEntry("entry-0".getBytes(Encoding));
Expand Down Expand Up @@ -2846,5 +2861,44 @@ void testAlwaysInactive() throws Exception {
assertFalse(cursor.isActive());
}

@Test
public void deleteMessagesCheckhMarkDelete() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger");
ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
final int totalEntries = 1000;
final Position[] positions = new Position[totalEntries];
for (int i = 0; i < totalEntries; i++) {
// add entry
positions[i] = ledger.addEntry(("entry-" + i).getBytes(Encoding));
}
assertEquals(c1.getNumberOfEntries(), totalEntries);
int totalDeletedMessages = 0;
for (int i = 0; i < totalEntries; i++) {
// delete entry
if ((i % 3) == 0) {
c1.delete(positions[i]);
totalDeletedMessages += 1;
}
}
assertEquals(c1.getNumberOfEntriesInBacklog(), totalEntries - totalDeletedMessages);
assertEquals(c1.getNumberOfEntries(), totalEntries - totalDeletedMessages);
assertEquals(c1.getMarkDeletedPosition(), positions[0]);
assertEquals(c1.getReadPosition(), positions[1]);

// delete 1/2 of the messags
for (int i = 0; i < totalEntries / 2; i++) {
// delete entry
if ((i % 3) != 0) {
c1.delete(positions[i]);
totalDeletedMessages += 1;
}
}
int markDelete = totalEntries / 2 - 1;
assertEquals(c1.getNumberOfEntriesInBacklog(), totalEntries - totalDeletedMessages);
assertEquals(c1.getNumberOfEntries(), totalEntries - totalDeletedMessages);
assertEquals(c1.getMarkDeletedPosition(), positions[markDelete]);
assertEquals(c1.getReadPosition(), positions[markDelete + 1]);
}

private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
}
Loading

0 comments on commit 5556f1b

Please sign in to comment.