Skip to content

Commit

Permalink
[issue apache#3975] Bugfix NPE on non durable consumer (apache#3988)
Browse files Browse the repository at this point in the history
*Motivation*

Trying to fix apache#3975

When a reset of a cursor is performed with some timestamp on a non-durable
consumer the message finder will fail with null pointer exception due to
`cursor.getName()` being null.

*Modifications*

  - Add method overloading for `newNonDurableCursor()` with subscription name.
  - Fix method getNonDurableSubscription to call `newNonDurableCursor()` with
    proper subscription name
  - Add test to assert issue.
  • Loading branch information
lovelle authored and merlimat committed Apr 9, 2019
1 parent 6bce00b commit 372575a
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ public interface ManagedLedger {
* @return the new NonDurableCursor
*/
ManagedCursor newNonDurableCursor(Position startCursorPosition) throws ManagedLedgerException;
ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName) throws ManagedLedgerException;

/**
* Delete a ManagedCursor asynchronously.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,16 @@ public ManagedCursor newNonDurableCursor(Position startCursorPosition) throws Ma
return new NonDurableCursorImpl(bookKeeper, config, this, null, (PositionImpl) startCursorPosition);
}

@Override
public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cursorName)
throws ManagedLedgerException {
checkManagedLedgerIsOpen();
checkFenced();

return new NonDurableCursorImpl(bookKeeper, config, this, cursorName,
(PositionImpl) startCursorPosition);
}

@Override
public Iterable<ManagedCursor> getCursors() {
return cursors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ private CompletableFuture<? extends Subscription> getNonDurableSubscription(Stri
Position startPosition = new PositionImpl(ledgerId, entryId);
ManagedCursor cursor = null;
try {
cursor = ledger.newNonDurableCursor(startPosition);
cursor = ledger.newNonDurableCursor(startPosition, subscriptionName);
} catch (ManagedLedgerException e) {
subscriptionFuture.completeExceptionally(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand Down Expand Up @@ -479,9 +481,33 @@ public void testMessageAvailableAfterRestart() throws Exception {
assertTrue(reader.hasMessageAvailable());

String readOut = new String(reader.readNext().getData());
assertTrue(readOut.equals(content));
assertEquals(content, readOut);
assertFalse(reader.hasMessageAvailable());
}

}

@Test(timeOut = 10000)
public void testReaderNonDurableIsAbleToSeekRelativeTime() throws Exception {
final int numOfMessage = 10;
final String topicName = "persistent://my-property/my-ns/ReaderSeek";

Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName).create();

for (int i = 0; i < numOfMessage; i++) {
producer.send(String.format("msg num %d", i).getBytes());
}

Reader<byte[]> reader = pulsarClient.newReader().topic(topicName)
.startMessageId(MessageId.earliest).create();
assertTrue(reader.hasMessageAvailable());

((ReaderImpl) reader).getConsumer().seek(RelativeTimeUtil.parseRelativeTimeInSeconds("-1m"));

assertTrue(reader.hasMessageAvailable());

reader.close();
producer.close();
}
}

0 comments on commit 372575a

Please sign in to comment.