Skip to content

Commit

Permalink
Fix concurrent access of uninitializedCursors in `ManagedLedgerImpl…
Browse files Browse the repository at this point in the history
….asyncOpenCursor` (apache#4837)

### Motivation
Fix concurrent access of `uninitializedCursors` in `ManagedLedgerImpl.asyncOpenCursor`.

### Modifications
* Adds test to expose concurrent access of `uninitializedCursors` in `ManagedLedgerImpl.asyncOpenCursor`.
* Fixes concurrent access of `uninitializedCursors` in `ManagedLedgerImpl.asyncOpenCursor`.
  • Loading branch information
kezhuw authored and sijie committed Aug 5, 2019
1 parent bb1108e commit 5bf319e
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ public void operationComplete() {
cursor.initializeCursorPosition(initialPosition == InitialPosition.Latest ? getLastPositionAndCounter()
: getFirstPositionAndCounter());

synchronized (this) {
synchronized (ManagedLedgerImpl.this) {
cursors.add(cursor);
uninitializedCursors.remove(cursorName).complete(cursor);
}
Expand All @@ -739,7 +739,7 @@ public void operationComplete() {
public void operationFailed(ManagedLedgerException exception) {
log.warn("[{}] Failed to open cursor: {}", name, cursor);

synchronized (this) {
synchronized (ManagedLedgerImpl.this) {
uninitializedCursors.remove(cursorName).completeExceptionally(exception);
}
callback.openCursorFailed(exception, ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
Expand All @@ -45,6 +48,7 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
Expand All @@ -53,6 +57,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -94,6 +99,7 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
Expand Down Expand Up @@ -2182,6 +2188,86 @@ public void openCursorComplete(ManagedCursor cursor, Object ctx) {
ledger.close();
}

@Test
public void testConcurrentOpenCursorShouldNotHaveConcurrentAccessOfUninitializedCursors() throws Exception {
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("ConcurrentAccessOfUninitializedCursors");

final CompletableFuture<ManagedCursor> cursorFuture = new CompletableFuture<>();
final CompletableFuture<Void> removingFuture = new CompletableFuture<>();
final CompletableFuture<Void> concurrentAccessFuture = new CompletableFuture<>();
final Throwable concurrentAccessTimeout = new TimeoutException();

cachedExecutor.execute(() -> {
removingFuture.join();
CompletableFuture<Void> lockingFuture = new CompletableFuture<>();
cachedExecutor.execute(() -> {
try {
lockingFuture.join();

// Gives `synchronized (ledger)` a chance to complete if it got lock immediately.
Thread.sleep(2);

// Normally, following code will process after success or failure contention of
// `synchronized (ledger)`. Theoretically, it is possible that following code
// complete before contention of `synchronized (ledger)` block, but it is rare
// in practice, and it is not harmful as it produces only false positive cases.
concurrentAccessFuture.completeExceptionally(concurrentAccessTimeout);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
});
lockingFuture.complete(null);
synchronized (ledger) {
concurrentAccessFuture.complete(null);
}
});

Map<String, CompletableFuture<ManagedCursor>> uninitializedCursors = ledger.uninitializedCursors;
Map<String, CompletableFuture<ManagedCursor>> spyUninitializedCursors = spy(uninitializedCursors);
doAnswer(mock -> {
removingFuture.complete(null);
try {
// Access of uninitializedCursors should guarded by synchronized(ledger),
// so there are must be no concurrent accesses in this scope. If we get this
// future successfully, then there is a concurrent access.
concurrentAccessFuture.get();
Throwable throwable = new IllegalStateException("Detecting concurrent access of uninitializedCursors");
cursorFuture.completeExceptionally(throwable);
} catch (Exception ex) {
assertSame(ExceptionUtils.getRootCause(ex), concurrentAccessTimeout);
}
return mock.callRealMethod();
}).when(spyUninitializedCursors).remove(anyString());
setFieldValue(ManagedLedgerImpl.class, ledger, "uninitializedCursors", spyUninitializedCursors);

cachedExecutor.execute(() -> {
try {
ledger.asyncOpenCursor("c1", new OpenCursorCallback() {
@Override
public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
cursorFuture.completeExceptionally(exception);
}

@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
cursorFuture.complete(cursor);
}
}, null);
} catch (Exception e) {
cursorFuture.completeExceptionally(e);
}
});

try {
ManagedCursor cursor = cursorFuture.get();
assertNotNull(cursor);
} catch (Exception ex) {
fail(ExceptionUtils.getRootCauseMessage(ex));
} finally {
ledger.close();
}
}

public ByteBuf getMessageWithMetadata(byte[] data) throws IOException {
MessageMetadata messageData = MessageMetadata.newBuilder().setPublishTime(System.currentTimeMillis())
.setProducerName("prod-name").setSequenceId(0).build();
Expand Down

0 comments on commit 5bf319e

Please sign in to comment.