Skip to content

Commit

Permalink
Revert "GEODE-7710: Fix race condition in federation of MXBeans (apac…
Browse files Browse the repository at this point in the history
…he#4807)" (apache#4832)

* Revert "GEODE-7710: Fix race condition in sending JMX notifications (apache#4808)"

This reverts commit ff6b0c8.

* Revert "GEODE-7710: Fix race condition in federation of MXBeans (apache#4807)"

This reverts commit 683113a.
  • Loading branch information
nabarunnag authored Apr 2, 2020
1 parent ef6fdc1 commit c5c0b0b
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 273 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPorts;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
import static org.apache.geode.test.dunit.Disconnect.disconnectAllFromDS;
import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
import static org.apache.geode.test.dunit.Invoke.invokeInEveryVM;
import static org.apache.geode.test.dunit.VM.getVM;
import static org.apache.geode.test.dunit.VM.getVMId;
import static org.apache.geode.test.dunit.VM.toArray;
Expand All @@ -59,7 +61,6 @@

import org.apache.geode.CancelException;
import org.apache.geode.ForcedDisconnectException;
import org.apache.geode.alerting.internal.spi.AlertingIOException;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.LocatorLauncher;
Expand All @@ -81,6 +82,7 @@
@SuppressWarnings("serial")
public class JMXMBeanReconnectDUnitTest implements Serializable {

private static final long TIMEOUT_MILLIS = getTimeout().getValueInMS();
private static final LocatorLauncher DUMMY_LOCATOR = mock(LocatorLauncher.class);
private static final ServerLauncher DUMMY_SERVER = mock(ServerLauncher.class);

Expand Down Expand Up @@ -155,7 +157,6 @@ public void setUp() throws Exception {
String createRegionCommand = "create region --type=REPLICATE --name=" + SEPARATOR + regionName;
gfsh.executeAndAssertThat(createRegionCommand).statusIsSuccess();

addIgnoredException(AlertingIOException.class);
addIgnoredException(CacheClosedException.class);
addIgnoredException(CancelException.class);
addIgnoredException(DistributedSystemDisconnectedException.class);
Expand Down Expand Up @@ -201,14 +202,13 @@ public void setUp() throws Exception {

@After
public void tearDown() {
for (VM vm : asList(serverVM, locator2VM, locator1VM)) {
vm.invoke(() -> {
BEFORE.get().countDown();
AFTER.get().countDown();
SERVER.getAndSet(DUMMY_SERVER).stop();
LOCATOR.getAndSet(DUMMY_LOCATOR).stop();
});
}
invokeInEveryVM(() -> {
BEFORE.get().countDown();
AFTER.get().countDown();
SERVER.getAndSet(DUMMY_SERVER).stop();
LOCATOR.getAndSet(DUMMY_LOCATOR).stop();
});
disconnectAllFromDS();
}

@Test
Expand Down Expand Up @@ -241,17 +241,15 @@ public void locatorHasMemberTypeMXBeansForBothLocators() {
await().untilAsserted(() -> {
assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
.as("GemFire mbeans on locator1")
.containsAll(expectedLocatorMXBeans(locator1Name))
.containsAll(expectedLocatorMXBeans(locator2Name));
.containsAll(expectedLocatorMXBeans(locator1Name));
});
});

locator2VM.invoke(() -> {
await().untilAsserted(() -> {
assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
.as("GemFire mbeans on locator2")
.containsAll(expectedLocatorMXBeans(locator2Name))
.containsAll(expectedLocatorMXBeans(locator1Name));
.containsAll(expectedLocatorMXBeans(locator2Name));
});
});
}
Expand Down Expand Up @@ -350,7 +348,7 @@ public void serverMXBeansOnLocatorAreRestoredAfterCrashedServerReturns() {
.isTrue();
});

system.waitUntilReconnected(getTimeout().getValueInMS(), MILLISECONDS);
system.waitUntilReconnected(TIMEOUT_MILLIS, MILLISECONDS);

await().untilAsserted(() -> {
assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
Expand Down Expand Up @@ -384,7 +382,7 @@ public void locatorMXBeansOnOtherLocatorAreRestoredAfterCrashedLocatorReturns()
@Override
public void reconnecting(InternalDistributedSystem oldSystem) {
try {
BEFORE.get().await(getTimeout().getValueInMS(), MILLISECONDS);
BEFORE.get().await(TIMEOUT_MILLIS, MILLISECONDS);
} catch (InterruptedException e) {
errorCollector.addError(e);
}
Expand Down Expand Up @@ -445,7 +443,7 @@ public void serverMXBeansAreRestoredOnBothLocatorsAfterCrashedServerReturns() {
@Override
public void reconnecting(InternalDistributedSystem oldSystem) {
try {
BEFORE.get().await(getTimeout().getValueInMS(), MILLISECONDS);
BEFORE.get().await(TIMEOUT_MILLIS, MILLISECONDS);
} catch (InterruptedException e) {
errorCollector.addError(e);
}
Expand Down Expand Up @@ -480,7 +478,7 @@ public void onReconnect(InternalDistributedSystem oldSystem,

serverVM.invoke(() -> {
BEFORE.get().countDown();
AFTER.get().await(getTimeout().getValueInMS(), MILLISECONDS);
AFTER.get().await(TIMEOUT_MILLIS, MILLISECONDS);

await().untilAsserted(() -> {
assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null))
Expand Down Expand Up @@ -509,7 +507,6 @@ public void onReconnect(InternalDistributedSystem oldSystem,
private static void startLocator(String name, File workingDirectory, int locatorPort, int jmxPort,
String locators) {
LOCATOR.set(new LocatorLauncher.Builder()
.setDeletePidFileOnStop(true)
.setMemberName(name)
.setPort(locatorPort)
.setWorkingDirectory(workingDirectory.getAbsolutePath())
Expand All @@ -535,7 +532,6 @@ private static void startLocator(String name, File workingDirectory, int locator

private static void startServer(String name, File workingDirectory, String locators) {
SERVER.set(new ServerLauncher.Builder()
.setDeletePidFileOnStop(true)
.setDisableDefaultServer(true)
.setMemberName(name)
.setWorkingDirectory(workingDirectory.getAbsolutePath())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,9 @@ public boolean hasOwnStats() {
return;
}
proxyFactory.createAllProxies(member, proxyMonitoringRegion);

managementCacheListener.markReady();
notifListener.markReady();
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("Error During GII Proxy creation", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,47 @@
import org.apache.logging.log4j.Logger;

import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.logging.internal.log4j.api.LogService;

/**
* This listener is attached to the Monitoring Region to receive any addition or deletion of MBeans.
* It updates the last refreshed time of proxy once it gets the update request from the Managed
* Node.
* This listener is attached to the Monitoring Region to receive any addition or deletion of MBEans
*
* It updates the last refreshed time of proxy once it gets the update request from the Managed Node
*
*
*/
class ManagementCacheListener extends CacheListenerAdapter<String, Object> {
public class ManagementCacheListener extends CacheListenerAdapter<String, Object> {

private static final Logger logger = LogService.getLogger();

private final MBeanProxyFactory proxyHelper;
private MBeanProxyFactory proxyHelper;

ManagementCacheListener(MBeanProxyFactory proxyHelper) {
private volatile boolean readyForEvents;

public ManagementCacheListener(MBeanProxyFactory proxyHelper) {
this.proxyHelper = proxyHelper;
this.readyForEvents = false;
}

@Override
public void afterCreate(EntryEvent<String, Object> event) {
if (!readyForEvents) {
return;
}
ObjectName objectName = null;

try {
objectName = ObjectName.getInstance(event.getKey());
Object newObject = event.getNewValue();
DistributedMember distributedMember = event.getDistributedMember();
Region<String, Object> region = event.getRegion();
proxyHelper.createProxy(distributedMember, objectName, region,
proxyHelper.createProxy(event.getDistributedMember(), objectName, event.getRegion(),
newObject);
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("Proxy Create failed for {} with exception {}", objectName, e.getMessage(), e);
}
}

}

@Override
Expand All @@ -70,12 +76,16 @@ public void afterDestroy(EntryEvent<String, Object> event) {
e);
}
}

}

@Override
public void afterUpdate(EntryEvent<String, Object> event) {
ObjectName objectName = null;
try {
if (!readyForEvents) {
return;
}
objectName = ObjectName.getInstance(event.getKey());

ProxyInfo proxyInfo = proxyHelper.findProxyInfo(objectName);
Expand All @@ -94,6 +104,13 @@ public void afterUpdate(EntryEvent<String, Object> event) {
if (logger.isDebugEnabled()) {
logger.debug("Proxy Update failed for {} with exception {}", objectName, e.getMessage(), e);
}

}

}

void markReady() {
readyForEvents = true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,101 @@
*/
package org.apache.geode.management.internal;


import javax.management.Notification;

import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.CacheListener;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.cache.RegionEvent;

/**
* This listener will be attached to each notification region corresponding to a member
*
*/
class NotificationCacheListener extends CacheListenerAdapter<NotificationKey, Notification> {
public class NotificationCacheListener implements CacheListener<NotificationKey, Notification> {

private final NotificationHubClient notificationHubClient;
/**
* For the
*/
private NotificationHubClient notifClient;

NotificationCacheListener(MBeanProxyFactory mBeanProxyFactory) {
this(new NotificationHubClient(mBeanProxyFactory));
}
private volatile boolean readyForEvents;

public NotificationCacheListener(MBeanProxyFactory proxyHelper) {

notifClient = new NotificationHubClient(proxyHelper);
this.readyForEvents = false;

@VisibleForTesting
NotificationCacheListener(NotificationHubClient notificationHubClient) {
this.notificationHubClient = notificationHubClient;
}

@Override
public void afterCreate(EntryEvent<NotificationKey, Notification> event) {
notificationHubClient.sendNotification(event);
if (!readyForEvents) {
return;
}
notifClient.sendNotification(event);

}

@Override
public void afterDestroy(EntryEvent<NotificationKey, Notification> event) {
// TODO Auto-generated method stub

}

@Override
public void afterInvalidate(EntryEvent<NotificationKey, Notification> event) {
// TODO Auto-generated method stub

}

@Override
public void afterRegionClear(RegionEvent<NotificationKey, Notification> event) {
// TODO Auto-generated method stub

}

@Override
public void afterRegionCreate(RegionEvent<NotificationKey, Notification> event) {
// TODO Auto-generated method stub

}

@Override
public void afterRegionDestroy(RegionEvent<NotificationKey, Notification> event) {
// TODO Auto-generated method stub

}

@Override
public void afterRegionInvalidate(RegionEvent<NotificationKey, Notification> event) {
// TODO Auto-generated method stub

}

@Override
public void afterRegionLive(RegionEvent<NotificationKey, Notification> event) {
// TODO Auto-generated method stub

}

@Override
public void afterUpdate(EntryEvent<NotificationKey, Notification> event) {
notificationHubClient.sendNotification(event);
if (!readyForEvents) {
return;
}
notifClient.sendNotification(event);

}

@Override
public void close() {
// TODO Auto-generated method stub

}

public void markReady() {
readyForEvents = true;
}

}
Loading

0 comments on commit c5c0b0b

Please sign in to comment.