Skip to content

Commit

Permalink
Avoid introduce null read position for the managed cursor. (apache#7264)
Browse files Browse the repository at this point in the history
### Motivation

Avoid introduce null read position for the managed cursor. 

Here is the error log related to null read position:
```
18:52:13.366 [pulsar-stats-updater-23-1] ERROR org.apache.pulsar.broker.service.persistent.PersistentTopic - Got exception when creating consumer stats for subscription itom-di-dp-preload_chotest_2-reader-4bd4e3dd50: null
java.lang.NullPointerException: null
	at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:877) ~[com.google.guava-guava-25.1-jre.jar:?]
	at org.apache.bookkeeper.mledger.impl.PositionImpl.compareTo(PositionImpl.java:92) ~[org.apache.pulsar-managed-ledger-2.5.2.jar:2.5.2]
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.getNumberOfEntriesSinceFirstNotAckedMessage(ManagedCursorImpl.java:721) ~[org.apache.pulsar-managed-ledger-2.5.2.jar:2.5.2]
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.getNumberOfEntriesSinceFirstNotAckedMessage(PersistentSubscription.java:790) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$updateRates$46(PersistentTopic.java:1419) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.updateRates(PersistentTopic.java:1387) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
	at org.apache.pulsar.broker.service.PulsarStats.lambda$null$1(PulsarStats.java:134) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
	at org.apache.pulsar.broker.service.PulsarStats.lambda$null$3(PulsarStats.java:131) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
	at org.apache.pulsar.broker.service.PulsarStats.lambda$updateStats$4(PulsarStats.java:120) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
	at org.apache.pulsar.broker.service.PulsarStats.updateStats(PulsarStats.java:110) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
	at org.apache.pulsar.broker.service.BrokerService.updateRates(BrokerService.java:1145) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
	at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [org.apache.pulsar-managed-ledger-2.5.2.jar:2.5.2]
	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_242]
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_242]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_242]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_242]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_242]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_242]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242]
```
The most doubtful thing is `getNextValidPosition` method in the ManagedLedgerImpl. If given a position which greater than the last add position, it will return a null value. This may cause the read position to become null. But I haven’t found how this situation appears. So in the PR, I added a log and print the stack trace which can help us to find the root cause and failback to the next position of the last position if the null next valid position occurs.
  • Loading branch information
codelipenghui authored Jun 16, 2020
1 parent 168e334 commit 7955cef
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,7 @@ public long getNumberOfEntriesSinceFirstNotAckedMessage() {
// validate it before preparing range
PositionImpl markDeletePosition = this.markDeletePosition;
PositionImpl readPosition = this.readPosition;
return (markDeletePosition.compareTo(readPosition) < 0)
return (markDeletePosition != null && readPosition != null && markDeletePosition.compareTo(readPosition) < 0)
? ledger.getNumberOfEntries(Range.openClosed(markDeletePosition, readPosition))
: 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2830,11 +2830,22 @@ public Long getNextValidLedger(long ledgerId) {
}

public PositionImpl getNextValidPosition(final PositionImpl position) {
PositionImpl next;
try {
next = getNextValidPositionInternal(position);
} catch (NullPointerException e) {
next = lastConfirmedEntry.getNext();
log.error("[{}] Can't find next valid position, fail back to the next position of the last position.", name, e);
}
return next;
}

public PositionImpl getNextValidPositionInternal(final PositionImpl position) {
PositionImpl nextPosition = position.getNext();
while (!isValidPosition(nextPosition)) {
Long nextLedgerId = ledgers.ceilingKey(nextPosition.getLedgerId() + 1);
if (nextLedgerId == null) {
return null;
throw new NullPointerException();
}
nextPosition = PositionImpl.get(nextLedgerId, 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1965,6 +1965,8 @@ public void testGetNextValidPosition() throws Exception {
assertEquals(ledger.getNextValidPosition((PositionImpl) c1.getMarkDeletedPosition()), p1);
assertEquals(ledger.getNextValidPosition(p1), p2);
assertEquals(ledger.getNextValidPosition(p3), PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1));
assertEquals(ledger.getNextValidPosition(PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1)), PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1));
assertEquals(ledger.getNextValidPosition(PositionImpl.get(p3.getLedgerId() + 1, p3.getEntryId() + 1)), PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri
future.completeExceptionally(e);
}
}).exceptionally(ex -> {
log.warn("[{}] Failed to create subscription: {} error: {}", topic, subscriptionName, ex.getMessage());
log.error("[{}] Failed to create subscription: {} error: {}", topic, subscriptionName, ex);
USAGE_COUNT_UPDATER.decrementAndGet(PersistentTopic.this);
future.completeExceptionally(new PersistenceException(ex));
return null;
Expand Down

0 comments on commit 7955cef

Please sign in to comment.