Skip to content

Commit

Permalink
Removed write queuing mechanism in ProxyToServerConnection
Browse files Browse the repository at this point in the history
  • Loading branch information
oxtoacart committed Sep 8, 2013
1 parent fd27035 commit b3c6f39
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ protected void disconnected() {
*/
protected void serverConnectionFlowStarted(
ProxyToServerConnection serverConnection) {
// stopReading();
stopReading();
this.numberOfCurrentlyConnectingServers.incrementAndGet();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
@Sharable
public class ProxyToServerConnection extends ProxyConnection<HttpResponse> {
private static final int MAXIMUM_QUEUED_WRITES = 10;

private final ClientToProxyConnection clientConnection;
private volatile TransportProtocol transportProtocol;
private volatile InetSocketAddress remoteAddress;
Expand Down Expand Up @@ -109,12 +109,6 @@ public class ProxyToServerConnection extends ProxyConnection<HttpResponse> {
*/
private volatile HttpResponse currentHttpResponse;

/**
* Holds writes that came in while we were connecting and need to be
* processed.
*/
private final Queue<Object> queuedWrites = new ConcurrentLinkedQueue<Object>();

/**
* Create a new ProxyToServerConnection.
*
Expand Down Expand Up @@ -224,26 +218,20 @@ void write(Object msg) {
} else {
synchronized (connectLock) {
if (isConnecting()) {
LOG.debug("Attempted to write while still in the process of connecting.");
if (queuedWrites.size() < MAXIMUM_QUEUED_WRITES) {
LOG.debug("Queuing write for later");
queuedWrites.add(msg);
} else {
LOG.debug("Waiting for connection");
try {
connectLock.wait(30000);
} catch (InterruptedException ie) {
LOG.warn("Interrupted while waiting for connect monitor");
}
if (is(DISCONNECTED)) {
LOG.debug("Connection failed while we were waiting for it, don't write");
return;
}
LOG.debug("Attempted to write while still in the process of connecting, waiting for connection.");
clientConnection.stopReading();
try {
connectLock.wait(30000);
} catch (InterruptedException ie) {
LOG.warn("Interrupted while waiting for connect monitor");
}
if (is(DISCONNECTED)) {
LOG.debug("Connection failed while we were waiting for it, don't write");
return;
}
return;
}
}

LOG.debug("Using existing connection to: {}", remoteAddress);
doWrite(msg);
}
Expand All @@ -258,7 +246,7 @@ protected void writeHttp(HttpObject httpObject) {
}
super.writeHttp(httpObject);
}

/***************************************************************************
* Lifecycle
**************************************************************************/
Expand Down Expand Up @@ -670,15 +658,6 @@ void connectionSucceeded(boolean shouldForwardInitialRequest) {
} else {
LOG.debug("Dropping initial request: {}", initialRequest);
}

if (!queuedWrites.isEmpty()) {
LOG.debug("Flushing queued writes");
Object queuedMsg;
while ((queuedMsg = queuedWrites.poll()) != null) {
LOG.debug("Flushing queued write: {}", queuedMsg);
doWrite(queuedMsg);
}
}
}

/**
Expand Down

0 comments on commit b3c6f39

Please sign in to comment.