Skip to content

Commit

Permalink
Merge branch 'h2o-parsemanycols'
Browse files Browse the repository at this point in the history
  • Loading branch information
arnocandel committed Jan 13, 2015
2 parents e1211d4 + d9a9919 commit 9210ec3
Show file tree
Hide file tree
Showing 12 changed files with 138 additions and 40 deletions.
49 changes: 49 additions & 0 deletions R/examples/manycols.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Install and Launch H2O R package
if ("package:h2o" %in% search()) { detach("package:h2o", unload=TRUE) }
if ("h2o" %in% rownames(installed.packages())) { remove.packages("h2o") }
install.packages("h2o", repos=(c("http://h2o-release.s3.amazonaws.com/h2o/h2o-parsemanycols/8/R", getOption("repos"))))
library(h2o)

# Connect to cluster (8 nodes with -Xmx 40g each)

# Launch H2O Cluster with YARN on HDP2.1
#wget http://h2o-release.s3.amazonaws.com/h2o/h2o-parsemanycols/8/h2o-2.9.0.8.zip
#unzip h2o-2.9.0.8.zip
#cd h2o-2.9.0.8/hadoop
#hadoop fs -rm -r myDir
#hadoop jar h2odriver_hdp2.1.jar water.hadoop.h2odriver -libjars ../h2o.jar -n 8 -mapperXmx 40g -output myDir -baseport 61111 -data_max_factor_levels 65000 -chunk_bits 24

h2oCluster <- h2o.init(ip="mr-0xd1", port=61111)

# Read data from HDFS
data.hex <- h2o.importFile(h2oCluster, "hdfs://mr-0xd6/datasets/15Mx2.2k.csv")

# Create 80/20 train/validation split
random <- h2o.runif(data.hex, seed = 123456789)
train <- h2o.assign(data.hex[random < .8,], "X15Mx2_2k_part0.hex")
valid <- h2o.assign(data.hex[random >= .8,], "X15Mx2_2k_part1.hex")

# Delete full training data and temporaries - only needed if memory is tight
h2o.rm(h2oCluster, "15Mx2_2k.hex") # optional
h2o.rm(h2oCluster, grep(pattern = "Last.value", x = h2o.ls(h2oCluster)$Key, value = TRUE))

response=2 #1:1000 imbalance
predictors=c(3:ncol(data.hex))

# Start modeling

# GLM
mdl.glm <- h2o.glm(x=predictors, y=response, data=train, lambda_search=T, family="binomial", max_predictors=100) #nfolds=5 is optional
mdl.glm

# compute validation error for GLM
pred.glm <- h2o.predict(mdl.glm, valid)
h2o.performance(pred.glm[,3], valid[,response], measure="F1")

# Gradient Boosted Trees
mdl.gbm <- h2o.gbm(x=predictors, y=response, data=train, validation=valid, importance=T, balance.classes = T, class.sampling.factors = c(1,250))
mdl.gbm

# Random Forest
mdl.rf <- h2o.randomForest(x=predictors, y=response, data=train, validation=valid, type="BigData", depth=15, importance=T, balance.classes = T, class.sampling.factors = c(1,250))
mdl.rf
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ check.deeplearning_autoencoder <- function(conn) {
cm <- h2o.confusionMatrix(test_preds[,1], test_labels)
cm

checkTrue(cm[length(cm)] == 0.104) #10% test set error
checkTrue(cm[length(cm)] == 0.1085) #10% test set error

testEnd()
}
Expand Down
18 changes: 18 additions & 0 deletions hadoop/src/main/java/water/hadoop/h2odriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class h2odriver extends Configured implements Tool {
static int cloudFormationTimeoutSeconds = DEFAULT_CLOUD_FORMATION_TIMEOUT_SECONDS;
static int nthreads = -1;
static int basePort = -1;
static int chunk_bits;
static int data_max_factor_levels;
static boolean beta = false;
static boolean enableRandomUdpDrop = false;
static boolean enableExceptions = false;
Expand Down Expand Up @@ -389,6 +391,8 @@ static void usage() {
" -n | -nodes <number of H2O nodes (i.e. mappers) to create>\n" +
" [-nthreads <maximum typical worker threads, i.e. cpus to use>]\n" +
" [-baseport <starting HTTP port for H2O nodes; default is 54321>]\n" +
" [-chunk_bits <bits per chunk (e.g., 22 for 4MB chunks)>]\n" +
" [-data_max_factor_levels <max. number of factors per column (e.g., 65000)>]\n" +
" [-ea]\n" +
" [-verbose:gc]\n" +
" [-XX:+PrintGCDetails]\n" +
Expand Down Expand Up @@ -543,6 +547,14 @@ else if (s.equals("-nthreads")) {
i++; if (i >= args.length) { usage(); }
nthreads = Integer.parseInt(args[i]);
}
else if (s.equals("-chunk_bits")) {
i++; if (i >= args.length) { usage(); }
chunk_bits = Integer.parseInt(args[i]);
}
else if (s.equals("-data_max_factor_levels")) {
i++; if (i >= args.length) { usage(); }
data_max_factor_levels = Integer.parseInt(args[i]);
}
else if (s.equals("-baseport")) {
i++; if (i >= args.length) { usage(); }
basePort = Integer.parseInt(args[i]);
Expand Down Expand Up @@ -912,6 +924,12 @@ private int run2(String[] args) throws Exception {
if (beta) {
conf.set(h2omapper.H2O_BETA_KEY, "-beta");
}
if (chunk_bits > 0) {
conf.set(h2omapper.H2O_CHUNKBITS_KEY, Integer.toString(chunk_bits));
}
if (data_max_factor_levels > 0) {
conf.set(h2omapper.H2O_DATAMAXFACTORLEVELS_KEY, Integer.toString(data_max_factor_levels));
}
if (enableRandomUdpDrop) {
conf.set(h2omapper.H2O_RANDOM_UDP_DROP_KEY, "-random_udp_drop");
}
Expand Down
18 changes: 18 additions & 0 deletions hadoop/src/main/java/water/hadoop/h2omapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public class h2omapper extends Mapper<Text, Text, Text, Text> {
final static public String H2O_BETA_KEY = "h2o.beta";
final static public String H2O_RANDOM_UDP_DROP_KEY = "h2o.random.udp.drop";
final static public String H2O_NTHREADS_KEY = "h2o.nthreads";
final static public String H2O_CHUNKBITS_KEY = "h2o.chunk.bits";
final static public String H2O_DATAMAXFACTORLEVELS_KEY = "h2o.data.max.factor.levels";
final static public String H2O_BASE_PORT_KEY = "h2o.baseport";
final static public String H2O_LICENSE_DATA_KEY = "h2o.license.data";
final static public String H2O_HADOOP_VERSION = "h2o.hadoop.version";
Expand Down Expand Up @@ -360,6 +362,8 @@ private int run2(Context context) throws IOException, InterruptedException {
String driverIp = conf.get(H2O_DRIVER_IP_KEY);
String driverPortString = conf.get(H2O_DRIVER_PORT_KEY);
String network = conf.get(H2O_NETWORK_KEY);
String chunkBitsString = conf.get(H2O_CHUNKBITS_KEY);
String dataMaxFactorLevelsString = conf.get(H2O_DATAMAXFACTORLEVELS_KEY);
String nthreadsString = conf.get(H2O_NTHREADS_KEY);
String basePortString = conf.get(H2O_BASE_PORT_KEY);
String betaString = conf.get(H2O_BETA_KEY);
Expand Down Expand Up @@ -401,6 +405,20 @@ private int run2(Context context) throws IOException, InterruptedException {
argsList.add(Integer.toString(basePort));
}
}
if (dataMaxFactorLevelsString != null) {
if (dataMaxFactorLevelsString.length() > 0) {
argsList.add("-max_data_factor_levels");
int dataMaxFactorLevels = Integer.parseInt(dataMaxFactorLevelsString);
argsList.add(Integer.toString(dataMaxFactorLevels));
}
}
if (chunkBitsString != null) {
if (chunkBitsString.length() > 0) {
argsList.add("-chunk_bits");
int chunkBits = Integer.parseInt(chunkBitsString);
argsList.add(Integer.toString(chunkBits));
}
}
if (betaString != null) {
if (betaString.length() > 0) {
argsList.add(betaString);
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/water/H2O.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public final class H2O {
// Max. number of factor levels ber column (before flipping all to NAs)
public static int DATA_MAX_FACTOR_LEVELS = 65000;

public static int LOG_CHK = 24; // Chunks are 1<<24, or 16Meg

// The multicast discovery port
static MulticastSocket CLOUD_MULTICAST_SOCKET;
static NetworkInterface CLOUD_MULTICAST_IF;
Expand Down Expand Up @@ -713,6 +715,7 @@ public static class OptArgs extends Arguments.Opt {
public String version = null;
public String single_precision = null;
public int data_max_factor_levels;
public int chunk_bits;
public String beta = null;
public String mem_watchdog = null; // For developer debugging
public boolean md5skip = false;
Expand Down Expand Up @@ -765,6 +768,10 @@ public static void printHelp() {
" from double to single precision to save memory of numerical data.\n" +
" (The default is double precision.)\n" +
"\n" +
" -chunk_bits <integer>\n" +
" The number of bits per chunk.\n" +
" (The default is " + LOG_CHK + ", which is " + PrettyPrint.bytes(1<<LOG_CHK) + ".)\n" +
"\n" +
" -data_max_factor_levels <integer>\n" +
" The maximum number of factor levels for categorical columns.\n" +
" Columns with more than the specified number of factor levels\n" +
Expand Down Expand Up @@ -918,6 +925,12 @@ public static void main( String[] args ) {
Log.info("Max. number of factor levels per column: " + DATA_MAX_FACTOR_LEVELS);
}

if (OPT_ARGS.chunk_bits != 0) {
if (OPT_ARGS.chunk_bits > 0)
LOG_CHK = OPT_ARGS.chunk_bits;
}
Log.info("Chunk size: " + PrettyPrint.bytes(1<<LOG_CHK));

// Get ice path before loading Log or Persist class
String ice = DEFAULT_ICE_ROOT();
if( OPT_ARGS.ice_root != null ) ice = OPT_ARGS.ice_root.replace("\\", "/");
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/water/Value.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
// In any case, they will cause issues with both GC (giant pause times on
// many collectors) and I/O (long term blocking of TCP I/O channels to
// service a single request, causing starvation of other requests).
public static final int MAX = 20*1024*1024;
public static final int MAX = 80*1024*1024;

// ---
// Values are wads of bits; known small enough to 'chunk' politely on disk,
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/water/fvec/FileVec.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
public abstract class FileVec extends ByteVec {
long _len; // File length
final byte _be;
public static final int CHUNK_SZ = 1 << LOG_CHK;
public static final int CHUNK_SZ = 1 << H2O.LOG_CHK;


protected FileVec(Key key, long len, byte be) {
Expand All @@ -15,7 +15,7 @@ protected FileVec(Key key, long len, byte be) {
}

@Override public long length() { return _len; }
@Override public int nChunks() { return (int)Math.max(1,_len>>LOG_CHK); }
@Override public int nChunks() { return (int)Math.max(1,_len>>H2O.LOG_CHK); }
@Override public boolean writable() { return false; }

//NOTE: override ALL rollups-related methods or ALL files will be loaded after import.
Expand Down Expand Up @@ -46,7 +46,7 @@ protected FileVec(Key key, long len, byte be) {
@Override
public int elem2ChunkIdx(long i) {
assert 0 <= i && i <= _len : " "+i+" < "+_len;
int cidx = (int)(i>>LOG_CHK);
int cidx = (int)(i>>H2O.LOG_CHK);
int nc = nChunks();
if( i >= _len ) return nc;
if( cidx >= nc ) cidx=nc-1; // Last chunk is larger
Expand All @@ -56,9 +56,9 @@ public int elem2ChunkIdx(long i) {
// Convert a chunk-index into a starting row #. Constant sized chunks
// (except for the last, which might be a little larger), and size-1 rows so
// this is a little shift-n-add math.
@Override public long chunk2StartElem( int cidx ) { return (long)cidx <<LOG_CHK; }
@Override public long chunk2StartElem( int cidx ) { return (long)cidx <<H2O.LOG_CHK; }
// Convert a chunk-key to a file offset. Size 1 rows, so this is a direct conversion.
static public long chunkOffset ( Key ckey ) { return (long)chunkIdx(ckey)<<LOG_CHK; }
static public long chunkOffset ( Key ckey ) { return (long)chunkIdx(ckey)<<H2O.LOG_CHK; }
// Reverse: convert a chunk-key into a cidx
static public int chunkIdx(Key ckey) { assert ckey._kb[0]==Key.DVEC; return UDP.get4(ckey._kb,1+1+4); }

Expand Down
9 changes: 6 additions & 3 deletions src/main/java/water/fvec/NewChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,8 @@ void append2( long l, int x ) {

// Slow-path append data
private void append2slowd() {
if( _sparseLen > Vec.CHUNK_SZ )
final int CHUNK_SZ = 1 << H2O.LOG_CHK;
if( _sparseLen > CHUNK_SZ )
throw new ArrayIndexOutOfBoundsException(_sparseLen);
assert _ls==null;
if(_ds != null && _ds.length > 0){
Expand All @@ -355,7 +356,8 @@ private void append2slowd() {
}
// Slow-path append data
private void append2slowUUID() {
if( _sparseLen > Vec.CHUNK_SZ )
final int CHUNK_SZ = 1 << H2O.LOG_CHK;
if( _sparseLen > CHUNK_SZ )
throw new ArrayIndexOutOfBoundsException(_sparseLen);
if( _ds==null && _ls!=null ) { // This can happen for columns with all NAs and then a UUID
_xs=null;
Expand All @@ -374,7 +376,8 @@ private void append2slowUUID() {
}
// Slow-path append data
private void append2slow( ) {
if( _sparseLen > Vec.CHUNK_SZ )
final int CHUNK_SZ = 1 << H2O.LOG_CHK;
if( _sparseLen > CHUNK_SZ )
throw new ArrayIndexOutOfBoundsException(_sparseLen);
assert _ds==null;
if(_ls != null && _ls.length > 0){
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/water/fvec/UploadFileVec.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public void close(C1NChunk c, int cidx, Futures fs) {
assert _len==-1; // Not closed
c._vec = this; // Attach chunk to this vec.
DKV.put(chunkKey(cidx),c,fs); // Write updated chunk back into K/V
_len = ((_nchunks-1L)<<LOG_CHK)+c._len;
_len = ((_nchunks-1L)<<H2O.LOG_CHK)+c._len;
}

@Override public Value chunkIdx( int cidx ) {
Expand Down
10 changes: 3 additions & 7 deletions src/main/java/water/fvec/Vec.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@
* @author Cliff Click
*/
public class Vec extends Iced {
/** Log-2 of Chunk size. */
public static final int LOG_CHK = 22; // Chunks are 1<<22, or 4Meg
/** Chunk size. Bigger increases batch sizes, lowers overhead costs, lower
* increases fine-grained parallelism. */
public static final int CHUNK_SZ = 1 << LOG_CHK;

/** Key mapping a Value which holds this Vec. */
final public Key _key; // Top-level key
Expand Down Expand Up @@ -224,10 +219,11 @@ public void map(Chunk[] cs) {
}.doAll(makeConSeq(0, len)).vecs(0);
}
public static Vec makeConSeq(double x, long len) {
int chunks = (int)Math.ceil((double)len / Vec.CHUNK_SZ);
final int CHUNK_SZ = 1 << H2O.LOG_CHK;
int chunks = (int)Math.ceil((double)len / CHUNK_SZ);
long[] espc = new long[chunks+1];
for (int i = 1; i<=chunks; ++i)
espc[i] = Math.min(espc[i-1] + Vec.CHUNK_SZ, len);
espc[i] = Math.min(espc[i-1] + CHUNK_SZ, len);
return new Vec(VectorGroup.VG_LEN1.addVec(), espc).makeCon(x);
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/water/util/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,7 @@ public static Compression guessCompressionMethod(byte [] bits){
break;
off += len;
if( off == bs.length ) { // Dataset is uncompressing alot! Need more space...
if( bs.length >= water.fvec.Vec.CHUNK_SZ )
if( bs.length >= (1 << H2O.LOG_CHK))
break; // Already got enough
bs = Arrays.copyOf(bs, bs.length * 2);
}
Expand Down
43 changes: 22 additions & 21 deletions src/test/java/water/fvec/VecTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

/** This test tests stability of Vec API. */
public class VecTest extends TestUtil {
final int CHUNK_SZ = 1 << H2O.LOG_CHK;

/** Test toEnum call to return correct domain. */
@Test public void testToEnum() {
Expand Down Expand Up @@ -75,62 +76,62 @@ private static final void testChangeDomainImpl(){
@Test public void testMakeConSeq() {
Vec v;

v = makeConSeq(0xCAFE,Vec.CHUNK_SZ);
v = makeConSeq(0xCAFE,CHUNK_SZ);
assertTrue(v.at(234) == 0xCAFE);
assertTrue(v._espc.length == 2);
assertTrue(
v._espc[0] == 0 &&
v._espc[1] == Vec.CHUNK_SZ
v._espc[1] == CHUNK_SZ
);
v.remove(new Futures()).blockForPending();

v = makeConSeq(0xCAFE,2*Vec.CHUNK_SZ);
v = makeConSeq(0xCAFE,2*CHUNK_SZ);
assertTrue(v.at(234) == 0xCAFE);
assertTrue(v.at(2*Vec.CHUNK_SZ-1) == 0xCAFE);
assertTrue(v.at(2*CHUNK_SZ-1) == 0xCAFE);
assertTrue(v._espc.length == 3);
assertTrue(
v._espc[0] == 0 &&
v._espc[1] == Vec.CHUNK_SZ &&
v._espc[2] == Vec.CHUNK_SZ*2
v._espc[1] == CHUNK_SZ &&
v._espc[2] == CHUNK_SZ*2
);
v.remove(new Futures()).blockForPending();

v = makeConSeq(0xCAFE,2*Vec.CHUNK_SZ+1);
v = makeConSeq(0xCAFE,2*CHUNK_SZ+1);
assertTrue(v.at(234) == 0xCAFE);
assertTrue(v.at(2*Vec.CHUNK_SZ) == 0xCAFE);
assertTrue(v.at(2*CHUNK_SZ) == 0xCAFE);
assertTrue(v._espc.length == 4);
assertTrue(
v._espc[0] == 0 &&
v._espc[1] == Vec.CHUNK_SZ &&
v._espc[2] == Vec.CHUNK_SZ*2 &&
v._espc[3] == Vec.CHUNK_SZ*2+1
v._espc[1] == CHUNK_SZ &&
v._espc[2] == CHUNK_SZ*2 &&
v._espc[3] == CHUNK_SZ*2+1
);
v.remove(new Futures()).blockForPending();

v = makeConSeq(0xCAFE,3*Vec.CHUNK_SZ);
v = makeConSeq(0xCAFE,3*CHUNK_SZ);
assertTrue(v.at(234) == 0xCAFE);
assertTrue(v.at(3*Vec.CHUNK_SZ-1) == 0xCAFE);
assertTrue(v.at(3*CHUNK_SZ-1) == 0xCAFE);
assertTrue(v._espc.length == 4);
assertTrue(
v._espc[0] == 0 &&
v._espc[1] == Vec.CHUNK_SZ &&
v._espc[2] == Vec.CHUNK_SZ*2 &&
v._espc[3] == Vec.CHUNK_SZ*3
v._espc[1] == CHUNK_SZ &&
v._espc[2] == CHUNK_SZ*2 &&
v._espc[3] == CHUNK_SZ*3
);
v.remove(new Futures()).blockForPending();
}
// Test HEX-1819
@Test public void testMakeSeq() {
Vec v = makeSeq(3*Vec.CHUNK_SZ);
Vec v = makeSeq(3*CHUNK_SZ);
assertTrue(v.at(0) == 1);
assertTrue(v.at(234) == 235);
assertTrue(v.at(2*Vec.CHUNK_SZ) == 2*Vec.CHUNK_SZ+1);
assertTrue(v.at(2*CHUNK_SZ) == 2*CHUNK_SZ+1);
assertTrue(v._espc.length == 4);
assertTrue(
v._espc[0] == 0 &&
v._espc[1] == Vec.CHUNK_SZ &&
v._espc[2] == Vec.CHUNK_SZ * 2 &&
v._espc[3] == Vec.CHUNK_SZ * 3
v._espc[1] == CHUNK_SZ &&
v._espc[2] == CHUNK_SZ * 2 &&
v._espc[3] == CHUNK_SZ * 3
);
v.remove(new Futures()).blockForPending();
}
Expand Down

0 comments on commit 9210ec3

Please sign in to comment.