Skip to content

Commit

Permalink
GEODE-7348: Removing dependencies on core god classes from TcpServer (a…
Browse files Browse the repository at this point in the history
…pache#4217)

- Removed DistributedSystem, GemFireCache,
  InternalConfiguratinPersistenceService dependencies that were used in the
  restarting method. Now there is a RestartableTcpHandler that lives inside core
  and extends TcpHandler. Core directly calls restarting on the
  RestartableTcpHandler.
- Extracted ProtocolChecker interface to remove Protobuf dependency.
  • Loading branch information
Ernie Burghardt authored Oct 24, 2019
1 parent 0df32bd commit b3ab652
Show file tree
Hide file tree
Showing 19 changed files with 233 additions and 176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@
import org.apache.geode.distributed.internal.InternalConfigurationPersistenceService;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.PoolStatHelper;
import org.apache.geode.distributed.internal.RestartableTcpHandler;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.distributed.internal.membership.gms.membership.HostAddress;
import org.apache.geode.distributed.internal.tcpserver.TcpClient;
import org.apache.geode.distributed.internal.tcpserver.TcpHandler;
import org.apache.geode.distributed.internal.tcpserver.TcpServer;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.cache.PoolStats;
Expand Down Expand Up @@ -402,7 +402,7 @@ private void startFakeLocator() throws IOException, InterruptedException {
Thread.sleep(500);
}

protected static class FakeHandler implements TcpHandler {
protected static class FakeHandler implements RestartableTcpHandler {
volatile ClientConnectionResponse nextConnectionResponse;
volatile LocatorListResponse nextLocatorListResponse;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionConfigImpl;
import org.apache.geode.distributed.internal.PoolStatHelper;
import org.apache.geode.distributed.internal.RestartableTcpHandler;
import org.apache.geode.internal.AvailablePort;
import org.apache.geode.internal.admin.SSLConfig;
import org.apache.geode.internal.net.SSLConfigurationFactory;
Expand Down Expand Up @@ -103,7 +104,7 @@ private void startTcpServer(Properties sslProperties) throws IOException {
localhost = InetAddress.getLocalHost();
port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);

TcpHandler tcpHandler = Mockito.mock(TcpHandler.class);
RestartableTcpHandler tcpHandler = Mockito.mock(RestartableTcpHandler.class);
when(tcpHandler.processRequest(any())).thenReturn("Running!");

server = new FakeTcpServer(port, localhost, sslProperties, null,
Expand Down Expand Up @@ -196,9 +197,10 @@ private static class FakeTcpServer extends TcpServer {
private DistributionConfig distributionConfig;

public FakeTcpServer(int port, InetAddress bind_address, Properties sslConfig,
DistributionConfigImpl cfg, TcpHandler handler, PoolStatHelper poolHelper,
DistributionConfigImpl cfg, RestartableTcpHandler handler, PoolStatHelper poolHelper,
String threadName) {
super(port, bind_address, sslConfig, cfg, handler, poolHelper, threadName, null, null);
super(port, bind_address, sslConfig, cfg, handler, poolHelper, threadName,
(socket, input, firstByte) -> false);
if (cfg == null) {
cfg = new DistributionConfigImpl(sslConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionConfigImpl;
import org.apache.geode.distributed.internal.PoolStatHelper;
import org.apache.geode.distributed.internal.RestartableTcpHandler;
import org.apache.geode.internal.AvailablePort;
import org.apache.geode.internal.admin.SSLConfig;
import org.apache.geode.internal.net.DummySocketCreator;
Expand Down Expand Up @@ -77,7 +78,7 @@ private void startTimeDelayedTcpServer(Properties sslProperties) throws IOExcept
port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);

server = new DummyTcpServer(port, localhost, sslProperties, null,
Mockito.mock(TcpHandler.class), Mockito.mock(PoolStatHelper.class),
Mockito.mock(RestartableTcpHandler.class), Mockito.mock(PoolStatHelper.class),
"server thread");
server.start();
}
Expand Down Expand Up @@ -138,9 +139,10 @@ private class DummyTcpServer extends TcpServer {
private List<Integer> recordedSocketsTimeouts = new ArrayList<>();

public DummyTcpServer(int port, InetAddress bind_address, Properties sslConfig,
DistributionConfigImpl cfg, TcpHandler handler, PoolStatHelper poolHelper,
DistributionConfigImpl cfg, RestartableTcpHandler handler, PoolStatHelper poolHelper,
String threadName) {
super(port, bind_address, sslConfig, cfg, handler, poolHelper, threadName, null, null);
super(port, bind_address, sslConfig, cfg, handler, poolHelper, threadName,
(socket, input, firstByte) -> false);
if (cfg == null) {
cfg = new DistributionConfigImpl(sslConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,9 @@
import org.mockito.stubbing.Answer;

import org.apache.geode.DataSerializable;
import org.apache.geode.cache.GemFireCache;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.DistributionConfigImpl;
import org.apache.geode.distributed.internal.InternalConfigurationPersistenceService;
import org.apache.geode.distributed.internal.PoolStatHelper;
import org.apache.geode.distributed.internal.RestartableTcpHandler;
import org.apache.geode.internal.AvailablePort;
import org.apache.geode.internal.cache.tier.sockets.TcpServerFactory;
import org.apache.geode.internal.net.SocketCreatorFactory;
Expand Down Expand Up @@ -164,7 +162,7 @@ public void testNewConnectionsAcceptedAfterSocketException() throws IOException,
ClassNotFoundException, InterruptedException {
// Initially mock the handler to throw a SocketException. We want to verify that the server
// can recover and serve new client requests after a SocketException is thrown.
TcpHandler mockTcpHandler = mock(TcpHandler.class);
RestartableTcpHandler mockTcpHandler = mock(RestartableTcpHandler.class);
doThrow(SocketException.class).when(mockTcpHandler).processRequest(any(Object.class));
start(mockTcpHandler);

Expand Down Expand Up @@ -248,10 +246,6 @@ public void shutDown() {
shutdown = true;
}

@Override
public void restarting(DistributedSystem ds, GemFireCache cache,
InternalConfigurationPersistenceService sharedConfig) {}

@Override
public void endRequest(Object request, long startTime) {}

Expand Down Expand Up @@ -287,10 +281,6 @@ public Object processRequest(Object request) throws IOException {
@Override
public void shutDown() {}

@Override
public void restarting(DistributedSystem ds, GemFireCache cache,
InternalConfigurationPersistenceService sharedConfig) {}

@Override
public void endRequest(Object request, long startTime) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1158,7 +1158,8 @@ private void restartWithoutSystem() throws IOException {
internalCache = null;

logger.info("Locator restart: initializing TcpServer peer location services");
server.restarting(null, null, null);
handler.restarting(null, null, null);
server.restarting();

if (productUseLog.isClosed()) {
productUseLog.reopen();
Expand All @@ -1185,7 +1186,8 @@ private void restartWithSystem(InternalDistributedSystem newSystem, InternalCach
logger.info("Locator restart: initializing TcpServer");

try {
server.restarting(newSystem, newCache, configurationPersistenceService);
handler.restarting(newSystem, newCache, configurationPersistenceService);
server.restarting();
} catch (CancelException e) {
internalDistributedSystem = null;
internalCache = null;
Expand Down Expand Up @@ -1215,7 +1217,7 @@ private void restartWithSystem(InternalDistributedSystem newSystem, InternalCach
endStartLocator(internalDistributedSystem);
logger.info("Locator restart completed");

server.restartCompleted(newSystem);
handler.restartCompleted(newSystem);
}

public ClusterManagementService getClusterManagementService() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@
import org.apache.geode.distributed.internal.tcpserver.TcpServer;
import org.apache.geode.internal.logging.LogService;

public class PrimaryHandler implements TcpHandler {
public class PrimaryHandler implements RestartableTcpHandler {
private static final Logger logger = LogService.getLogger();

private final LocatorMembershipListener locatorListener;
private final InternalLocator internalLocator;

private volatile Map<Class, TcpHandler> handlerMapping = new HashMap<>();
private volatile Set<TcpHandler> allHandlers = new HashSet<>();
private volatile Map<Class, RestartableTcpHandler> handlerMapping = new HashMap<>();
private volatile Set<RestartableTcpHandler> allHandlers = new HashSet<>();

private TcpServer tcpServer;

Expand All @@ -66,7 +66,7 @@ public synchronized void init(TcpServer tcpServer) {
public void restarting(DistributedSystem ds, GemFireCache cache,
InternalConfigurationPersistenceService sharedConfig) {
if (ds != null) {
for (TcpHandler handler : allHandlers) {
for (RestartableTcpHandler handler : allHandlers) {
handler.restarting(ds, cache, sharedConfig);
}
}
Expand All @@ -75,7 +75,7 @@ public void restarting(DistributedSystem ds, GemFireCache cache,
@Override
public void restartCompleted(DistributedSystem ds) {
if (ds != null) {
for (TcpHandler handler : allHandlers) {
for (RestartableTcpHandler handler : allHandlers) {
handler.restartCompleted(ds);
}
}
Expand Down Expand Up @@ -137,9 +137,9 @@ synchronized boolean isHandled(Class clazz) {
return handlerMapping.containsKey(clazz);
}

public synchronized void addHandler(Class clazz, TcpHandler handler) {
Map<Class, TcpHandler> tmpHandlerMapping = new HashMap<>(handlerMapping);
Set<TcpHandler> tmpAllHandlers = new HashSet<>(allHandlers);
public synchronized void addHandler(Class clazz, RestartableTcpHandler handler) {
Map<Class, RestartableTcpHandler> tmpHandlerMapping = new HashMap<>(handlerMapping);
Set<RestartableTcpHandler> tmpAllHandlers = new HashSet<>(allHandlers);
tmpHandlerMapping.put(clazz, handler);
if (tmpAllHandlers.add(handler) && tcpServer != null) {
handler.init(tcpServer);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.distributed.internal;

import java.io.DataInputStream;
import java.io.IOException;
import java.net.Socket;

import org.apache.logging.log4j.Logger;

import org.apache.geode.cache.IncompatibleVersionException;
import org.apache.geode.distributed.internal.tcpserver.ProtocolChecker;
import org.apache.geode.internal.cache.client.protocol.ClientProtocolProcessor;
import org.apache.geode.internal.cache.client.protocol.ClientProtocolService;
import org.apache.geode.internal.cache.client.protocol.ClientProtocolServiceLoader;
import org.apache.geode.internal.cache.client.protocol.exception.ServiceLoadingFailureException;
import org.apache.geode.internal.cache.client.protocol.exception.ServiceVersionNotFoundException;
import org.apache.geode.internal.cache.tier.CommunicationMode;
import org.apache.geode.internal.cache.tier.sockets.Handshake;
import org.apache.geode.internal.logging.LogService;

public class ProtocolCheckerImpl implements ProtocolChecker {
private static final Logger logger = LogService.getLogger();
public final InternalLocator internalLocator;
public final ClientProtocolServiceLoader clientProtocolServiceLoader;

public ProtocolCheckerImpl(
final InternalLocator internalLocator,
final ClientProtocolServiceLoader clientProtocolServiceLoader) {
this.internalLocator = internalLocator;
this.clientProtocolServiceLoader = clientProtocolServiceLoader;
}


@Override
public boolean checkProtocol(final Socket socket, final DataInputStream input,
final int firstByte) throws Exception {
boolean handled = false;
if (firstByte == CommunicationMode.ProtobufClientServerProtocol.getModeNumber()) {
handleProtobufConnection(socket, input);
handled = true;
} else if (CommunicationMode.isValidMode(firstByte)) {
socket.getOutputStream().write(Handshake.REPLY_SERVER_IS_LOCATOR);
throw new Exception("Improperly configured client detected - use addPoolLocator to "
+ "configure its locators instead of addPoolServer.");
}
return handled;
}

public void handleProtobufConnection(Socket socket, DataInputStream input) throws Exception {
if (!Boolean.getBoolean("geode.feature-protobuf-protocol")) {
logger.warn("Incoming protobuf connection, but protobuf not enabled on this locator.");
socket.close();
return;
}

try {
ClientProtocolService clientProtocolService = clientProtocolServiceLoader.lookupService();
clientProtocolService.initializeStatistics("LocatorStats",
internalLocator.getDistributedSystem());
try (ClientProtocolProcessor pipeline = clientProtocolService.createProcessorForLocator(
internalLocator, internalLocator.getCache().getSecurityService())) {
while (!pipeline.socketProcessingIsFinished()) {
pipeline.processMessage(input, socket.getOutputStream());
}
} catch (IncompatibleVersionException e) {
// should not happen on the locator as there is no handshake.
logger.error("Unexpected exception in client message processing", e);
}
} catch (ServiceLoadingFailureException e) {
logger.error("There was an error looking up the client protocol service", e);
socket.close();
throw new IOException("There was an error looking up the client protocol service", e);
} catch (ServiceVersionNotFoundException e) {
logger.error("Unable to find service matching the client protocol version byte", e);
socket.close();
throw new IOException("Unable to find service matching the client protocol version byte", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.distributed.internal;


import org.apache.geode.cache.GemFireCache;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.tcpserver.TcpHandler;
import org.apache.geode.distributed.internal.tcpserver.TcpServer;

/**
* A handler which responds to messages for the {@link TcpServer}
*
* @since GemFire 5.7
*/
public interface RestartableTcpHandler extends TcpHandler {

/**
* Informs the handler that TcpServer is restarting with the given distributed system and cache
*
* @param sharedConfig TODO
*/
void restarting(DistributedSystem ds, GemFireCache cache,
InternalConfigurationPersistenceService sharedConfig);

/**
* Informs the handler that restart has completed
*/
default void restartCompleted(DistributedSystem ds) {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.DistributionAdvisor.Profile;
import org.apache.geode.distributed.internal.tcpserver.TcpHandler;
import org.apache.geode.distributed.internal.tcpserver.TcpServer;
import org.apache.geode.internal.cache.CacheServerAdvisor.CacheServerProfile;
import org.apache.geode.internal.cache.ControllerAdvisor;
Expand All @@ -64,7 +63,7 @@
*
* @since GemFire 5.7
*/
public class ServerLocator implements TcpHandler, DistributionAdvisee {
public class ServerLocator implements RestartableTcpHandler, DistributionAdvisee {
private static final Logger logger = LogService.getLogger();

private final int port;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
*/
package org.apache.geode.distributed.internal.membership;

import org.apache.geode.distributed.internal.RestartableTcpHandler;
import org.apache.geode.distributed.internal.membership.gms.Services;
import org.apache.geode.distributed.internal.tcpserver.TcpHandler;

public interface NetLocator extends TcpHandler {
public interface NetLocator extends RestartableTcpHandler {

/**
* This must be called after booting the membership manager so that the locator can use its
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@
import org.apache.geode.distributed.internal.InternalConfigurationPersistenceService;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.LocatorStats;
import org.apache.geode.distributed.internal.RestartableTcpHandler;
import org.apache.geode.distributed.internal.membership.NetLocator;
import org.apache.geode.distributed.internal.membership.gms.Services;
import org.apache.geode.distributed.internal.membership.gms.interfaces.Locator;
import org.apache.geode.distributed.internal.membership.gms.locator.GMSLocator;
import org.apache.geode.distributed.internal.tcpserver.TcpHandler;
import org.apache.geode.distributed.internal.tcpserver.TcpServer;

public class GMSLocatorAdapter implements TcpHandler, NetLocator {
public class GMSLocatorAdapter implements RestartableTcpHandler, NetLocator {

private final GMSLocator gmsLocator;

Expand Down
Loading

0 comments on commit b3ab652

Please sign in to comment.