Skip to content

Commit

Permalink
Fix the nondurable consumer can not specify the initial position (apa…
Browse files Browse the repository at this point in the history
…che#7702)

Fixes apache#7619

Motivation

When using nondurable consumer to consumer message and specify the initial position
from earliest, the consumer can not start consume from the earliest message.
  • Loading branch information
zymap authored Aug 3, 2020
1 parent ca98a89 commit bea5dd8
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ public interface ManagedLedger {
*/
ManagedCursor newNonDurableCursor(Position startCursorPosition) throws ManagedLedgerException;
ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName) throws ManagedLedgerException;
ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName, InitialPosition initialPosition) throws ManagedLedgerException;

/**
* Delete a ManagedCursor asynchronously.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,12 @@ public ManagedCursor newNonDurableCursor(Position startCursorPosition) throws Ma
}

@Override
public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cursorName)
public ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName) throws ManagedLedgerException {
return newNonDurableCursor(startPosition, subscriptionName, InitialPosition.Latest);
}

@Override
public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cursorName, InitialPosition initialPosition)
throws ManagedLedgerException {
Objects.requireNonNull(cursorName, "cursor name can't be null");
checkManagedLedgerIsOpen();
Expand All @@ -872,7 +877,7 @@ public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cu
}

NonDurableCursorImpl cursor = new NonDurableCursorImpl(bookKeeper, config, this, cursorName,
(PositionImpl) startCursorPosition);
(PositionImpl) startCursorPosition, initialPosition);
cursor.setActive();

log.info("[{}] Opened new cursor: {}", name, cursor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,29 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NonDurableCursorImpl extends ManagedCursorImpl {

NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName,
PositionImpl startCursorPosition) {
PositionImpl startCursorPosition, PulsarApi.CommandSubscribe.InitialPosition initialPosition) {
super(bookkeeper, config, ledger, cursorName);

// Compare with "latest" position marker by using only the ledger id. Since the C++ client is using 48bits to
// store the entryId, it's not able to pass a Long.max() as entryId. In this case there's no point to require
// both ledgerId and entryId to be Long.max()
if (startCursorPosition == null || startCursorPosition.getLedgerId() == PositionImpl.latest.getLedgerId()) {
// Start from last entry
initializeCursorPosition(ledger.getLastPositionAndCounter());
switch (initialPosition) {
case Latest:
initializeCursorPosition(ledger.getLastPositionAndCounter());
break;
case Earliest:
initializeCursorPosition(ledger.getFirstPositionAndCounter());
break;
}
} else if (startCursorPosition.getLedgerId() == PositionImpl.earliest.getLedgerId()) {
// Start from invalid ledger to read from first available entry
recoverCursor(ledger.getPreviousPosition(ledger.getFirstPosition()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri

CompletableFuture<? extends Subscription> subscriptionFuture = isDurable ? //
getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec, replicatedSubscriptionState) //
: getNonDurableSubscription(subscriptionName, startMessageId, startMessageRollbackDurationSec);
: getNonDurableSubscription(subscriptionName, startMessageId, initialPosition, startMessageRollbackDurationSec);

int maxUnackedMessages = isDurable
? maxUnackedMessagesOnConsumer
Expand Down Expand Up @@ -693,7 +693,7 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
}

private CompletableFuture<? extends Subscription> getNonDurableSubscription(String subscriptionName,
MessageId startMessageId, long startMessageRollbackDurationSec) {
MessageId startMessageId, InitialPosition initialPosition, long startMessageRollbackDurationSec) {
log.info("[{}][{}] Creating non-durable subscription at msg id {}", topic, subscriptionName, startMessageId);

synchronized (ledger) {
Expand All @@ -719,7 +719,7 @@ private CompletableFuture<? extends Subscription> getNonDurableSubscription(Stri
Position startPosition = new PositionImpl(ledgerId, entryId);
ManagedCursor cursor = null;
try {
cursor = ledger.newNonDurableCursor(startPosition, subscriptionName);
cursor = ledger.newNonDurableCursor(startPosition, subscriptionName, initialPosition);
} catch (ManagedLedgerException e) {
return FutureUtil.failedFuture(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,4 +269,9 @@ public void testConsumerBuilderImplWhenNumericPropertiesAreValid() {
consumerBuilderImpl.patternAutoDiscoveryPeriod(1, TimeUnit.SECONDS);
}

@Test
public void testConsumerMode() {
consumerBuilderImpl.subscriptionMode(SubscriptionMode.NonDurable)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.messaging;

import lombok.Cleanup;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.testng.annotations.Test;

import java.util.stream.IntStream;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals;

public class NonDurableConsumerMessagingTest extends MessagingBase {

@Test(dataProvider = "ServiceUrls")
public void testNonDurableConsumer(String serviceUrls) throws Exception {
final String topicName = getNonPartitionedTopic("test-non-durable-consumer", false);
@Cleanup
final PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrls).build();

int numMessages = 20;

try (final Producer<byte[]> producer = client.newProducer()
.topic(topicName)
.create()) {

IntStream.range(0, numMessages).forEach(i -> {
String payload = "message-" + i;
producer.sendAsync(payload.getBytes(UTF_8));
});
// flush the producer to make sure all messages are persisted
producer.flush();

try (final Consumer<byte[]> consumer = client.newConsumer()
.topic(topicName)
.subscriptionName("non-durable-consumer")
.subscriptionMode(SubscriptionMode.NonDurable)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe()) {

for (int i = 0; i < numMessages; i++) {
Message<byte[]> msg = consumer.receive();
assertEquals(
"message-" + i,
new String(msg.getValue(), UTF_8)
);
}
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -439,4 +439,5 @@ protected void partitionedTopicSendAndReceiveWithKeyShared(String serviceUrl, bo
closeConsumers(consumerList);
log.info("-- Exiting {} test --", methodName);
}

}

0 comments on commit bea5dd8

Please sign in to comment.