Skip to content

Commit

Permalink
[pulsar-broker] cursor: safe guard to avoid cursor-znode after cursor…
Browse files Browse the repository at this point in the history
… is closed (apache#3556)

### Motivation

Sometimes while shutting down the broker, unknown uncompleted callback tries to update `managed-cursor` even if managed-cursor is closed and because of that topic which is already loaded by new broker sees below exception while updating managed-cursor-metadata and cursor is not able to update mark-delete position until it unloaded again.
```
09:22:02.641 [BookKeeperClientWorker-OrderedExecutor-8-0] INFO  org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper - [sample/global/ns1/persistent/TopicBadVersion] [pulsar.repl.us-east] Updating cursor info ledgerId=234567890 mark-delete=12345678900:5306
00:00:05.642 [bookkeeper-ml-workers-OrderedExecutor-14-0] WARN  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [sample/global/ns1/persistent/TopicBadVersion] Failed to update consumer pulsar.repl.us-east
org.apache.bookkeeper.mledger.ManagedLedgerException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion
Caused by: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:118) ~[pulsar-broker-2.2.5-.jar:2.2.5-]
        at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.lambda$null$103(MetaStoreImplZookeeper.java:287) ~[managed-ledger-original-2.2.5-.jar:2.2.5-]
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [managed-ledger-original-2.2.5-.jar:2.2.5-]
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [bookkeeper-common-4.7.2.jar:4.7.2]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_131]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_131]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-all-4.1.22.Final.jar:4.1.22.Final]
```

It seems it always happens for replicator-cursor and not sure exact step to reproduce but it might happen when bookies might be in unstable state (not sure about the root cause). one of the example:

Behavior:
broker-1 unloads the topic by `09:21:59.451` and broker-2 loads the same topic at `09:22:00.135`. But broker-1 has log of updating metadata-znode at `09:22:05.207`. So, broker-2 fails to update cursor-metadata with exception `BadVersionException`
```
Previous-broker: broker-1
09:21:58.855 [shutdown-thread-49-1] INFO  org.apache.pulsar.broker.namespace.OwnedBundle - Disabling ownership: sample/global/ns1/0x2aaaaaa8_0x35555552
:
:
09:21:59.668 [shutdown-thread-49-1] INFO  org.apache.pulsar.broker.namespace.OwnedBundle - Unloading sample/global/ns1/0x2aaaaaa8_0x35555552 namespace-bundle with 345 topics completed in 812.0 ms
:
09:21:58.930 [shutdown-thread-49-1] INFO  org.apache.pulsar.broker.service.BrokerService - [persistent://sample/global/ns1/TopicBadVersion] Unloading topic
:
09:21:59.451 [bookkeeper-ml-workers-OrderedExecutor-14-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [sample/global/ns1/persistent/TopicBadVersion][pulsar.repl.us-east] Closed cursor at md-position=12345678900:5306
09:21:59.451 [bookkeeper-ml-workers-OrderedExecutor-14-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://sample/global/ns1/TopicBadVersion] Topic closed
:
09:22:05.207 [BookKeeperClientWorker-OrderedExecutor-2-0] WARN  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [sample/global/ns1/persistent/TopicBadVersion] Error updating cursor pulsar.repl.us-east position 12345678900:5306 in meta-ledger 14451380450: BookKeeper client is closed
09:22:05.207 [BookKeeperClientWorker-OrderedExecutor-2-0] INFO  org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper - [sample/global/ns1/persistent/TopicBadVersion] [pulsar.repl.us-east] Updating cursor info ledgerId=-1 mark-delete=12345678900:5306
```

```
Current-broker: broker-2
09:22:00.135 [pulsar-ordered-OrderedExecutor-7-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - Opening managed ledger sample/global/ns1/persistent/TopicBadVersion
```

### Modification

It happens when someone tries to update cursor metadata by calling `persistPositionMetaStore(..)` so, it should have safe guard and should not update metadata if cursor is already closed.

### Result
It avoids any unexpected behavior while updating cursor-metadata in closed cursor state.
  • Loading branch information
rdhabalia authored and sijie committed Feb 9, 2019
1 parent a1a20a8 commit b9fb91e
Showing 1 changed file with 22 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ enum State {
NoLedger, // There is no metadata ledger open for writing
Open, // Metadata ledger is ready
SwitchingLedger, // The metadata ledger is being switched
Closing, // The managed cursor is closing
Closed // The managed cursor has been closed
}

Expand Down Expand Up @@ -439,7 +440,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
public void asyncReadEntries(final int numberOfEntriesToRead, final ReadEntriesCallback callback,
final Object ctx) {
checkArgument(numberOfEntriesToRead > 0);
if (STATE_UPDATER.get(this) == State.Closed) {
if (isClosed()) {
callback.readEntriesFailed(new ManagedLedgerException("Cursor was already closed"), ctx);
return;
}
Expand Down Expand Up @@ -489,7 +490,7 @@ public void readEntryComplete(Entry entry, Object ctx) {
public void asyncGetNthEntry(int n, IndividualDeletedEntries deletedEntries, ReadEntryCallback callback,
Object ctx) {
checkArgument(n > 0);
if (STATE_UPDATER.get(this) == State.Closed) {
if (isClosed()) {
callback.readEntryFailed(new ManagedLedgerException("Cursor was already closed"), ctx);
return;
}
Expand Down Expand Up @@ -554,7 +555,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
@Override
public void asyncReadEntriesOrWait(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx) {
checkArgument(numberOfEntriesToRead > 0);
if (STATE_UPDATER.get(this) == State.Closed) {
if (isClosed()) {
callback.readEntriesFailed(new CursorAlreadyClosedException("Cursor was already closed"), ctx);
return;
}
Expand Down Expand Up @@ -628,6 +629,10 @@ public void asyncReadEntriesOrWait(int numberOfEntriesToRead, ReadEntriesCallbac
}
}

private boolean isClosed() {
return state == State.Closed || state == State.Closing;
}

@Override
public boolean cancelPendingReadRequest() {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -1351,7 +1356,7 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
checkNotNull(position);
checkArgument(position instanceof PositionImpl);

if (STATE_UPDATER.get(this) == State.Closed) {
if (isClosed()) {
callback.markDeleteFailed(new ManagedLedgerException("Cursor was already closed"), ctx);
return;
}
Expand Down Expand Up @@ -1567,7 +1572,7 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {

@Override
public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallback callback, Object ctx) {
if (state == State.Closed) {
if (isClosed()) {
callback.deleteFailed(new ManagedLedgerException("Cursor was already closed"), ctx);
return;
}
Expand Down Expand Up @@ -1885,6 +1890,14 @@ private boolean shouldPersistUnackRangesToLedger() {

private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl position, Map<String, Long> properties,
MetaStoreCallback<Void> callback, boolean persistIndividualDeletedMessageRanges) {
if (state == State.Closed) {
ledger.getExecutor().execute(safeRun(() -> {
callback.operationFailed(new MetaStoreException(
new ManagedLedgerException.CursorAlreadyClosedException(name + " cursor already closed")));
}));
return;
}

// When closing we store the last mark-delete position in the z-node itself, so we won't need the cursor ledger,
// hence we write it as -1. The cursor ledger is deleted once the z-node write is confirmed.
ManagedCursorInfo.Builder info = ManagedCursorInfo.newBuilder() //
Expand Down Expand Up @@ -1919,13 +1932,14 @@ public void operationFailed(MetaStoreException e) {

@Override
public void asyncClose(final AsyncCallbacks.CloseCallback callback, final Object ctx) {
State oldState = STATE_UPDATER.getAndSet(this, State.Closed);
if (oldState == State.Closed) {
State oldState = STATE_UPDATER.getAndSet(this, State.Closing);
if (oldState == State.Closed || oldState == State.Closing) {
log.info("[{}] [{}] State is already closed", ledger.getName(), name);
callback.closeComplete(ctx);
return;
}
persistPosition(-1, lastMarkDeleteEntry.newPosition, lastMarkDeleteEntry.properties, callback, ctx);
STATE_UPDATER.set(this, State.Closed);
}

/**
Expand Down Expand Up @@ -2175,7 +2189,7 @@ boolean shouldCloseLedger(LedgerHandle lh) {
long now = clock.millis();
if ((lh.getLastAddConfirmed() >= config.getMetadataMaxEntriesPerLedger()
|| lastLedgerSwitchTimestamp < (now - config.getLedgerRolloverTimeout() * 1000))
&& STATE_UPDATER.get(this) != State.Closed) {
&& (STATE_UPDATER.get(this) != State.Closed && STATE_UPDATER.get(this) != State.Closing)) {
// It's safe to modify the timestamp since this method will be only called from a callback, implying that
// calls will be serialized on one single thread
lastLedgerSwitchTimestamp = now;
Expand Down

0 comments on commit b9fb91e

Please sign in to comment.