Skip to content

Commit

Permalink
Merge branch 'hadoop'
Browse files Browse the repository at this point in the history
  • Loading branch information
cypof committed May 7, 2013
2 parents f422a01 + c4551a1 commit d512de4
Show file tree
Hide file tree
Showing 22 changed files with 261 additions and 186 deletions.
6 changes: 0 additions & 6 deletions .classpath
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,11 @@
<classpathentry kind="lib" path="lib/jets3t/commons-httpclient-3.1.jar"/>
<classpathentry kind="lib" path="lib/jets3t/jets3t-0.6.1.jar"/>
<classpathentry kind="lib" path="lib/apache/commons-codec-1.3.jar"/>
<classpathentry kind="lib" path="lib/apache/commons-logging-1.1.1-src.zip"/>
<classpathentry kind="lib" path="lib/hadoop/cdh3/commons-cli-1.2.jar"/>
<classpathentry kind="lib" path="lib/hadoop/cdh3/guava-r09-jarjar.jar"/>
<classpathentry kind="lib" path="lib/hadoop/cdh3/jackson-core-asl-1.5.2.jar"/>
<classpathentry kind="lib" path="lib/hadoop/cdh3/jackson-mapper-asl-1.5.2.jar"/>
<classpathentry kind="lib" path="lib/hadoop/cdh3/log4j-1.2.15.jar" sourcepath="/home/cypof/.m2/repository/log4j/log4j/1.2.15/log4j-1.2.15-sources.jar"/>
<classpathentry kind="lib" path="lib/hadoop/cdh3/hadoop-core-0.20.2-cdh3u6.jar" sourcepath="lib/hadoop/cdh3/hadoop-core-0.20.2-cdh3u6-sources.jar"/>
<classpathentry kind="lib" path="lib/hadoop/cdh3/maprfs-0.1.jar">
<attributes>
<attribute name="org.eclipse.jdt.launching.CLASSPATH_ATTR_LIBRARY_PATH_ENTRY" value="/home/cypof/Downloads/mapr"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="target/classes"/>
</classpath>
6 changes: 6 additions & 0 deletions .project
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.python.pydev.PyDevBuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
Expand All @@ -13,5 +18,6 @@
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
<nature>org.python.pydev.pythonNature</nature>
</natures>
</projectDescription>
Binary file removed lib/hadoop/cdh3/maprfs-0.1.jar
Binary file not shown.
2 changes: 1 addition & 1 deletion py/h2o.py
Original file line number Diff line number Diff line change
Expand Up @@ -1369,7 +1369,7 @@ def __init__(self,
use_debugger=None, classpath=None,
use_hdfs=False,
# hdfs_version="cdh4", hdfs_name_node="192.168.1.151",
hdfs_version="cdh3u5", hdfs_name_node="192.168.1.176",
hdfs_version="cdh3", hdfs_name_node="192.168.1.176",
hdfs_config=None,
aws_credentials=None,
use_flatfile=False, java_heap_GB=None, java_heap_MB=None, java_extra_args=None,
Expand Down
2 changes: 1 addition & 1 deletion py/testdir_hosts/test_parse_nflx_loop_hdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def test_import_nflx_parse_loop(self):
h2o_hosts.build_cloud_with_hosts(node_count=1, java_heap_GB=tryHeap,
use_hdfs=True,
hdfs_name_node='192.168.1.176',
hdfs_version='cdh3u5')
hdfs_version='cdh3')

# don't raise exception if we find something bad in h2o stdout/stderr?
h2o.nodes[0].sandbox_ignore_errors = True
Expand Down
2 changes: 1 addition & 1 deletion py/testdir_multi_jvm/test_hdfs_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def setUpClass(cls):
localhost = h2o.decide_if_localhost()
if (localhost):
h2o.build_cloud(3,
use_hdfs=True, hdfs_version='cdh3u5', hdfs_name_node='192.168.1.176')
use_hdfs=True, hdfs_version='cdh3', hdfs_name_node='192.168.1.176')
else:
h2o_hosts.build_cloud_with_hosts()

Expand Down
4 changes: 2 additions & 2 deletions py/testdir_single_jvm/test_hdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ def setUpClass(cls):
global localhost
localhost = h2o.decide_if_localhost()
if (localhost):
h2o.build_cloud(1, use_hdfs=True, hdfs_version='cdh3u5', hdfs_name_node='192.168.1.176')
h2o.build_cloud(1, use_hdfs=True, hdfs_version='cdh3', hdfs_name_node='192.168.1.176')
else:
h2o_hosts.build_cloud_with_hosts(1, use_hdfs=True, hdfs_version='cdh3u5', hdfs_name_node='192.168.1.176')
h2o_hosts.build_cloud_with_hosts(1, use_hdfs=True, hdfs_version='cdh3', hdfs_name_node='192.168.1.176')

@classmethod
def tearDownClass(cls):
Expand Down
4 changes: 2 additions & 2 deletions py/testdir_single_jvm/test_hdfs_multi_bad_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ def setUpClass(cls):
global localhost
localhost = h2o.decide_if_localhost()
if (localhost):
h2o.build_cloud(1, use_hdfs=True, hdfs_version='cdh3u5', hdfs_name_node='192.168.1.176')
h2o.build_cloud(1, use_hdfs=True, hdfs_version='cdh3', hdfs_name_node='192.168.1.176')
else:
h2o_hosts.build_cloud_with_hosts(1, use_hdfs=True, hdfs_version='cdh3u5', hdfs_name_node='192.168.1.176')
h2o_hosts.build_cloud_with_hosts(1, use_hdfs=True, hdfs_version='cdh3', hdfs_name_node='192.168.1.176')

@classmethod
def tearDownClass(cls):
Expand Down
4 changes: 2 additions & 2 deletions py/testdir_single_jvm/test_hdfs_multi_copies.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ def setUpClass(cls):
global localhost
localhost = h2o.decide_if_localhost()
if (localhost):
h2o.build_cloud(1, use_hdfs=True, hdfs_version='cdh3u5', hdfs_name_node='192.168.1.176')
h2o.build_cloud(1, use_hdfs=True, hdfs_version='cdh3', hdfs_name_node='192.168.1.176')
else:
h2o_hosts.build_cloud_with_hosts(1, use_hdfs=True, hdfs_version='cdh3u5', hdfs_name_node='192.168.1.176')
h2o_hosts.build_cloud_with_hosts(1, use_hdfs=True, hdfs_version='cdh3', hdfs_name_node='192.168.1.176')

@classmethod
def tearDownClass(cls):
Expand Down
4 changes: 2 additions & 2 deletions py/testdir_single_jvm/test_hdfs_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ def setUpClass(cls):
global localhost
localhost = h2o.decide_if_localhost()
if (localhost):
h2o.build_cloud(1,use_hdfs=True, hdfs_version='cdh3u5', hdfs_name_node="192.168.1.176")
h2o.build_cloud(1,use_hdfs=True, hdfs_version='cdh3', hdfs_name_node="192.168.1.176")
else:
h2o_hosts.build_cloud_with_hosts(1,use_hdfs=True, hdfs_version='cdh3u5', hdfs_name_node="192.168.1.176")
h2o_hosts.build_cloud_with_hosts(1,use_hdfs=True, hdfs_version='cdh3', hdfs_name_node="192.168.1.176")

@classmethod
def tearDownClass(cls):
Expand Down
162 changes: 80 additions & 82 deletions src/main/java/water/AutoBuffer.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package water;

import java.io.EOFException;
import java.io.IOException;
import java.lang.reflect.Array;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.Random;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import water.util.Log;
import water.util.Log.Tag.Sys;

/**
* A ByteBuffer backed mixed Input/OutputStream class.
Expand Down Expand Up @@ -66,6 +68,9 @@ public final class AutoBuffer {
// The assumed max UDP packetsize
public static final int MTU = 1500-8/*UDP packet header size*/;

// Enable this to test random TCP fails on open or write
static final Random RANDOM_TCP_DROP = null; //new Random();

// Incoming UDP request. Make a read-mode AutoBuffer from the open Channel,
// figure the originating H2ONode from the first few bytes read.
public AutoBuffer( DatagramChannel sock ) throws IOException {
Expand Down Expand Up @@ -188,8 +193,8 @@ public String toString() {
sb.append("[AB ").append(_read ? "read " : "write ");
sb.append(_firstPage?"first ":"2nd ").append(_h2o);
sb.append(" ").append(Value.nameOfPersist(_persist));
sb.append(" 0 <= ").append(_bb.position()).append(" <= ").append(_bb.limit());
sb.append(" <= ").append(_bb.capacity());
if( _bb != null ) sb.append(" 0 <= ").append(_bb.position()).append(" <= ").append(_bb.limit());
if( _bb != null ) sb.append(" <= ").append(_bb.capacity());
return sb.append("]").toString();
}

Expand Down Expand Up @@ -234,47 +239,64 @@ private final int bbFree() {
return 0; // Flow-coding
}

// You thought TCP was a reliable protocol, right? WRONG! Fails 100% of the
// time under heavy network load. Connection-reset-by-peer & connection
// timeouts abound, even after a socket open and after a 1st successful
// ByteBuffer write. It *appears* that the reader is unaware that a writer
// was told "go ahead and write" by the TCP stack, so all these fails are
// only on the writer-side.
public static class TCPIsUnreliableException extends RuntimeException {
final IOException _ioe;
TCPIsUnreliableException( IOException ioe ) { _ioe = ioe; }
}

// For reads, just assert all was read and close and release resources.
// (release ByteBuffer back to the common pool). For writes, force any final
// bytes out. If the write is to an H2ONode and is short, send via UDP.
// AutoBuffer close calls order; i.e. a reader close() will block until the
// writer does a close().
public final int close() { return close(true); }
public final int close(boolean expect_tcp) {
public final int close() { return close(true,false); }
public final int close(boolean expect_tcp) { return close(expect_tcp,false); }
public final int close(boolean expect_tcp, boolean failed) {
assert _h2o != null || _chan != null; // Byte-array backed should not be closed
// Extra asserts on closing TCP channels: we should always know & expect
// TCP channels, and read them fully. If we close a TCP channel that is
// not fully read then the writer will assert and we will silently run on.
// Note: this assert is essentially redundant with the extra read/write of
// 0xab below; closing a TCP read-channel early will read 1 more byte -
// which probably will not be 0xab.
assert expect_tcp || !hasTCP();
assert expect_tcp || failed || !hasTCP();
try {
if( _chan == null ) { // No channel?
if( _read ) return bbFree();
// For small-packet write, send via UDP. Since nothing is sent until
// now, this close() call trivially orders - since the reader will not
// even start (much less close()) until this packet is sent.
if( _bb.position() < MTU ) return udpSend();
}
// Force AutoBuffer 'close' calls to order; i.e. block readers until
// writers do a 'close' - by writing 1 more byte in the close-call which
// the reader will have to wait for.
if( _h2o != null ) { // TCP connection?
if( _read ) { // Reader?
int x = get1(); // Read 1 more byte
assert x == 0xab : "AB.close instead of 0xab sentinel got "+x+", "+this;
} else { // Writer?
put1(0xab); // Write one-more byte
if( !failed ) { // Failed flag is to recover from a broken TCP write
if( _chan == null ) { // No channel?
if( _read ) return bbFree();
// For small-packet write, send via UDP. Since nothing is sent until
// now, this close() call trivially orders - since the reader will not
// even start (much less close()) until this packet is sent.
if( _bb.position() < MTU ) return udpSend();
}
// Force AutoBuffer 'close' calls to order; i.e. block readers until
// writers do a 'close' - by writing 1 more byte in the close-call which
// the reader will have to wait for.
if( _h2o != null ) { // TCP connection?
if( _read ) { // Reader?
int x = get1(); // Read 1 more byte
assert x == 0xab : "AB.close instead of 0xab sentinel got "+x+", "+this;
} else { // Writer?
put1(0xab); // Write one-more byte
}
}
if( !_read ) sendPartial(); // Finish partial writes
_chan.close();
_time_close_ms = System.currentTimeMillis();
TimeLine.record_IOclose(this,_persist); // Profile TCP connections
} else { // Failed?
if( _chan != null ) _chan.close(); // Just close, restore, exit
}
if( !_read ) sendPartial(); // Finish partial writes
_chan.close();
_time_close_ms = System.currentTimeMillis();
TimeLine.record_IOclose(this,_persist); // Profile TCP connections
} catch( IOException e ) { // Dunno how to handle so crash-n-burn
throw Log.errRTExcept(e);
// If already in failure-recovery mode, do not log an error on close...
// we already logged at the higher layer in the application.
if( !failed ) throw Log.errRTExcept(e);
} finally {
restorePriority(); // And if we raised priority, lower it back
if( _chan instanceof SocketChannel )
Expand All @@ -292,63 +314,28 @@ public void drainClose() {
TCPS.decrementAndGet();
bbFree();
} catch( IOException e ) { // Dunno how to handle so crash-n-burn
throw Log.errRTExcept(e);
throw Log.errRTExcept(e);
}
}

// Need a sock for a big read or write operation
private void tcpOpen() {
private void tcpOpen() throws IOException {
assert _firstPage && _bb.limit() >= 1+2+4; // At least something written
assert _chan == null;
assert _bb.position()==0;

SocketChannel sock=null;
while(true) { // Loop, in case we get socket open problems
IOException ex = null;
try {
// We expect the socket open to be fast, but if the receiver is very
// overwhelmed he might not respond for a long long time. In this
// case we simply keep retrying (after a sleep period to let the
// receiver catch up).
sock = SocketChannel.open();
assert sock.isBlocking();
sock.socket().setSendBufferSize(BBSIZE);
boolean res = sock.connect( _h2o._key );
assert res && !sock.isConnectionPending();
// Do an initial write: under heavy load we might get a bogus open
// followed by a reset/close AFTER writing... leading to a TCP
// connection-reset-by-peer error. The fix is to delay a tad (for the
// receiver load to drop) and try again.
_bb.position(0);
long ns = System.nanoTime();
while( _bb.hasRemaining() )
sock.write(_bb);
_time_io_ns += (System.nanoTime()-ns);
break;
} // Explicitly ignore the following exceptions but fail on the rest
catch (ConnectException e) { ex = e; }
catch (SocketTimeoutException e) { ex = e; }
catch (IOException e) { ex = e; }
finally {
if( ex != null ) {
try { if( sock != null ) sock.close(); } catch( IOException e ) { }
// Note: logging is dangerous here, as logging requires another TCP
// connection and this TCP connection just failed.
Log.info("Network congestion: TCP open/write "+ex.getMessage()+" talking to "+_h2o+", waiting and retrying...");
try { Thread.sleep(500); } catch (InterruptedException ie) {}
}
}
}

assert sock.isConnected(); // Supposed to be a blocking channel
assert sock.isOpen(); // Supposed to be an open channel
SocketChannel sock = SocketChannel.open();
if( RANDOM_TCP_DROP != null && RANDOM_TCP_DROP.nextInt(10) == 0 )
throw new IOException("Random TCP Open Fail");
sock.socket().setSendBufferSize(BBSIZE);
boolean res = sock.connect( _h2o._key );
assert res && !sock.isConnectionPending() && sock.isBlocking() && sock.isConnected() && sock.isOpen();
_chan = sock;
TCPS.incrementAndGet();
raisePriority();
}

// True if we opened a TCP channel, or will open one to close-and-send
boolean hasTCP() { return _chan != null || (_bb != null && _bb.position() >= MTU); }
boolean hasTCP() { return _chan instanceof SocketChannel || (_bb != null && _bb.position() >= MTU); }

// True if we are in read-mode
boolean readMode() { return _read; }
Expand Down Expand Up @@ -387,7 +374,7 @@ public final boolean eof() {
// over with.
private void raisePriority() {
if(_oldPrior == -1){
assert _chan instanceof SocketChannel && _oldPrior == -1;
assert _chan instanceof SocketChannel;
_oldPrior = Thread.currentThread().getPriority();
Thread.currentThread().setPriority(Thread.MAX_PRIORITY-1);
}
Expand Down Expand Up @@ -447,16 +434,19 @@ private ByteBuffer getSz(int sz) {

private ByteBuffer getImpl( int sz ) {
assert _read : "Reading from a buffer in write mode";
assert _chan != null : "Read to much data from a byte[] backed buffer";
assert _chan != null : "Read to much data from a byte[] backed buffer, AB="+this;
_bb.compact(); // Move remaining unread bytes to start of buffer; prep for reading
// Its got to fit or we asked for too much
assert _bb.position()+sz <= _bb.capacity() : "("+_bb.position()+"+"+sz+" <= "+_bb.capacity()+")";
long ns = System.nanoTime();
while( _bb.position() < sz ) { // Read until we got enuf
try {
int res = _chan.read(_bb); // Read more
// Readers are supposed to be strongly typed and read the exact expected bytes
if( res == -1 ) throw new RuntimeException("EOF while reading "+sz+" bytes, AB="+this);
// Readers are supposed to be strongly typed and read the exact expected bytes.
// However, if a TCP connection fails mid-read we'll get a short-read.
// This is indistinguishable from a mis-alignment between the writer and reader!
if( res == -1 )
throw new TCPIsUnreliableException(new EOFException("Reading "+sz+" bytes, AB="+this));
if( res == 0 ) throw new RuntimeException("Reading zero bytes - so no progress?");
} catch( IOException e ) { // Dunno how to handle so crash-n-burn
throw Log.errRTExcept(e);
Expand Down Expand Up @@ -494,16 +484,24 @@ private ByteBuffer sendPartial() {
if( _chan == null )
TimeLine.record_send(this,true);
_bb.flip(); // Prep for writing.
_bb.mark();
if( _chan == null)
tcpOpen(); // This is a big operation. Open a TCP socket as-needed.
try{
try {
if( _chan == null )
tcpOpen(); // This is a big operation. Open a TCP socket as-needed.
long ns = System.nanoTime();
while( _bb.hasRemaining() )
while( _bb.hasRemaining() ) {
_chan.write(_bb);
if( RANDOM_TCP_DROP != null &&_chan instanceof SocketChannel && RANDOM_TCP_DROP.nextInt(100) == 0 )
throw new IOException("Random TCP Write Fail");
}
_time_io_ns += (System.nanoTime()-ns);
} catch( IOException e ) { // Can't open the connection, try again later
throw new RuntimeException(Log.err("TCP Write, AB="+this+" failed with ",e));
} catch( IOException e ) { // Some kind of TCP fail?
// Change to an unchecked exception (so we don't have to annotate every
// frick'n put1/put2/put4/read/write call). Retry & recovery happens at
// a higher level. AutoBuffers are used for many things including e.g.
// disk i/o & UDP writes; this exception only happens on a failed TCP
// write - and we don't want to make the other AutoBuffer users have to
// declare (and then ignore) this exception.
throw new TCPIsUnreliableException(e);
}
if( _bb.capacity() < 16*1024 ) _bb = bbMake();
_firstPage = false;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/water/FJPacket.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class FJPacket extends H2OCountedCompleter {
if( _ctrl <= UDP.udp.ack.ordinal() )
UDP.udp.UDPS[_ctrl]._udp.call(_ab).close();
else
RPC.remote_exec(_ab).close();
RPC.remote_exec(_ab);
tryComplete();
}
// Run at max priority until we decrypt the packet enough to get priorities out
Expand Down
Loading

0 comments on commit d512de4

Please sign in to comment.