Skip to content

Commit

Permalink
Issue 1069: Provide a setting in consumer configuration to specify wh…
Browse files Browse the repository at this point in the history
…ere to start consuming messages (apache#1397)

* 1.add SubscriptionInitialPosition option in ConsumerConfiguration to enable initial position when subscribing

* 1.revert unrelated changes in ConsumerImpl 2.add subscriptionInitialPosition in ConsumerConfigurationData

* 1.rename testOpenCursorOnLatestAndEarliest to testConsumerSubscriptionInitializePosition

* 1.revert unrelated changes

* correct doAnswer of asyncOpenCursor in test case
  • Loading branch information
XiaoZYang authored and merlimat committed Mar 21, 2018
1 parent 0068568 commit e7261b7
Show file tree
Hide file tree
Showing 23 changed files with 360 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;

/**
* A ManagedLedger it's a superset of a BookKeeper ledger concept.
Expand Down Expand Up @@ -132,11 +133,28 @@ public interface ManagedLedger {
*
* @param name
* the name associated with the ManagedCursor
* @param initializeOnLatest
* the flag tell the method wthether it should intialize the cursor at latest position or not.
* @return the ManagedCursor
* @throws ManagedLedgerException
*/
ManagedCursor openCursor(String name) throws InterruptedException, ManagedLedgerException;

/**
* Open a ManagedCursor in this ManagedLedger.
* <p>
* If the cursors doesn't exist, a new one will be created and its position will be at the end of the ManagedLedger.
*
* @param name
* the name associated with the ManagedCursor
* @param initialPosition
* the cursor will be set at lastest position or not when first created
* default is <b>true</b>
* @return the ManagedCursor
* @throws ManagedLedgerException
*/
public ManagedCursor openCursor(String name, InitialPosition initialPosition) throws InterruptedException, ManagedLedgerException;

/**
* Creates a new cursor whose metadata is not backed by durable storage. A caller can treat the non-durable cursor
* exactly like a normal cursor, with the only difference in that after restart it will not remember which entries
Expand Down Expand Up @@ -194,7 +212,23 @@ public interface ManagedLedger {
void asyncOpenCursor(String name, OpenCursorCallback callback, Object ctx);

/**
* Get a list of all the cursors reading from this ManagedLedger.
* Open a ManagedCursor asynchronously.
*
* @see #openCursor(String)
* @param name
* the name associated with the ManagedCursor
* @param initialPosition
* the cursor will be set at lastest position or not when first created
* default is <b>true</b>
* @param callback
* callback object
* @param ctx
* opaque context
*/
public void asyncOpenCursor(String name, InitialPosition initialPosition, OpenCursorCallback callback, Object ctx);

/**
* Get a list of all the cursors reading from this ManagedLedger
*
* @return a list of cursors
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.bookkeeper.mledger.util.Pair;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
Expand Down Expand Up @@ -546,15 +547,20 @@ public synchronized void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback
}

@Override
public ManagedCursor openCursor(String cursorName) throws InterruptedException, ManagedLedgerException {
public ManagedCursor openCursor(String cursorName) throws InterruptedException, ManagedLedgerException{
return openCursor(cursorName, InitialPosition.Latest);
}

@Override
public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition) throws InterruptedException, ManagedLedgerException {
final CountDownLatch counter = new CountDownLatch(1);
class Result {
ManagedCursor cursor = null;
ManagedLedgerException exception = null;
}
final Result result = new Result();

asyncOpenCursor(cursorName, new OpenCursorCallback() {
asyncOpenCursor(cursorName, initialPosition, new OpenCursorCallback() {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
result.cursor = cursor;
Expand Down Expand Up @@ -582,9 +588,12 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
}

@Override
public synchronized void asyncOpenCursor(final String cursorName, final OpenCursorCallback callback,
final Object ctx) {
public synchronized void asyncOpenCursor(final String cursorName, final OpenCursorCallback callback, Object ctx){
this.asyncOpenCursor(cursorName, InitialPosition.Latest, callback, ctx);
}

@Override
public synchronized void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition, final OpenCursorCallback callback, final Object ctx){
try {
checkManagedLedgerIsOpen();
checkFenced();
Expand Down Expand Up @@ -624,7 +633,7 @@ public void operationComplete() {
log.info("[{}] Opened new cursor: {}", name, cursor);
cursor.setActive();
// Update the ack position (ignoring entries that were written while the cursor was being created)
cursor.initializeCursorPosition(getLastPositionAndCounter());
cursor.initializeCursorPosition(initialPosition == InitialPosition.Latest ? getLastPositionAndCounter() : getFirstPositionAndCounter());

synchronized (this) {
cursors.add(cursor);
Expand Down Expand Up @@ -2030,6 +2039,22 @@ Pair<PositionImpl, Long> getLastPositionAndCounter() {
return Pair.create(pos, count);
}

/**
* Get the first position written in the managed ledger, alongside with the associated counter
*/
Pair<PositionImpl, Long> getFirstPositionAndCounter() {
PositionImpl pos;
long count;
Pair<PositionImpl, Long> lastPositionAndCounter;

do {
pos = getFirstPosition();
lastPositionAndCounter = getLastPositionAndCounter();
count = lastPositionAndCounter.second - getNumberOfEntries(Range.openClosed(pos, lastPositionAndCounter.first));
} while (pos.compareTo(getFirstPosition()) != 0 || lastPositionAndCounter.first.compareTo(getLastPosition()) != 0);
return Pair.create(pos, count);
}

public void activateCursor(ManagedCursor cursor) {
if (activeCursors.get(cursor.getName()) == null) {
activeCursors.add(cursor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.bookkeeper.mledger.util.Pair;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
import org.apache.zookeeper.CreateMode;
Expand Down Expand Up @@ -2152,4 +2153,35 @@ public ByteBuf getMessageWithMetadata(byte[] data) throws IOException {
return ByteBufPair.coalesce(ByteBufPair.get(headers, payload));
}

@Test
public void testConsumerSubscriptionInitializePosition() throws Exception{
final int MAX_ENTRY_PER_LEDGER = 2;
ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(MAX_ENTRY_PER_LEDGER);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("lastest_earliest_ledger", config);

final int totalInsertedEntries = 20;
for (int i = 0; i < totalInsertedEntries; i++) {
String content = "entry" + i; // 5 bytes
ledger.addEntry(content.getBytes());
}
// Open Cursor also adds cursor into activeCursor-container
ManagedCursor latestCursor = ledger.openCursor("c1", InitialPosition.Latest);
ManagedCursor earliestCursor = ledger.openCursor("c2", InitialPosition.Earliest);

// Since getReadPosition returns the next position, we decrease the entryId by 1
PositionImpl p1 = (PositionImpl) latestCursor.getReadPosition();
PositionImpl p2 = (PositionImpl) earliestCursor.getReadPosition();

Pair<PositionImpl, Long> latestPositionAndCounter = ledger.getLastPositionAndCounter();
Pair<PositionImpl, Long> earliestPositionAndCounter = ledger.getFirstPositionAndCounter();

assertEquals(latestPositionAndCounter.first.getNext(), p1);
assertEquals(earliestPositionAndCounter.first.getNext(), p2);

assertEquals(latestPositionAndCounter.second.longValue(), totalInsertedEntries);
assertEquals(earliestPositionAndCounter.second.longValue(), totalInsertedEntries - earliestCursor.getNumberOfEntriesInBacklog());

ledger.close();

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.TopicDomain;
Expand Down Expand Up @@ -818,7 +819,7 @@ protected void internalCreateSubscription(String subscriptionName, MessageIdImpl
}

PersistentSubscription subscription = (PersistentSubscription) topic
.createSubscription(subscriptionName).get();
.createSubscription(subscriptionName, InitialPosition.Latest).get();
subscription.resetCursor(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).get();
log.info("[{}][{}] Successfully created subscription {} at message id {}", clientAppId(),
topicName, subscriptionName, messageId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
Expand Down Expand Up @@ -68,6 +70,7 @@ public class Consumer {
private final String appId;
private AuthenticationDataSource authenticationData;
private final String topicName;
private final InitialPosition subscriptionInitialPosition;

private final long consumerId;
private final int priorityLevel;
Expand Down Expand Up @@ -104,7 +107,7 @@ public class Consumer {
public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId,
int priorityLevel, String consumerName,
int maxUnackedMessages, ServerCnx cnx, String appId,
Map<String, String> metadata, boolean readCompacted) throws BrokerServiceException {
Map<String, String> metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition) throws BrokerServiceException {

this.subscription = subscription;
this.subType = subType;
Expand All @@ -114,6 +117,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
this.readCompacted = readCompacted;
this.consumerName = consumerName;
this.maxUnackedMessages = maxUnackedMessages;
this.subscriptionInitialPosition = subscriptionInitialPosition;
this.cnx = cnx;
this.msgOut = new Rate();
this.msgRedeliver = new Rate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSend;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandUnsubscribe;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
Expand Down Expand Up @@ -559,6 +560,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
final boolean readCompacted = subscribe.getReadCompacted();
final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);
final InitialPosition initialPosition = subscribe.getInitialPosition();

CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
Expand Down Expand Up @@ -622,7 +624,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
service.getTopic(topicName.toString())
.thenCompose(topic -> topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
subType, priorityLevel, consumerName, isDurable,
startMessageId, metadata, readCompacted))
startMessageId, metadata, readCompacted, initialPosition))
.thenAccept(consumer -> {
if (consumerFuture.complete(consumer)) {
log.info("[{}] Created subscription on topic {} / {}", remoteAddress, topicName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
Expand Down Expand Up @@ -80,9 +81,9 @@ default long getOriginalSequenceId() {

CompletableFuture<Consumer> subscribe(ServerCnx cnx, String subscriptionName, long consumerId, SubType subType,
int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
Map<String, String> metadata, boolean readCompacted);
Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition);

CompletableFuture<Subscription> createSubscription(String subscriptionName);
CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition);

CompletableFuture<Void> unsubscribe(String subName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
Expand Down Expand Up @@ -322,7 +323,7 @@ public void removeProducer(Producer producer) {
@Override
public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId,
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
Map<String, String> metadata, boolean readCompacted) {
Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition) {

final CompletableFuture<Consumer> future = new CompletableFuture<>();

Expand Down Expand Up @@ -366,7 +367,7 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri

try {
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0, cnx,
cnx.getRole(), metadata, readCompacted);
cnx.getRole(), metadata, readCompacted, initialPosition);
subscription.addConsumer(consumer);
if (!cnx.isActive()) {
consumer.close();
Expand Down Expand Up @@ -396,7 +397,7 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri
}

@Override
public CompletableFuture<Subscription> createSubscription(String subscriptionName) {
public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition) {
return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName));
}

Expand Down
Loading

0 comments on commit e7261b7

Please sign in to comment.