From 387848f10c0362569b453ca3ed583990547bac84 Mon Sep 17 00:00:00 2001 From: Kirk Lund Date: Tue, 25 Feb 2020 16:26:12 -0800 Subject: [PATCH] GEODE-7810: Change alert listener connection warning from fatal (#4728) Add AlertingIOException to excludedClasses.txt for geode-core AnalyzeSerializablesJUnitTest. --- .../geode/codeAnalysis/excludedClasses.txt | 1 + .../internal/ClusterAlertMessaging.java | 25 +- .../internal/spi/AlertingIOException.java | 32 +++ .../apache/geode/internal/tcp/TCPConduit.java | 41 +++- .../internal/ClusterAlertMessagingTest.java | 96 +++++++- .../geode/internal/tcp/TCPConduitTest.java | 219 ++++++++++++++++++ 6 files changed, 389 insertions(+), 25 deletions(-) create mode 100644 geode-core/src/main/java/org/apache/geode/alerting/internal/spi/AlertingIOException.java create mode 100644 geode-core/src/test/java/org/apache/geode/internal/tcp/TCPConduitTest.java diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt index 9e997af434e0..a46d5fcf5363 100644 --- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt +++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt @@ -2,6 +2,7 @@ org/apache/geode/GemFireCacheException org/apache/geode/admin/AlertLevel org/apache/geode/alerting/internal/AlertingSession$State org/apache/geode/alerting/internal/spi/AlertLevel +org/apache/geode/alerting/internal/spi/AlertingIOException org/apache/geode/cache/operations/internal/UpdateOnlyMap org/apache/geode/cache/query/internal/index/CompactRangeIndex$1 org/apache/geode/cache/query/internal/DefaultQuery$TestHook$SPOTS diff --git a/geode-core/src/main/java/org/apache/geode/alerting/internal/ClusterAlertMessaging.java b/geode-core/src/main/java/org/apache/geode/alerting/internal/ClusterAlertMessaging.java index f5fcba5fe799..e0ddadf85f3e 100644 --- a/geode-core/src/main/java/org/apache/geode/alerting/internal/ClusterAlertMessaging.java +++ b/geode-core/src/main/java/org/apache/geode/alerting/internal/ClusterAlertMessaging.java @@ -18,11 +18,13 @@ import java.time.Instant; import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; import org.apache.logging.log4j.Logger; import org.apache.geode.alerting.internal.spi.AlertLevel; import org.apache.geode.alerting.internal.spi.AlertingAction; +import org.apache.geode.alerting.internal.spi.AlertingIOException; import org.apache.geode.annotations.VisibleForTesting; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.ClusterDistributionManager; @@ -34,29 +36,38 @@ public class ClusterAlertMessaging implements AlertMessaging { - private static final Logger logger = LogService.getLogger(); + private static final Logger LOGGER = LogService.getLogger(); private final InternalDistributedSystem system; private final DistributionManager dm; private final AlertListenerMessageFactory alertListenerMessageFactory; private final ExecutorService executor; + private final Consumer alertingIOExceptionLogger; public ClusterAlertMessaging(final InternalDistributedSystem system) { this(system, system.getDistributionManager(), new AlertListenerMessageFactory(), - newFixedThreadPool("AlertingMessaging Processor", true, 1)); + newFixedThreadPool("AlertingMessaging Processor", true, 1), + LOGGER::warn); } @VisibleForTesting - ClusterAlertMessaging(final InternalDistributedSystem system, - final DistributionManager dm, + ClusterAlertMessaging(final InternalDistributedSystem system, final DistributionManager dm, final AlertListenerMessageFactory alertListenerMessageFactory, final ExecutorService executor) { + this(system, dm, alertListenerMessageFactory, executor, LOGGER::warn); + } + + @VisibleForTesting + ClusterAlertMessaging(final InternalDistributedSystem system, final DistributionManager dm, + final AlertListenerMessageFactory alertListenerMessageFactory, final ExecutorService executor, + final Consumer alertingIOExceptionLogger) { this.system = system; this.dm = dm; this.alertListenerMessageFactory = alertListenerMessageFactory; this.executor = executor; + this.alertingIOExceptionLogger = alertingIOExceptionLogger; } @Override @@ -77,7 +88,7 @@ public void sendAlert(final DistributedMember member, if (member.equals(system.getDistributedMember())) { // process in local member - logger.debug("Processing local alert message: {}, {}, {}, {}, {}, {}, [{}], [{}].", + LOGGER.debug("Processing local alert message: {}, {}, {}, {}, {}, {}, [{}], [{}].", member, alertLevel, timestamp, connectionName, threadName, threadId, formattedMessage, stackTrace); @@ -85,7 +96,7 @@ public void sendAlert(final DistributedMember member, } else { // send to remote member - logger.debug("Sending remote alert message: {}, {}, {}, {}, {}, {}, [{}], [{}].", + LOGGER.debug("Sending remote alert message: {}, {}, {}, {}, {}, {}, [{}], [{}].", member, alertLevel, timestamp, connectionName, threadName, threadId, formattedMessage, stackTrace); @@ -94,6 +105,8 @@ public void sendAlert(final DistributedMember member, } catch (ReenteredConnectException ignore) { // OK. We can't send to this recipient because we're in the middle of // trying to connect to it. + } catch (AlertingIOException e) { + alertingIOExceptionLogger.accept(e); } })); } diff --git a/geode-core/src/main/java/org/apache/geode/alerting/internal/spi/AlertingIOException.java b/geode-core/src/main/java/org/apache/geode/alerting/internal/spi/AlertingIOException.java new file mode 100644 index 000000000000..09f8fb94ada6 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/alerting/internal/spi/AlertingIOException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geode.alerting.internal.spi; + +import java.io.IOException; +import java.io.UncheckedIOException; + +/** + * Wraps an {@link IOException} that is thrown while attempting to notify an alert listener. + */ +public class AlertingIOException extends UncheckedIOException { + + private static final long serialVersionUID = 3702403276743962841L; + + public AlertingIOException(IOException cause) { + super(cause); + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java index e10b41d42a3b..a816895ae02c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java @@ -31,6 +31,7 @@ import java.nio.channels.SocketChannel; import java.util.Map; import java.util.Properties; +import java.util.function.Function; import org.apache.logging.log4j.Logger; @@ -38,6 +39,8 @@ import org.apache.geode.CancelException; import org.apache.geode.SystemFailure; import org.apache.geode.alerting.internal.spi.AlertingAction; +import org.apache.geode.alerting.internal.spi.AlertingIOException; +import org.apache.geode.annotations.VisibleForTesting; import org.apache.geode.annotations.internal.MakeNotStatic; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.DistributedSystemDisconnectedException; @@ -215,6 +218,23 @@ public class TCPConduit implements Runnable { */ public TCPConduit(Membership mgr, int port, InetAddress address, boolean isBindAddress, DirectChannel receiver, Properties props) throws ConnectionException { + this(mgr, port, address, isBindAddress, receiver, props, ConnectionTable::create, + SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER), + () -> { + try { + LocalHostUtil.getLocalHost(); + } catch (UnknownHostException e) { + throw new ConnectionException("Unable to resolve localHost address", e); + } + }, + true); + } + + @VisibleForTesting + TCPConduit(Membership mgr, int port, InetAddress address, boolean isBindAddress, + DirectChannel receiver, Properties props, + Function connectionTableFactory, SocketCreator socketCreator, + Runnable localHostValidation, boolean startAcceptor) throws ConnectionException { parseProperties(props); this.address = address; @@ -232,21 +252,18 @@ public TCPConduit(Membership mgr, int port, InetAddress address, boolean isBindA stats = new LonerDistributionManager.DummyDMStats(); } - conTable = ConnectionTable.create(this); + conTable = connectionTableFactory.apply(this); - socketCreator = - SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER); + this.socketCreator = socketCreator; useSSL = socketCreator.useSSL(); if (address == null) { - try { - LocalHostUtil.getLocalHost(); - } catch (UnknownHostException e) { - throw new ConnectionException("Unable to resolve localHost address", e); - } + localHostValidation.run(); } - startAcceptor(); + if (startAcceptor) { + startAcceptor(); + } } public static void init() { @@ -858,10 +875,10 @@ public Connection getConnection(InternalDistributedMember memberAddress, } if (breakLoop) { - if (!problem.getMessage().startsWith("Cannot form connection to alert listener")) { - logger.warn("Throwing IOException after finding breakLoop=true", problem); - } if (problem instanceof IOException) { + if (problem.getMessage().startsWith("Cannot form connection to alert listener")) { + throw new AlertingIOException((IOException) problem); + } throw (IOException) problem; } throw new IOException( diff --git a/geode-core/src/test/java/org/apache/geode/alerting/internal/ClusterAlertMessagingTest.java b/geode-core/src/test/java/org/apache/geode/alerting/internal/ClusterAlertMessagingTest.java index 5dd5687fcc82..4471ed37e4e2 100644 --- a/geode-core/src/test/java/org/apache/geode/alerting/internal/ClusterAlertMessagingTest.java +++ b/geode-core/src/test/java/org/apache/geode/alerting/internal/ClusterAlertMessagingTest.java @@ -14,27 +14,33 @@ */ package org.apache.geode.alerting.internal; +import static org.apache.geode.internal.cache.util.UncheckedUtils.cast; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.catchThrowable; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; +import java.io.IOException; import java.time.Instant; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.function.Consumer; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.ArgumentCaptor; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; import org.mockito.quality.Strictness; @@ -42,6 +48,7 @@ import org.apache.geode.alerting.internal.spi.AlertLevel; import org.apache.geode.alerting.internal.spi.AlertingAction; +import org.apache.geode.alerting.internal.spi.AlertingIOException; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.ClusterDistributionManager; import org.apache.geode.distributed.internal.DistributionConfig; @@ -90,14 +97,14 @@ public void sendAlertProcessesMessageIfMemberIsLocal() { @Test public void sendAlertSendsMessageIfMemberIsRemote() { - DistributionManager dm = mock(ClusterDistributionManager.class); + DistributionManager distributionManager = mock(ClusterDistributionManager.class); ClusterAlertMessaging clusterAlertMessaging = - spyClusterAlertMessaging(dm, currentThreadExecutorService()); + spyClusterAlertMessaging(distributionManager, currentThreadExecutorService()); clusterAlertMessaging.sendAlert(remoteMember, AlertLevel.WARNING, Instant.now(), "threadName", Thread.currentThread().getId(), "formattedMessage", "stackTrace"); - verify(dm).putOutgoing(eq(alertListenerMessage)); + verify(distributionManager).putOutgoing(eq(alertListenerMessage)); } @Test @@ -118,10 +125,11 @@ public void sendAlertUsesAlertingAction() { ClusterDistributionManager distributionManager = mock(ClusterDistributionManager.class); ClusterAlertMessaging clusterAlertMessaging = spyClusterAlertMessaging(distributionManager, executor); - when(distributionManager.putOutgoing(any())).thenAnswer(invocation -> { - assertThat(AlertingAction.isThreadAlerting()).isTrue(); - return null; - }); + when(distributionManager.putOutgoing(any())) + .thenAnswer(invocation -> { + assertThat(AlertingAction.isThreadAlerting()).isTrue(); + return null; + }); clusterAlertMessaging.sendAlert(remoteMember, AlertLevel.WARNING, Instant.now(), "threadName", Thread.currentThread().getId(), "formattedMessage", "stackTrace"); @@ -129,6 +137,64 @@ public void sendAlertUsesAlertingAction() { verify(distributionManager).putOutgoing(any()); } + @Test + public void sendAlertLogsWarning_ifAlertingIOExceptionIsCaught() { + ExecutorService executor = currentThreadExecutorService(); + ClusterDistributionManager distributionManager = mock(ClusterDistributionManager.class); + Consumer alertingIOExceptionLogger = cast(mock(Consumer.class)); + ClusterAlertMessaging clusterAlertMessaging = + spyClusterAlertMessaging(distributionManager, executor, alertingIOExceptionLogger); + doThrow(new AlertingIOException(new IOException("Cannot form connection to alert listener"))) + .when(distributionManager).putOutgoing(any()); + + clusterAlertMessaging.sendAlert(remoteMember, AlertLevel.WARNING, Instant.now(), "threadName", + Thread.currentThread().getId(), "formattedMessage", "stackTrace"); + + ArgumentCaptor captor = ArgumentCaptor.forClass(AlertingIOException.class); + verify(alertingIOExceptionLogger).accept(captor.capture()); + + assertThat(captor.getValue()) + .isInstanceOf(AlertingIOException.class) + .hasMessageContaining("Cannot form connection to alert listener"); + } + + @Test + public void sendAlertLogsWarningOnce_ifAlertingIOExceptionIsCaught() { + ExecutorService executor = currentThreadExecutorService(); + ClusterDistributionManager distributionManager = mock(ClusterDistributionManager.class); + Consumer alertingIOExceptionLogger = cast(mock(Consumer.class)); + ClusterAlertMessaging clusterAlertMessaging = + spyClusterAlertMessaging(distributionManager, executor, alertingIOExceptionLogger); + doThrow(new AlertingIOException(new IOException("Cannot form connection to alert listener"))) + .when(distributionManager).putOutgoing(any()); + + clusterAlertMessaging.sendAlert(remoteMember, AlertLevel.WARNING, Instant.now(), "threadName", + Thread.currentThread().getId(), "formattedMessage", "stackTrace"); + + ArgumentCaptor captor = ArgumentCaptor.forClass(AlertingIOException.class); + verify(alertingIOExceptionLogger).accept(captor.capture()); + + assertThat(captor.getAllValues()).hasSize(1); + } + + @Test + public void sendAlertDoesNotSend_ifAlertingIOExceptionIsCaught() { + ExecutorService executor = currentThreadExecutorService(); + ClusterDistributionManager distributionManager = mock(ClusterDistributionManager.class); + ClusterAlertMessaging clusterAlertMessaging = + spyClusterAlertMessaging(distributionManager, executor); + when(distributionManager.putOutgoing(any())) + .thenAnswer(invocation -> { + assertThat(AlertingAction.isThreadAlerting()).isTrue(); + return null; + }); + + clusterAlertMessaging.sendAlert(remoteMember, AlertLevel.WARNING, Instant.now(), "threadName", + Thread.currentThread().getId(), "formattedMessage", "stackTrace"); + + verifyZeroInteractions(distributionManager); + } + @Test public void processAlertListenerMessage_requires_ClusterDistributionManager() { ClusterAlertMessaging clusterAlertMessaging = spy(new ClusterAlertMessaging(system, @@ -156,6 +222,22 @@ private ClusterAlertMessaging spyClusterAlertMessaging(DistributionManager distr executorService)); } + private ClusterAlertMessaging spyClusterAlertMessaging(DistributionManager distributionManager, + ExecutorService executorService, Consumer alertingIOExceptionLogger) { + when(alertListenerMessageFactory.createAlertListenerMessage(any(DistributedMember.class), + any(AlertLevel.class), any(Instant.class), anyString(), anyString(), anyLong(), anyString(), + anyString())) + .thenReturn(alertListenerMessage); + when(config.getName()) + .thenReturn("name"); + when(system.getConfig()) + .thenReturn(config); + when(system.getDistributedMember()) + .thenReturn(localMember); + return spy(new ClusterAlertMessaging(system, distributionManager, alertListenerMessageFactory, + executorService, alertingIOExceptionLogger)); + } + private ExecutorService currentThreadExecutorService() { ExecutorService executor = mock(ExecutorService.class); when(executor.submit(isA(Runnable.class))) diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/TCPConduitTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/TCPConduitTest.java new file mode 100644 index 000000000000..33c6ea1a159a --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/TCPConduitTest.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geode.internal.tcp; + +import static org.apache.geode.internal.cache.util.UncheckedUtils.cast; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchThrowable; +import static org.mockito.Mockito.anyBoolean; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.same; +import static org.mockito.Mockito.when; +import static org.mockito.quality.Strictness.STRICT_STUBS; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.Properties; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +import org.apache.geode.alerting.internal.spi.AlertingAction; +import org.apache.geode.alerting.internal.spi.AlertingIOException; +import org.apache.geode.distributed.DistributedSystemDisconnectedException; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.direct.DirectChannel; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.distributed.internal.membership.api.Membership; +import org.apache.geode.internal.inet.LocalHostUtil; +import org.apache.geode.internal.net.SocketCreator; + +public class TCPConduitTest { + + private Membership membership; + private DirectChannel directChannel; + private InetAddress localHost; + private ConnectionTable connectionTable; + private SocketCreator socketCreator; + + @Rule + public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(STRICT_STUBS); + + @Before + public void setUp() throws Exception { + membership = cast(mock(Membership.class)); + directChannel = mock(DirectChannel.class); + connectionTable = mock(ConnectionTable.class); + socketCreator = mock(SocketCreator.class); + localHost = LocalHostUtil.getLocalHost(); + + when(directChannel.getDM()) + .thenReturn(mock(DistributionManager.class)); + } + + @Test + public void getConnectionThrowsAlertingIOException_ifCaughtIOException_whileAlerting() + throws Exception { + TCPConduit tcpConduit = + new TCPConduit(membership, 0, localHost, false, directChannel, new Properties(), + TCPConduit -> connectionTable, socketCreator, doNothing(), false); + InternalDistributedMember member = mock(InternalDistributedMember.class); + doThrow(new IOException("Cannot form connection to alert listener")) + .when(connectionTable).get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong()); + when(membership.memberExists(eq(member))) + .thenReturn(true); + when(membership.isShunned(same(member))) + .thenReturn(false); + + AlertingAction.execute(() -> { + Throwable thrown = catchThrowable(() -> { + tcpConduit.getConnection(member, false, false, 0L, 0L, 0L); + }); + + assertThat(thrown) + .isInstanceOf(AlertingIOException.class); + }); + } + + @Test + public void getConnectionRethrows_ifCaughtIOException_whileNotAlerting() throws Exception { + TCPConduit tcpConduit = + new TCPConduit(membership, 0, localHost, false, directChannel, new Properties(), + TCPConduit -> connectionTable, socketCreator, doNothing(), false); + InternalDistributedMember member = mock(InternalDistributedMember.class); + Connection connection = mock(Connection.class); + when(connection.getRemoteAddress()) + .thenReturn(member); + doThrow(new IOException("Cannot form connection to alert listener")) + // getConnection will loop indefinitely until connectionTable returns connection + .doReturn(connection) + .when(connectionTable).get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong()); + when(membership.memberExists(eq(member))) + .thenReturn(true); + when(membership.isShunned(same(member))) + .thenReturn(false); + + Connection value = tcpConduit.getConnection(member, false, false, 0L, 0L, 0L); + + assertThat(value) + .isSameAs(connection); + } + + @Test + public void getConnectionRethrows_ifCaughtIOException_whenMemberDoesNotExist() throws Exception { + TCPConduit tcpConduit = + new TCPConduit(membership, 0, localHost, false, directChannel, new Properties(), + TCPConduit -> connectionTable, socketCreator, doNothing(), false); + InternalDistributedMember member = mock(InternalDistributedMember.class); + doThrow(new IOException("Cannot form connection to alert listener")) + .when(connectionTable).get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong()); + when(membership.memberExists(eq(member))) + .thenReturn(false); + + Throwable thrown = catchThrowable(() -> { + tcpConduit.getConnection(member, false, false, 0L, 0L, 0L); + }); + + assertThat(thrown) + .isInstanceOf(IOException.class) + .isNotInstanceOf(AlertingIOException.class); + } + + @Test + public void getConnectionRethrows_ifCaughtIOException_whenMemberIsShunned() throws Exception { + TCPConduit tcpConduit = + new TCPConduit(membership, 0, localHost, false, directChannel, new Properties(), + TCPConduit -> connectionTable, socketCreator, doNothing(), false); + InternalDistributedMember member = mock(InternalDistributedMember.class); + doThrow(new IOException("Cannot form connection to alert listener")) + .when(connectionTable).get(same(member), anyBoolean(), anyLong(), anyLong(), anyLong()); + when(membership.memberExists(same(member))) + .thenReturn(true); + when(membership.isShunned(same(member))) + .thenReturn(true); + + Throwable thrown = catchThrowable(() -> { + tcpConduit.getConnection(member, false, false, 0L, 0L, 0L); + }); + + assertThat(thrown) + .isInstanceOf(IOException.class) + .isNotInstanceOf(AlertingIOException.class); + } + + @Test + public void getConnectionThrowsDistributedSystemDisconnectedException_ifCaughtIOException_whenShutdownIsInProgress() + throws Exception { + TCPConduit tcpConduit = + new TCPConduit(membership, 0, localHost, false, directChannel, new Properties(), + TCPConduit -> connectionTable, socketCreator, doNothing(), false); + InternalDistributedMember member = mock(InternalDistributedMember.class); + doThrow(new IOException("Cannot form connection to alert listener")) + .when(connectionTable).get(same(member), anyBoolean(), anyLong(), anyLong(), anyLong()); + when(membership.memberExists(same(member))) + .thenReturn(true); + when(membership.isShunned(same(member))) + .thenReturn(false); + when(membership.shutdownInProgress()) + .thenReturn(true); + + Throwable thrown = catchThrowable(() -> { + tcpConduit.getConnection(member, false, false, 0L, 0L, 0L); + }); + + assertThat(thrown) + .isInstanceOf(DistributedSystemDisconnectedException.class) + .hasMessage("Abandoned because shutdown is in progress"); + } + + @Test + public void getConnectionThrowsDistributedSystemDisconnectedException_ifCaughtIOException_whenShutdownIsInProgress_andCancelIsInProgress() + throws Exception { + TCPConduit tcpConduit = + new TCPConduit(membership, 0, localHost, false, directChannel, new Properties(), + TCPConduit -> connectionTable, socketCreator, doNothing(), false); + InternalDistributedMember member = mock(InternalDistributedMember.class); + doThrow(new IOException("Cannot form connection to alert listener")) + .when(connectionTable).get(same(member), anyBoolean(), anyLong(), anyLong(), anyLong()); + when(membership.memberExists(same(member))) + .thenReturn(true); + when(membership.isShunned(same(member))) + .thenReturn(false); + when(membership.shutdownInProgress()) + .thenReturn(true); + + Throwable thrown = catchThrowable(() -> { + tcpConduit.getConnection(member, false, false, 0L, 0L, 0L); + }); + + assertThat(thrown) + .isInstanceOf(DistributedSystemDisconnectedException.class) + .hasMessage("Abandoned because shutdown is in progress"); + } + + private Runnable doNothing() { + return () -> { + // nothing + }; + } +}