From 372575a9877bf50e8f55a9568ef6c07fcae86644 Mon Sep 17 00:00:00 2001 From: Ezequiel Lovelle Date: Mon, 8 Apr 2019 23:01:19 -0300 Subject: [PATCH] [issue #3975] Bugfix NPE on non durable consumer (#3988) *Motivation* Trying to fix #3975 When a reset of a cursor is performed with some timestamp on a non-durable consumer the message finder will fail with null pointer exception due to `cursor.getName()` being null. *Modifications* - Add method overloading for `newNonDurableCursor()` with subscription name. - Fix method getNonDurableSubscription to call `newNonDurableCursor()` with proper subscription name - Add test to assert issue. --- .../bookkeeper/mledger/ManagedLedger.java | 1 + .../mledger/impl/ManagedLedgerImpl.java | 10 +++++++ .../service/persistent/PersistentTopic.java | 2 +- .../pulsar/client/api/TopicReaderTest.java | 28 ++++++++++++++++++- 4 files changed, 39 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index 00fc2d06e0870..d51b0d803a7d0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -193,6 +193,7 @@ public interface ManagedLedger { * @return the new NonDurableCursor */ ManagedCursor newNonDurableCursor(Position startCursorPosition) throws ManagedLedgerException; + ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName) throws ManagedLedgerException; /** * Delete a ManagedCursor asynchronously. diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index a398162317701..7a4eb60ba2bbe 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -844,6 +844,16 @@ public ManagedCursor newNonDurableCursor(Position startCursorPosition) throws Ma return new NonDurableCursorImpl(bookKeeper, config, this, null, (PositionImpl) startCursorPosition); } + @Override + public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cursorName) + throws ManagedLedgerException { + checkManagedLedgerIsOpen(); + checkFenced(); + + return new NonDurableCursorImpl(bookKeeper, config, this, cursorName, + (PositionImpl) startCursorPosition); + } + @Override public Iterable getCursors() { return cursors; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 10394370f878a..e1a050979e282 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -641,7 +641,7 @@ private CompletableFuture getNonDurableSubscription(Stri Position startPosition = new PositionImpl(ledgerId, entryId); ManagedCursor cursor = null; try { - cursor = ledger.newNonDurableCursor(startPosition); + cursor = ledger.newNonDurableCursor(startPosition, subscriptionName); } catch (ManagedLedgerException e) { subscriptionFuture.completeExceptionally(e); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java index 6318974e22678..c1e93aa76b968 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java @@ -34,7 +34,9 @@ import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.ReaderImpl; import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.common.util.RelativeTimeUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -479,9 +481,33 @@ public void testMessageAvailableAfterRestart() throws Exception { assertTrue(reader.hasMessageAvailable()); String readOut = new String(reader.readNext().getData()); - assertTrue(readOut.equals(content)); + assertEquals(content, readOut); assertFalse(reader.hasMessageAvailable()); } } + + @Test(timeOut = 10000) + public void testReaderNonDurableIsAbleToSeekRelativeTime() throws Exception { + final int numOfMessage = 10; + final String topicName = "persistent://my-property/my-ns/ReaderSeek"; + + Producer producer = pulsarClient.newProducer() + .topic(topicName).create(); + + for (int i = 0; i < numOfMessage; i++) { + producer.send(String.format("msg num %d", i).getBytes()); + } + + Reader reader = pulsarClient.newReader().topic(topicName) + .startMessageId(MessageId.earliest).create(); + assertTrue(reader.hasMessageAvailable()); + + ((ReaderImpl) reader).getConsumer().seek(RelativeTimeUtil.parseRelativeTimeInSeconds("-1m")); + + assertTrue(reader.hasMessageAvailable()); + + reader.close(); + producer.close(); + } }