Skip to content

Commit

Permalink
GEODE-9819: fix durable client socket leak
Browse files Browse the repository at this point in the history
Added unit test that reproduced the socket leak.
This involved some change to the product classes
to make them unit testable.
Fixed the leak by making sure socket.close is called
if the response code was not successful.
  • Loading branch information
dschneider-pivotal authored Jan 5, 2022
1 parent 5e9c775 commit 97601eb
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,30 +131,25 @@ public class CacheClientNotifier {
@MakeNotStatic
private static volatile CacheClientNotifier ccnSingleton;

private final SocketMessageWriter socketMessageWriter = new SocketMessageWriter();
private final SocketMessageWriter socketMessageWriter;
private final ClientRegistrationEventQueueManager clientRegistrationEventQueueManager;
private final CacheClientProxyFactory cacheClientProxyFactory;

/**
* Factory method to construct a CacheClientNotifier {@code CacheClientNotifier} instance.
*
* @param cache The GemFire {@code InternalCache}
* @param clientRegistrationEventQueueManager Manages temporary registration queues for clients
* @return A {@code CacheClientNotifier} instance
*/
public static synchronized CacheClientNotifier getInstance(InternalCache cache,
@VisibleForTesting
static CacheClientNotifier getInstance(InternalCache cache,
ClientRegistrationEventQueueManager clientRegistrationEventQueueManager,
StatisticsClock statisticsClock,
CacheServerStats acceptorStats,
int maximumMessageCount,
int messageTimeToLive,
ConnectionListener listener,
OverflowAttributes overflowAttributes,
boolean isGatewayReceiver) {
boolean isGatewayReceiver,
SocketMessageWriter socketMessageWriter) {
if (ccnSingleton == null) {
ccnSingleton = new CacheClientNotifier(cache, clientRegistrationEventQueueManager,
statisticsClock, acceptorStats, maximumMessageCount, messageTimeToLive, listener,
isGatewayReceiver, new CacheClientProxyFactory());
isGatewayReceiver, new CacheClientProxyFactory(), socketMessageWriter);
}

if (!isGatewayReceiver && ccnSingleton.getHaContainer() == null) {
Expand All @@ -165,6 +160,27 @@ public static synchronized CacheClientNotifier getInstance(InternalCache cache,
return ccnSingleton;
}

/**
* Factory method to construct a CacheClientNotifier {@code CacheClientNotifier} instance.
*
* @param cache The GemFire {@code InternalCache}
* @param clientRegistrationEventQueueManager Manages temporary registration queues for clients
* @return A {@code CacheClientNotifier} instance
*/
public static synchronized CacheClientNotifier getInstance(InternalCache cache,
ClientRegistrationEventQueueManager clientRegistrationEventQueueManager,
StatisticsClock statisticsClock,
CacheServerStats acceptorStats,
int maximumMessageCount,
int messageTimeToLive,
ConnectionListener listener,
OverflowAttributes overflowAttributes,
boolean isGatewayReceiver) {
return getInstance(cache, clientRegistrationEventQueueManager, statisticsClock,
acceptorStats, maximumMessageCount, messageTimeToLive, listener, overflowAttributes,
isGatewayReceiver, new SocketMessageWriter());
}

public static CacheClientNotifier getInstance() {
return ccnSingleton;
}
Expand Down Expand Up @@ -446,15 +462,19 @@ void registerClientInternal(final ClientRegistrationMetadata clientRegistrationM
if (logger.isDebugEnabled()) {
logger.debug("CacheClientNotifier: Successfully registered {}", cacheClientProxy);
}
performPostAuthorization(cacheClientProxy, clientProxyMembershipID, member,
sysProps,
subjectOrPrincipal);
} else {
try {
// prevent leak by closing socket
socket.close();
} catch (IOException ignore) {
}
logger.warn(
"CacheClientNotifier: Unsuccessfully registered client with identifier {} and response code {}",
new Object[] {clientProxyMembershipID, responseByte});
}

performPostAuthorization(cacheClientProxy, clientProxyMembershipID, member,
sysProps,
subjectOrPrincipal);
}

private void handleAuthenticationException(final ClientProxyMembershipID clientProxyMembershipID,
Expand Down Expand Up @@ -1720,7 +1740,9 @@ private CacheClientNotifier(InternalCache cache,
int messageTimeToLive,
ConnectionListener listener,
boolean isGatewayReceiver,
CacheClientProxyFactory cacheClientProxyFactory) {
CacheClientProxyFactory cacheClientProxyFactory,
SocketMessageWriter socketMessageWriter) {
this.socketMessageWriter = socketMessageWriter;
this.cacheClientProxyFactory = cacheClientProxyFactory;
// Set the Cache
setCache(cache);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.RegionExistsException;
import org.apache.geode.cache.query.internal.cq.InternalCqQuery;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.cache.ClientServerObserver;
import org.apache.geode.internal.cache.ClientServerObserverHolder;
import org.apache.geode.internal.cache.Conflatable;
Expand Down Expand Up @@ -156,7 +155,7 @@ private static HARegionQueue getMessageQueue(CacheClientProxy proxy,
.putProxy(HARegionQueue.createRegionName(proxy.getHARegionName()), proxy);
boolean createDurableQueue = proxy.proxyID.isDurable();
boolean canHandleDelta =
InternalDistributedSystem.getAnyInstance().getConfig().getDeltaPropagation()
proxy.getCache().getInternalDistributedSystem().getConfig().getDeltaPropagation()
&& !(proxy.clientConflation == Handshake.CONFLATION_ON);
if ((createDurableQueue || canHandleDelta) && logger.isDebugEnabled()) {
logger.debug("Creating a {} subscription queue for {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
Expand All @@ -49,6 +50,7 @@
import org.apache.geode.Statistics;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.query.internal.cq.ServerCQ;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.SystemTimer;
import org.apache.geode.internal.cache.EntryEventImpl;
Expand Down Expand Up @@ -361,4 +363,34 @@ private InternalCacheEvent internalCacheEvent(ClientProxyMembershipID clientProx
return internalCacheEvent;
}

@Test
public void registerClientInternalWithDuplicateDurableClientClosesSocket() throws Exception {
when(internalCache.getCCPTimer())
.thenReturn(mock(SystemTimer.class));
when(internalCache.getInternalDistributedSystem())
.thenReturn(internalDistributedSystem);
when(internalCache.getDistributedSystem()).thenReturn(internalDistributedSystem);
when(internalDistributedSystem.getProperties()).thenReturn(mock(Properties.class));
when(internalDistributedSystem.getStatisticsManager())
.thenReturn(statisticsManager);
when(statisticsManager.createAtomicStatistics(any(), any()))
.thenReturn(statistics);
cacheClientNotifier = CacheClientNotifier.getInstance(internalCache,
mock(ClientRegistrationEventQueueManager.class), mock(StatisticsClock.class),
mock(CacheServerStats.class), 10, 10, mock(ConnectionListener.class), null, false,
mock(SocketMessageWriter.class));
ClientRegistrationMetadata metadata = mock(ClientRegistrationMetadata.class);
ClientProxyMembershipID id = mock(ClientProxyMembershipID.class);
CacheClientProxy proxy = mock(CacheClientProxy.class);
when(proxy.getProxyID()).thenReturn(id);
when(metadata.getClientProxyMembershipID()).thenReturn(id);
when(id.getDistributedMember()).thenReturn(mock(DistributedMember.class));
when(id.getDurableId()).thenReturn("durable");
when(id.isDurable()).thenReturn(true);
cacheClientNotifier.addClientProxy(proxy);

cacheClientNotifier.registerClientInternal(metadata, socket, true, 0, false);

verify(socket).close();
}
}

0 comments on commit 97601eb

Please sign in to comment.