Skip to content

Commit

Permalink
GEODE-7345: break dependency on SocketCreator (apache#4322)
Browse files Browse the repository at this point in the history
  • Loading branch information
Bill authored Nov 15, 2019
1 parent 82ac712 commit 22a8c26
Show file tree
Hide file tree
Showing 48 changed files with 591 additions and 322 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package org.apache.geode.cache.client.internal;

import static org.apache.geode.distributed.internal.membership.adapter.SocketCreatorAdapter.asTcpSocketCreator;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;

import java.io.IOException;
Expand Down Expand Up @@ -49,6 +50,7 @@
import org.apache.geode.internal.cache.CacheServerImpl;
import org.apache.geode.internal.cache.PoolFactoryImpl;
import org.apache.geode.internal.net.SocketCreatorFactory;
import org.apache.geode.internal.security.SecurableCommunicationChannel;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.NetworkUtils;
Expand All @@ -60,8 +62,8 @@ public class LocatorLoadBalancingDUnitTest extends LocatorTestBase {

/**
* The number of connections that we can be off by in the balancing tests We need this little
* fudge factor, because the locator can receive an update from the cache server after it has
* made incremented its counter for a client connection, but the client hasn't connected yet. This
* fudge factor, because the locator can receive an update from the cache server after it has made
* incremented its counter for a client connection, but the client hasn't connected yet. This
* wipes out the estimation on the locator. This means that we may be slighly off in our balance.
* <p>
* TODO grid fix this hole in the locator.
Expand Down Expand Up @@ -132,42 +134,62 @@ public void testEstimation() throws IOException, ClassNotFoundException {

SocketCreatorFactory.setDistributionConfig(new DistributionConfigImpl(new Properties()));
ClientConnectionResponse response;
response =
(ClientConnectionResponse) new TcpClient().requestToServer(InetAddress.getByName(hostName),
locatorPort, new ClientConnectionRequest(Collections.EMPTY_SET, null), 10000);
response = issueClientConnectionRequest(hostName, locatorPort,
new ClientConnectionRequest(Collections.EMPTY_SET, null), true);
Assert.assertEquals(expectedLocation, response.getServer());

response =
(ClientConnectionResponse) new TcpClient().requestToServer(InetAddress.getByName(hostName),
locatorPort, new ClientConnectionRequest(Collections.EMPTY_SET, null), 10000, true);
response = issueClientConnectionRequest(hostName, locatorPort,
new ClientConnectionRequest(Collections.EMPTY_SET, null), true);
Assert.assertEquals(expectedLocation, response.getServer());

// we expect that the connection load load will be 2 * the loadPerConnection
vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected));

QueueConnectionResponse response2;
response2 =
(QueueConnectionResponse) new TcpClient().requestToServer(InetAddress.getByName(hostName),
locatorPort, new QueueConnectionRequest(null, 2, Collections.EMPTY_SET, null, false),
10000, true);
response2 = issueQueueConnectionRequest(hostName, locatorPort, 2);
Assert.assertEquals(Collections.singletonList(expectedLocation), response2.getServers());

response2 =
(QueueConnectionResponse) new TcpClient().requestToServer(InetAddress.getByName(hostName),
locatorPort, new QueueConnectionRequest(null, 5, Collections.EMPTY_SET, null, false),
10000, true);

response2 = issueQueueConnectionRequest(hostName, locatorPort, 5);
Assert.assertEquals(Collections.singletonList(expectedLocation), response2.getServers());

// we expect that the queue load will increase by 2
expectedLoad.setSubscriptionConnectionLoad(2f);
vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected));
}

private ClientConnectionResponse issueClientConnectionRequest(final String hostName,
final int locatorPort,
final ClientConnectionRequest request,
final boolean replyExpected)
throws IOException, ClassNotFoundException {
return (ClientConnectionResponse) issueRequest(hostName, locatorPort, request, replyExpected);
}

private QueueConnectionResponse issueQueueConnectionRequest(final String hostName,
final int locatorPort,
final int redundantCopies)
throws IOException, ClassNotFoundException {
return (QueueConnectionResponse) issueRequest(hostName, locatorPort,
new QueueConnectionRequest(null, redundantCopies, Collections.EMPTY_SET, null, false),
true);
}

private Object issueRequest(final String hostName, final int locatorPort,
final Object request, final boolean replyExpected)
throws IOException, ClassNotFoundException {
return new TcpClient(
asTcpSocketCreator(
SocketCreatorFactory
.getSocketCreatorForComponent(SecurableCommunicationChannel.LOCATOR)))
.requestToServer(InetAddress.getByName(hostName),
locatorPort,
request,
10000, replyExpected);
}

/**
* Test to make sure the cache servers communicate their updated load to the controller when the
* load on the cache server changes.
*
*/
@Test
public void testLoadMessaging() throws Exception {
Expand Down Expand Up @@ -216,7 +238,6 @@ public void testLoadMessaging() throws Exception {

/**
* Test to make sure that the locator balancing load between two servers.
*
*/
@Test
public void testBalancing() throws Exception {
Expand Down Expand Up @@ -277,7 +298,6 @@ private void waitForPrefilledConnections(final int count, final String poolName)
/**
* Test that the locator balances load between three servers with intersecting server groups.
* Server: 1 2 3 Groups: a a,b b
*
*/
@Test
public void testIntersectingServerGroups() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.distributed.internal.membership.adapter.SocketCreatorAdapter.asTcpSocketCreator;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

Expand All @@ -34,6 +35,8 @@
import org.apache.geode.distributed.internal.membership.gms.locator.FindCoordinatorResponse;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.net.SocketCreator;
import org.apache.geode.internal.net.SocketCreatorFactory;
import org.apache.geode.internal.security.SecurableCommunicationChannel;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.VM;
Expand Down Expand Up @@ -142,8 +145,11 @@ private void restartLocator(int port0, File logFile0, Properties props) {
new InternalDistributedMember("localhost", 1234));
FindCoordinatorResponse response;

response = (FindCoordinatorResponse) new TcpClient()
.requestToServer(SocketCreator.getLocalHost(), port0, req, 5000);
response = (FindCoordinatorResponse) new TcpClient(
asTcpSocketCreator(
SocketCreatorFactory
.getSocketCreatorForComponent(SecurableCommunicationChannel.LOCATOR)))
.requestToServer(SocketCreator.getLocalHost(), port0, req, 5000);
assertThat(response).isNotNull();

} catch (IllegalStateException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.distributed.internal.membership.adapter.SocketCreatorAdapter.asTcpSocketCreator;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
Expand All @@ -32,6 +33,7 @@
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -77,6 +79,8 @@
import org.apache.geode.internal.cache.PoolStats;
import org.apache.geode.internal.cache.tier.InternalClientMembership;
import org.apache.geode.internal.cache.tier.sockets.TcpServerFactory;
import org.apache.geode.internal.net.SocketCreatorFactory;
import org.apache.geode.internal.security.SecurableCommunicationChannel;
import org.apache.geode.management.membership.ClientMembershipEvent;
import org.apache.geode.management.membership.ClientMembershipListener;
import org.apache.geode.test.dunit.NetworkUtils;
Expand Down Expand Up @@ -137,7 +141,7 @@ public void tearDown() {
try {
if (server != null && server.isAlive()) {
try {
new TcpClient().stop(InetAddress.getLocalHost(), port);
issueStopRequest(port);
} catch (ConnectException ignore) {
// must not be running
}
Expand All @@ -154,6 +158,15 @@ public void tearDown() {
}
}

private void issueStopRequest(final int port)
throws ConnectException, UnknownHostException {
new TcpClient(
asTcpSocketCreator(
SocketCreatorFactory
.getSocketCreatorForComponent(SecurableCommunicationChannel.LOCATOR)))
.stop(InetAddress.getLocalHost(), port);
}

/**
* This test validates the AutoConnectionSourceImpl.updateLocatorInLocatorList method. That method
* takes InetSocketAddres of locator which unable to connect to locator. And update that
Expand Down Expand Up @@ -318,7 +331,7 @@ public void test_DiscoverLocators_whenOneLocatorWasShutdown() throws Exception {
startFakeLocator();
int secondPort = AvailablePortHelper.getRandomAvailableTCPPort();
TcpServer server2 =
new TcpServerFactory().makeTcpServer(secondPort, InetAddress.getLocalHost(), null, null,
new TcpServerFactory().makeTcpServer(secondPort, InetAddress.getLocalHost(),
handler, new FakeHelper(), "tcp server", null);
server2.start();

Expand All @@ -328,7 +341,7 @@ public void test_DiscoverLocators_whenOneLocatorWasShutdown() throws Exception {
handler.nextLocatorListResponse = new LocatorListResponse(locators, false);
Thread.sleep(500);
try {
new TcpClient().stop(InetAddress.getLocalHost(), port);
issueStopRequest(port);
} catch (ConnectException ignore) {
// must not be running
}
Expand All @@ -339,7 +352,7 @@ public void test_DiscoverLocators_whenOneLocatorWasShutdown() throws Exception {
assertEquals(server1, source.findServer(null));
} finally {
try {
new TcpClient().stop(InetAddress.getLocalHost(), secondPort);
issueStopRequest(secondPort);
} catch (ConnectException ignore) {
// must not be running
}
Expand All @@ -362,7 +375,7 @@ public void testDiscoverLocatorsConnectsToLocatorsAfterTheyStartUp() throws Exce
await().until(() -> source.getOnlineLocators().size() == 1);
} finally {
try {
new TcpClient().stop(InetAddress.getLocalHost(), port);
issueStopRequest(port);
} catch (ConnectException ignore) {
// must not be running
}
Expand Down Expand Up @@ -396,7 +409,7 @@ public void testLocatorUpdateIntervalZero() {
}

private void startFakeLocator() throws IOException, InterruptedException {
server = new TcpServerFactory().makeTcpServer(port, InetAddress.getLocalHost(), null, null,
server = new TcpServerFactory().makeTcpServer(port, InetAddress.getLocalHost(),
handler, new FakeHelper(), "Tcp Server", null);
server.start();
Thread.sleep(500);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.geode.distributed.ConfigurationProperties.LOG_FILE;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.distributed.internal.DistributionConfig.GEMFIRE_PREFIX;
import static org.apache.geode.distributed.internal.membership.adapter.SocketCreatorAdapter.asTcpSocketCreator;
import static org.apache.geode.internal.AvailablePort.SOCKET;
import static org.apache.geode.internal.AvailablePort.getRandomAvailablePort;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -57,6 +58,8 @@
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.OSProcess;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.net.SocketCreatorFactory;
import org.apache.geode.internal.security.SecurableCommunicationChannel;
import org.apache.geode.management.internal.JmxManagerAdvisor.JmxManagerProfile;
import org.apache.geode.management.internal.configuration.messages.SharedConfigurationStatusRequest;
import org.apache.geode.test.junit.categories.MembershipTest;
Expand Down Expand Up @@ -170,7 +173,10 @@ public void testHandlersAreWaitedOn() throws Exception {
public void testBasicInfo() throws Exception {
locator = Locator.startLocator(port, tmpFile);
int boundPort = port == 0 ? locator.getPort() : port;
TcpClient client = new TcpClient();
TcpClient client = new TcpClient(
asTcpSocketCreator(
SocketCreatorFactory
.getSocketCreatorForComponent(SecurableCommunicationChannel.LOCATOR)));
String[] info = client.getInfo(InetAddress.getLocalHost(), boundPort);

assertThat(info).isNotNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.mockito.quality.Strictness;

import org.apache.geode.internal.logging.InternalLogWriter;
import org.apache.geode.internal.security.SecurableCommunicationChannel;
import org.apache.geode.logging.internal.LoggingSession;

public class InternalLocatorIntegrationTest {
Expand Down Expand Up @@ -85,6 +86,8 @@ public void tearDown() {
@Test
public void constructs() {
when(distributionConfig.getLogFile()).thenReturn(logFile);
when(distributionConfig.getSecurableCommunicationChannels()).thenReturn(
new SecurableCommunicationChannel[0]);

assertThatCode(() -> {
internalLocator =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.distributed.ConfigurationProperties.MEMBER_TIMEOUT;
import static org.apache.geode.distributed.internal.membership.adapter.SocketCreatorAdapter.asTcpSocketCreator;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -66,10 +67,13 @@
import org.apache.geode.distributed.internal.membership.gms.api.MessageListener;
import org.apache.geode.distributed.internal.membership.gms.interfaces.JoinLeave;
import org.apache.geode.distributed.internal.membership.gms.membership.GMSJoinLeave;
import org.apache.geode.distributed.internal.tcpserver.TcpClient;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.admin.remote.RemoteTransportConfig;
import org.apache.geode.internal.net.SocketCreator;
import org.apache.geode.internal.net.SocketCreatorFactory;
import org.apache.geode.internal.security.SecurableCommunicationChannel;
import org.apache.geode.internal.security.SecurityService;
import org.apache.geode.internal.security.SecurityServiceFactory;
import org.apache.geode.internal.serialization.DSFIDSerializer;
Expand Down Expand Up @@ -270,6 +274,10 @@ public MemberIdentifier answer(InvocationOnMock invocation) throws Throwable {
.setMembershipListener(listener)
.setConfig(new ServiceConfig(transport, config))
.setSerializer(serializer)
.setLocatorClient(new TcpClient(
asTcpSocketCreator(
SocketCreatorFactory
.getSocketCreatorForComponent(SecurableCommunicationChannel.LOCATOR))))
.create();
m1.startEventProcessing();
return Pair.of(m1, messageListener);
Expand Down Expand Up @@ -457,7 +465,10 @@ public void testMulticastDiscoveryNotAllowed() {
Services services = mock(Services.class);
when(services.getConfig()).thenReturn(membershipConfig);

GMSJoinLeave joinLeave = new GMSJoinLeave();
GMSJoinLeave joinLeave = new GMSJoinLeave(new TcpClient(
asTcpSocketCreator(
SocketCreatorFactory.setDistributionConfig(config)
.getSocketCreatorForComponent(SecurableCommunicationChannel.LOCATOR))));
try {
joinLeave.init(services);
throw new Error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,30 @@
*/
package org.apache.geode.distributed.internal.membership.gms.locator;

import static org.apache.geode.distributed.internal.membership.adapter.SocketCreatorAdapter.asTcpSocketCreator;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.util.Properties;

import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import org.apache.geode.distributed.internal.DistributionConfigImpl;
import org.apache.geode.distributed.internal.LocatorStats;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.gms.GMSMembershipView;
import org.apache.geode.distributed.internal.membership.gms.Services;
import org.apache.geode.distributed.internal.membership.gms.interfaces.JoinLeave;
import org.apache.geode.distributed.internal.membership.gms.interfaces.Messenger;
import org.apache.geode.distributed.internal.tcpserver.TcpClient;
import org.apache.geode.distributed.internal.tcpserver.TcpServer;
import org.apache.geode.internal.net.SocketCreatorFactory;
import org.apache.geode.internal.security.SecurableCommunicationChannel;
import org.apache.geode.internal.serialization.DSFIDSerializer;
import org.apache.geode.internal.serialization.DSFIDSerializerImpl;
import org.apache.geode.internal.serialization.Version;
Expand All @@ -50,6 +58,9 @@ public class GMSLocatorIntegrationTest {

@Before
public void setUp() {

SocketCreatorFactory.setDistributionConfig(new DistributionConfigImpl(new Properties()));

tcpServer = mock(TcpServer.class);
view = new GMSMembershipView();
services = mock(Services.class);
Expand All @@ -67,10 +78,18 @@ public void setUp() {

gmsLocator =
new GMSLocator(null, null, false, false, new LocatorStats(), "",
temporaryFolder.getRoot().toPath());
temporaryFolder.getRoot().toPath(), new TcpClient(
asTcpSocketCreator(
SocketCreatorFactory
.getSocketCreatorForComponent(SecurableCommunicationChannel.LOCATOR))));
gmsLocator.setServices(services);
}

@After
public void after() {
SocketCreatorFactory.close();
}

@Test
public void viewFileIsNullByDefault() {
assertThat(gmsLocator.getViewFile()).isNull();
Expand Down
Loading

0 comments on commit 22a8c26

Please sign in to comment.