diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ServerCnxTest.java index 8f43fc669db0c..8cff414f2b9dd 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ServerCnxTest.java @@ -32,6 +32,7 @@ import java.io.IOException; import java.lang.reflect.Field; import java.util.ArrayList; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -616,15 +617,12 @@ public void testCreateProducerTimeout() throws Exception { setChannelConnected(); // Delay the topic creation in a deterministic way - CountDownLatch successTopicCreationDelayLatch = new CountDownLatch(1); - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - successTopicCreationDelayLatch.await(); - + CompletableFuture openTopicFuture = new CompletableFuture<>(); + doAnswer(invocationOnMock -> { + openTopicFuture.complete(() -> { ((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null); - return null; - } + }); + return null; }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class), any(OpenLedgerCallback.class), anyObject()); @@ -649,7 +647,8 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { producerName); channel.writeInbound(createProducer2); - successTopicCreationDelayLatch.countDown(); + // Complete the topic opening + openTopicFuture.get().run(); // Close succeeds Object response = getResponse(); @@ -749,36 +748,21 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { channel.finish(); } - @Test(timeOut = 30000) + @Test(timeOut = 30000, invocationCount = 1, skipFailedInvocations = true) public void testCreateProducerBookieTimeout() throws Exception { resetChannel(); setChannelConnected(); // Delay the topic creation in a deterministic way - CountDownLatch failedTopicCreationDelayLatch = new CountDownLatch(1); - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - failedTopicCreationDelayLatch.await(); - + CompletableFuture openFailedTopic = new CompletableFuture<>(); + doAnswer(invocationOnMock -> { + openFailedTopic.complete(() -> { ((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null); - return null; - } + }); + return null; }).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class), any(OpenLedgerCallback.class), anyObject()); - CountDownLatch topicCreationDelayLatch = new CountDownLatch(1); - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - topicCreationDelayLatch.await(); - - ((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null); - return null; - } - }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class), - any(OpenLedgerCallback.class), anyObject()); - // In a create producer timeout from client side we expect to see this sequence of commands : // 1. create a failure producer which will timeout creation after 100msec // 2. close producer @@ -801,8 +785,8 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { producerName); channel.writeInbound(createProducer2); - failedTopicCreationDelayLatch.countDown(); - topicCreationDelayLatch.countDown(); + // Now the topic gets opened + openFailedTopic.get().run(); // Close succeeds Object response = getResponse(); @@ -840,17 +824,13 @@ public void testSubscribeTimeout() throws Exception { setChannelConnected(); // Delay the topic creation in a deterministic way - CountDownLatch topicCreationDelayLatch = new CountDownLatch(1); - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - topicCreationDelayLatch.await(); + CompletableFuture openTopicTask = new CompletableFuture<>(); + doAnswer(invocationOnMock -> { + openTopicTask.complete(() -> { + ((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null); + }); - synchronized (ServerCnxTest.this) { - ((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null); - } - return null; - } + return null; }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class), any(OpenLedgerCallback.class), anyObject()); @@ -881,7 +861,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { successSubName, 1 /* consumer id */, 5 /* request id */, SubType.Exclusive, "test" /* consumer name */); channel.writeInbound(subscribe4); - topicCreationDelayLatch.countDown(); + openTopicTask.get().run(); Object response; @@ -918,35 +898,22 @@ public void testSubscribeBookieTimeout() throws Exception { setChannelConnected(); // Delay the topic creation in a deterministic way - CountDownLatch successTopicCreationDelayLatch = new CountDownLatch(1); - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - successTopicCreationDelayLatch.await(); - + CompletableFuture openTopicSuccess = new CompletableFuture<>(); + doAnswer(invocationOnMock -> { + openTopicSuccess.complete(() -> { ((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null); - return null; - } + }); + return null; }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class), any(OpenLedgerCallback.class), anyObject()); - CountDownLatch failedTopicCreationDelayLatch = new CountDownLatch(1); - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - - new Thread(() -> { - try { - failedTopicCreationDelayLatch.await(); - } catch (InterruptedException e) { - } - - ((OpenLedgerCallback) invocationOnMock.getArguments()[2]) - .openLedgerFailed(new ManagedLedgerException("Managed ledger failure"), null); - }).start(); - - return null; - } + CompletableFuture openTopicFail = new CompletableFuture<>(); + doAnswer(invocationOnMock -> { + openTopicFail.complete(() -> { + ((OpenLedgerCallback) invocationOnMock.getArguments()[2]) + .openLedgerFailed(new ManagedLedgerException("Managed ledger failure"), null); + }); + return null; }).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class), any(OpenLedgerCallback.class), anyObject()); @@ -969,8 +936,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { successSubName, 1 /* consumer id */, 3 /* request id */, SubType.Exclusive, "test" /* consumer name */); channel.writeInbound(subscribe2); - successTopicCreationDelayLatch.countDown(); - failedTopicCreationDelayLatch.countDown(); + openTopicFail.get().run(); Object response; @@ -992,6 +958,8 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { successSubName, 1 /* consumer id */, 4 /* request id */, SubType.Exclusive, "test" /* consumer name */); channel.writeInbound(subscribe3); + openTopicSuccess.get().run(); + // Subscribe succeeds response = getResponse(); assertEquals(response.getClass(), CommandSuccess.class);