Skip to content

Commit

Permalink
Made ServerCnx timeout test to execute all steps in same thread
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed Sep 12, 2016
1 parent 0c4116a commit bd18ebc
Showing 1 changed file with 38 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -616,15 +617,12 @@ public void testCreateProducerTimeout() throws Exception {
setChannelConnected();

// Delay the topic creation in a deterministic way
CountDownLatch successTopicCreationDelayLatch = new CountDownLatch(1);
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
successTopicCreationDelayLatch.await();

CompletableFuture<Runnable> openTopicFuture = new CompletableFuture<>();
doAnswer(invocationOnMock -> {
openTopicFuture.complete(() -> {
((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
return null;
}
});
return null;
}).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), anyObject());

Expand All @@ -649,7 +647,8 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
producerName);
channel.writeInbound(createProducer2);

successTopicCreationDelayLatch.countDown();
// Complete the topic opening
openTopicFuture.get().run();

// Close succeeds
Object response = getResponse();
Expand Down Expand Up @@ -749,36 +748,21 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
channel.finish();
}

@Test(timeOut = 30000)
@Test(timeOut = 30000, invocationCount = 1, skipFailedInvocations = true)
public void testCreateProducerBookieTimeout() throws Exception {
resetChannel();
setChannelConnected();

// Delay the topic creation in a deterministic way
CountDownLatch failedTopicCreationDelayLatch = new CountDownLatch(1);
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
failedTopicCreationDelayLatch.await();

CompletableFuture<Runnable> openFailedTopic = new CompletableFuture<>();
doAnswer(invocationOnMock -> {
openFailedTopic.complete(() -> {
((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
return null;
}
});
return null;
}).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), anyObject());

CountDownLatch topicCreationDelayLatch = new CountDownLatch(1);
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
topicCreationDelayLatch.await();

((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
return null;
}
}).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), anyObject());

// In a create producer timeout from client side we expect to see this sequence of commands :
// 1. create a failure producer which will timeout creation after 100msec
// 2. close producer
Expand All @@ -801,8 +785,8 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
producerName);
channel.writeInbound(createProducer2);

failedTopicCreationDelayLatch.countDown();
topicCreationDelayLatch.countDown();
// Now the topic gets opened
openFailedTopic.get().run();

// Close succeeds
Object response = getResponse();
Expand Down Expand Up @@ -840,17 +824,13 @@ public void testSubscribeTimeout() throws Exception {
setChannelConnected();

// Delay the topic creation in a deterministic way
CountDownLatch topicCreationDelayLatch = new CountDownLatch(1);
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
topicCreationDelayLatch.await();
CompletableFuture<Runnable> openTopicTask = new CompletableFuture<>();
doAnswer(invocationOnMock -> {
openTopicTask.complete(() -> {
((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
});

synchronized (ServerCnxTest.this) {
((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
}
return null;
}
return null;
}).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), anyObject());

Expand Down Expand Up @@ -881,7 +861,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
successSubName, 1 /* consumer id */, 5 /* request id */, SubType.Exclusive, "test" /* consumer name */);
channel.writeInbound(subscribe4);

topicCreationDelayLatch.countDown();
openTopicTask.get().run();

Object response;

Expand Down Expand Up @@ -918,35 +898,22 @@ public void testSubscribeBookieTimeout() throws Exception {
setChannelConnected();

// Delay the topic creation in a deterministic way
CountDownLatch successTopicCreationDelayLatch = new CountDownLatch(1);
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
successTopicCreationDelayLatch.await();

CompletableFuture<Runnable> openTopicSuccess = new CompletableFuture<>();
doAnswer(invocationOnMock -> {
openTopicSuccess.complete(() -> {
((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
return null;
}
});
return null;
}).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), anyObject());

CountDownLatch failedTopicCreationDelayLatch = new CountDownLatch(1);
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {

new Thread(() -> {
try {
failedTopicCreationDelayLatch.await();
} catch (InterruptedException e) {
}

((OpenLedgerCallback) invocationOnMock.getArguments()[2])
.openLedgerFailed(new ManagedLedgerException("Managed ledger failure"), null);
}).start();

return null;
}
CompletableFuture<Runnable> openTopicFail = new CompletableFuture<>();
doAnswer(invocationOnMock -> {
openTopicFail.complete(() -> {
((OpenLedgerCallback) invocationOnMock.getArguments()[2])
.openLedgerFailed(new ManagedLedgerException("Managed ledger failure"), null);
});
return null;
}).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), anyObject());

Expand All @@ -969,8 +936,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
successSubName, 1 /* consumer id */, 3 /* request id */, SubType.Exclusive, "test" /* consumer name */);
channel.writeInbound(subscribe2);

successTopicCreationDelayLatch.countDown();
failedTopicCreationDelayLatch.countDown();
openTopicFail.get().run();

Object response;

Expand All @@ -992,6 +958,8 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
successSubName, 1 /* consumer id */, 4 /* request id */, SubType.Exclusive, "test" /* consumer name */);
channel.writeInbound(subscribe3);

openTopicSuccess.get().run();

// Subscribe succeeds
response = getResponse();
assertEquals(response.getClass(), CommandSuccess.class);
Expand Down

0 comments on commit bd18ebc

Please sign in to comment.