Skip to content

Commit

Permalink
Bind both TCP and UDP ports before clustering
Browse files Browse the repository at this point in the history
Was checking only that the UDP port was available. Could cluster up but
hang & complain that the TCP port was not available.
  • Loading branch information
bghill committed Feb 28, 2015
1 parent f17c7ee commit b9fd80f
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 13 deletions.
5 changes: 3 additions & 2 deletions src/main/java/water/AutoBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ public AutoBuffer( int len ) {
private static final AtomicInteger BBCACHE= new AtomicInteger(0);
private static final LinkedBlockingDeque<ByteBuffer> BBS = new LinkedBlockingDeque<ByteBuffer>();
static final int BBSIZE = 64*1024; // Bytebuffer "common big size"
public static int TCP_BUF_SIZ = BBSIZE;
private static void bbstats( AtomicInteger ai ) {
if( !DEBUG ) return;
if( (ai.incrementAndGet()&511)==511 ) {
Expand Down Expand Up @@ -804,7 +805,7 @@ AutoBuffer putUdp (UDP.udp type) {
assert _bb.position()==0;
putSp(1+2);
_bb.put ((byte)type.ordinal());
_bb.putChar((char)H2O.UDP_PORT ); // Outgoing port is always the sender's (me) port
_bb.putChar((char)H2O.H2O_PORT); // Outgoing port is always the sender's (me) port
assert _bb.position()==1+2;
return this;
}
Expand All @@ -815,7 +816,7 @@ AutoBuffer putTask(UDP.udp type, int tasknum) {
AutoBuffer putTask(int ctrl, int tasknum) {
assert _bb.position()==0;
putSp(1+2+4);
_bb.put((byte)ctrl).putChar((char)H2O.UDP_PORT).putInt(tasknum);
_bb.put((byte)ctrl).putChar((char)H2O.H2O_PORT).putInt(tasknum);
return this;
}

Expand Down
20 changes: 14 additions & 6 deletions src/main/java/water/H2O.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import water.util.*;
import water.util.Log.Tag.Sys;
import water.license.LicenseManager;
import java.nio.channels.ServerSocketChannel;

/**
* Start point for creating or joining an <code>H2O</code> Cloud.
Expand All @@ -37,7 +38,7 @@ public final class H2O {

// The default port for finding a Cloud
public static int DEFAULT_PORT = 54321;
public static int UDP_PORT; // Fast/small UDP transfers
public static int H2O_PORT; // Fast/small UDP transfers
public static int API_PORT; // RequestServer and the new API HTTP port

// Whether to toggle to single precision as upper limit for storing floating point numbers
Expand Down Expand Up @@ -1128,9 +1129,9 @@ static void initializeNetworkSockets( ) {
API_PORT = OPT_ARGS.port != 0 ? OPT_ARGS.port : DEFAULT_PORT;

while (true) {
UDP_PORT = API_PORT+1;
H2O_PORT = API_PORT+1;
if( API_PORT<0 || API_PORT>65534 ) // 65535 is max, implied for udp port
Log.die("Attempting to use system illegal port, either "+API_PORT+" or "+UDP_PORT);
Log.die("Attempting to use system illegal port, either "+API_PORT+" or "+ H2O_PORT);
try {
// kbn. seems like we need to set SO_REUSEADDR before binding?
// http://www.javadocexamples.com/java/net/java.net.ServerSocket.html#setReuseAddress:boolean
Expand All @@ -1149,16 +1150,23 @@ static void initializeNetworkSockets( ) {
? new ServerSocket(API_PORT)
: new ServerSocket(API_PORT, -1/*defaultBacklog*/, SELF_ADDRESS);
_apiSocket.setReuseAddress(true);

// Bind to the UDP socket
_udpSocket = DatagramChannel.open();
_udpSocket.socket().setReuseAddress(true);
_udpSocket.socket().bind(new InetSocketAddress(SELF_ADDRESS, UDP_PORT));
InetSocketAddress isa = new InetSocketAddress(H2O.SELF_ADDRESS, H2O_PORT);
_udpSocket.socket().bind(isa);
// Bind to the TCP socket also
TCPReceiverThread.SOCK = ServerSocketChannel.open();
TCPReceiverThread.SOCK.socket().setReceiveBufferSize(water.AutoBuffer.TCP_BUF_SIZ);
TCPReceiverThread.SOCK.socket().bind(isa);
break;
} catch (IOException e) {
try { if( _apiSocket != null ) _apiSocket.close(); } catch( IOException ohwell ) { Log.err(ohwell); }
Utils.close(_udpSocket);
if( TCPReceiverThread.SOCK != null ) try { TCPReceiverThread.SOCK.close(); } catch( IOException ie ) { }
_apiSocket = null;
_udpSocket = null;
TCPReceiverThread.SOCK = null;
if( OPT_ARGS.port != 0 )
Log.die("On " + SELF_ADDRESS +
" some of the required ports " + (OPT_ARGS.port+0) +
Expand All @@ -1168,7 +1176,7 @@ static void initializeNetworkSockets( ) {
API_PORT += 2;
}
SELF = H2ONode.self(SELF_ADDRESS);
Log.info("Internal communication uses port: ",UDP_PORT,"\nListening for HTTP and REST traffic on http://",SELF_ADDRESS.getHostAddress(),":"+_apiSocket.getLocalPort()+"/");
Log.info("Internal communication uses port: ", H2O_PORT,"\nListening for HTTP and REST traffic on http://",SELF_ADDRESS.getHostAddress(),":"+_apiSocket.getLocalPort()+"/");

String embeddedConfigFlatfile = null;
AbstractEmbeddedH2OConfig ec = getEmbeddedH2OConfig();
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/water/H2ONode.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public H2ONode( ) { }
// Get a nice Node Name for this Node in the Cloud. Basically it's the
// InetAddress we use to communicate to this Node.
static H2ONode self(InetAddress local) {
assert H2O.UDP_PORT != 0;
assert H2O.H2O_PORT != 0;
try {
// Figure out which interface matches our IP address
List<NetworkInterface> matchingIfs = new ArrayList();
Expand Down Expand Up @@ -180,7 +180,7 @@ static H2ONode self(InetAddress local) {
} catch( Exception e ) {
throw Log.errRTExcept(e);
}
return intern(new H2Okey(local,H2O.UDP_PORT));
return intern(new H2Okey(local,H2O.H2O_PORT));
}

// Happy printable string
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/water/TCPReceiverThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void run() {
break; // Socket closed for shutdown
} catch( Exception e ) {
// On any error from anybody, close all sockets & re-open
Log.err("Retrying after IO error on TCP port "+H2O.UDP_PORT+": ",e);
Log.err("Retrying after IO error on TCP port "+H2O.H2O_PORT +": ",e);
saw_error = true;
errsock = SOCK ; SOCK = null; // Signal error recovery on the next loop
}
Expand Down Expand Up @@ -93,7 +93,7 @@ public void run() {
// On any error from anybody, close everything
System.err.println("IO error");
e.printStackTrace();
Log.err("IO error on TCP port "+H2O.UDP_PORT+": ",e);
Log.err("IO error on TCP port "+H2O.H2O_PORT +": ",e);
break;
}
// Reuse open sockets for the next task
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/water/UDPReceiverThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void run() {
break; // Socket closed for shutdown
} catch( Exception e ) {
// On any error from anybody, close all sockets & re-open
Log.err("UDP Receiver error on port "+H2O.UDP_PORT,e);
Log.err("UDP Receiver error on port "+H2O.H2O_PORT,e);
saw_error = true;
errsock = sock ; sock = null; // Signal error recovery on the next loop
}
Expand Down

0 comments on commit b9fd80f

Please sign in to comment.