Skip to content

Commit

Permalink
GEODE-6065: Continue event processing when hostname lookup fails (apa…
Browse files Browse the repository at this point in the history
…che#2883)

Co-authored-by: Ryan McMahon <[email protected]>
Co-authored-by: Bill Burcham <[email protected]>
  • Loading branch information
mcmellawatt and Bill committed Nov 21, 2018
1 parent 933c4fb commit 671671b
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
* @since GemFire 5.7
*
*/
class PooledConnection implements Connection {
public class PooledConnection implements Connection {

/*
* connection is volatile because we may asynchronously destroy the pooled connection while
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import java.util.Set;

import org.apache.geode.DataSerializer;
import org.apache.geode.GemFireConfigException;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.cache.UnsupportedVersionException;
import org.apache.geode.cache.client.ServerConnectivityException;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DurableClientAttributes;
import org.apache.geode.distributed.Role;
Expand Down Expand Up @@ -94,6 +94,19 @@ public class InternalDistributedMember implements DistributedMember, Externaliza
/** product version bit flag */
private static final int VERSION_BIT = 0x8;

@FunctionalInterface
public interface HostnameResolver {
InetAddress getInetAddress(ServerLocation location) throws UnknownHostException;
}

public static void setHostnameResolver(final HostnameResolver hostnameResolver) {
InternalDistributedMember.hostnameResolver = hostnameResolver;
}

/** Retrieves an InetAddress given the provided hostname */
private static HostnameResolver hostnameResolver =
(location) -> InetAddress.getByName(location.getHostName());

/**
* Representing the host name of this member.
*/
Expand Down Expand Up @@ -213,12 +226,13 @@ public InternalDistributedMember(String i, int p) {

public InternalDistributedMember(ServerLocation location) {
this.hostName = location.getHostName();
InetAddress addr = null;
final InetAddress addr;
try {
addr = InetAddress.getByName(this.hostName);
addr = hostnameResolver.getInetAddress(location);
} catch (UnknownHostException e) {
throw new GemFireConfigException("Unable to resolve server location " + location, e);
throw new ServerConnectivityException("Unable to resolve server location " + location, e);
}

netMbr = MemberFactory.newNetMember(addr, location.getPort());
netMbr.setVmKind(ClusterDistributionManager.NORMAL_DM_TYPE);
versionObj = Version.CURRENT;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.wan;

import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.LinkedList;
import java.util.Properties;

import org.junit.Test;
import org.mockito.Mockito;

import org.apache.geode.CancelCriterion;
import org.apache.geode.Statistics;
import org.apache.geode.cache.client.internal.Connection;
import org.apache.geode.cache.client.internal.Endpoint;
import org.apache.geode.cache.client.internal.EndpointManager;
import org.apache.geode.cache.client.internal.PoolImpl;
import org.apache.geode.cache.client.internal.pooling.PooledConnection;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.gms.membership.HostAddress;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.PoolFactoryImpl;
import org.apache.geode.internal.cache.PoolManagerImpl;
import org.apache.geode.internal.monitoring.ThreadsMonitoring;
import org.apache.geode.internal.net.SSLConfigurationFactory;
import org.apache.geode.internal.security.SecurableCommunicationChannel;

public class GatewaySenderEventRemoteDispatcherIntegrationTest {

/*
* Sometimes hostname lookup is flaky. We don't want such a failure to cripple our
* event processor.
*
* This test assumes hostname lookup (of IP number) succeeds when establishing the initial
* connection, but fails when constructing the InternalDistributedSystem object in response to a
* remote server crash.
*/
@Test
public void canProcessesEventAfterHostnameLookupFailsInNotifyServerCrashed() throws Exception {

final PoolImpl pool = getPool();

final ServerLocation serverLocation = mock(ServerLocation.class);

final AbstractGatewaySenderEventProcessor eventProcessor =
getMockedAbstractGatewaySenderEventProcessor(pool, serverLocation);

final Endpoint endpoint = getMockedEndpoint(serverLocation);
final Connection connection = getMockedConnection(serverLocation, endpoint);

/*
* In order for listeners to be notified, the endpoint must be referenced by the
* endpointManager so that it can be removed when the RuntimeException() is thrown by the
* connection
*/
final EndpointManager endpointManager = pool.getEndpointManager();
endpointManager.referenceEndpoint(serverLocation, mock(InternalDistributedMember.class));

final GatewaySenderEventRemoteDispatcher dispatcher =
new GatewaySenderEventRemoteDispatcher(eventProcessor, connection);

/*
* Set a HostnameResolver which simulates a failed
* hostname lookup resulting in an UnknownHostException
*/
InternalDistributedMember.setHostnameResolver(ignored -> {
throw new UnknownHostException("a.b.c");
});

/*
* We have mocked our connection to throw a RuntimeException when readAcknowledgement() is
* called, then in the exception handling for that RuntimeException, the UnknownHostException
* will be thrown when trying to notify listeners of the crash.
*/
dispatcher.readAcknowledgement();

/*
* Need to reset the hostname resolver to a real InetAddress resolver as it is static state and
* we do not want it to throw an UnknownHostException in subsequent test runs.
*/
InternalDistributedMember
.setHostnameResolver((location) -> InetAddress.getByName(location.getHostName()));

/*
* The handling of the UnknownHostException should not result in the event processor being
* stopped, so assert that setIsStopped(true) was never called.
*/
verify(eventProcessor, Mockito.times(0)).setIsStopped(true);
}

private PoolImpl getPool() {
final DistributionConfig distributionConfig = mock(DistributionConfig.class);
doReturn(new SecurableCommunicationChannel[] {}).when(distributionConfig)
.getSecurableCommunicationChannels();

SSLConfigurationFactory.setDistributionConfig(distributionConfig);

final Properties properties = new Properties();
properties.put(DURABLE_CLIENT_ID, "1");

final Statistics statistics = mock(Statistics.class);

final PoolFactoryImpl.PoolAttributes poolAttributes =
mock(PoolFactoryImpl.PoolAttributes.class);
/*
* These are the minimum pool attributes required
* so that basic validation and setup completes successfully. The values of
* these attributes have no importance to the assertions of the test itself.
*/
doReturn(1).when(poolAttributes).getMaxConnections();
doReturn((long) 10e8).when(poolAttributes).getPingInterval();

final CancelCriterion cancelCriterion = mock(CancelCriterion.class);

final InternalCache internalCache = mock(InternalCache.class);
doReturn(cancelCriterion).when(internalCache).getCancelCriterion();

final InternalDistributedSystem internalDistributedSystem =
mock(InternalDistributedSystem.class);
doReturn(distributionConfig).when(internalDistributedSystem).getConfig();
doReturn(properties).when(internalDistributedSystem).getProperties();
doReturn(statistics).when(internalDistributedSystem).createAtomicStatistics(any(), anyString());

final PoolManagerImpl poolManager = mock(PoolManagerImpl.class);
doReturn(true).when(poolManager).isNormal();

final ThreadsMonitoring tMonitoring = mock(ThreadsMonitoring.class);

return PoolImpl.create(poolManager, "pool", poolAttributes, new LinkedList<HostAddress>(),
internalDistributedSystem, internalCache, tMonitoring);
}

private Connection getMockedConnection(ServerLocation serverLocation, Endpoint endpoint)
throws Exception {
/*
* Mock the connection to throw a RuntimeException() when connection.Execute() is called,
* so that we attempt to notify listeners in the exception handling logic in
* OpExecutorImpl.executeWithPossibleReAuthentication()
*/
final Connection connection = mock(PooledConnection.class);
doReturn(serverLocation).when(connection).getServer();
doReturn(endpoint).when(connection).getEndpoint();
doThrow(new RuntimeException()).when(connection).execute(any());
return connection;
}

private AbstractGatewaySenderEventProcessor getMockedAbstractGatewaySenderEventProcessor(
PoolImpl pool, ServerLocation serverLocation) {
final AbstractGatewaySender abstractGatewaySender = mock(AbstractGatewaySender.class);
doReturn(serverLocation).when(abstractGatewaySender).getServerLocation();
doReturn(pool).when(abstractGatewaySender).getProxy();

final AbstractGatewaySenderEventProcessor eventProcessor =
mock(AbstractGatewaySenderEventProcessor.class);
doReturn(abstractGatewaySender).when(eventProcessor).getSender();
return eventProcessor;
}

private Endpoint getMockedEndpoint(ServerLocation serverLocation) {
final Endpoint endpoint = mock(Endpoint.class);
doReturn(serverLocation).when(endpoint).getLocation();
return endpoint;
}

}

0 comments on commit 671671b

Please sign in to comment.