Skip to content

Commit

Permalink
Read from compacted topic ledger if available and enabled (apache#1231)
Browse files Browse the repository at this point in the history
* Read from compacted topic ledger if available and enabled

If a topic has been compacted and the client has enabled reads from
compacted topics, try to read from the compacted ledger if the cursor
position lands before or within the range of message IDs in the
compacted topic ledger. If the cursor position lands after the message
IDs, in the compacted topic ledger, read from the cursor as normal.

* fixup mocks in tests
  • Loading branch information
ivankelly authored and merlimat committed Feb 19, 2018
1 parent 3d5d760 commit 8d04a42
Show file tree
Hide file tree
Showing 12 changed files with 457 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.mledger.Entry;

final class EntryImpl extends AbstractReferenceCounted implements Entry, Comparable<EntryImpl>, ReferenceCounted {
public final class EntryImpl extends AbstractReferenceCounted implements Entry, Comparable<EntryImpl>, ReferenceCounted {

private static final Recycler<EntryImpl> RECYCLER = new Recycler<EntryImpl>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ public ManagedLedgerFactory getManagedLedgerFactory() {
return managedLedgerFactory;
}

public BookKeeper getBookKeeperClient() {
return bkClient;
}

public void close() throws IOException {
try {
managedLedgerFactory.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;

import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
Expand Down Expand Up @@ -542,6 +543,10 @@ public BrokerService getBrokerService() {
return this.brokerService;
}

public BookKeeper getBookKeeperClient() {
return managedLedgerClientFactory.getBookKeeperClient();
}

public ManagedLedgerFactory getManagedLedgerFactory() {
return managedLedgerClientFactory.getManagedLedgerFactory();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,11 @@ protected void readMoreEntries(Consumer consumer) {
log.debug("[{}-{}] Schedule read of {} messages", name, consumer, messagesToRead);
}
havePendingRead = true;
cursor.asyncReadEntriesOrWait(messagesToRead, this, consumer);
if (consumer.readCompacted()) {
topic.compactedTopic.asyncReadEntriesOrWait(cursor, messagesToRead, this, consumer);
} else {
cursor.asyncReadEntriesOrWait(messagesToRead, this, consumer);
}
} else {
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Consumer buffer is full, pause reading", name, consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
public static final int MESSAGE_RATE_BACKOFF_MS = 1000;

private final MessageDeduplication messageDeduplication;
private final CompactedTopic compactedTopic;
final CompactedTopic compactedTopic;

// Whether messages published must be encrypted or not in this topic
private volatile boolean isEncryptionRequired = false;
Expand Down Expand Up @@ -207,7 +207,7 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS

this.dispatchRateLimiter = new DispatchRateLimiter(this);

this.compactedTopic = new CompactedTopicImpl();
this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());

for (ManagedCursor cursor : ledger.getCursors()) {
if (cursor.getName().startsWith(replicatorPrefix)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
*/
package org.apache.pulsar.compaction;

import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;

public interface CompactedTopic {
void newCompactedLedger(Position p, long compactedLedgerId);
void asyncReadEntriesOrWait(ManagedCursor cursor, int numberOfEntriesToRead,
ReadEntriesCallback callback, Object ctx);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,87 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.collect.ComparisonChain;

import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.impl.RawMessageImpl;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CompactedTopicImpl implements CompactedTopic {
final static long NEWER_THAN_COMPACTED = -0xfeed0fbaL;
final static int DEFAULT_STARTPOINT_CACHE_SIZE = 100;

private final BookKeeper bk;

private PositionImpl compactionHorizon = null;
private CompletableFuture<CompactedTopicContext> compactedTopicContext = null;

public CompactedTopicImpl(BookKeeper bk) {
this.bk = bk;
}

@Override
public void newCompactedLedger(Position p, long compactedLedgerId) {
synchronized (this) {
compactionHorizon = (PositionImpl)p;
compactedTopicContext = openCompactedLedger(bk, compactedLedgerId);
}
}

@Override
public void newCompactedLedger(Position p, long compactedLedgerId) {}
public void asyncReadEntriesOrWait(ManagedCursor cursor, int numberOfEntriesToRead,
ReadEntriesCallback callback, Object ctx) {
synchronized (this) {
PositionImpl cursorPosition = (PositionImpl) cursor.getReadPosition();
if (compactionHorizon == null
|| compactionHorizon.compareTo(cursorPosition) < 0) {
cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, ctx);
} else {
compactedTopicContext.thenCompose(
(context) -> {
return findStartPoint(cursorPosition, context.ledger.getLastAddConfirmed(), context.cache)
.thenCompose((startPoint) -> {
if (startPoint == NEWER_THAN_COMPACTED) {
cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, ctx);
return CompletableFuture.completedFuture(null);
} else {
long endPoint = Math.min(context.ledger.getLastAddConfirmed(),
startPoint + numberOfEntriesToRead);
return readEntries(context.ledger, startPoint, endPoint)
.thenAccept((entries) -> {
Entry lastEntry = entries.get(entries.size() - 1);
cursor.seek(lastEntry.getPosition().getNext());
callback.readEntriesComplete(entries, ctx);
});
}
});
})
.exceptionally((exception) -> {
callback.readEntriesFailed(new ManagedLedgerException(exception), ctx);
return null;
});
}
}
}

static CompletableFuture<Long> findStartPoint(PositionImpl p,
long lastEntryId,
Expand Down Expand Up @@ -107,6 +166,60 @@ private static CompletableFuture<MessageIdData> readOneMessageId(LedgerHandle lh
return promise;
}

private static CompletableFuture<CompactedTopicContext> openCompactedLedger(BookKeeper bk, long id) {
CompletableFuture<LedgerHandle> promise = new CompletableFuture<>();
bk.asyncOpenLedger(id,
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD,
(rc, ledger, ctx) -> {
if (rc != BKException.Code.OK) {
promise.completeExceptionally(BKException.create(rc));
} else {
promise.complete(ledger);
}
}, null);
return promise.thenApply((ledger) -> new CompactedTopicContext(
ledger, createCache(ledger, DEFAULT_STARTPOINT_CACHE_SIZE)));
}

private static CompletableFuture<List<Entry>> readEntries(LedgerHandle lh, long from, long to) {
CompletableFuture<Enumeration<LedgerEntry>> promise = new CompletableFuture<>();

lh.asyncReadEntries(from, to,
(rc, _lh, seq, ctx) -> {
if (rc != BKException.Code.OK) {
promise.completeExceptionally(BKException.create(rc));
} else {
promise.complete(seq);
}
}, null);
return promise.thenApply(
(seq) -> {
List<Entry> entries = new ArrayList<Entry>();
while (seq.hasMoreElements()) {
ByteBuf buf = seq.nextElement().getEntryBuffer();
try (RawMessage m = RawMessageImpl.deserializeFrom(buf)) {
entries.add(EntryImpl.create(m.getMessageIdData().getLedgerId(),
m.getMessageIdData().getEntryId(),
m.getHeadersAndPayload()));
} finally {
buf.release();
}
}
return entries;
});
}

static class CompactedTopicContext {
final LedgerHandle ledger;
final AsyncLoadingCache<Long,MessageIdData> cache;

CompactedTopicContext(LedgerHandle ledger, AsyncLoadingCache<Long,MessageIdData> cache) {
this.ledger = ledger;
this.cache = cache;
}
}

private static int comparePositionAndMessageId(PositionImpl p, MessageIdData m) {
return ComparisonChain.start()
.compare(p.getLedgerId(), m.getLedgerId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ protected final void internalSetupForStatsTest() throws Exception {

protected final void init() throws Exception {
mockZookKeeper = createMockZooKeeper();
mockBookKeeper = new NonClosableMockBookKeeper(new ClientConfiguration(), mockZookKeeper);
mockBookKeeper = createMockBookKeeper(mockZookKeeper);

sameThreadOrderedSafeExecutor = new SameThreadOrderedSafeExecutor();

Expand Down Expand Up @@ -208,6 +208,10 @@ public static MockZooKeeper createMockZooKeeper() throws Exception {
return zk;
}

public static NonClosableMockBookKeeper createMockBookKeeper(ZooKeeper zookeeper) throws Exception {
return new NonClosableMockBookKeeper(new ClientConfiguration(), zookeeper);
}

// Prevent the MockBookKeeper instance from being closed when the broker is restarted within a test
private static class NonClosableMockBookKeeper extends MockBookKeeper {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;

import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject;
Expand Down Expand Up @@ -111,6 +112,7 @@ public void setup() throws Exception {

ZooKeeper mockZk = createMockZooKeeper();
doReturn(mockZk).when(pulsar).getZkClient();
doReturn(createMockBookKeeper(mockZk)).when(pulsar).getBookKeeperClient();

configCacheService = mock(ConfigurationCacheService.class);
@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;

import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.mockito.Matchers.any;
Expand Down Expand Up @@ -142,6 +143,7 @@ public void setup() throws Exception {

ZooKeeper mockZk = createMockZooKeeper();
doReturn(mockZk).when(pulsar).getZkClient();
doReturn(createMockBookKeeper(mockZk)).when(pulsar).getBookKeeperClient();

configCacheService = mock(ConfigurationCacheService.class);
@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;

import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.mockito.Matchers.any;
Expand Down Expand Up @@ -153,6 +154,7 @@ public void setup() throws Exception {

ZooKeeper mockZk = createMockZooKeeper();
doReturn(mockZk).when(pulsar).getZkClient();
doReturn(createMockBookKeeper(mockZk)).when(pulsar).getBookKeeperClient();

configCacheService = mock(ConfigurationCacheService.class);
@SuppressWarnings("unchecked")
Expand Down
Loading

0 comments on commit 8d04a42

Please sign in to comment.