Skip to content

Commit

Permalink
GEODE-7623: Preventing recursion in alert generation
Browse files Browse the repository at this point in the history
After the change to send alerts in a executor, there was a possibiltiy for
sending alerts to generate new alerts, resulting an in infinite loop.

Setting the AlertingAction threadlocal in the executor thread that is sending
alerts, to prevent recursion.

Adding a test that we do not generate recursive alert messages.
  • Loading branch information
upthewaterspout committed Dec 31, 2019
1 parent b7417c0 commit 36cb801
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
import static org.apache.geode.test.dunit.NetworkUtils.getServerHostName;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -120,6 +122,23 @@ public void alertMessageIsReceivedForListenerLevelWarning() {
verify(messageListener, timeout(TIMEOUT)).received(isA(AlertListenerMessage.class));
}

@Test
public void alertMessageProcessingDoesNotTriggerAdditionalAlertMessage() {
alertingService.addAlertListener(member, WARNING);
logger = spy(logger);

String recursiveAlert = "Recursive Alert";
doAnswer(invocation -> {
logger.warn(recursiveAlert);
return null;
}).when(messageListener).received(isA(AlertListenerMessage.class));

logger.warn(alertMessage);

verify(messageListener, timeout(TIMEOUT).times(1)).received(isA(AlertListenerMessage.class));
verify(logger, timeout(TIMEOUT).times(1)).warn(eq(recursiveAlert));
}

@Test
public void alertMessageIsReceivedForListenerLevelError() {
alertingService.addAlertListener(member, ERROR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.logging.log4j.Logger;

import org.apache.geode.alerting.internal.spi.AlertLevel;
import org.apache.geode.alerting.internal.spi.AlertingAction;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
Expand Down Expand Up @@ -66,7 +67,7 @@ public void sendAlert(final DistributedMember member,
final long threadId,
final String formattedMessage,
final String stackTrace) {
executor.submit(() -> {
executor.submit(() -> AlertingAction.execute(() -> {
try {
String connectionName = system.getConfig().getName();

Expand All @@ -77,22 +78,24 @@ public void sendAlert(final DistributedMember member,
if (member.equals(system.getDistributedMember())) {
// process in local member
logger.debug("Processing local alert message: {}, {}, {}, {}, {}, {}, [{}], [{}].",
member, alertLevel, timestamp, connectionName, threadName, threadId, formattedMessage,
member, alertLevel, timestamp, connectionName, threadName, threadId,
formattedMessage,
stackTrace);
processAlertListenerMessage(message);

} else {
// send to remote member
logger.debug("Sending remote alert message: {}, {}, {}, {}, {}, {}, [{}], [{}].",
member, alertLevel, timestamp, connectionName, threadName, threadId, formattedMessage,
member, alertLevel, timestamp, connectionName, threadName, threadId,
formattedMessage,
stackTrace);
dm.putOutgoing(message);
}
} catch (ReenteredConnectException ignore) {
// OK. We can't send to this recipient because we're in the middle of
// trying to connect to it.
}
});
}));
}

public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.mockito.stubbing.Answer;

import org.apache.geode.alerting.internal.spi.AlertLevel;
import org.apache.geode.alerting.internal.spi.AlertingAction;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionConfig;
Expand Down Expand Up @@ -111,6 +112,23 @@ public void sendAlertUsesExecutorService() {
verify(executor).submit(any(Runnable.class));
}

@Test
public void sendAlertUsesAlertingAction() {
ExecutorService executor = currentThreadExecutorService();
ClusterDistributionManager distributionManager = mock(ClusterDistributionManager.class);
ClusterAlertMessaging clusterAlertMessaging =
spyClusterAlertMessaging(distributionManager, executor);
when(distributionManager.putOutgoing(any())).thenAnswer(invocation -> {
assertThat(AlertingAction.isThreadAlerting()).isTrue();
return null;
});

clusterAlertMessaging.sendAlert(remoteMember, AlertLevel.WARNING, Instant.now(), "threadName",
Thread.currentThread().getId(), "formattedMessage", "stackTrace");

verify(distributionManager).putOutgoing(any());
}

@Test
public void processAlertListenerMessage_requires_ClusterDistributionManager() {
ClusterAlertMessaging clusterAlertMessaging = spy(new ClusterAlertMessaging(system,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void append(final LogEvent event) {
LOGGER.trace("Skipping append of {} because {} is alerting.", event, Thread.currentThread());
return;
}
AlertingAction.execute(() -> doAppend(event));
doAppend(event);
}

private void doAppend(final LogEvent event) {
Expand Down

0 comments on commit 36cb801

Please sign in to comment.