Skip to content

Commit

Permalink
GEODE-8652: NioSslEngine.close() Bypasses Locks (apache#5666)
Browse files Browse the repository at this point in the history
- NioSslEngine.close() proceeds even if readers (or writers) are
  operating on its ByteBuffers, allowing Connection.close() to close 
  its socket and proceed.

- NioSslEngine.close() needed a lock only on the output buffer, so 
  we split what was a single lock into two. Also instead of using 
  synchronized we use a ReentrantLock so we can 
  call tryLock() and time out if needed in NioSslEngine.close().

- Since readers/writers may hold locks on these input/output buffers
  when NioSslEngine.close() is called a reference count is maintained
  and the buffers are returned to the pool only when the last user
  is done.

- To manage the locking and reference counting a new AutoCloseable
  ByteBufferSharing interface is introduced with a trivial 
  implementation: ByteBufferSharingNoOp and a real implementation:
  ByteBufferSharingImpl.

Co-authored-by: Bill Burcham <[email protected]>
Co-authored-by: Darrel Schneider <[email protected]>
Co-authored-by: Ernie Burghardt <[email protected]>
  • Loading branch information
3 people authored Oct 29, 2020
1 parent a9d346f commit 08e9e96
Show file tree
Hide file tree
Showing 16 changed files with 1,195 additions and 460 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.geode.internal.tcp;

import static org.apache.geode.distributed.ConfigurationProperties.CONSERVE_SOCKETS;
import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.NAME;
import static org.apache.geode.distributed.ConfigurationProperties.SOCKET_BUFFER_SIZE;
import static org.apache.geode.distributed.ConfigurationProperties.SOCKET_LEASE_TIME;
import static org.apache.geode.distributed.ConfigurationProperties.SSL_ENABLED_COMPONENTS;
import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE;
import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE_PASSWORD;
import static org.apache.geode.distributed.ConfigurationProperties.SSL_PROTOCOLS;
import static org.apache.geode.distributed.ConfigurationProperties.SSL_REQUIRE_AUTHENTICATION;
import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE;
import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE_PASSWORD;
import static org.apache.geode.distributed.ConfigurationProperties.USE_CLUSTER_CONFIGURATION;
import static org.apache.geode.test.dunit.VM.getVM;
import static org.apache.geode.test.util.ResourceUtils.createTempFileFromResource;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Fail.fail;

import java.io.File;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Properties;
import java.util.concurrent.TimeoutException;

import org.apache.logging.log4j.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.Locator;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.DistributionMessageObserver;
import org.apache.geode.distributed.internal.membership.gms.membership.GMSJoinLeave;
import org.apache.geode.internal.cache.UpdateOperation.UpdateMessage;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.rules.DistributedBlackboard;
import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
import org.apache.geode.test.dunit.rules.DistributedRule;

/**
* It would be nice if this test didn't need to use the cache since the test's purpose is to test
* that the {@link Connection} class can be closed while readers and writers hold locks on its
* internal TLS {@link ByteBuffer}s
*
* But this test does use the cache (region) because it enabled us to use existing cache messaging
* and to use the DistributionMessageObserver (observer) hooks.
*
* see also ClusterCommunicationsDUnitTest
*/
public class ConnectionCloseSSLTLSDUnitTest implements Serializable {

private static final int SMALL_BUFFER_SIZE = 8000;
private static final String UPDATE_ENTERED_GATE = "connectionCloseDUnitTest.regionUpdateEntered";
private static final String SUSPEND_UPDATE_GATE = "connectionCloseDUnitTest.suspendRegionUpdate";
private static final String regionName = "connectionCloseDUnitTestRegion";
private static final Logger logger = LogService.getLogger();

private static Cache cache;

@Rule
public DistributedRule distributedRule =
DistributedRule.builder().withVMCount(3).build();

@Rule
public DistributedBlackboard blackboard = new DistributedBlackboard();

@Rule
public DistributedRestoreSystemProperties restoreSystemProperties =
new DistributedRestoreSystemProperties();

private VM locator;
private VM sender;
private VM receiver;

@Before
public void before() {
locator = getVM(0);
sender = getVM(1);
receiver = getVM(2);
}

@After
public void after() {
receiver.invoke(() -> {
DistributionMessageObserver.setInstance(null);
});
}

@Test
public void connectionWithHungReaderIsCloseableAndUnhangsReader()
throws InterruptedException, TimeoutException {

blackboard.clearGate(UPDATE_ENTERED_GATE);
blackboard.clearGate(SUSPEND_UPDATE_GATE);

final int locatorPort = createLocator(locator);
createCacheAndRegion(sender, locatorPort);
createCacheAndRegion(receiver, locatorPort);

receiver
.invoke("set up DistributionMessageObserver to 'hang' sender's put (on receiver)",
() -> {
final DistributionMessageObserver observer =
new DistributionMessageObserver() {

@Override
public void beforeProcessMessage(final ClusterDistributionManager dm,
final DistributionMessage message) {
guardMessageProcessingHook(message, () -> {
try {
blackboard.signalGate(UPDATE_ENTERED_GATE);
blackboard.waitForGate(SUSPEND_UPDATE_GATE);
} catch (TimeoutException | InterruptedException e) {
fail("message observus interruptus");
}
logger.info("BGB: got before process message: " + message);
});
}
};
DistributionMessageObserver.setInstance(observer);
});

final AsyncInvocation<Object> putInvocation = sender.invokeAsync("try a put", () -> {
final Region<Object, Object> region = cache.getRegion(regionName);
// test is going to close the cache while we are waiting for our ack
assertThatThrownBy(() -> {
region.put("hello", "world");
}).isInstanceOf(DistributedSystemDisconnectedException.class);
});

// wait until our message observer is blocked
blackboard.waitForGate(UPDATE_ENTERED_GATE);

// at this point our put() is blocked waiting for a direct ack
assertThat(putInvocation.isAlive()).as("put is waiting for remote region to ack").isTrue();

/*
* Now close the cache. The point of calling it is to test that we don't block while trying
* to close connections. Cache.close() calls DistributedSystem.disconnect() which in turn
* closes all the connections (and their sockets.) We want the sockets to close because that'll
* cause our hung put() to see a DistributedSystemDisconnectedException.
*/
sender.invoke("", () -> cache.close());

// wait for put task to complete: with an exception, that is!
putInvocation.get();

// un-stick our message observer
blackboard.signalGate(SUSPEND_UPDATE_GATE);
}

private void guardMessageProcessingHook(final DistributionMessage message,
final Runnable runnable) {
if (message instanceof UpdateMessage) {
final UpdateMessage updateMessage = (UpdateMessage) message;
if (updateMessage.getRegionPath().equals("/" + regionName)) {
runnable.run();
}
}
}

private int createLocator(VM memberVM) {
return memberVM.invoke("create locator", () -> {
// if you need to debug SSL communications use this property:
// System.setProperty("javax.net.debug", "all");
System.setProperty(GMSJoinLeave.BYPASS_DISCOVERY_PROPERTY, "true");
return Locator.startLocatorAndDS(0, new File(""), getDistributedSystemProperties())
.getPort();
});
}

private void createCacheAndRegion(VM memberVM, int locatorPort) {
memberVM.invoke("start cache and create region", () -> {
cache = createCache(locatorPort);
cache.createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
});
}

private Cache createCache(int locatorPort) {
// if you need to debug SSL communications use this property:
// System.setProperty("javax.net.debug", "all");
Properties properties = getDistributedSystemProperties();
properties.setProperty(LOCATORS, "localhost[" + locatorPort + "]");
return new CacheFactory(properties).create();
}

private Properties getDistributedSystemProperties() {
Properties properties = new Properties();
properties.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false");
properties.setProperty(USE_CLUSTER_CONFIGURATION, "false");
properties.setProperty(NAME, "vm" + VM.getCurrentVMNum());
properties.setProperty(CONSERVE_SOCKETS, "false"); // we are testing direct ack
properties.setProperty(SOCKET_LEASE_TIME, "10000");
properties.setProperty(SOCKET_BUFFER_SIZE, "" + SMALL_BUFFER_SIZE);

properties.setProperty(SSL_ENABLED_COMPONENTS, "cluster,locator");
properties
.setProperty(SSL_KEYSTORE, createTempFileFromResource(getClass(), "server.keystore")
.getAbsolutePath());
properties.setProperty(SSL_TRUSTSTORE,
createTempFileFromResource(getClass(), "server.keystore")
.getAbsolutePath());
properties.setProperty(SSL_PROTOCOLS, "TLSv1.2");
properties.setProperty(SSL_KEYSTORE_PASSWORD, "password");
properties.setProperty(SSL_TRUSTSTORE_PASSWORD, "password");
properties.setProperty(SSL_REQUIRE_AUTHENTICATION, "true");
return properties;
}

}
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,9 @@ private Thread startServerNIO(final ServerSocket serverSocket, int timeoutMillis
final NioSslEngine nioSslEngine = engine;
engine.close(socket.getChannel());
assertThatThrownBy(() -> {
nioSslEngine.unwrap(ByteBuffer.wrap(new byte[0]));
try (final ByteBufferSharing unused =
nioSslEngine.unwrap(ByteBuffer.wrap(new byte[0]))) {
}
})
.isInstanceOf(IOException.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,13 @@ private void writeMessageToNIOSSLServer(SocketChannel clientChannel, NioSslEngin
ByteBuffer buffer = bbos.getContentBuffer();
System.out.println(
"client buffer position is " + buffer.position() + " and limit is " + buffer.limit());
ByteBuffer wrappedBuffer = engine.wrap(buffer);
System.out.println("client wrapped buffer position is " + wrappedBuffer.position()
+ " and limit is " + wrappedBuffer.limit());
int bytesWritten = clientChannel.write(wrappedBuffer);
System.out.println("client bytes written is " + bytesWritten);
try (final ByteBufferSharing outputSharing = engine.wrap(buffer)) {
ByteBuffer wrappedBuffer = outputSharing.getBuffer();
System.out.println("client wrapped buffer position is " + wrappedBuffer.position()
+ " and limit is " + wrappedBuffer.limit());
int bytesWritten = clientChannel.write(wrappedBuffer);
System.out.println("client bytes written is " + bytesWritten);
}
}

private Thread startServerNIO(final ServerSocket serverSocket, int timeoutMillis)
Expand Down Expand Up @@ -299,7 +301,9 @@ private Thread startServerNIO(final ServerSocket serverSocket, int timeoutMillis
final NioSslEngine nioSslEngine = engine;
engine.close(socket.getChannel());
assertThatThrownBy(() -> {
nioSslEngine.unwrap(ByteBuffer.wrap(new byte[0]));
try (final ByteBufferSharing unused =
nioSslEngine.unwrap(ByteBuffer.wrap(new byte[0]))) {
}
})
.isInstanceOf(IOException.class);
}
Expand All @@ -313,24 +317,35 @@ private Thread startServerNIO(final ServerSocket serverSocket, int timeoutMillis
private void readMessageFromNIOSSLClient(Socket socket, ByteBuffer buffer, NioSslEngine engine)
throws IOException {

ByteBuffer unwrapped = engine.getUnwrappedBuffer(buffer);
// if we already have unencrypted data skip unwrapping
if (unwrapped.position() == 0) {
int bytesRead;
// if we already have encrypted data skip reading from the socket
if (buffer.position() == 0) {
bytesRead = socket.getChannel().read(buffer);
buffer.flip();
try (final ByteBufferSharing sharedBuffer = engine.getUnwrappedBuffer()) {
final ByteBuffer unwrapped = sharedBuffer.getBuffer();
// if we already have unencrypted data skip unwrapping
if (unwrapped.position() == 0) {
int bytesRead;
// if we already have encrypted data skip reading from the socket
if (buffer.position() == 0) {
bytesRead = socket.getChannel().read(buffer);
buffer.flip();
} else {
bytesRead = buffer.remaining();
}
System.out.println("server bytes read is " + bytesRead + ": buffer position is "
+ buffer.position() + " and limit is " + buffer.limit());
try (final ByteBufferSharing sharedBuffer2 = engine.unwrap(buffer)) {
final ByteBuffer unwrapped2 = sharedBuffer2.getBuffer();

unwrapped2.flip();
System.out.println("server unwrapped buffer position is " + unwrapped2.position()
+ " and limit is " + unwrapped2.limit());
finishReadMessageFromNIOSSLClient(unwrapped2);
}
} else {
bytesRead = buffer.remaining();
finishReadMessageFromNIOSSLClient(unwrapped);
}
System.out.println("server bytes read is " + bytesRead + ": buffer position is "
+ buffer.position() + " and limit is " + buffer.limit());
unwrapped = engine.unwrap(buffer);
unwrapped.flip();
System.out.println("server unwrapped buffer position is " + unwrapped.position()
+ " and limit is " + unwrapped.limit());
}
}

private void finishReadMessageFromNIOSSLClient(final ByteBuffer unwrapped) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(unwrapped);
DataInputStream dis = new DataInputStream(bbis);
String welcome = dis.readUTF();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,4 @@ org/apache/geode/cache/query/internal/xml/ElementType
org/apache/geode/cache/query/internal/xml/ElementType$1
org/apache/geode/cache/query/internal/xml/ElementType$2
org/apache/geode/cache/query/internal/xml/ElementType$3
org/apache/geode/internal/net/ByteBufferSharingImpl$OpenAttemptTimedOut
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.geode.internal.net;

import java.io.IOException;
import java.nio.ByteBuffer;


/**
* When a {@link ByteBufferSharing} is acquired in a try-with-resources the buffer is available (for
* reading and modification) within the scope of that try block.
*
* Releases managed ByteBuffer back to pool after last reference is dropped.
*/
public interface ByteBufferSharing extends AutoCloseable {

/**
* Call this method only within a try-with-resource in which this {@link ByteBufferSharing} was
* acquired. Retain the reference only within the scope of that try-with-resources.
*
* @return the buffer: manipulable only within the scope of the try-with-resources
* @throws IOException if the buffer is no longer accessible
*/
ByteBuffer getBuffer() throws IOException;

/**
* Expand the buffer if needed. This may return a different object so be sure to pay attention to
* the return value if you need access to the potentially- expanded buffer.
*
* Subsequent calls to {@link #getBuffer()} will return that new buffer too.
*
* @return the same buffer or a different (bigger) buffer
* @throws IOException if the buffer is no longer accessible
*/
ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException;

/**
* Override {@link AutoCloseable#close()} without throws clause since we don't need one.
*/
@Override
void close();
}
Loading

0 comments on commit 08e9e96

Please sign in to comment.