Skip to content

Commit

Permalink
track mastervecs in rbind
Browse files Browse the repository at this point in the history
  • Loading branch information
spennihana committed Jan 8, 2015
1 parent 432717a commit aa1d616
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 27 deletions.
9 changes: 4 additions & 5 deletions R/h2o-package/R/Classes.R
Original file line number Diff line number Diff line change
Expand Up @@ -1164,12 +1164,11 @@ rbind.H2OParsedData <- function(..., deparse.level = 1) {
# l_dep <- sapply(substitute(placeholderFunction(...))[-1], deparse)
if(length(l) == 0) stop('rbind requires an H2O parsed dataset')

klass <- 'H2OParsedData'
# klass <- 'H2OParsedData'
h2o <- l[[1]]@h2o
nrows <- nrow(l[[1]])
m <- Map(function(elem){ inherits(elem, klass) & elem@h2o@ip == h2o@ip & elem@h2o@port == h2o@port & nrows == nrow(elem) }, l)
compatible <- Reduce(function(l,r) l & r, x=m, init=T)
if(!compatible){ stop(paste('rbind: all elements must be of type', klass, 'and in the same H2O instance'))}
# m <- Map(function(elem){ inherits(elem, klass) & elem@h2o@ip == h2o@ip & elem@h2o@port == h2o@port & nrows == nrow(elem) }, l)
# compatible <- Reduce(function(l,r) l & r, x=m, init=T)
# if(!compatible){ stop(paste('rbind: all elements must be of type', klass, 'and in the same H2O instance'))}

# If cbind(x,x), dupe colnames will automatically be renamed by H2O
if(is.null(names(l)))
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/water/api/Inspect2.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public static Response redirect(Request req, String src_key) {
sb.append("<td><b>").append(btn).append("</b></td>");
continue;
}
if(src_key.vecs()[i] instanceof TransfVec) {
if(src_key.vecs()[i] instanceof TransfVec || (src_key.vecs()[i].isInt() && src_key.vecs()[i].isEnum() && src_key.vecs()[i].masterVec() == null)) {
String btn2 = "<span class='btn_custom'>\n";
btn2 += "<a href='ToInt2.html?src_key=" + src_key._key.toString() + "&column_index=" + (i+1) + "'>"
+ "<button type='submit' class='btn btn-custom'>As Integer</button>\n";
Expand Down
13 changes: 10 additions & 3 deletions src/main/java/water/api/ToInt2.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,17 @@ public class ToInt2 extends Request2 {
@Override
protected Response serve() {
try {
if (column_index <= 0) throw new IllegalArgumentException("Column index is 1 based. Please supply a valid column index in the range [1,"+ src_key.numCols()+"]");
if (column_index <= 0 || column_index > src_key.numCols()) throw new IllegalArgumentException("Column index is 1 based. Please supply a valid column index in the range [1,"+ src_key.numCols()+"]");
Log.info("Integerizing column " + column_index);
assert src_key.vecs()[column_index - 1].masterVec().isInt();
Vec nv = src_key.vecs()[column_index - 1].masterVec();
Vec nv;
if ((nv= src_key.vecs()[column_index-1].masterVec()) == null) {
assert src_key.vecs()[column_index-1].isInt();
nv = src_key.vecs()[column_index-1];
nv._domain = null;
} else {
assert src_key.vecs()[column_index - 1].masterVec().isInt();
nv = src_key.vecs()[column_index - 1].masterVec();
}
src_key.replace(column_index - 1, nv);

} catch( Throwable e ) {
Expand Down
46 changes: 28 additions & 18 deletions src/main/java/water/exec/ASTOp.java
Original file line number Diff line number Diff line change
Expand Up @@ -1495,47 +1495,50 @@ private static Map<Integer, String> invert(Map<String, Integer> map) {

private static class ParallelRbinds extends H2O.H2OCountedCompleter{

private final Env _env;
private final Frame[] _f;
private final int _argcnt;
private final AtomicInteger _ctr;
private int _maxP = 100;

private long[] _espc;
private Vec[] _vecs;
ParallelRbinds(Env e, int argcnt) { _env = e; _argcnt = argcnt; _ctr = new AtomicInteger(_maxP-1); } //TODO pass maxP to constructor
ParallelRbinds(Frame[] f, int argcnt) { _f = f; _argcnt = argcnt; _ctr = new AtomicInteger(_maxP-1); } //TODO pass maxP to constructor

@Override public void compute2() {
addToPendingCount(_env.peekAry().numCols()-1);
addToPendingCount(_f[0].numCols()-1);
int nchks=0;
for (int i =0; i < _argcnt; ++i)
nchks+=_env.ary(-(i+1)).anyVec().nChunks();
nchks+=_f[i].anyVec().nChunks();

_espc = new long[nchks+1];
int coffset = _env.peekAry().anyVec().nChunks();
long[] first_espc = _env.peekAry().anyVec()._espc;
int coffset = _f[0].anyVec().nChunks();
long[] first_espc = _f[0].anyVec()._espc;
System.arraycopy(first_espc, 0, _espc, 0, first_espc.length);
for (int i=1; i< _argcnt; ++i) {
for (int i=1; i < _argcnt; ++i) {
long roffset = _espc[coffset];
long[] espc = _env.ary(-(i+1)).anyVec()._espc;
long[] espc = _f[i].anyVec()._espc;
int j = 1;
for (; j < espc.length; j++)
_espc[coffset + j] = roffset+ espc[j];
coffset += _env.ary(-(i+1)).anyVec().nChunks();
coffset += _f[i].anyVec().nChunks();
}

Key[] keys = _env.peekAry().anyVec().group().addVecs(_env.peekAry().numCols());
Key[] keys = _f[0].anyVec().group().addVecs(_f[0].numCols());
_vecs = new Vec[keys.length];
String type;
for (int i=0; i<_vecs.length; ++i)
_vecs[i] = new Vec( keys[i], _espc, null, (type=get_type(_env.peekAry().vec(i))).equals("UUID"), type.equals("time") ? (byte)3 : (byte)-1);
for (int i=0; i<_vecs.length; ++i) {
_vecs[i] = new Vec(keys[i], _espc, null, (type = get_type(_f[0].vec(i))).equals("UUID"), type.equals("time") ? (byte) 3 : (byte) -1);
}

for (int i=0; i < Math.min(_maxP, _vecs.length); ++i) forkVecTask(i);
}

private void forkVecTask(final int i) {
Vec[] vecs = new Vec[_argcnt];
for (int j= 0; j < _argcnt; ++j)
vecs[j] = _env.ary(-(j+1)).vec(i);
for (int j= 0; j < _argcnt; ++j) {
Vec vm, v = _f[j].vec(i);
vecs[j] = ((vm=v.masterVec())==null) ? v : vm;
}
new RbindTask(new Callback(), vecs, _vecs[i], _espc).fork();
}

Expand All @@ -1554,25 +1557,32 @@ private class Callback extends H2O.H2OCallback {
// quick check to make sure rbind is feasible
if (argcnt-1 == 1) { return; } // leave stack as is

Frame[] fs = new Frame[argcnt-1];
Frame f1 = env.peekAry();

int j = fs.length-1;
boolean[] wrapped = new boolean[f1.numCols()];
for (int c = 0; c<f1.numCols(); ++c) wrapped[c] = f1.vec(c).masterVec() != null;
fs[j--] = f1;
// do error checking and compute new offsets in tandem
for (int i = 0; i < argcnt-1; ++i) {
for (int i = 1; i < argcnt-1; ++i) {
Frame t = env.ary(-(i+1));

fs[j--]=t;
// check columns match
if (t.numCols() != f1.numCols())
throw new IllegalArgumentException("Column mismatch! Expected " + f1.numCols() + " but frame has " + t.numCols());

// check column types
for (int c = 0; c < f1.numCols(); ++c) {
wrapped[c] |= t.vec(c).masterVec() != null;
if (!get_type(f1.vec(c)).equals(get_type(t.vec(c))))
throw new IllegalArgumentException("Column type mismatch! Expected type " + get_type(f1.vec(c)) + " but vec has type " + get_type(t.vec(c)));
}
}

ParallelRbinds t;
H2O.submitTask(t =new ParallelRbinds(env, argcnt-1)).join();
H2O.submitTask(t = new ParallelRbinds(fs, argcnt-1)).join();
for (int i = 0; i < wrapped.length; ++i)
if (wrapped[i]) t._vecs[i] = t._vecs[i].toEnum();
Key m = Key.make();
env.poppush(argcnt, new Frame(m, f1.names(), t._vecs), m.toString());
}
Expand Down

0 comments on commit aa1d616

Please sign in to comment.