Skip to content

Commit

Permalink
add tests for ConnectionContext
Browse files Browse the repository at this point in the history
Some of these are pretty complicated because they have to see
what happens in other threads when we make requests. I added an
`AbstractLiveTest` class which can be used to conveniently do real tests
on anonymous ports conveniently.

Test Plan: ran them. They pass in next commit
  • Loading branch information
jesboat committed Jul 27, 2015
1 parent 8c3e0b6 commit 4b2b422
Show file tree
Hide file tree
Showing 2 changed files with 294 additions and 0 deletions.
145 changes: 145 additions & 0 deletions nifty-core/src/test/java/com/facebook/nifty/core/AbstractLiveTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Copyright (C) 2012-2013 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.nifty.core;

import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TMessage;
import org.apache.thrift.protocol.TMessageType;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TStruct;
import org.apache.thrift.transport.TIOStreamTransport;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;

import com.facebook.nifty.processor.NiftyProcessor;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;

public class AbstractLiveTest
{
protected AbstractLiveTest() { }


protected FakeServer listen(NiftyProcessor processor) {
// NiftyBootstrap.stop() will shutdown the threadpool for us
return new FakeServer(processor, Executors.newCachedThreadPool(), new DefaultChannelGroup());
}

protected FakeServer listen(NiftyProcessor processor, Executor taskExecutor, ChannelGroup group) {
return new FakeServer(processor, taskExecutor, group);
}

protected FakeClient connect(FakeServer server) throws IOException {
return new FakeClient(server);
}

protected NiftyProcessor mockProcessor(
@Nullable final BlockingQueue<TProtocol> inQueue,
@Nullable final BlockingQueue<TProtocol> outQueue,
@Nullable final BlockingQueue<RequestContext> requestContextQueue,
@Nonnull final BlockingQueue<SettableFuture<Boolean>> responseQueue
) {
return new NiftyProcessor() {
@Override
public ListenableFuture<Boolean> process(TProtocol in, TProtocol out,
RequestContext requestContext) throws TException {
if (inQueue != null) {
Uninterruptibles.putUninterruptibly(inQueue, in);
}
if (outQueue != null) {
Uninterruptibles.putUninterruptibly(outQueue, out);
}
if (requestContextQueue != null) {
Uninterruptibles.putUninterruptibly(requestContextQueue, requestContext);
}
SettableFuture<Boolean> resp = SettableFuture.create();
Uninterruptibles.putUninterruptibly(responseQueue, resp);
return resp;
}
};
}


protected static class FakeServer implements AutoCloseable {
private final NiftyBootstrap nifty;

private FakeServer(NiftyProcessor processor, Executor taskExecutor, ChannelGroup group) {
ThriftServerDef thriftServerDef =
new ThriftServerDefBuilder()
.withProcessor(processor)
.using(taskExecutor)
.build();

this.nifty = new NiftyBootstrap(
ImmutableSet.of(thriftServerDef),
new NettyServerConfigBuilder().build(),
group);

nifty.start();
}

public int getPort() {
return Iterables.getOnlyElement(nifty.getBoundPorts().values());
}

@Override
public void close() {
nifty.stop();
}
}

protected static class FakeClient implements AutoCloseable {
private Socket socketToServer;

private FakeClient(FakeServer server) throws IOException {
socketToServer = new Socket(InetAddress.getLoopbackAddress(), server.getPort());
}

public int getClientPort() {
return socketToServer.getLocalPort();
}

public void sendRequest() throws IOException, TException {
TProtocol out = new TBinaryProtocol(new TIOStreamTransport(socketToServer.getOutputStream()));
out.writeMessageBegin(new TMessage("dummy", TMessageType.CALL, 0));
out.writeStructBegin(new TStruct("dummy_args"));
out.writeFieldStop();
out.writeStructEnd();
out.writeMessageEnd();
out.getTransport().flush();
}

@Override
public void close() throws IOException {
socketToServer.close();
socketToServer = null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Copyright (C) 2012-2013 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.nifty.core;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.thrift.TException;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.testng.Assert;
import org.testng.annotations.Test;

import com.facebook.nifty.processor.NiftyProcessor;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;

public class TestConnectionContext extends AbstractLiveTest
{

@Test
public void testContextNormal() throws IOException, TException, InterruptedException
{
final SynchronousQueue<RequestContext> requestContextQueue = new SynchronousQueue<>();
final SynchronousQueue<SettableFuture<Boolean>> sendResponseQueue = new SynchronousQueue<>();
NiftyProcessor processor = mockProcessor(null, null, requestContextQueue, sendResponseQueue);

try (FakeServer server = listen(processor);
FakeClient client = connect(server)) {

// Issue a fake request and wait for it to arrive
client.sendRequest();
RequestContext requestContext = requestContextQueue.poll(30, TimeUnit.SECONDS);
Preconditions.checkNotNull(requestContext, "Either deadlock, or your computer is really slow");
ConnectionContext actualContext = requestContext.getConnectionContext();
SettableFuture<Boolean> sendResponse = sendResponseQueue.take();

// ConnectionContext should show the request from the client
Assert.assertNotNull(
actualContext.getRemoteAddress(),
"remote address non-null");
Assert.assertEquals(
((InetSocketAddress) actualContext.getRemoteAddress()).getPort(),
client.getClientPort(),
"context has correct port");

sendResponse.set(false);
}
}

@Test
public void testContextOnClosedConnection() throws IOException, TException, InterruptedException
{
// An ExecutorService which lets us delay calls from NiftyDispatcher to the processor
final SynchronousQueue<Semaphore> tasksWaitingToRun = new SynchronousQueue<>();
final ExecutorService threadpool = Executors.newCachedThreadPool();
Executor slowExecutor = new Executor() {
@Override
public void execute(final Runnable task) {
threadpool.execute(new Runnable() {
@Override
public void run() {
Semaphore allowMeToContinue = new Semaphore(0);
Uninterruptibles.putUninterruptibly(tasksWaitingToRun, allowMeToContinue);
allowMeToContinue.acquireUninterruptibly();
task.run();
}
});
}
};

final SynchronousQueue<RequestContext> requestContextQueue = new SynchronousQueue<>();
final SynchronousQueue<SettableFuture<Boolean>> sendResponseQueue = new SynchronousQueue<>();
NiftyProcessor processor = mockProcessor(null, null, requestContextQueue, sendResponseQueue);

ChannelGroup channels = new DefaultChannelGroup();

try (FakeServer server = listen(processor, slowExecutor, channels);
FakeClient client = connect(server)) {

// Send a request, and wait for NiftyDispatcher to put it in the queue
client.sendRequest();
Semaphore allowNiftyProcessorToRun = tasksWaitingToRun.poll(30, TimeUnit.SECONDS);
Preconditions.checkNotNull(allowNiftyProcessorToRun, "Either deadlock, or your computer is really slow");

// Close the channel. We do the close on the server side because that
// lets us control when all the close handlers are run (namely, on
// our thread) which in turn lets us wait until the channel has finished
// closing.
final AtomicReference<Thread> threadThatProcessedClose = new AtomicReference<>();
Channel channelToClient = Iterables.getOnlyElement(channels);
channelToClient.getCloseFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
threadThatProcessedClose.set(Thread.currentThread());
}
});
channelToClient.close();
Preconditions.checkState(threadThatProcessedClose.get() == Thread.currentThread());

// Now allow the NiftyProcessor to run, and wait for it to finish
allowNiftyProcessorToRun.release();
RequestContext requestContext = requestContextQueue.poll(30, TimeUnit.SECONDS);
Preconditions.checkNotNull(requestContext, "Either deadlock, or your computer is really slow");
ConnectionContext actualContext = requestContext.getConnectionContext();
SettableFuture<Boolean> sendResponse = sendResponseQueue.take();

// The connection context should still be correct
Assert.assertNotNull(
actualContext.getRemoteAddress(),
"remote address non-null");
Assert.assertEquals(
((InetSocketAddress) actualContext.getRemoteAddress()).getPort(),
client.getClientPort(),
"context has correct port");

sendResponse.set(false);
} finally {
threadpool.shutdown();
}
}


}

0 comments on commit 4b2b422

Please sign in to comment.