Skip to content

Commit

Permalink
GEODE-8521: detect if a p2p reader thread is hung (apache#5763)
Browse files Browse the repository at this point in the history
* AbstractExecutor can now be suspended and resumed.
This does nothing on the base class but on P2PReaderExecutorGroup
it turns monitoring on and off.
Also the monitor thread will now detect a zero startTime
and in that case set the startTime allowing threads being
monitored to not keep setting it themselves (which happens more often).

* the timeInterval and timeLimit that come from gemfire properties
will now always be the values configured on the DistributedSystem.
Previously the thread monitoring classes had their own instance of
DistributionConfigImpl which at least in some cases could be inconsistent
the the config on the DistributedSystem. Now these two config values
are passed in to the constructor.

* If an instance of ResourceManagerStats can not be found then
it will now only prevent the monitor from updating the "numThreadsStuck"
stat. Before it prevented it from doing any detection and logging.
The lazy initialization of the ResourceManagerStats is now cleaner and does
not require the integration test to explicitly call run().

Co-authored-by: Darrel Schneider <[email protected]>
  • Loading branch information
dschneider-pivotal authored Dec 4, 2020
1 parent 4e206f0 commit e26d759
Show file tree
Hide file tree
Showing 28 changed files with 523 additions and 220 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import org.apache.geode.cache.CacheFactory;
import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.distributed.internal.DistributionConfigImpl;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.monitoring.executor.AbstractExecutor;
Expand Down Expand Up @@ -56,70 +55,126 @@ private void initInternalDistributedSystem() {
nonDefault = new Properties();
nonDefault.put(ConfigurationProperties.MCAST_PORT, "0");
nonDefault.put(ConfigurationProperties.LOCATORS, "");
nonDefault.put(ConfigurationProperties.THREAD_MONITOR_ENABLED, "true");
nonDefault.put(ConfigurationProperties.THREAD_MONITOR_TIME_LIMIT, "30000");

cache = (InternalCache) new CacheFactory(nonDefault).create();
}

/**
* Tests that in case no instance of internal distribution system exists dummy instance is used
*/
@Test
public void testThreadsMonitoringWorkflow() {

ThreadsMonitoring threadMonitoring = null;

DistributionManager distributionManager = cache.getDistributionManager();
if (distributionManager != null) {
threadMonitoring = distributionManager.getThreadMonitoring();
}
assertThat(distributionManager).isNotNull();
ThreadsMonitoring threadMonitoring = distributionManager.getThreadMonitoring();
assertThat(threadMonitoring).isNotNull();

assertThat(threadMonitoring).isInstanceOf(ThreadsMonitoringImpl.class);
ThreadsMonitoringImpl impl = ((ThreadsMonitoringImpl) threadMonitoring);

DistributionConfigImpl distributionConfigImpl = new DistributionConfigImpl(nonDefault);
if (distributionConfigImpl.getThreadMonitorEnabled() && threadMonitoring != null) {
assertThat(threadMonitoring).isInstanceOf(ThreadsMonitoringImpl.class);
ThreadsMonitoringImpl impl = ((ThreadsMonitoringImpl) threadMonitoring);
impl.getTimer().cancel();

impl.getTimer().cancel();
assertThat(impl.getThreadsMonitoringProcess().mapValidation())
.describedAs("ThreadMonitor monitoring process map validation should be false.")
.isFalse();

// to initiate ResourceManagerStats
impl.getThreadsMonitoringProcess().run();
threadMonitoring.startMonitor(ThreadsMonitoring.Mode.FunctionExecutor);

assertThat(impl.getThreadsMonitoringProcess().mapValidation())
.describedAs("ThreadMonitor monitoring process map validation should be false.")
.isFalse();
assertThat(impl.getThreadsMonitoringProcess().mapValidation())
.describedAs("ThreadMonitor monitoring process map validation should still be false.")
.isFalse();

threadMonitoring.startMonitor(ThreadsMonitoring.Mode.FunctionExecutor);
AbstractExecutor abstractExecutorGroup =
impl.getMonitorMap().get(Thread.currentThread().getId());
abstractExecutorGroup.setStartTime(abstractExecutorGroup.getStartTime()
- cache.getInternalDistributedSystem().getConfig().getThreadMonitorTimeLimit() - 1);

assertThat(impl.getThreadsMonitoringProcess().mapValidation())
.describedAs("ThreadMonitor monitoring process map validation should still be false.")
.isFalse();
assertThat(impl.getThreadsMonitoringProcess().mapValidation())
.describedAs("ThreadMonitor monitoring process map validation should now be true.")
.isTrue();
assertThat((impl.getThreadsMonitoringProcess().getResourceManagerStats().getNumThreadStuck()))
.describedAs("ThreadMonitor monitoring process should identify one stuck thread.")
.isEqualTo(1);

AbstractExecutor abstractExecutorGroup =
impl.getMonitorMap().get(Thread.currentThread().getId());
abstractExecutorGroup.setStartTime(abstractExecutorGroup.getStartTime()
- distributionConfigImpl.getThreadMonitorTimeLimit() - 1);
impl.getMonitorMap().put(abstractExecutorGroup.getThreadID() + 1, abstractExecutorGroup);
impl.getMonitorMap().put(abstractExecutorGroup.getThreadID() + 2, abstractExecutorGroup);
impl.getThreadsMonitoringProcess().mapValidation();

assertThat(impl.getThreadsMonitoringProcess().mapValidation())
.describedAs("ThreadMonitor monitoring process map validation should now be true.")
.isTrue();
assertThat((impl.getThreadsMonitoringProcess().getResourceManagerStats().getNumThreadStuck()))
.describedAs("ThreadMonitor monitoring process should identify one stuck thread.")
.isEqualTo(1);
assertThat((impl.getThreadsMonitoringProcess().getResourceManagerStats().getNumThreadStuck()))
.describedAs("ThreadMonitor monitoring process should identify three stuck threads.")
.isEqualTo(3);

impl.getMonitorMap().put(abstractExecutorGroup.getThreadID() + 1, abstractExecutorGroup);
impl.getMonitorMap().put(abstractExecutorGroup.getThreadID() + 2, abstractExecutorGroup);
impl.getThreadsMonitoringProcess().mapValidation();
threadMonitoring.endMonitor();
impl.getThreadsMonitoringProcess().mapValidation();

assertThat((impl.getThreadsMonitoringProcess().getResourceManagerStats().getNumThreadStuck()))
.describedAs("ThreadMonitor monitoring process should identify three stuck threads.")
.isEqualTo(3);
assertThat((impl.getThreadsMonitoringProcess().getResourceManagerStats().getNumThreadStuck()))
.describedAs("ThreadMonitor monitoring process should identify two stuck threads.")
.isEqualTo(2);
}

threadMonitoring.endMonitor();
impl.getThreadsMonitoringProcess().mapValidation();
@Test
public void verifySuspendResumeFunctionCorrectly() {

assertThat((impl.getThreadsMonitoringProcess().getResourceManagerStats().getNumThreadStuck()))
.describedAs("ThreadMonitor monitoring process should identify two stuck threads.")
.isEqualTo(2);
}
DistributionManager distributionManager = cache.getDistributionManager();
assertThat(distributionManager).isNotNull();
ThreadsMonitoring threadMonitoring = distributionManager.getThreadMonitoring();
assertThat(threadMonitoring).isNotNull();
final int monitorTimeLimit =
cache.getInternalDistributedSystem().getConfig().getThreadMonitorTimeLimit();

assertThat(threadMonitoring).isInstanceOf(ThreadsMonitoringImpl.class);
ThreadsMonitoringImpl impl = ((ThreadsMonitoringImpl) threadMonitoring);

impl.getTimer().cancel();

assertThat(impl.getThreadsMonitoringProcess().mapValidation())
.describedAs("ThreadMonitor monitoring process map validation should be false.")
.isFalse();

AbstractExecutor executor =
threadMonitoring.createAbstractExecutor(ThreadsMonitoring.Mode.P2PReaderExecutor);

assertThat(impl.getThreadsMonitoringProcess().mapValidation())
.describedAs("ThreadMonitor monitoring process map validation should still be false.")
.isFalse();

threadMonitoring.register(executor);
assertThat(impl.getThreadsMonitoringProcess().mapValidation())
.describedAs("ThreadMonitor monitoring process map validation should still be false.")
.isFalse();
assertThat(executor.getStartTime()).isNotZero();

executor.setStartTime(executor.getStartTime() - monitorTimeLimit - 1);
assertThat(impl.getThreadsMonitoringProcess().mapValidation())
.describedAs("ThreadMonitor monitoring process map validation should now be true.")
.isTrue();
assertThat((impl.getThreadsMonitoringProcess().getResourceManagerStats().getNumThreadStuck()))
.describedAs("ThreadMonitor monitoring process should identify one stuck thread.")
.isEqualTo(1);

executor.suspendMonitoring();
assertThat(impl.getThreadsMonitoringProcess().mapValidation())
.describedAs("ThreadMonitor monitoring process map validation should still be false.")
.isFalse();

executor.resumeMonitoring();
assertThat(executor.getStartTime()).isZero();
assertThat(impl.getThreadsMonitoringProcess().mapValidation())
.describedAs("ThreadMonitor monitoring process map validation should still be false.")
.isFalse();
assertThat(executor.getStartTime()).isNotZero();

executor.setStartTime(executor.getStartTime() - monitorTimeLimit - 1);
assertThat(impl.getThreadsMonitoringProcess().mapValidation())
.describedAs("ThreadMonitor monitoring process map validation should now be true.")
.isTrue();
assertThat((impl.getThreadsMonitoringProcess().getResourceManagerStats().getNumThreadStuck()))
.describedAs("ThreadMonitor monitoring process should identify one stuck thread.")
.isEqualTo(1);

impl.unregister(executor);
assertThat(impl.getThreadsMonitoringProcess().mapValidation())
.describedAs("ThreadMonitor monitoring process map validation should still be false.")
.isFalse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ public class ClusterOperationExecutors implements OperationExecutors {

DistributionConfig config = system.getConfig();

threadMonitor = config.getThreadMonitorEnabled() ? new ThreadsMonitoringImpl(system)
threadMonitor = config.getThreadMonitorEnabled() ? new ThreadsMonitoringImpl(system,
config.getThreadMonitorInterval(), config.getThreadMonitorTimeLimit())
: new ThreadsMonitoringImplDummy();


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ public LonerDistributionManager(InternalDistributedSystem system, InternalLogWri
DistributionConfig config = system.getConfig();

if (config.getThreadMonitorEnabled()) {
this.threadMonitor = new ThreadsMonitoringImpl(system);
this.threadMonitor = new ThreadsMonitoringImpl(system, config.getThreadMonitorInterval(),
config.getThreadMonitorTimeLimit());
logger.info("[ThreadsMonitor] New Monitor object and process were created.\n");
} else {
this.threadMonitor = new ThreadsMonitoringImplDummy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ public enum Mode {
SerialQueuedExecutor,
OneTaskOnlyExecutor,
ScheduledThreadExecutor,
AGSExecutor
AGSExecutor,
P2PReaderExecutor
};

Map<Long, AbstractExecutor> getMonitorMap();
Expand All @@ -38,18 +39,45 @@ public enum Mode {
void close();

/**
* Starting to monitor a new executor object.
* Start monitoring the calling thread.
*
* @param mode the object executor group.
* @param mode describes the group the calling thread should be associated with.
* @return true - if succeeded , false - if failed.
*/
public boolean startMonitor(Mode mode);

/**
* Ending the monitoring of an executor object.
* Stops monitoring the calling thread if it is currently being monitored.
*/
public void endMonitor();

/**
* Creates a new executor that is associated with the calling thread.
* Callers need to pass the returned executor to {@link #register(AbstractExecutor)}
* for this executor to be monitored.
*
* @param mode describes the group the calling thread should be associated with.
* @return the created {@link AbstractExecutor} instance.
*/
public AbstractExecutor createAbstractExecutor(Mode mode);

/**
* Call to cause this thread monitor to start monitoring
* the given executor.
*
* @param executor the executor to monitor.
* @return true - if succeeded , false - if failed.
*/
public boolean register(AbstractExecutor executor);

/**
* Call to cause this thread monitor to stop monitoring
* the given executor.
*
* @param executor the executor to stop monitoring.
*/
public void unregister(AbstractExecutor executor);

/**
* A long-running thread that may appear stuck should periodically update its "alive"
* status by invoking this method
Expand Down
Loading

0 comments on commit e26d759

Please sign in to comment.