Skip to content
This repository has been archived by the owner on Jan 27, 2021. It is now read-only.

Commit

Permalink
GEODE-6008: CI Failure: ClientServerHostNameVerificationDistributedTe… (
Browse files Browse the repository at this point in the history
apache#4839)

* GEODE-6008: CI Failure: ClientServerHostNameVerificationDistributedTest.expectConnectionFailureWhenNoHostNameInServerKey failed

The "Message distribution has terminated" failure is caused by an
unreported NullPointerException in a residual reader thread introduced
in the fix for GEODE-7727.  That fix caused a thread to say alive in a
peer-to-peer tcp/ip Connection in order to clean up the receiving side
of a socket.  The Connection shutdown method close() method, however,
releases the Connection's input buffer and nulls out the field.  The
reader thread then threw an NPE that was caught and caused the
"Message distribution has terminated" message, which is picked up as a
suspect string by the testing infrastructure.

This problem is also seen in GEODE-7894, GEODE-7871, GEODE-7873 and
GEODE-7806.

The fix is to record the fact that a residual reader thread exists and
avoid releasing the Connection's input buffer when the connection is
closed.  This lets the reader thread do the cleanup.

While testing the fix I found that the NioSslEngine was throwing an
IllegalStateException when the reader thread tried to use it in this
same situation.  This exception wasn't being caught and caused more
suspect strings to be logged.  I've changed this to a checked exception
that is already handled by the reader thread.

ClientServerHostNameVerificationDistributedTest also wasn't working on
my Mac due to its /etc/hosts configuration.  I changed the test to allow
the IP address selected by LocalHostUtil to be a valid client/server
address for the SSL certificates it generates.

* fixed failing test due to change in exceptions in NioSslEngine
  • Loading branch information
bschuchardt authored Mar 24, 2020
1 parent a93a59e commit 1dfc496
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.geode.cache.ssl.CertStores;
import org.apache.geode.cache.ssl.CertificateBuilder;
import org.apache.geode.cache.ssl.CertificateMaterial;
import org.apache.geode.internal.inet.LocalHostUtil;
import org.apache.geode.internal.net.SocketCreatorFactory;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
Expand Down Expand Up @@ -82,6 +83,7 @@ public void connectionSuccessfulWhenHostNameOfLocatorAndServer() throws Exceptio
.sanDnsName(InetAddress.getLoopbackAddress().getHostName())
.sanDnsName(InetAddress.getLocalHost().getHostName())
.sanDnsName(InetAddress.getLocalHost().getCanonicalHostName())
.sanIpAddress(LocalHostUtil.getLocalHost())
.sanIpAddress(InetAddress.getLocalHost())
.sanIpAddress(InetAddress.getByName("0.0.0.0")) // to pass on windows
.generate();
Expand All @@ -91,6 +93,7 @@ public void connectionSuccessfulWhenHostNameOfLocatorAndServer() throws Exceptio
.issuedBy(ca)
.sanDnsName(InetAddress.getLocalHost().getHostName())
.sanDnsName(InetAddress.getLocalHost().getCanonicalHostName())
.sanIpAddress(LocalHostUtil.getLocalHost())
.sanIpAddress(InetAddress.getLocalHost())
.generate();

Expand Down Expand Up @@ -162,6 +165,7 @@ public void expectConnectionFailureWhenNoHostNameInServerKey() throws Exception
.sanDnsName(InetAddress.getLoopbackAddress().getHostName())
.sanDnsName(InetAddress.getLocalHost().getHostName())
.sanDnsName(InetAddress.getLocalHost().getCanonicalHostName())
.sanIpAddress(LocalHostUtil.getLocalHost())
.sanIpAddress(InetAddress.getLocalHost())
.generate();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionImpl;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.rules.CacheRule;
Expand Down Expand Up @@ -95,6 +97,17 @@ public void sharedSenderShouldRecoverFromClosedSocket() {
ConnectionTable conTable = getConnectionTable();
await().untilAsserted(() -> assertThat(conTable.getNumberOfReceivers()).isEqualTo(2));
});

// Make sure the Sender connection has a reader thread
vm1.invoke(() -> {
ConnectionTable conTable = getConnectionTable();
InternalDistributedSystem distributedSystem = getCache().getInternalDistributedSystem();
InternalDistributedMember otherMember = distributedSystem.getDistributionManager()
.getOtherNormalDistributionManagerIds().iterator().next();
Connection connection = conTable.getConduit().getConnection(otherMember, true, false,
System.currentTimeMillis(), 15000, 0);
assertThat(connection.hasResidualReaderThread()).isTrue();
});
}

private ConnectionTable getConnectionTable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
Expand Down Expand Up @@ -216,7 +217,7 @@ private Thread startServerNIO(final ServerSocket serverSocket, int timeoutMillis
assertThatThrownBy(() -> {
nioSslEngine.unwrap(ByteBuffer.wrap(new byte[0]));
})
.isInstanceOf(IllegalStateException.class);
.isInstanceOf(IOException.class);
}
}
}, this.testName.getMethodName() + "-server");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ private Thread startServerNIO(final ServerSocket serverSocket, int timeoutMillis
assertThatThrownBy(() -> {
nioSslEngine.unwrap(ByteBuffer.wrap(new byte[0]));
})
.isInstanceOf(IllegalStateException.class);
.isInstanceOf(IOException.class);
}
}
}, this.testName.getMethodName() + "-server");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,9 @@ ByteBuffer expandWriteBuffer(BufferType type, ByteBuffer existing,
return bufferPool.expandWriteBufferIfNeeded(type, existing, desiredCapacity);
}

void checkClosed() {
void checkClosed() throws IOException {
if (closed) {
throw new IllegalStateException("NioSslEngine has been closed");
throw new IOException("NioSslEngine has been closed");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,12 @@ public class Connection implements Runnable {
/** set to true if we exceeded the ack-wait-threshold waiting for a response */
private volatile boolean ackTimedOut;

/**
* a Reader thread for an shared Connection will remain around in order to
* ensure that the socket is properly closed.
*/
private volatile boolean hasResidualReaderThread;

/**
* creates a "reader" connection that we accepted (it was initiated by an explicit connect being
* done on the other side).
Expand Down Expand Up @@ -1326,7 +1332,7 @@ private void close(String reason, boolean cleanupEndpoint, boolean p_removeEndpo
}
// make sure our socket is closed
asyncClose(false);
if (!isReceiver) {
if (!isReceiver && !hasResidualReaderThread()) {
// receivers release the input buffer when exiting run(). Senders use the
// inputBuffer for reading direct-reply responses
releaseInputBuffer();
Expand Down Expand Up @@ -1469,6 +1475,7 @@ public void run() {
asyncClose(false);
}
}

releaseInputBuffer();

// make sure that if the reader thread exits we notify a thread waiting for the handshake.
Expand Down Expand Up @@ -1512,6 +1519,9 @@ private String p2pReaderName() {
}

private void readMessages() {
if (closing.get()) {
return;
}
// take a snapshot of uniqueId to detect reconnect attempts
SocketChannel channel;
try {
Expand Down Expand Up @@ -1556,7 +1566,7 @@ private void readMessages() {
}
// we should not change the state of the connection if we are a handshake reader thread
// as there is a race between this thread and the application thread doing direct ack
boolean isHandShakeReader = false;
boolean handshakeHasBeenRead = false;
// if we're using SSL/TLS the input buffer may already have data to process
boolean skipInitialRead = getInputBuffer().position() > 0;
try {
Expand Down Expand Up @@ -1611,20 +1621,24 @@ private void readMessages() {
}
processInputBuffer();

if (!isHandShakeReader && !isReceiver && (handshakeRead || handshakeCancelled)) {
if (!handshakeHasBeenRead && !isReceiver && (handshakeRead || handshakeCancelled)) {
if (logger.isDebugEnabled()) {
if (handshakeRead) {
logger.debug("handshake has been read {}", this);
} else {
logger.debug("handshake has been cancelled {}", this);
}
}
isHandShakeReader = true;
handshakeHasBeenRead = true;

// Once we have read the handshake for unshared connections, the reader can skip
// processing messages
if (!sharedResource || asyncMode) {
break;
} else {
// not exiting and not a Reader spawned from a ServerSocket.accept(), so
// let's set some state noting that this is happening
hasResidualReaderThread = true;
}

}
Expand Down Expand Up @@ -1667,7 +1681,7 @@ private void readMessages() {
return;

} catch (Exception e) {
owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
if (!stopped && !isSocketClosed()) {
logger.fatal(String.format("%s exception in channel read", p2pReaderName()), e);
}
Expand All @@ -1680,14 +1694,15 @@ private void readMessages() {
}
}
} finally {
if (!isHandShakeReader || (sharedResource && !asyncMode)) {
hasResidualReaderThread = false;
if (!handshakeHasBeenRead || (sharedResource && !asyncMode)) {
synchronized (stateLock) {
connectionState = STATE_IDLE;
}
}
if (logger.isDebugEnabled()) {
logger.debug("readMessages terminated id={} from {} isHandshakeReader={}", conduitIdStr,
remoteAddr, isHandShakeReader);
remoteAddr, handshakeHasBeenRead);
}
}
}
Expand Down Expand Up @@ -3249,6 +3264,15 @@ boolean getOriginatedHere() {
return !isReceiver;
}

/**
* A shared sender connection will leave a reader thread around to ensure that the
* socket is properly closed at this end. When that is the case isResidualReaderThread
* will return true.
*/
public boolean hasResidualReaderThread() {
return hasResidualReaderThread;
}

/**
* answers whether this connection is used for ordered message delivery
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.net.Socket;
import java.net.SocketException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -196,11 +197,11 @@ public void handshakeDoesNotTerminateWithFinished() throws Exception {


@Test
public void checkClosed() {
public void checkClosed() throws Exception {
nioSslEngine.checkClosed();
}

@Test(expected = IllegalStateException.class)
@Test(expected = IOException.class)
public void checkClosedThrows() throws Exception {
when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
new SSLEngineResult(CLOSED, FINISHED, 0, 100));
Expand Down Expand Up @@ -340,7 +341,8 @@ public void close() throws Exception {
when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
new SSLEngineResult(CLOSED, FINISHED, 0, 0));
nioSslEngine.close(mockChannel);
assertThatThrownBy(() -> nioSslEngine.checkClosed()).isInstanceOf(IllegalStateException.class);
assertThatThrownBy(() -> nioSslEngine.checkClosed()).isInstanceOf(IOException.class)
.hasMessageContaining("NioSslEngine has been closed");
nioSslEngine.close(mockChannel);
}

Expand Down

0 comments on commit 1dfc496

Please sign in to comment.