Skip to content

Commit

Permalink
add a few more job cancelled checks in the adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
spennihana committed Aug 12, 2014
1 parent 10f083c commit e51886c
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 6 deletions.
8 changes: 7 additions & 1 deletion src/main/java/hex/singlenoderf/DABuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import hex.singlenoderf.SpeeDRF.DRFParams;
import jsr166y.ForkJoinTask;
import jsr166y.RecursiveAction;
import water.Job;
import water.Key;
import water.Timer;
import water.UKV;
Expand Down Expand Up @@ -120,22 +121,27 @@ protected DataAdapter inhaleData(Frame fr, boolean useNonLocal) {
Log.info(Log.Tag.Sys.RANDF,"Beginning Random Forest Inhale.");
ForkJoinTask.invokeAll(dataInhaleJobs);

if(dapt._jobKey != null && !Job.isRunning(dapt._jobKey)) throw new Job.JobCancelledException();

// Shrink data
dapt.shrink();
if(dapt._jobKey != null && !Job.isRunning(dapt._jobKey)) throw new Job.JobCancelledException();
Log.info(Log.Tag.Sys.RANDF,"Inhale done in " + t_inhale);
return dapt;
}

static RecursiveAction loadChunkAction(final DataAdapter dapt, final Frame fr, final int cidx, final boolean[] isByteCol, final long[] naCnts, boolean regression) {
return new RecursiveAction() {
@Override protected void compute() {
if(dapt._jobKey != null && !Job.isRunning(dapt._jobKey)) throw new Job.JobCancelledException();
try {
Chunk[] chks = new Chunk[fr.numCols()];
int ncolumns = chks.length;
for(int i = 0; i < chks.length; ++i) {
chks[i] = fr.vecs()[i].chunkForChunkIdx(cidx);
}
for (int j = 0; j < chks[0]._len; ++j) {
if(dapt._jobKey != null && !Job.isRunning(dapt._jobKey)) throw new Job.JobCancelledException();
int rowNum = (int)chks[0]._start + j;
boolean rowIsValid = false;
for(int c = 0; c < chks.length; ++c) {
Expand All @@ -161,7 +167,7 @@ static RecursiveAction loadChunkAction(final DataAdapter dapt, final Frame fr, f
if (!rowIsValid) dapt.markIgnoredRow(j);
}
} catch (Throwable t) {
t.printStackTrace();
//
}
}
};
Expand Down
8 changes: 7 additions & 1 deletion src/main/java/hex/singlenoderf/DataAdapter.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package hex.singlenoderf;


import water.Job;
import water.Key;
import water.MemoryManager;
import water.fvec.Frame;
import water.fvec.Vec;
Expand Down Expand Up @@ -34,17 +36,20 @@ final class DataAdapter {
/** Use regression */
public final boolean _regression;

public Key _jobKey;

DataAdapter(Frame fr, SpeeDRFModel model, int[] modelDataMap, int rows,
long unique, long seed, int binLimit, double[] classWt) {
// assert model._dataKey == fr._key;
_seed = seed+(unique<<16); // This is important to preserve sampling selection!!!
/* Maximum arity for a column (not a hard limit) */
_numRows = rows;
_jobKey = model.jobKey;
_numClasses = model.regression ? 1 : model.classes();
_regression = model.regression;

_c = new Col[fr.numCols()];
for( int i = 0; i < _c.length; i++ ) {
if(model.jobKey != null && !Job.isRunning(model.jobKey)) throw new Job.JobCancelledException();
assert fr._names[modelDataMap[i]].equals(fr._names[i]);
Vec v = fr.vecs()[i];
if( isByteCol(v,rows, i == _c.length-1, _regression) ) // we do not bin for small values
Expand Down Expand Up @@ -107,6 +112,7 @@ public float getRawClassColumnValueFromBin(int row) {
}

public void shrink() {
if(_jobKey != null && !Job.isRunning(_jobKey)) throw new Job.JobCancelledException();
for ( Col c: _c) c.shrink();
}

Expand Down
9 changes: 5 additions & 4 deletions src/main/java/hex/singlenoderf/Tree.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public Tree(final Key jobKey, final Key modelKey, final Data data, byte producer

// Oops, uncaught exception
@Override public boolean onExceptionalCompletion( Throwable ex, CountedCompleter cc) {
ex.printStackTrace();
// ex.printStackTrace();
return true;
}

Expand Down Expand Up @@ -151,6 +151,7 @@ private StringBuffer computeStatistics() {
}

_stats = null; // GC
if(_jobKey != null && !Job.isRunning(_jobKey)) throw new Job.JobCancelledException();

// Atomically improve the Model as well
Key tkey = toKey();
Expand All @@ -161,9 +162,7 @@ private StringBuffer computeStatistics() {
if (_verbose > 10) {
Log.info(Sys.RANDF, _tree.toString(sb, Integer.MAX_VALUE).toString());
}
} else {
if(_jobKey != null && !Job.isRunning(_jobKey)) throw new Job.JobCancelledException();
}
} else throw new Job.JobCancelledException();
// Wait for completion
tryComplete();
}
Expand Down Expand Up @@ -193,6 +192,7 @@ private class FJBuild extends RecursiveTask<INode> {
@Override public INode compute() {
hex.singlenoderf.Statistic left = getStatistic(0,_data, _seed + LTSS_INIT, _exclusiveSplitLimit); // first get the statistics
hex.singlenoderf.Statistic rite = getStatistic(1,_data, _seed + RTSS_INIT, _exclusiveSplitLimit);
if(_jobKey != null && !Job.isRunning(_jobKey)) throw new Job.JobCancelledException();
Data[] res = new Data[2]; // create the data, node and filter the data
int c = _split._column, s = _split._split;
assert c != _data.columns()-1; // Last column is the class column
Expand All @@ -203,6 +203,7 @@ private class FJBuild extends RecursiveTask<INode> {
FJBuild fj0 = null, fj1 = null;
hex.singlenoderf.Statistic.Split ls = left.split(res[0], _depth >= _maxDepth); // get the splits
hex.singlenoderf.Statistic.Split rs = rite.split(res[1], _depth >= _maxDepth);
if(_jobKey != null && !Job.isRunning(_jobKey)) throw new Job.JobCancelledException();
if (ls.isLeafNode() || ls.isImpossible()) {
if (_regression) {
float av = res[0].computeAverage();
Expand Down

0 comments on commit e51886c

Please sign in to comment.