Skip to content

Commit

Permalink
Bug fixes from h2o-dev
Browse files Browse the repository at this point in the history
FetchAck priority level (so an RPC response containing a new type can
fetch the type & id from the leader even if its the leader sending the
RPC response).
AutoBuffer.close takes no args & is idempotent.
H2O priority queues, worker threads do not surf other low-priority
worker thread queues.
  • Loading branch information
cliffclick committed Aug 2, 2014
1 parent 1fa2b13 commit 375a460
Show file tree
Hide file tree
Showing 11 changed files with 134 additions and 123 deletions.
96 changes: 41 additions & 55 deletions src/main/java/water/AutoBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,37 +45,37 @@ public class AutoBuffer {
// Where to send or receive data via TCP or UDP (choice made as we discover
// how big the message is); used to lazily create a Channel. If NULL, then
// _chan should be a pre-existing Channel, such as a FileChannel.
public final H2ONode _h2o;
final H2ONode _h2o;

// TRUE for read-mode. FALSE for write-mode. Can be flipped for rapid turnaround.
private boolean _read;

// TRUE if this AutoBuffer has never advanced past the first "page" of data.
// The UDP-flavor, port# and task fields are only valid until we read over
// them when flipping the ByteBuffer to the next chunk of data. Used in
// asserts all over the place.
// asserts all over the place.
private boolean _firstPage;

// Total size written out from 'new' to 'close'. Only updated when actually
// reading or writing data, or after close(). For profiling only.
public int _size, _zeros, _arys;
int _size, _zeros, _arys;
// More profiling: start->close msec, plus nano's spent in blocking I/O
// calls. The difference between (close-start) and i/o msec is the time the
// i/o thread spends doing other stuff (e.g. allocating Java objects or
// (de)serializing).
public long _time_start_ms, _time_close_ms, _time_io_ns;
long _time_start_ms, _time_close_ms, _time_io_ns;
// I/O persistence flavor: Value.ICE, NFS, HDFS, S3, TCP. Used to record I/O time.
final byte _persist;

// The assumed max UDP packetsize
public static final int MTU = 1500-8/*UDP packet header size*/;
static final int MTU = 1500-8/*UDP packet header size*/;

// Enable this to test random TCP fails on open or write
static final Random RANDOM_TCP_DROP = null; //new Random();

// Incoming UDP request. Make a read-mode AutoBuffer from the open Channel,
// figure the originating H2ONode from the first few bytes read.
public AutoBuffer( DatagramChannel sock ) throws IOException {
AutoBuffer( DatagramChannel sock ) throws IOException {
_chan = null;
_bb = bbMake();
_read = true; // Reading by default
Expand All @@ -102,7 +102,7 @@ public AutoBuffer( DatagramChannel sock ) throws IOException {

// Incoming TCP request. Make a read-mode AutoBuffer from the open Channel,
// figure the originating H2ONode from the first few bytes read.
public AutoBuffer( SocketChannel sock ) throws IOException {
AutoBuffer( SocketChannel sock ) throws IOException {
_chan = sock;
raisePriority(); // Make TCP priority high
_bb = bbMake();
Expand All @@ -120,7 +120,7 @@ public AutoBuffer( SocketChannel sock ) throws IOException {
// Make an AutoBuffer to write to an H2ONode. Requests for full buffer will
// open a TCP socket and roll through writing to the target. Smaller
// requests will send via UDP.
public AutoBuffer( H2ONode h2o ) {
AutoBuffer( H2ONode h2o ) {
_bb = bbMake();
_chan = null; // Channel made lazily only if we write alot
_h2o = h2o;
Expand All @@ -143,7 +143,7 @@ public AutoBuffer( FileChannel fc, boolean read, byte persist ) {
}

// Read from UDP multicast. Same as the byte[]-read variant, except there is an H2O.
public AutoBuffer( DatagramPacket pack ) {
AutoBuffer( DatagramPacket pack ) {
_size = pack.getLength();
_bb = ByteBuffer.wrap(pack.getData(), 0, pack.getLength()).order(ByteOrder.nativeOrder());
_bb.position(0);
Expand All @@ -157,7 +157,7 @@ public AutoBuffer( DatagramPacket pack ) {
/** Read from a fixed byte[]; should not be closed. */
public AutoBuffer( byte[] buf ) { this(buf,0); }
/** Read from a fixed byte[]; should not be closed. */
public AutoBuffer( byte[] buf, int off ) {
AutoBuffer( byte[] buf, int off ) {
assert buf != null : "null fed to ByteBuffer.wrap";
_bb = ByteBuffer.wrap(buf).order(ByteOrder.nativeOrder());
_bb.position(off);
Expand Down Expand Up @@ -192,8 +192,7 @@ public AutoBuffer( int len ) {
_persist = 0; // No persistance
}

@Override
public String toString() {
@Override public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("[AB ").append(_read ? "read " : "write ");
sb.append(_firstPage?"first ":"2nd ").append(_h2o);
Expand All @@ -213,17 +212,17 @@ public String toString() {
private static final AtomicInteger BBFREE = new AtomicInteger(0);
private static final AtomicInteger BBCACHE= new AtomicInteger(0);
private static final LinkedBlockingDeque<ByteBuffer> BBS = new LinkedBlockingDeque<ByteBuffer>();
public static final int BBSIZE = 64*1024; // Bytebuffer "common big size"
static final int BBSIZE = 64*1024; // Bytebuffer "common big size"
private static void bbstats( AtomicInteger ai ) {
if( !DEBUG ) return;
if( (ai.incrementAndGet()&511)==511 ) {
Log.warn("BB make="+BBMAKE.get()+" free="+BBFREE.get()+" cache="+BBCACHE.get()+" size="+BBS.size());
}
}

private static final ByteBuffer bbMake() {
private static ByteBuffer bbMake() {
while( true ) { // Repeat loop for DBB OutOfMemory errors
ByteBuffer bb = null;
ByteBuffer bb;
try { bb = BBS.pollFirst(0,TimeUnit.SECONDS); }
catch( InterruptedException e ) { throw Log.errRTExcept(e); }
if( bb != null ) {
Expand All @@ -238,17 +237,17 @@ private static final ByteBuffer bbMake() {
// java.lang.OutOfMemoryError: Direct buffer memory
if( !"Direct buffer memory".equals(oome.getMessage()) ) throw oome;
System.out.println("Sleeping & retrying");
try { Thread.sleep(100); } catch( InterruptedException ie ) { }
try { Thread.sleep(100); } catch( InterruptedException ignore ) { }
}
}
}
private static final void bbFree(ByteBuffer bb) {
private static void bbFree(ByteBuffer bb) {
bbstats(BBFREE);
bb.clear();
BBS.offerFirst(bb);
}

private final int bbFree() {
private int bbFree() {
if( _bb != null && _bb.isDirect() ) bbFree(_bb);
_bb = null;
return 0; // Flow-coding
Expand All @@ -260,7 +259,7 @@ private final int bbFree() {
// ByteBuffer write. It *appears* that the reader is unaware that a writer
// was told "go ahead and write" by the TCP stack, so all these fails are
// only on the writer-side.
public static class AutoBufferException extends RuntimeException {
static class AutoBufferException extends RuntimeException {
final IOException _ioe;
AutoBufferException( IOException ioe ) { _ioe = ioe; }
}
Expand Down Expand Up @@ -346,7 +345,7 @@ private void tcpOpen() throws IOException {
// us before closing the channel, we can not read till the end. So we just
// close the channel and let the other side to deal with it and figure out
// the task has been cancelled (still sending ack ack back).
public void drainClose() {
void drainClose() {
try {
// Appears to work reasonably now; removing noisy printout
//try { Log.info("drainClose channel to " + ((SocketChannel) _chan).socket().getInetAddress()); }
Expand All @@ -369,27 +368,20 @@ public void drainClose() {
int zeros() { return _zeros; }

// Available bytes in this buffer to read
public int remaining() { return _bb.remaining(); }
public int position () { return _bb.position (); }
public void position(int pos) { _bb.position(pos); }
public int limit() { return _bb.limit(); }

public void positionWithResize(int value) {
putSp(value - position());
position(value);
}

// Return byte[] from a writable AutoBuffer
public final byte[] buf() {
assert _h2o==null && _chan==null && _read==false && !_bb.isDirect();
assert _h2o==null && _chan==null && !_read && !_bb.isDirect();
return MemoryManager.arrayCopyOfRange(_bb.array(), _bb.arrayOffset(), _bb.position());
}
public final byte[] bufClose() {
byte[] res = _bb.array();
bbFree();
return res;
}
public final boolean eof() {
final boolean eof() {
assert _h2o==null && _chan==null;
return _bb.position()==_bb.limit();
}
Expand Down Expand Up @@ -428,16 +420,16 @@ private int udpSend() throws IOException {
}

// Flip to write-mode
public AutoBuffer clearForWriting() {
assert _read == true;
AutoBuffer clearForWriting() {
assert _read;
_read = false;
_bb.clear();
_firstPage = true;
return this;
}
// Flip to read-mode
public AutoBuffer flipForReading() {
assert _read == false;
assert !_read;
_read = true;
_bb.flip();
_firstPage = true;
Expand Down Expand Up @@ -482,7 +474,7 @@ private ByteBuffer getImpl( int sz ) {
// Windows message for a reset-channel
if( e.getMessage().equals("An established connection was aborted by the software in your host machine") )
throw new AutoBufferException(e);
throw Log.errRTExcept(e);
throw Log.errRTExcept(e);
}
}
_time_io_ns += (System.nanoTime()-ns);
Expand Down Expand Up @@ -543,12 +535,6 @@ private ByteBuffer sendPartial() {
return _bb;
}

public int peek1() {
if (eof())
return 0;
getSp(1);
return get1(position());
}
public String getStr(int off, int len) {
return new String(_bb.array(), _bb.arrayOffset()+off, len);
}
Expand Down Expand Up @@ -619,13 +605,13 @@ public AutoBuffer put(Iced f) {
// Int will take 1+4 bytes, and bigger values 1+8 bytes. This compression is
// optimized for small integers (including -1 which is often used as a "array
// is null" flag when passing the array length).
public AutoBuffer putInt( int x ) {
AutoBuffer putInt( int x ) {
if( 0 <= (x+1)&& (x+1) <= 253 ) return put1(x+1);
if( Short.MIN_VALUE <= x && x <= Short.MAX_VALUE ) return put1(255).put2((short)x);
return put1(254).put4(x);
}
// Get a (compressed) integer. See above for the compression strategy and reasoning.
public int getInt( ) {
int getInt( ) {
int x = get1();
if( x <= 253 ) return x-1;
if( x==255 ) return (short)get2();
Expand All @@ -639,7 +625,7 @@ public int getInt( ) {
// putInt(# of leading nulls)
// putInt(# of non-nulls)
// If # of non-nulls is > 0, putInt( # of trailing nulls)
public long putZA( Object[] A ) {
long putZA( Object[] A ) {
if( A==null ) { putInt(-1); return 0; }
int x=0; for( ; x<A.length; x++ ) if( A[x ]!=null ) break;
int y=A.length; for( ; y>x; y-- ) if( A[y-1]!=null ) break;
Expand All @@ -653,7 +639,7 @@ public long putZA( Object[] A ) {
// Returns -1 if null.
// Returns a long of (leading zeros | middle non-zeros).
// If there are non-zeros, caller has to read the trailing zero-length.
public long getZA( ) {
long getZA( ) {
int x=getInt(); // Length of leading zeros
if( x == -1 ) return -1; // or a null
int nz=getInt(); // Non-zero in the middle
Expand Down Expand Up @@ -802,16 +788,16 @@ public int read( byte[] buf, int off, int len ) {
// -----------------------------------------------
// Utility functions to handle common UDP packet tasks.
// Get the 1st control byte
public int getCtrl( ) { return getSz(1).get(0)&0xFF; }
int getCtrl( ) { return getSz(1).get(0)&0xFF; }
// Get the port in next 2 bytes
public int getPort( ) { return getSz(1+2).getChar(1); }
int getPort( ) { return getSz(1+2).getChar(1); }
// Get the task# in the next 4 bytes
public int getTask( ) { return getSz(1+2+4).getInt(1+2); }
int getTask( ) { return getSz(1+2+4).getInt(1+2); }
// Get the flag in the next 1 byte
public int getFlag( ) { return getSz(1+2+4+1).get(1+2+4); }
int getFlag( ) { return getSz(1+2+4+1).get(1+2+4); }

// Set the ctrl, port, task. Ready to write more bytes afterwards
public AutoBuffer putUdp (UDP.udp type) {
AutoBuffer putUdp (UDP.udp type) {
assert _bb.position()==0;
putSp(1+2);
_bb.put ((byte)type.ordinal());
Expand All @@ -820,10 +806,10 @@ public AutoBuffer putUdp (UDP.udp type) {
return this;
}

public AutoBuffer putTask(UDP.udp type, int tasknum) {
AutoBuffer putTask(UDP.udp type, int tasknum) {
return putUdp(type).put4(tasknum);
}
public AutoBuffer putTask(int ctrl, int tasknum) {
AutoBuffer putTask(int ctrl, int tasknum) {
assert _bb.position()==0;
putSp(1+2+4);
_bb.put((byte)ctrl).putChar((char)H2O.UDP_PORT).putInt(tasknum);
Expand Down Expand Up @@ -908,7 +894,7 @@ public long[] getA8( ) {
case 2: for( int i=x; i<x+y; i++ ) buf[i] = (short)get2(); return buf;
case 4: for( int i=x; i<x+y; i++ ) buf[i] = get4(); return buf;
case 8: break;
default: H2O.fail();
default: throw H2O.fail();
}

int sofar = x;
Expand Down Expand Up @@ -1058,7 +1044,7 @@ public AutoBuffer putA1( byte[] ary, int sofar, int length ) {
}
return this;
}
public AutoBuffer putA2( short[] ary ) {
AutoBuffer putA2( short[] ary ) {
_arys++;
if( ary == null ) return putInt(-1);
putInt(ary.length);
Expand Down Expand Up @@ -1161,7 +1147,7 @@ public AutoBuffer putA8d( double[] ary ) {
return this;
}

public AutoBuffer putAA1( byte[][] ary ) {
AutoBuffer putAA1( byte[][] ary ) {
_arys++;
long xy = putZA(ary);
if( xy == -1 ) return this;
Expand All @@ -1170,7 +1156,7 @@ public AutoBuffer putAA1( byte[][] ary ) {
for( int i=x; i<x+y; i++ ) putA1(ary[i]);
return this;
}
public AutoBuffer putAA2( short[][] ary ) {
AutoBuffer putAA2( short[][] ary ) {
_arys++;
long xy = putZA(ary);
if( xy == -1 ) return this;
Expand Down Expand Up @@ -1215,7 +1201,7 @@ public AutoBuffer putAA8d( double[][] ary ) {
for( int i=x; i<x+y; i++ ) putA8d(ary[i]);
return this;
}
public AutoBuffer putAAA4( int[][][] ary ) {
AutoBuffer putAAA4( int[][][] ary ) {
_arys++;
long xy = putZA(ary);
if( xy == -1 ) return this;
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/water/FJPacket.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package water;

import water.H2O.H2OCountedCompleter;

/**
Expand All @@ -9,7 +10,7 @@
* @author <a href="mailto:[email protected]"></a>
* @version 1.0
*/
public class FJPacket extends H2OCountedCompleter {
class FJPacket extends H2OCountedCompleter {
final AutoBuffer _ab;
final int _ctrl; // 1st byte of packet
FJPacket( AutoBuffer ab, int ctrl ) { _ab = ab; _ctrl = ctrl; }
Expand All @@ -29,6 +30,7 @@ public class FJPacket extends H2OCountedCompleter {
H2O.MAX_PRIORITY, // Rebooted
H2O.MAX_PRIORITY, // Timeline
H2O.ACK_ACK_PRIORITY,// Ack Ack
H2O.FETCH_ACK_PRIORITY, // Class/ID mapping ACK
H2O.ACK_PRIORITY, // Ack
H2O.DESERIAL_PRIORITY}; // Exec is very high, so we deserialize early
@Override public byte priority() { return UDP_PRIORITIES[_ctrl]; }
Expand Down
Loading

0 comments on commit 375a460

Please sign in to comment.