Skip to content

Commit

Permalink
GEODE-9825: processInputBuffer resize retains data (apache#7131)
Browse files Browse the repository at this point in the history
  • Loading branch information
Bill authored Nov 23, 2021
1 parent 5ec6a66 commit fb142e1
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package org.apache.geode.distributed.internal;

import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.DataInput;
Expand All @@ -29,12 +30,13 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;

import junitparams.Parameters;
import org.jetbrains.annotations.NotNull;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.ssl.CertStores;
Expand All @@ -48,15 +50,24 @@
import org.apache.geode.test.dunit.rules.DistributedExecutorServiceRule;
import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.junit.categories.MembershipTest;
import org.apache.geode.test.junit.runners.GeodeParamsRunner;
import org.apache.geode.test.version.VersionManager;

/**
* Tests one-way P2P messaging between two peers. A shared,
* ordered connection is used and many concurrent tasks
* compete on the sending side. Tests with TLS enabled
* to exercise ByteBufferSharing and friends.
* Tests one-way P2P messaging between two peers.
* Many concurrent tasks compete on the sending side.
* The main purpose of the test is to exercise
* ByteBufferSharing and friends.
*
* Tests combinations of: conserve-sockets true/false,
* TLS on/off, and socket-buffer-size for sender
* and receiver both set to the default (and equal)
* and set to the sender's buffer twice as big as the
* receiver's buffer.
*
*/
@Category({MembershipTest.class})
@RunWith(GeodeParamsRunner.class)
public class P2PMessagingConcurrencyDUnitTest {

// how many messages will each sender generate?
Expand All @@ -71,6 +82,8 @@ public class P2PMessagingConcurrencyDUnitTest {
// random seed
private static final int RANDOM_SEED = 1234;

private static Properties securityProperties;

@Rule
public final ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);

Expand All @@ -87,21 +100,56 @@ public class P2PMessagingConcurrencyDUnitTest {
*/
private static LongAdder bytesTransferredAdder;

@Before
public void before() throws GeneralSecurityException, IOException {
final Properties configuration = gemFireConfiguration();
private void configure(
final boolean conserveSockets,
final boolean useTLS,
final int sendSocketBufferSize,
final int receiveSocketBufferSize) throws GeneralSecurityException, IOException {

final Properties senderConfiguration =
gemFireConfiguration(conserveSockets, useTLS, sendSocketBufferSize);
final Properties receiverConfiguration =
gemFireConfiguration(conserveSockets, useTLS, receiveSocketBufferSize);

final MemberVM locator =
clusterStartupRule.startLocatorVM(0, 0, VersionManager.CURRENT_VERSION,
x -> x.withProperties(configuration).withConnectionToLocator()
x -> x.withProperties(senderConfiguration).withConnectionToLocator()
.withoutClusterConfigurationService().withoutManagementRestService());

sender = clusterStartupRule.startServerVM(1, configuration, locator.getPort());
receiver = clusterStartupRule.startServerVM(2, configuration, locator.getPort());
sender = clusterStartupRule.startServerVM(1, senderConfiguration, locator.getPort());
receiver = clusterStartupRule.startServerVM(2, receiverConfiguration, locator.getPort());
}

@Test
public void testP2PMessagingWithTLS() {
@Parameters({
/*
* all combinations of flags with buffer sizes:
* (equal), larger/smaller, smaller/larger, minimal
*/
"true, true, 32768, 32768",
"true, true, 65536, 32768",
"true, true, 32768, 65536",
"true, true, 1024, 1024",
"true, false, 32768, 32768",
"true, false, 65536, 32768",
"true, false, 32768, 65536",
"true, false, 1024, 1024",
"false, true, 32768, 32768",
"false, true, 65536, 32768",
"false, true, 32768, 65536",
"false, true, 1024, 1024",
"false, false, 32768, 32768",
"false, false, 65536, 32768",
"false, false, 32768, 65536",
"false, false, 1024, 1024",
})
public void testP2PMessaging(
final boolean conserveSockets,
final boolean useTLS,
final int sendSocketBufferSize,
final int receiveSocketBufferSize) throws GeneralSecurityException, IOException {

configure(conserveSockets, useTLS, sendSocketBufferSize, receiveSocketBufferSize);

final InternalDistributedMember receiverMember =
receiver.invoke(() -> {
Expand Down Expand Up @@ -172,10 +220,16 @@ public void testP2PMessagingWithTLS() {

});

final long bytesSent = sender.invoke(() -> bytesTransferredAdder.sum());
final long bytesReceived = receiver.invoke(() -> bytesTransferredAdder.sum());
final long bytesSent = getByteCount(sender);

assertThat(bytesReceived).as("bytes received != bytes sent").isEqualTo(bytesSent);
await().untilAsserted(
() -> assertThat(getByteCount(receiver))
.as("bytes received != bytes sent")
.isEqualTo(bytesSent));
}

private long getByteCount(final MemberVM member) {
return member.invoke(() -> bytesTransferredAdder.sum());
}

private static ClusterDistributionManager getCDM() {
Expand Down Expand Up @@ -245,7 +299,7 @@ public void fromData(final DataInput in, final DeserializationContext context)
throws IOException, ClassNotFoundException {
super.fromData(in, context);

final int messageId = in.readInt();
messageId = in.readInt();

final int length = in.readInt();

Expand All @@ -263,10 +317,19 @@ public int getDSFID() {
}

@NotNull
private static Properties gemFireConfiguration()
private static Properties gemFireConfiguration(
final boolean conserveSockets, final boolean useTLS,
final int socketBufferSize)
throws GeneralSecurityException, IOException {

final Properties props = securityProperties();
final Properties props;
if (useTLS) {
props = securityProperties();
} else {
props = new Properties();
}

props.setProperty("socket-buffer-size", String.valueOf(socketBufferSize));

/*
* This is something we intend to test!
Expand All @@ -276,29 +339,32 @@ private static Properties gemFireConfiguration()
*
* careful: if you set a boolean it doesn't take hold! setting a String
*/
props.setProperty("conserve-sockets", "true");
props.setProperty("conserve-sockets", String.valueOf(conserveSockets));

return props;
}

@NotNull
private static Properties securityProperties() throws GeneralSecurityException, IOException {
final CertificateMaterial ca = new CertificateBuilder()
.commonName("Test CA")
.isCA()
.generate();

final CertificateMaterial serverCertificate = new CertificateBuilder()
.commonName("member")
.issuedBy(ca)
.generate();

final CertStores memberStore = new CertStores("member");
memberStore.withCertificate("member", serverCertificate);
memberStore.trust("ca", ca);
// we want to exercise the ByteBufferSharing code paths; we don't care about client auth etc
final Properties props = memberStore.propertiesWith("all", false, false);
return props;
// subsequent calls must return the same value so members agree on credentials
if (securityProperties == null) {
final CertificateMaterial ca = new CertificateBuilder()
.commonName("Test CA")
.isCA()
.generate();

final CertificateMaterial serverCertificate = new CertificateBuilder()
.commonName("member")
.issuedBy(ca)
.generate();

final CertStores memberStore = new CertStores("member");
memberStore.withCertificate("member", serverCertificate);
memberStore.trust("ca", ca);
// we want to exercise the ByteBufferSharing code paths; we don't care about client auth etc
securityProperties = memberStore.propertiesWith("all", false, false);
}
return securityProperties;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2770,6 +2770,9 @@ public void readAck(final DirectReplyProcessor processor)
/**
* processes the current NIO buffer. If there are complete messages in the buffer, they are
* deserialized and passed to TCPConduit for further processing
*
* pre-condition: inputBuffer (from inputSharing.getBuffer()) is in WRITABLE mode
* post-condition: inputBuffer is in WRITABLE mode
*/
private void processInputBuffer(AbstractExecutor threadMonitorExecutor)
throws ConnectionException, IOException {
Expand Down Expand Up @@ -2846,12 +2849,12 @@ private void processInputBuffer(AbstractExecutor threadMonitorExecutor)
"Allocating larger network read buffer, new size is {} old size was {}.",
allocSize, oldBufferSize);
inputBuffer = inputSharing.expandReadBufferIfNeeded(allocSize);
makeReadableBufferWriteable(inputBuffer);
} else {
if (inputBuffer.position() != 0) {
inputBuffer.compact();
} else {
inputBuffer.position(inputBuffer.limit());
inputBuffer.limit(inputBuffer.capacity());
makeReadableBufferWriteable(inputBuffer);
}
}
}
Expand All @@ -2865,6 +2868,11 @@ private void processInputBuffer(AbstractExecutor threadMonitorExecutor)
}
}

private void makeReadableBufferWriteable(final ByteBuffer inputBuffer) {
inputBuffer.position(inputBuffer.limit());
inputBuffer.limit(inputBuffer.capacity());
}

private boolean readHandshakeForReceiver(final DataInput dis) {
try {
checkHandshakeInitialByte(dis);
Expand Down

0 comments on commit fb142e1

Please sign in to comment.