Skip to content

Commit

Permalink
GEODE-653: Add unit test for GMSHealthMonitor checkIfAvailable
Browse files Browse the repository at this point in the history
Removed unused code
Minor javadoc corrections
  • Loading branch information
jhuynh1 committed Dec 16, 2015
1 parent d40d8a7 commit 8c9af2a
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@
* to remove that member from view.
*
* It has {@link #suspect(InternalDistributedMember, String)} api, which can be used
* to initiate suspect processing for any member. First is checks whether member is
* responding or not. Then it informs to probable coordinators to remove that member from
* to initiate suspect processing for any member. First is checks whether the member is
* responding or not. Then it informs probable coordinators to remove that member from
* view.
*
* It has {@link #checkIfAvailable(DistributedMember, String, boolean)} api to see
* if that member is alive. Then based on removal flag it initiate the suspect processing
* if that member is alive. Then based on removal flag it initiates the suspect processing
* for that member.
*
* */
Expand Down Expand Up @@ -158,9 +158,6 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {

private ExecutorService checkExecutor;

// List<SuspectRequest> suspectRequests = new ArrayList<SuspectRequest>();
// private RequestCollector<SuspectRequest> suspectRequestCollectorThread;

/**
* to stop check scheduler
*/
Expand Down Expand Up @@ -463,17 +460,6 @@ private boolean doCheckMember(InternalDistributedMember member) {
return false;
}

/**
* Check for recent messaging activity from the given member
* @param suspectMember
* @return whether there has been activity within memberTimeout ms
*/
private boolean checkRecentActivity(InternalDistributedMember suspectMember) {
TimeStamp ts = memberTimeStamps.get(suspectMember);
return (ts != null && (System.currentTimeMillis() - ts.getTime()) <= memberTimeout);
}


/**
* During final check, establish TCP connection between current member and suspect member.
* And exchange PING/PONG message to see if the suspect member is still alive.
Expand Down Expand Up @@ -1238,100 +1224,6 @@ public int getFailureDetectionPort() {
return this.socketPort;
}

interface Callback<T> {
public void process(List<T> requests);
}

/***
* this thread will collect suspect message for some time interval
* then it send message to current coordinator first if its not in
* suspected list. if its in then it will send message to next probable
* coordinator. NOTE: this thread will not check-server for verification
* assuming many servers are going down and lets coordinator deals with it.
*
* Should we wait for ack from coordinator/probable coordinator that I got
* request to suspect these members.
*
*/
class RequestCollector<T> extends Thread {
volatile boolean shutdown = false;
final List<T> listToTrack;
final Callback<T> callback;
final long timeout;

public RequestCollector(String name, ThreadGroup tg, List<T> l, Callback<T> c, long t) {
super(tg, name);
listToTrack = l;
callback = c;
timeout = t;
}

void shutdown() {
shutdown = true;
synchronized (listToTrack) {
listToTrack.notify();
interrupt();
}
}

boolean isShutdown() {
return shutdown;
}

@Override
public void run() {
List<T> requests = null;
logger.debug("Suspect thread is starting");
long okayToSendSuspectRequest = System.currentTimeMillis() + timeout;
try {
for (;;) {
synchronized (listToTrack) {
if (shutdown || services.getCancelCriterion().isCancelInProgress()) {
return;
}
if (listToTrack.isEmpty()) {
try {
logger.trace("Result collector is waiting");
listToTrack.wait();
} catch (InterruptedException e) {
return;
}
} else {
long now = System.currentTimeMillis();
if (now < okayToSendSuspectRequest) {
// sleep to let more suspect requests arrive
try {
sleep(okayToSendSuspectRequest - now);
continue;
} catch (InterruptedException e) {
return;
}
} else {
if (requests == null) {
requests = new ArrayList<T>(listToTrack);
} else {
requests.addAll(listToTrack);
}
listToTrack.clear();
okayToSendSuspectRequest = System.currentTimeMillis() + timeout;
}
}
} // synchronized
if (requests != null && !requests.isEmpty()) {
if (logger != null && logger.isDebugEnabled()) {
logger.info("Health Monitor is sending {} member suspect requests to coordinator", requests.size());
}
callback.process(requests);
requests = null;
}
}
} finally {
shutdown = true;
logger.debug("Suspect thread is stopped");
}
}
}

private void sendSuspectRequest(final List<SuspectRequest> requests) {
// the background suspect-collector thread is currently disabled
// synchronized (suspectRequests) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
Expand All @@ -42,6 +46,8 @@
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
Expand Down Expand Up @@ -407,25 +413,33 @@ public void testRemoveMemberCalledAfterDoingFinalCheckOnCoordinator() throws Exc
* validates HealthMonitor.CheckIfAvailable api
*/
@Test
public void testCheckIfAvailable() {

NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new HashSet<InternalDistributedMember>(), new HashSet<InternalDistributedMember>());

// 3rd is current member
when(messenger.getMemberID()).thenReturn(mockMembers.get(3));

gmsHealthMonitor.installView(v);

public void testCheckIfAvailableNoHeartBeatDontRemoveMember() {
long startTime = System.currentTimeMillis();

boolean retVal = gmsHealthMonitor.checkIfAvailable(mockMembers.get(1), "Not responding", false);

long timeTaken = System.currentTimeMillis() - startTime;

assertTrue("This should have taken member ping timeout 100ms ", timeTaken > 90);
assertTrue("CheckIfAvailable should have return false", !retVal);
assertTrue("This should have taken member ping timeout 100ms ", timeTaken >= gmsHealthMonitor.memberTimeout);
assertFalse("CheckIfAvailable should have return false", retVal);
}

@Test
public void testCheckIfAvailableWithSimulatedHeartBeat() {
InternalDistributedMember memberToCheck = mockMembers.get(1);
HeartbeatMessage fakeHeartbeat = new HeartbeatMessage();
fakeHeartbeat.setSender(memberToCheck);
when(messenger.send(any(HeartbeatRequestMessage.class))).then(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
gmsHealthMonitor.processMessage(fakeHeartbeat);
return null;
}
});

boolean retVal = gmsHealthMonitor.checkIfAvailable(memberToCheck, "Not responding", true);
assertTrue("CheckIfAvailable should have return true", retVal);
}


@Test
public void testShutdown() {

Expand Down

0 comments on commit 8c9af2a

Please sign in to comment.