Skip to content

Commit

Permalink
GEODE-7810: Change alert listener connection warning from fatal (apac…
Browse files Browse the repository at this point in the history
…he#4728)

Add AlertingIOException to excludedClasses.txt for geode-core
AnalyzeSerializablesJUnitTest.
  • Loading branch information
kirklund authored Feb 26, 2020
1 parent 0eb49ab commit 387848f
Show file tree
Hide file tree
Showing 6 changed files with 389 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ org/apache/geode/GemFireCacheException
org/apache/geode/admin/AlertLevel
org/apache/geode/alerting/internal/AlertingSession$State
org/apache/geode/alerting/internal/spi/AlertLevel
org/apache/geode/alerting/internal/spi/AlertingIOException
org/apache/geode/cache/operations/internal/UpdateOnlyMap
org/apache/geode/cache/query/internal/index/CompactRangeIndex$1
org/apache/geode/cache/query/internal/DefaultQuery$TestHook$SPOTS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

import java.time.Instant;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;

import org.apache.logging.log4j.Logger;

import org.apache.geode.alerting.internal.spi.AlertLevel;
import org.apache.geode.alerting.internal.spi.AlertingAction;
import org.apache.geode.alerting.internal.spi.AlertingIOException;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
Expand All @@ -34,29 +36,38 @@

public class ClusterAlertMessaging implements AlertMessaging {

private static final Logger logger = LogService.getLogger();
private static final Logger LOGGER = LogService.getLogger();

private final InternalDistributedSystem system;
private final DistributionManager dm;
private final AlertListenerMessageFactory alertListenerMessageFactory;
private final ExecutorService executor;
private final Consumer<AlertingIOException> alertingIOExceptionLogger;

public ClusterAlertMessaging(final InternalDistributedSystem system) {
this(system,
system.getDistributionManager(),
new AlertListenerMessageFactory(),
newFixedThreadPool("AlertingMessaging Processor", true, 1));
newFixedThreadPool("AlertingMessaging Processor", true, 1),
LOGGER::warn);
}

@VisibleForTesting
ClusterAlertMessaging(final InternalDistributedSystem system,
final DistributionManager dm,
ClusterAlertMessaging(final InternalDistributedSystem system, final DistributionManager dm,
final AlertListenerMessageFactory alertListenerMessageFactory,
final ExecutorService executor) {
this(system, dm, alertListenerMessageFactory, executor, LOGGER::warn);
}

@VisibleForTesting
ClusterAlertMessaging(final InternalDistributedSystem system, final DistributionManager dm,
final AlertListenerMessageFactory alertListenerMessageFactory, final ExecutorService executor,
final Consumer<AlertingIOException> alertingIOExceptionLogger) {
this.system = system;
this.dm = dm;
this.alertListenerMessageFactory = alertListenerMessageFactory;
this.executor = executor;
this.alertingIOExceptionLogger = alertingIOExceptionLogger;
}

@Override
Expand All @@ -77,15 +88,15 @@ public void sendAlert(final DistributedMember member,

if (member.equals(system.getDistributedMember())) {
// process in local member
logger.debug("Processing local alert message: {}, {}, {}, {}, {}, {}, [{}], [{}].",
LOGGER.debug("Processing local alert message: {}, {}, {}, {}, {}, {}, [{}], [{}].",
member, alertLevel, timestamp, connectionName, threadName, threadId,
formattedMessage,
stackTrace);
processAlertListenerMessage(message);

} else {
// send to remote member
logger.debug("Sending remote alert message: {}, {}, {}, {}, {}, {}, [{}], [{}].",
LOGGER.debug("Sending remote alert message: {}, {}, {}, {}, {}, {}, [{}], [{}].",
member, alertLevel, timestamp, connectionName, threadName, threadId,
formattedMessage,
stackTrace);
Expand All @@ -94,6 +105,8 @@ public void sendAlert(final DistributedMember member,
} catch (ReenteredConnectException ignore) {
// OK. We can't send to this recipient because we're in the middle of
// trying to connect to it.
} catch (AlertingIOException e) {
alertingIOExceptionLogger.accept(e);
}
}));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geode.alerting.internal.spi;

import java.io.IOException;
import java.io.UncheckedIOException;

/**
* Wraps an {@link IOException} that is thrown while attempting to notify an alert listener.
*/
public class AlertingIOException extends UncheckedIOException {

private static final long serialVersionUID = 3702403276743962841L;

public AlertingIOException(IOException cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@
import java.nio.channels.SocketChannel;
import java.util.Map;
import java.util.Properties;
import java.util.function.Function;

import org.apache.logging.log4j.Logger;

import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
import org.apache.geode.SystemFailure;
import org.apache.geode.alerting.internal.spi.AlertingAction;
import org.apache.geode.alerting.internal.spi.AlertingIOException;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
Expand Down Expand Up @@ -215,6 +218,23 @@ public class TCPConduit implements Runnable {
*/
public TCPConduit(Membership mgr, int port, InetAddress address, boolean isBindAddress,
DirectChannel receiver, Properties props) throws ConnectionException {
this(mgr, port, address, isBindAddress, receiver, props, ConnectionTable::create,
SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER),
() -> {
try {
LocalHostUtil.getLocalHost();
} catch (UnknownHostException e) {
throw new ConnectionException("Unable to resolve localHost address", e);
}
},
true);
}

@VisibleForTesting
TCPConduit(Membership mgr, int port, InetAddress address, boolean isBindAddress,
DirectChannel receiver, Properties props,
Function<TCPConduit, ConnectionTable> connectionTableFactory, SocketCreator socketCreator,
Runnable localHostValidation, boolean startAcceptor) throws ConnectionException {
parseProperties(props);

this.address = address;
Expand All @@ -232,21 +252,18 @@ public TCPConduit(Membership mgr, int port, InetAddress address, boolean isBindA
stats = new LonerDistributionManager.DummyDMStats();
}

conTable = ConnectionTable.create(this);
conTable = connectionTableFactory.apply(this);

socketCreator =
SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER);
this.socketCreator = socketCreator;
useSSL = socketCreator.useSSL();

if (address == null) {
try {
LocalHostUtil.getLocalHost();
} catch (UnknownHostException e) {
throw new ConnectionException("Unable to resolve localHost address", e);
}
localHostValidation.run();
}

startAcceptor();
if (startAcceptor) {
startAcceptor();
}
}

public static void init() {
Expand Down Expand Up @@ -858,10 +875,10 @@ public Connection getConnection(InternalDistributedMember memberAddress,
}

if (breakLoop) {
if (!problem.getMessage().startsWith("Cannot form connection to alert listener")) {
logger.warn("Throwing IOException after finding breakLoop=true", problem);
}
if (problem instanceof IOException) {
if (problem.getMessage().startsWith("Cannot form connection to alert listener")) {
throw new AlertingIOException((IOException) problem);
}
throw (IOException) problem;
}
throw new IOException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,41 @@
*/
package org.apache.geode.alerting.internal;

import static org.apache.geode.internal.cache.util.UncheckedUtils.cast;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowable;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Consumer;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.ArgumentCaptor;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.quality.Strictness;
import org.mockito.stubbing.Answer;

import org.apache.geode.alerting.internal.spi.AlertLevel;
import org.apache.geode.alerting.internal.spi.AlertingAction;
import org.apache.geode.alerting.internal.spi.AlertingIOException;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionConfig;
Expand Down Expand Up @@ -90,14 +97,14 @@ public void sendAlertProcessesMessageIfMemberIsLocal() {

@Test
public void sendAlertSendsMessageIfMemberIsRemote() {
DistributionManager dm = mock(ClusterDistributionManager.class);
DistributionManager distributionManager = mock(ClusterDistributionManager.class);
ClusterAlertMessaging clusterAlertMessaging =
spyClusterAlertMessaging(dm, currentThreadExecutorService());
spyClusterAlertMessaging(distributionManager, currentThreadExecutorService());

clusterAlertMessaging.sendAlert(remoteMember, AlertLevel.WARNING, Instant.now(), "threadName",
Thread.currentThread().getId(), "formattedMessage", "stackTrace");

verify(dm).putOutgoing(eq(alertListenerMessage));
verify(distributionManager).putOutgoing(eq(alertListenerMessage));
}

@Test
Expand All @@ -118,17 +125,76 @@ public void sendAlertUsesAlertingAction() {
ClusterDistributionManager distributionManager = mock(ClusterDistributionManager.class);
ClusterAlertMessaging clusterAlertMessaging =
spyClusterAlertMessaging(distributionManager, executor);
when(distributionManager.putOutgoing(any())).thenAnswer(invocation -> {
assertThat(AlertingAction.isThreadAlerting()).isTrue();
return null;
});
when(distributionManager.putOutgoing(any()))
.thenAnswer(invocation -> {
assertThat(AlertingAction.isThreadAlerting()).isTrue();
return null;
});

clusterAlertMessaging.sendAlert(remoteMember, AlertLevel.WARNING, Instant.now(), "threadName",
Thread.currentThread().getId(), "formattedMessage", "stackTrace");

verify(distributionManager).putOutgoing(any());
}

@Test
public void sendAlertLogsWarning_ifAlertingIOExceptionIsCaught() {
ExecutorService executor = currentThreadExecutorService();
ClusterDistributionManager distributionManager = mock(ClusterDistributionManager.class);
Consumer<AlertingIOException> alertingIOExceptionLogger = cast(mock(Consumer.class));
ClusterAlertMessaging clusterAlertMessaging =
spyClusterAlertMessaging(distributionManager, executor, alertingIOExceptionLogger);
doThrow(new AlertingIOException(new IOException("Cannot form connection to alert listener")))
.when(distributionManager).putOutgoing(any());

clusterAlertMessaging.sendAlert(remoteMember, AlertLevel.WARNING, Instant.now(), "threadName",
Thread.currentThread().getId(), "formattedMessage", "stackTrace");

ArgumentCaptor<AlertingIOException> captor = ArgumentCaptor.forClass(AlertingIOException.class);
verify(alertingIOExceptionLogger).accept(captor.capture());

assertThat(captor.getValue())
.isInstanceOf(AlertingIOException.class)
.hasMessageContaining("Cannot form connection to alert listener");
}

@Test
public void sendAlertLogsWarningOnce_ifAlertingIOExceptionIsCaught() {
ExecutorService executor = currentThreadExecutorService();
ClusterDistributionManager distributionManager = mock(ClusterDistributionManager.class);
Consumer<AlertingIOException> alertingIOExceptionLogger = cast(mock(Consumer.class));
ClusterAlertMessaging clusterAlertMessaging =
spyClusterAlertMessaging(distributionManager, executor, alertingIOExceptionLogger);
doThrow(new AlertingIOException(new IOException("Cannot form connection to alert listener")))
.when(distributionManager).putOutgoing(any());

clusterAlertMessaging.sendAlert(remoteMember, AlertLevel.WARNING, Instant.now(), "threadName",
Thread.currentThread().getId(), "formattedMessage", "stackTrace");

ArgumentCaptor<AlertingIOException> captor = ArgumentCaptor.forClass(AlertingIOException.class);
verify(alertingIOExceptionLogger).accept(captor.capture());

assertThat(captor.getAllValues()).hasSize(1);
}

@Test
public void sendAlertDoesNotSend_ifAlertingIOExceptionIsCaught() {
ExecutorService executor = currentThreadExecutorService();
ClusterDistributionManager distributionManager = mock(ClusterDistributionManager.class);
ClusterAlertMessaging clusterAlertMessaging =
spyClusterAlertMessaging(distributionManager, executor);
when(distributionManager.putOutgoing(any()))
.thenAnswer(invocation -> {
assertThat(AlertingAction.isThreadAlerting()).isTrue();
return null;
});

clusterAlertMessaging.sendAlert(remoteMember, AlertLevel.WARNING, Instant.now(), "threadName",
Thread.currentThread().getId(), "formattedMessage", "stackTrace");

verifyZeroInteractions(distributionManager);
}

@Test
public void processAlertListenerMessage_requires_ClusterDistributionManager() {
ClusterAlertMessaging clusterAlertMessaging = spy(new ClusterAlertMessaging(system,
Expand Down Expand Up @@ -156,6 +222,22 @@ private ClusterAlertMessaging spyClusterAlertMessaging(DistributionManager distr
executorService));
}

private ClusterAlertMessaging spyClusterAlertMessaging(DistributionManager distributionManager,
ExecutorService executorService, Consumer<AlertingIOException> alertingIOExceptionLogger) {
when(alertListenerMessageFactory.createAlertListenerMessage(any(DistributedMember.class),
any(AlertLevel.class), any(Instant.class), anyString(), anyString(), anyLong(), anyString(),
anyString()))
.thenReturn(alertListenerMessage);
when(config.getName())
.thenReturn("name");
when(system.getConfig())
.thenReturn(config);
when(system.getDistributedMember())
.thenReturn(localMember);
return spy(new ClusterAlertMessaging(system, distributionManager, alertListenerMessageFactory,
executorService, alertingIOExceptionLogger));
}

private ExecutorService currentThreadExecutorService() {
ExecutorService executor = mock(ExecutorService.class);
when(executor.submit(isA(Runnable.class)))
Expand Down
Loading

0 comments on commit 387848f

Please sign in to comment.