diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java index 722c2f7abf2c..b6021139ef04 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java @@ -349,6 +349,12 @@ public static Byte isExecuteFunctionOnLocalNodeOnly() { return executeFunctionOnLocalNodeOnly.get(); } + @VisibleForTesting + void setServerConnectionCollection( + ServerConnectionCollection serverConnectionCollection) { + this.serverConnectionCollection = serverConnectionCollection; + } + private boolean verifyClientConnection() { synchronized (handshakeMonitor) { if (handshake == null) { @@ -816,6 +822,12 @@ void doNormalMessage() { return; } + if (isTerminated()) { + // Client is being terminated, don't try to process message. + processMessages = false; + return; + } + ThreadState threadState = null; resumeThreadMonitoring(); try { @@ -930,7 +942,8 @@ private void suspendThreadMonitoring() { } } - private void resumeThreadMonitoring() { + @VisibleForTesting + void resumeThreadMonitoring() { if (threadMonitorExecutor != null) { threadMonitorExecutor.resumeMonitoring(); } @@ -966,6 +979,7 @@ void handleTermination(boolean timedOut) { } terminated = true; } + setNotProcessingMessage(); boolean clientDeparted = false; boolean cleanupStats = false; diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java index 2735141e43ab..7e15eab74fcb 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java @@ -262,4 +262,18 @@ public void getUniqueIdBytesShouldThrowCacheClosedException() throws Exception { assertThatThrownBy(() -> spy.getUniqueIdBytes(requestMessage, -1)) .isInstanceOf(CacheClosedException.class); } + + @Test + public void doNormalMessageShouldNotProcessMessageWhenTerminated() { + ServerConnection spy = spy(serverConnection); + ServerConnectionCollection serverConnections = mock(ServerConnectionCollection.class); + spy.setServerConnectionCollection(serverConnections); + when(serverConnections.incrementConnectionsProcessing()).thenReturn(true); + + doReturn(true).when(spy).isTerminated(); + + spy.doNormalMessage(); + assertThat(spy.getProcessMessages()).isFalse(); + verify(spy, never()).resumeThreadMonitoring(); + } }