Skip to content

Commit

Permalink
MockZooKeeper#failNow is unreliable (apache#7109)
Browse files Browse the repository at this point in the history
* MockZooKeeper#failNow is unreliable

The MockZooKeeper#failNow instructs the MockZooKeeper instance to fail
the next call to zookeeper. In a multithreaded system with many things
accessing zookeeper, using #failNow is unreliable, as a background
thread could try to access ZK before the call that is actually under
tests accesses it.

This change tightens the condition on which the failed ZK call can
occur, by checking the operation type and path. This resolves a flake
that was occuring in ZooKeeperSessionExpiryRecoveryTest.

* Fixed import missing after merge

Co-authored-by: Ivan Kelly <[email protected]>
  • Loading branch information
merlimat and Ivan Kelly authored Jun 1, 2020
1 parent 0b020d9 commit 7314ac2
Show file tree
Hide file tree
Showing 12 changed files with 339 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.MockZooKeeper;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
Expand Down Expand Up @@ -1168,7 +1169,11 @@ void errorCreatingCursor() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger");

bkc.failAfter(1, BKException.Code.NotEnoughBookiesException);
zkc.failNow(Code.SESSIONEXPIRED);
zkc.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return path.equals("/managed-ledgers/my_test_ledger/c1")
&& op == MockZooKeeper.Op.CREATE;
});

try {
ledger.openCursor("c1");
fail("should have failed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.DigestType;
Expand All @@ -41,6 +42,7 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.MockZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
Expand All @@ -54,7 +56,10 @@ public void removingCursor() throws Exception {

assertNotNull(zkc.exists("/managed-ledgers/my_test_ledger/c1", false));

zkc.failNow(Code.BADVERSION);
zkc.failConditional(Code.BADVERSION, (op, path) -> {
return op == MockZooKeeper.Op.SET
&& path.equals("/managed-ledgers/my_test_ledger/c1");
});

try {
c1.close();
Expand All @@ -77,7 +82,10 @@ public void removingCursor2() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger");
ledger.openCursor("c1");

zkc.failNow(Code.CONNECTIONLOSS);
zkc.failConditional(Code.CONNECTIONLOSS, (op, path) -> {
return op == MockZooKeeper.Op.DELETE
&& path.equals("/managed-ledgers/my_test_ledger/c1");
});

try {
ledger.deleteCursor("c1");
Expand Down Expand Up @@ -207,7 +215,11 @@ public void errorInRecovering4() throws Exception {

factory = new ManagedLedgerFactoryImpl(bkc, zkc);

zkc.failAfter(1, Code.CONNECTIONLOSS);

zkc.failConditional(Code.CONNECTIONLOSS, (op, path) -> {
return path.equals("/managed-ledgers/my_test_ledger")
&& op == MockZooKeeper.Op.SET;
});

try {
ledger = factory.open("my_test_ledger");
Expand All @@ -229,7 +241,10 @@ public void errorInRecovering5() throws Exception {

factory = new ManagedLedgerFactoryImpl(bkc, zkc);

zkc.failAfter(2, Code.CONNECTIONLOSS);
zkc.failConditional(Code.CONNECTIONLOSS, (op, path) -> {
return path.equals("/managed-ledgers/my_test_ledger")
&& op == MockZooKeeper.Op.GET_CHILDREN;
});

try {
ledger = factory.open("my_test_ledger");
Expand All @@ -252,7 +267,10 @@ public void errorInRecovering6() throws Exception {

factory = new ManagedLedgerFactoryImpl(bkc, zkc);

zkc.failAfter(3, Code.CONNECTIONLOSS);
zkc.failConditional(Code.CONNECTIONLOSS, (op, path) -> {
return path.equals("/managed-ledgers/my_test_ledger/c1")
&& op == MockZooKeeper.Op.GET;
});

try {
ledger = factory.open("my_test_ledger");
Expand Down Expand Up @@ -302,9 +320,12 @@ public void digestError() throws Exception {
public void errorInUpdatingLedgersList() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));

final CountDownLatch latch = new CountDownLatch(1);
CompletableFuture<Void> promise = new CompletableFuture<>();

zkc.failAfter(0, Code.CONNECTIONLOSS);
zkc.failConditional(Code.CONNECTIONLOSS, (op, path) -> {
return path.equals("/managed-ledgers/my_test_ledger")
&& op == MockZooKeeper.Op.SET;
});

ledger.asyncAddEntry("entry".getBytes(), new AddEntryCallback() {
public void addFailed(ManagedLedgerException exception, Object ctx) {
Expand All @@ -318,22 +339,25 @@ public void addComplete(Position position, Object ctx) {

ledger.asyncAddEntry("entry".getBytes(), new AddEntryCallback() {
public void addFailed(ManagedLedgerException exception, Object ctx) {
latch.countDown();
promise.complete(null);
}

public void addComplete(Position position, Object ctx) {
fail("should have failed");
promise.completeExceptionally(new Exception("should have failed"));
}
}, null);

latch.await();
promise.get();
}

@Test
public void recoverAfterZnodeVersionError() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));

zkc.failNow(Code.BADVERSION);
zkc.failConditional(Code.BADVERSION, (op, path) -> {
return path.equals("/managed-ledgers/my_test_ledger")
&& op == MockZooKeeper.Op.SET;
});

// First write will succeed
ledger.addEntry("test".getBytes());
Expand Down Expand Up @@ -369,7 +393,11 @@ public void recoverAfterWriteError() throws Exception {
assertEquals(cursor.getNumberOfEntriesInBacklog(false), 1);

bkc.failNow(BKException.Code.BookieHandleNotAvailableException);
zkc.failNow(Code.CONNECTIONLOSS);
zkc.failConditional(Code.CONNECTIONLOSS, (op, path) -> {
return path.equals("/managed-ledgers/my_test_ledger")
&& op == MockZooKeeper.Op.SET;
});

try {
ledger.addEntry("entry-2".getBytes());
fail("should fail");
Expand Down Expand Up @@ -458,7 +486,10 @@ public void recoverAfterMarkDeleteError() throws Exception {
Position position = ledger.addEntry("entry".getBytes());

bkc.failNow(BKException.Code.BookieHandleNotAvailableException);
zkc.failNow(Code.CONNECTIONLOSS);
zkc.failConditional(Code.CONNECTIONLOSS, (op, path) -> {
return path.equals("/managed-ledgers/my_test_ledger/my-cursor")
&& op == MockZooKeeper.Op.SET;
});

try {
cursor.markDelete(position);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
import org.apache.pulsar.metadata.impl.zookeeper.ZKMetadataStore;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
Expand Down Expand Up @@ -1467,7 +1468,11 @@ public void discardEmptyLedgersOnError() throws Exception {
assertEquals(ledger.getLedgersInfoAsList().size(), 1);

bkc.failNow(BKException.Code.NoBookieAvailableException);
zkc.failNow(Code.CONNECTIONLOSS);
zkc.failConditional(Code.CONNECTIONLOSS, (op, path) -> {
return path.equals("/managed-ledgers/my_test_ledger")
&& op == MockZooKeeper.Op.SET;
});

try {
ledger.addEntry("entry".getBytes());
fail("Should have received exception");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -34,6 +35,7 @@
import org.apache.pulsar.metadata.impl.zookeeper.ZKMetadataStore;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.ZooDefs;
import org.testng.annotations.Test;

Expand All @@ -43,7 +45,10 @@ public class MetaStoreImplTest extends MockedBookKeeperTestCase {
void getMLList() throws Exception {
MetaStore store = new MetaStoreImpl(new ZKMetadataStore(zkc), executor);

zkc.failNow(Code.CONNECTIONLOSS);
zkc.failConditional(Code.CONNECTIONLOSS, (op, path) -> {
return op == MockZooKeeper.Op.GET_CHILDREN
&& path.equals("/managed-ledgers");
});

try {
store.getManagedLedgers();
Expand Down Expand Up @@ -129,22 +134,22 @@ public void operationComplete(ManagedCursorInfo result, Stat version) {
void failInCreatingMLnode() throws Exception {
MetaStore store = new MetaStoreImpl(new ZKMetadataStore(zkc), executor);

final CountDownLatch latch = new CountDownLatch(1);
final CompletableFuture<Void> promise = new CompletableFuture<>();

zkc.failAfter(1, Code.CONNECTIONLOSS);
zkc.failConditional(Code.CONNECTIONLOSS, (op, path) -> {
return op == MockZooKeeper.Op.CREATE;
});

store.getManagedLedgerInfo("my_test", false, new MetaStoreCallback<MLDataFormats.ManagedLedgerInfo>() {
public void operationFailed(MetaStoreException e) {
// Ok
latch.countDown();
promise.complete(null);
}

public void operationComplete(ManagedLedgerInfo result, Stat version) {
fail("Operation should have failed");
promise.completeExceptionally(new Exception("Operation should have failed"));
}
});

latch.await();
promise.get();
}

@Test(timeOut = 20000)
Expand All @@ -153,34 +158,36 @@ void updatingCursorNode() throws Exception {

zkc.create("/managed-ledgers/my_test", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

final CountDownLatch latch = new CountDownLatch(1);
final CompletableFuture<Void> promise = new CompletableFuture<>();

ManagedCursorInfo info = ManagedCursorInfo.newBuilder().setCursorsLedgerId(1).build();
store.asyncUpdateCursorInfo("my_test", "c1", info, null, new MetaStoreCallback<Void>() {
public void operationFailed(MetaStoreException e) {
fail("should have succeeded");
promise.completeExceptionally(e);
}

public void operationComplete(Void result, Stat version) {
// Update again using the version
zkc.failNow(Code.CONNECTIONLOSS);
zkc.failConditional(Code.CONNECTIONLOSS, (op, path) -> {
return op == MockZooKeeper.Op.SET
&& path.contains("my_test") && path.contains("c1");
});

ManagedCursorInfo info = ManagedCursorInfo.newBuilder().setCursorsLedgerId(2).build();
store.asyncUpdateCursorInfo("my_test", "c1", info, version, new MetaStoreCallback<Void>() {
public void operationFailed(MetaStoreException e) {
// ok
latch.countDown();
promise.complete(null);
}

@Override
public void operationComplete(Void result, Stat version) {
fail("should have failed");
promise.completeExceptionally(new Exception("should have failed"));
}
});
}
});

latch.await();
promise.get();
}

@Test(timeOut = 20000)
Expand All @@ -189,31 +196,34 @@ void updatingMLNode() throws Exception {

zkc.create("/managed-ledgers/my_test", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

final CountDownLatch latch = new CountDownLatch(1);
final CompletableFuture<Void> promise = new CompletableFuture<>();

store.getManagedLedgerInfo("my_test", false, new MetaStoreCallback<ManagedLedgerInfo>() {
public void operationFailed(MetaStoreException e) {
fail("should have succeeded");
promise.completeExceptionally(e);
}

public void operationComplete(ManagedLedgerInfo mlInfo, Stat version) {
// Update again using the version
zkc.failNow(Code.BADVERSION);
zkc.failConditional(Code.BADVERSION, (op, path) -> {
return op == MockZooKeeper.Op.SET
&& path.contains("my_test");
});

store.asyncUpdateLedgerIds("my_test", mlInfo, version, new MetaStoreCallback<Void>() {
public void operationFailed(MetaStoreException e) {
// ok
latch.countDown();
promise.complete(null);
}

@Override
public void operationComplete(Void result, Stat version) {
fail("should have failed");
promise.completeExceptionally(new Exception("should have failed"));
}
});
}
});

latch.await();
promise.get();
}
}
Loading

0 comments on commit 7314ac2

Please sign in to comment.