Skip to content

Commit

Permalink
[GEODE-77] TCP check for health monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
jchen21 committed Oct 16, 2015
1 parent 5ca7c37 commit e267c88
Show file tree
Hide file tree
Showing 9 changed files with 418 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
import static com.gemstone.gemfire.internal.DataSerializableFixedID.CHECK_RESPONSE;
import static com.gemstone.gemfire.internal.DataSerializableFixedID.SUSPECT_MEMBERS_MESSAGE;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -24,6 +31,7 @@

import org.apache.logging.log4j.Logger;

import com.gemstone.gemfire.SystemConnectException;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
import com.gemstone.gemfire.distributed.internal.DistributionMessage;
Expand All @@ -36,6 +44,7 @@
import com.gemstone.gemfire.distributed.internal.membership.gms.messages.CheckResponseMessage;
import com.gemstone.gemfire.distributed.internal.membership.gms.messages.SuspectMembersMessage;
import com.gemstone.gemfire.distributed.internal.membership.gms.messages.SuspectRequest;
import com.gemstone.gemfire.internal.AvailablePort;
import com.gemstone.gemfire.internal.concurrent.ConcurrentHashSet;

/**
Expand Down Expand Up @@ -118,6 +127,15 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {

/** test hook */
boolean beingSick = false;

// For TCP check
private ExecutorService serverSocketExecutor;
private static final int PING = 0x01;
private static final int PONG = 0x02;
private InetAddress ip;
private int socketPort;
private ServerSocket serverSocket;
private Map<InternalDistributedMember, InetSocketAddress> socketInfo = new ConcurrentHashMap<InternalDistributedMember, InetSocketAddress>();

public GMSHealthMonitor() {

Expand Down Expand Up @@ -327,6 +345,61 @@ private boolean doCheckMember(InternalDistributedMember pingMember) {
return false;
}

/**
* During final check, establish TCP connection between current member and suspect member.
* And exchange PING/PONG message to see if the suspect member is still alive.
*
* @param suspectMember member that does not respond to CheckRequestMessage
* @return true if successfully exchanged PING/PONG with TCP connection, otherwise false.
*/
private boolean doTCPCheckMember(InternalDistributedMember suspectMember) {
logger.trace("Checking member {} with TCP socket connection.", suspectMember);
Socket clientSocket = new Socket();
try {
// establish TCP connection
InetSocketAddress addr = socketInfo.get(suspectMember);
if (addr != null) {
logger.trace("Checking member {} with TCP socket connection {}:{}.", suspectMember, addr.getAddress(), addr.getPort());
clientSocket.connect(addr, (int) services.getConfig().getMemberTimeout());
}
if (clientSocket.isConnected()) {
clientSocket.setSoTimeout((int) services.getConfig().getMemberTimeout());
InputStream in = clientSocket.getInputStream();
OutputStream out = clientSocket.getOutputStream();
out.write(PING);
out.flush();
clientSocket.shutdownOutput();
logger.trace("Send {} to member {} with TCP socket connection.", PING, suspectMember);
int b = in.read();
logger.trace("Received {} from member {} with TCP socket connection.", b, suspectMember);
if (b == PONG) {
CustomTimeStamp ts = memberVsLastMsgTS.get(suspectMember);
if (ts != null) {
ts.setTimeStamp(System.currentTimeMillis());
}
return true;
} else {
// TODO: Received something other than PONG. Is it possible?
return false;
}
} else {// cannot establish TCP connection with suspect member
return false;
}
} catch (IOException e) {
logger.trace("Unexpected exception", e);
} finally {
try {
if (clientSocket != null) {
clientSocket.close();
}
} catch (IOException e) {
logger.trace("Unexpected exception", e);
}
}

return false;
}

/*
* (non-Javadoc)
*
Expand Down Expand Up @@ -355,7 +428,7 @@ public boolean checkIfAvailable(DistributedMember mbr, String reason, boolean in
}

public void start() {
{
{
scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Expand Down Expand Up @@ -396,6 +469,108 @@ public void process(List<SuspectRequest> requests) {
suspectRequestCollectorThread.setDaemon(true);
suspectRequestCollectorThread.start();
}

{
serverSocketExecutor = Executors.newCachedThreadPool(new ThreadFactory() {
AtomicInteger threadIdx = new AtomicInteger();

@Override
public Thread newThread(Runnable r) {
int id = threadIdx.getAndIncrement();
Thread th = new Thread(Services.getThreadGroup(), r, "TCP Check ServerSocket Thread " + id);
th.setDaemon(true);
return th;
}
});

serverSocketExecutor.execute(new Runnable() {
@Override
public void run() {
Socket socket = null;
try {
// start server socket for TCP check
localAddress = services.getMessenger().getMemberID();
ip = localAddress.getInetAddress();
int[] portRange = services.getConfig().getMembershipPortRange();
socketPort = AvailablePort.getAvailablePortInRange(portRange[0], portRange[1], AvailablePort.SOCKET);
if (socketPort == -1) {
throw new SystemConnectException("Unable to find a free port in the membership port range");
}
serverSocket = new ServerSocket(socketPort);
logger.debug("GMSHealthMonitor started server socket on {}:{}.", ip, socketPort);
socketInfo.put(localAddress, new InetSocketAddress(ip, socketPort));
while (!services.getCancelCriterion().isCancelInProgress()
&& !GMSHealthMonitor.this.isStopping && !GMSHealthMonitor.this.playingDead) {
try {
socket = serverSocket.accept();
if (GMSHealthMonitor.this.playingDead) {
continue;
}
socket.setSoTimeout((int) services.getConfig().getMemberTimeout());
new ClientSocketHandler(socket).start();
} catch (IOException e) {
logger.trace("Unexpected exception", e);
try {
if (socket != null) {
socket.close();
}
} catch (IOException ioe) {
logger.trace("Unexpected exception", ioe);
}
}
}
} catch (IOException e) {
logger.trace("Unexpected exception", e);
} finally {
// close the server socket
if (serverSocket != null) {
try {
serverSocket.close();
logger.debug("GMSHealthMonitor server socket closed.");
} catch (IOException e) {
logger.debug("Unexpected exception", e);
}
}
}
}
});
}
}

class ClientSocketHandler extends Thread {

private Socket socket;

public ClientSocketHandler(Socket socket) {
super(services.getThreadGroup(), "ClientSocketHandler");
this.socket = socket;
setDaemon(true);
}

public void run() {
try {
InputStream in = socket.getInputStream();
OutputStream out = socket.getOutputStream();
int b = in.read();
logger.debug("GMSHealthMonitor server socket received {}.", b);
if (b == PING) {
out.write(PONG);
out.flush();
socket.shutdownOutput();
logger.debug("GMSHealthMonitor server socket replied {}.", PONG);
}
} catch (IOException e) {
logger.trace("Unexpected exception", e);
} finally {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
logger.info("Unexpected exception", e);
}
}
}
}
}

public synchronized void installView(NetView newView) {
Expand Down Expand Up @@ -519,6 +694,18 @@ private void stopServices() {
checkExecutor.shutdown();
}

if (serverSocketExecutor != null) {
if (serverSocket != null) {
try {
serverSocket.close();
}
catch (IOException e) {
logger.trace("Unexpected exception", e);
}
}
serverSocketExecutor.shutdownNow();
}

if (suspectRequestCollectorThread != null) {
suspectRequestCollectorThread.shutdown();
}
Expand All @@ -528,7 +715,7 @@ private void stopServices() {
* test method
*/
public boolean isShutdown() {
return scheduler.isShutdown() && checkExecutor.isShutdown() && !suspectRequestCollectorThread.isAlive();
return scheduler.isShutdown() && checkExecutor.isShutdown() && serverSocketExecutor.isShutdown() && !suspectRequestCollectorThread.isAlive();
}

@Override
Expand Down Expand Up @@ -748,7 +935,12 @@ public void run() {
memberVsLastMsgTS.put(mbr, ts);

logger.info("Performing final check for suspect member {} reason={}", mbr, reason);
boolean pinged = GMSHealthMonitor.this.doCheckMember(mbr);
boolean pinged;
if (socketInfo.get(mbr) == null || socketInfo.get(mbr).getPort() < 0) {
pinged = GMSHealthMonitor.this.doCheckMember(mbr);
} else {
pinged = GMSHealthMonitor.this.doTCPCheckMember(mbr);
}
if (!pinged && !isStopping) {
ts = memberVsLastMsgTS.get(mbr);
if (ts == null || ts.getTimeStamp() <= startTime) {
Expand Down Expand Up @@ -913,4 +1105,21 @@ public void memberShutdown(DistributedMember mbr, String reason) {
// TODO Auto-generated method stub

}

public Map<InternalDistributedMember, InetSocketAddress> getSocketInfo() {
return this.socketInfo;
}

public void installSocketInfo(List<InternalDistributedMember> members, List<Integer> portsForMembers) {
logger.info("members=" + members + " portsForMembers=" + portsForMembers);
logger.info("members.size()=" + members.size() + " portsForMembers.size()=" + portsForMembers.size());
for (int i = 0; i < members.size(); i++) {
if (portsForMembers.get(i) == -1) {
continue;
}
InetSocketAddress addr = new InetSocketAddress(members.get(i).getInetAddress(), portsForMembers.get(i));
socketInfo.put(members.get(i), addr);
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package com.gemstone.gemfire.distributed.internal.membership.gms.interfaces;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;

import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;

Expand Down Expand Up @@ -34,4 +38,17 @@ public interface HealthMonitor extends Service {
* ShutdownMessage has been received from the given member
*/
public void memberShutdown(DistributedMember mbr, String reason);

/**
* Returns a map that describes the members and their server sockets
*/
public Map<InternalDistributedMember, InetSocketAddress> getSocketInfo();

/**
* Update the information of the members and their server sockets
*
* @param members
* @param portsForMembers List of socket ports for each member
*/
public void installSocketInfo(List<InternalDistributedMember> members, List<Integer> portsForMembers);
}
Loading

0 comments on commit e267c88

Please sign in to comment.