Skip to content

Commit

Permalink
Fix flake in DiscoveryServiceTest (apache#1081) (apache#2406)
Browse files Browse the repository at this point in the history
This test was flaking because it was only waiting for 1 second for
connection and message exchange to complete, which is not enough time
when there's heavy load on the machine (simulated with stress-ng).

The fix is to increase the timeout to 10 seconds. I've also cleaned up
the test to use a CompletableFuture rather than a CountDownLatch so
tha the test thread can be notified of failures in the handlers.
  • Loading branch information
ivankelly authored and sijie committed Aug 28, 2018
1 parent 8da0669 commit 774cafa
Showing 1 changed file with 43 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.apache.pulsar.discovery.service.web.ZookeeperCacheLoader.LOADBALANCE_BROKERS_ROOT;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.lang.reflect.Field;
Expand All @@ -31,16 +30,17 @@
import java.security.PrivateKey;
import java.security.cert.X509Certificate;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
import org.apache.pulsar.discovery.service.web.ZookeeperCacheLoader;
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
Expand All @@ -66,8 +66,13 @@
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DiscoveryServiceTest extends BaseDiscoveryTestSetup {

private static final Logger log = LoggerFactory.getLogger(DiscoveryServiceTest.class);

private final static String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/certificate/client.crt";
private final static String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/certificate/client.key";

Expand Down Expand Up @@ -120,48 +125,40 @@ public void testGetPartitionsMetadata() throws Exception {

/**
* It verifies: client connects to Discovery-service and receives discovery response successfully.
*
*
* @throws Exception
*/
@Test
public void testClientServerConnection() throws Exception {
addBrokerToZk(2);
// 1. client connects to DiscoveryService, 2. Client receive service-lookup response
final int messageTransfer = 2;
final CountDownLatch latch = new CountDownLatch(messageTransfer);
NioEventLoopGroup workerGroup = connectToService(service.getServiceUrl(), latch, false);
try {
assertTrue(latch.await(1, TimeUnit.SECONDS));
} catch (InterruptedException e) {
fail("should have received lookup response message from server", e);
}

final CompletableFuture<BaseCommand> promise = new CompletableFuture<>();
NioEventLoopGroup workerGroup = connectToService(service.getServiceUrl(), promise, false);
assertEquals(promise.get(10, TimeUnit.SECONDS).getType(), BaseCommand.Type.CONNECTED);
workerGroup.shutdownGracefully();
}

@Test
public void testClientServerConnectionTls() throws Exception {
addBrokerToZk(2);
// 1. client connects to DiscoveryService, 2. Client receive service-lookup response
final int messageTransfer = 2;
final CountDownLatch latch = new CountDownLatch(messageTransfer);
NioEventLoopGroup workerGroup = connectToService(service.getServiceUrlTls(), latch, true);
try {
assertTrue(latch.await(1, TimeUnit.SECONDS));
} catch (InterruptedException e) {
fail("should have received lookup response message from server", e);
}

final CompletableFuture<BaseCommand> promise = new CompletableFuture<>();
NioEventLoopGroup workerGroup = connectToService(service.getServiceUrlTls(), promise, true);
assertEquals(promise.get(10, TimeUnit.SECONDS).getType(), BaseCommand.Type.CONNECTED);
workerGroup.shutdownGracefully();
}

/**
* creates ClientHandler channel to connect and communicate with server
*
*
* @param serviceUrl
* @param latch
* @return
* @throws URISyntaxException
*/
public static NioEventLoopGroup connectToService(String serviceUrl, CountDownLatch latch, boolean tls)
public static NioEventLoopGroup connectToService(String serviceUrl,
CompletableFuture<BaseCommand> promise,
boolean tls)
throws URISyntaxException {
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
Expand All @@ -181,47 +178,59 @@ public void initChannel(SocketChannel ch) throws Exception {
SslContext sslCtx = builder.build();
ch.pipeline().addLast("tls", sslCtx.newHandler(ch.alloc()));
}
ch.pipeline().addLast(new ClientHandler(latch));
ch.pipeline().addLast(new ClientHandler(promise));
}
});
URI uri = new URI(serviceUrl);
InetSocketAddress serviceAddress = new InetSocketAddress(uri.getHost(), uri.getPort());
b.connect(serviceAddress).addListener((ChannelFuture future) -> {
if (!future.isSuccess()) {
throw new IllegalStateException(future.cause());
promise.completeExceptionally(future.cause());
}
});
return workerGroup;
}

static class ClientHandler extends ChannelInboundHandlerAdapter {

final CountDownLatch latch;
final CompletableFuture<BaseCommand> promise;

public ClientHandler(CountDownLatch latch) {
this.latch = latch;
public ClientHandler(CompletableFuture<BaseCommand> promise) {
this.promise = promise;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buffer = (ByteBuf) msg;
buffer.release();
latch.countDown();
try {
ByteBuf buffer = (ByteBuf) msg;
buffer.readUnsignedInt(); // discard frame length
int cmdSize = (int) buffer.readUnsignedInt();
buffer.writerIndex(buffer.readerIndex() + cmdSize);
ByteBufCodedInputStream cmdInputStream = ByteBufCodedInputStream.get(buffer);
BaseCommand.Builder cmdBuilder = BaseCommand.newBuilder();
BaseCommand cmd = cmdBuilder.mergeFrom(cmdInputStream, null).build();

cmdInputStream.recycle();
cmdBuilder.recycle();
buffer.release();

promise.complete(cmd);
} catch (Exception e) {
promise.completeExceptionally(e);
}
ctx.close();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
promise.completeExceptionally(cause);
ctx.close();
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
ctx.writeAndFlush(Commands.newConnect("", "", null));
latch.countDown();
}

}
Expand Down

0 comments on commit 774cafa

Please sign in to comment.