Skip to content

Commit

Permalink
More API refinement
Browse files Browse the repository at this point in the history
append2 is only on NewChunk now; MRTask2.map explicitly uses it.
Use 1Meg-element chunks for fluidvecs by default
Fix bugs parsing across chunk boundaries & invalid cols.
Change junit to use larger file.  Move WordCount into WordCountTest
file.
  • Loading branch information
cliffclick committed Jun 24, 2013
1 parent 8232acb commit 92d091e
Show file tree
Hide file tree
Showing 17 changed files with 212 additions and 210 deletions.
8 changes: 4 additions & 4 deletions src/main/java/water/MRTask2.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ public abstract class MRTask2<T extends MRTask2> extends DTask implements Clonea

// Run some useful function over this <strong>local</strong> Chunk, and
// record the results in the <em>this<em> MRTask2.
public void map( Chunk bv ) { }
public void map( Chunk bv0, Chunk bv1 ) { }
public void map( Chunk bvs[] ) { }
public void map( Chunk bv ) { }
public void map( NewChunk bv0, Chunk bv1 ) { }
public void map( Chunk bvs[] ) { }

// Combine results from 'mrt' into 'this' MRTask2. Both 'this' and 'mrt' are
// guaranteed to either have map() run on them, or be the results of a prior
Expand Down Expand Up @@ -138,7 +138,7 @@ private final RPC<T> remote_compute( int lo, int hi ) {

// Call all the various map() calls that apply
if( _fr._vecs.length == 1 ) map(bvs[0]);
if( _fr._vecs.length == 2 ) map(bvs[0], bvs[1]);
if( _fr._vecs.length == 2 && bvs[0] instanceof NewChunk) map((NewChunk)bvs[0], bvs[1]);
if( true ) map(bvs );
_res = self(); // Save results since called map() at least once!

Expand Down
1 change: 0 additions & 1 deletion src/main/java/water/fvec/C0LChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ public class C0LChunk extends Chunk {
}
@Override protected final long at8_impl( int i ) { return _con; }
@Override protected final double atd_impl( int i ) {return _con; }
@Override void append2 ( long l, int exp ) { throw H2O.fail(); }
@Override boolean set8_impl(int idx, long l) { return l==_con; }
@Override boolean hasFloat() { return false; }
@Override public AutoBuffer write(AutoBuffer bb) { return bb.putA1(_mem,_mem.length); }
Expand Down
1 change: 0 additions & 1 deletion src/main/java/water/fvec/C1Chunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ public class C1Chunk extends Chunk {
long res = 0xFF&_mem[i+OFF];
return (res == _NA)?_vec._fNA:res;
}
@Override void append2 ( long l, int exp ) { throw H2O.fail(); }
@Override boolean hasFloat() { return false; }
@Override public AutoBuffer write(AutoBuffer bb) { return bb.putA1(_mem,_mem.length); }
@Override public C1Chunk read(AutoBuffer bb) {
Expand Down
1 change: 0 additions & 1 deletion src/main/java/water/fvec/C1SChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ public class C1SChunk extends Chunk {
long res = 0xFF&_mem[i+OFF];
return (res == _NA)?_vec._fNA:(res+_bias)*_scale;
}
@Override void append2 ( long l, int exp ) { throw H2O.fail(); }
@Override boolean set8_impl(int idx, long l) { return false; }
@Override boolean hasFloat() { return _scale < 1.0; }
@Override public AutoBuffer write(AutoBuffer bb) { return bb.putA1(_mem,_mem.length); }
Expand Down
1 change: 0 additions & 1 deletion src/main/java/water/fvec/C2Chunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ public class C2Chunk extends Chunk {
int res = UDP.get2(_mem,(i<<1)+OFF);
return res == _NA?_vec._fNA:res;
}
@Override void append2 ( long l, int exp ) { throw H2O.fail(); }
@Override boolean set8_impl(int idx, long l) { return false; }
@Override boolean hasFloat() { return false; }
@Override public AutoBuffer write(AutoBuffer bb) { return bb.putA1(_mem,_mem.length); }
Expand Down
1 change: 0 additions & 1 deletion src/main/java/water/fvec/C2SChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ public class C2SChunk extends Chunk {
long res = UDP.get2(_mem,(i<<1)+OFF);
return (res == _NA)?_vec._fNA:(res + _bias)*_scale;
}
@Override void append2 ( long l, int exp ) { throw H2O.fail(); }
@Override boolean set8_impl(int idx, long l) { return false; }
@Override boolean hasFloat() { return _scale < 1.0; }
@Override public AutoBuffer write(AutoBuffer bb) { return bb.putA1(_mem,_mem.length); }
Expand Down
1 change: 0 additions & 1 deletion src/main/java/water/fvec/C4Chunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ public class C4Chunk extends Chunk {
long res = UDP.get4(_mem,i<<2);
return res == _NA?_vec._fNA:res;
}
@Override void append2 ( long l, int exp ) { throw H2O.fail(); }
@Override boolean set8_impl(int idx, long l) { return false; }
@Override boolean hasFloat() { return false; }
@Override public AutoBuffer write(AutoBuffer bb) { return bb.putA1(_mem,_mem.length); }
Expand Down
1 change: 0 additions & 1 deletion src/main/java/water/fvec/C4FChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ public class C4FChunk extends Chunk {
float res = UDP.get4f(_mem,i<<2);
return Float.isNaN(res)?_vec._fNA:res;
}
@Override void append2 ( long l, int exp ) { throw H2O.fail(); }
@Override boolean set8_impl(int idx, long l) { return false; }
@Override boolean hasFloat() { return true; }
@Override public AutoBuffer write(AutoBuffer bb) { return bb.putA1(_mem,_mem.length); }
Expand Down
1 change: 0 additions & 1 deletion src/main/java/water/fvec/C8Chunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ public class C8Chunk extends Chunk {
long res = UDP.get8(_mem,i<<3);
return res == _NA?_vec._fNA:res;
}
@Override void append2 ( long l, int exp ) { throw H2O.fail(); }
@Override boolean set8_impl(int idx, long l) { return false; }
@Override boolean hasFloat() { return false; }
@Override public AutoBuffer write(AutoBuffer bb) { return bb.putA1(_mem,_mem.length); }
Expand Down
1 change: 0 additions & 1 deletion src/main/java/water/fvec/C8DChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ public class C8DChunk extends Chunk {
double res = UDP.get8d(_mem,i<<3);
return Double.isNaN(res)?_vec._fNA:res;
}
@Override void append2 ( long l, int exp ) { throw H2O.fail(); }
@Override boolean set8_impl(int idx, long l) { return false; }
@Override boolean hasFloat() { return true; }
@Override public AutoBuffer write(AutoBuffer bb) {return bb.putA1(_mem,_mem.length);}
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/water/fvec/Chunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public abstract class Chunk extends Iced implements Cloneable {
// range-check by the JIT as expected.
public final double at0( int i ) { return _chk.atd_impl(i); }
public final long at80( int i ) { return _chk.at8_impl(i); }
public final boolean isNA0( int i ) { return valueIsNA(_chk.at80(i)); }
public final boolean isNA0( int i ) { return valueIsNA(at80(i)); }

// Load a double or long value from the 1-entry chunk cache, or miss-out.
// This version uses absolute element numbers, but must convert them to
Expand Down Expand Up @@ -88,11 +88,11 @@ public void close( int cidx, Futures fs ) {
else DKV.put(_vec.chunkKey(cidx),_chk,fs); // Write updated chunk back into K/V
}

public int cidx() { return _vec.elem2ChunkIdx(_start); }

// Chunk-specific readers.
abstract protected double atd_impl(int idx);
abstract protected long at8_impl(int idx);
// Chunk-specific append of data
abstract void append2( long l, int exp );
// Chunk-specific writer. Returns false if the value does not fit in the
// current compression scheme.
abstract boolean set8_impl(int idx, long l);
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/water/fvec/NFSFileVec.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private NFSFileVec(Key key, long len) {
Value val1 = DKV.get(dkey);// Check for an existing one... will fetch data as needed
if( val1 != null ) return val1; // Found an existing one?
// Lazily create a DVec for this chunk
int len = (int)(cidx < nchk-1 ? ValueArray.CHUNK_SZ : (_len-chunk2StartElem(cidx)));
int len = (int)(cidx < nchk-1 ? Vec.CHUNK_SZ : (_len-chunk2StartElem(cidx)));
// DVec is just the raw file data with a null-compression scheme
Value val2 = new Value(dkey,len,null,TypeMap.C1CHUNK,Value.NFS);
val2.setdsk(); // It is already on disk.
Expand Down
15 changes: 8 additions & 7 deletions src/main/java/water/fvec/NewChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,20 @@ protected final boolean isNA(int idx){
return _ls[idx] == 0 && _xs[idx] == Integer.MIN_VALUE;
}
// Fast-path append long data
@Override void append2( long l, int x ) {
void append2( long l, int x ) {
if( _len >= _ls.length ) append2slow();
_ls[_len] = l;
_xs[_len] = x;
_len++;
}
// Slow-path append data
void append2slow( ) {
if( _len > ValueArray.CHUNK_SZ )
if( _len > Vec.CHUNK_SZ )
throw new ArrayIndexOutOfBoundsException(_len);
_ls = Arrays.copyOf(_ls,_len<<1);
_xs = Arrays.copyOf(_xs,_len<<1);
}
void invalid() { append2(0,Integer.MIN_VALUE); }

// Do any final actions on a completed NewVector. Mostly: compress it, and
// do a DKV put on an appropriate Key. The original NewVector goes dead
Expand All @@ -58,7 +59,7 @@ Chunk compress() {
boolean overflow=false;
boolean floatOverflow = false;
for( int i=0; i<_len; i++ ) {
if(isNA(i))continue;
if( isNA(i) ) continue;
long l = _ls[i];
int x = _xs[i];
// Compute per-chunk min/sum/max
Expand Down Expand Up @@ -141,12 +142,12 @@ Chunk compress() {
private byte[] bufX( long bias, int scale, int off, int log ) {
byte[] bs = new byte[(_len<<log)+off];
for( int i=0; i<_len; i++ ) {
if(isNA(i)){
if( isNA(i) ) {
switch( log ) {
case 0: bs [i +off] = (byte)(0xFF&C1Chunk._NA); break;
case 0: bs [i +off] = (byte)(C1Chunk._NA); break;
case 1: UDP.set2(bs,(i<<1)+off, (short)C2Chunk._NA); break;
case 2: UDP.set4(bs,(i<<2)+off, (int)C4Chunk._NA); break;
case 3: UDP.set8(bs,(i<<3)+off, C8Chunk._NA); break;
case 2: UDP.set4(bs,(i<<2)+off, (int)C4Chunk._NA); break;
case 3: UDP.set8(bs,(i<<3)+off, C8Chunk._NA); break;
default: H2O.fail();
}
} else {
Expand Down
16 changes: 8 additions & 8 deletions src/main/java/water/fvec/ParseDataset2.java
Original file line number Diff line number Diff line change
Expand Up @@ -307,13 +307,14 @@ private class DParse extends MRTask2<DParse> {
final Chunk in = bvs[bvs.length-1];
final NewChunk[] nvs = new NewChunk[bvs.length-1];
for( int i=0; i<nvs.length; i++ ) nvs[i] = (NewChunk)bvs[i];
final int cidx = in.cidx();

// The Parser
CsvParser parser = new CsvParser(_setup,false) {
private byte[] _mem2; // Chunk following this one
private int _col=0; // Column #
@Override public byte[] getChunkData( int cidx ) {
if( cidx==0 ) return in._mem;
@Override public byte[] getChunkData( int cidx0 ) {
if( cidx0==cidx ) return in._mem;
if( _mem2 != null ) return _mem2;
Chunk in2 = in._vec.nextBV(in);
return in2 == null ? null : (_mem2=in2._mem);
Expand All @@ -322,15 +323,14 @@ private class DParse extends MRTask2<DParse> {
@Override public void newLine() {
if( _col > 0 )
while( _col < _cols.length )
addInvalidCol(_col++);
addInvalidCol(_col);
_col=0;
}

// Handle a new number column in the parser
@Override public void addNumCol(int colIdx, long number, int exp) {
assert colIdx==_col;
nvs[colIdx].append2(number,exp);
_col++; // Next column filled in
nvs[_col++].append2(number,exp);
}

@Override public void addStrCol(int colIdx, ValueString str) {
Expand All @@ -339,11 +339,11 @@ private class DParse extends MRTask2<DParse> {
_col++; // Next column filled in
throw H2O.unimpl();
}
@Override public void addInvalidCol(int colIdx) { throw H2O.unimpl(); }
@Override public void addInvalidCol(int colIdx) { assert colIdx==_col : "colIdx="+colIdx+" _col="+_col; nvs[_col++].invalid(); }
@Override public void rollbackLine() { }
@Override public boolean isString(int colIdx) { return false; }
};
parser.parse(0);
parser.parse(cidx);
}
}

Expand Down Expand Up @@ -395,7 +395,7 @@ public static CsvParser.Setup csvGuessValue( ByteVec vec, byte separator, Compre
break;
off += len;
if( off == bs.length ) { // Dataset is uncompressing alot! Need more space...
if( bs.length >= ValueArray.CHUNK_SZ )
if( bs.length >= Vec.CHUNK_SZ )
break; // Already got enough
bs = Arrays.copyOf(bs, bs.length * 2);
}
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/water/fvec/Vec.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
// Vec Key format is: Key. VEC - byte, 0 - byte, 0 - int, normal Key bytes.
// DVec Key format is: Key.DVEC - byte, 0 - byte, chunk# - int, normal Key bytes.
public class Vec extends Iced {
public static final int LOG_CHK = 20; // Chunks are 1<<20, or 1Meg
public static final long CHUNK_SZ = 1L << LOG_CHK;

final public Key _key; // Top-level key
// Element-start per chunk. Always zero for chunk 0. One more entry than
// chunks, so the last entry is the total number of rows. This field is
Expand Down
Loading

0 comments on commit 92d091e

Please sign in to comment.