Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/0xdata/h2o
Browse files Browse the repository at this point in the history
  • Loading branch information
cliffclick committed Jan 31, 2014
2 parents f6d4ba0 + 0676ab3 commit 63f2f33
Show file tree
Hide file tree
Showing 19 changed files with 125 additions and 62 deletions.
6 changes: 4 additions & 2 deletions R/tests/Utils/h2oR.R
Original file line number Diff line number Diff line change
Expand Up @@ -250,13 +250,15 @@ function() {
Log.info("Checking Package dependencies for this test.\n")
if (!"RUnit" %in% rownames(installed.packages())) install.packages("RUnit")
if (!"testthat" %in% rownames(installed.packages())) install.packages("testthat")

if (!"R.utils" %in% rownames(installed.packages())) install.packages("R.utils")

if (Sys.info()['sysname'] == "Windows")
options(RCurlOptions = list(cainfo = system.file("CurlSSL", "cacert.pem", package = "RCurl")))

Log.info("Loading RUnit and testthat\n")
Log.info("Loading RUnit and testthat and R.utils\n")
require(RUnit)
require(testthat)
require(R.utils)
}

#
Expand Down
2 changes: 1 addition & 1 deletion bench2/R/h2oPerf/epilogue.R
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ function() {
function() {
r <- list(binomial_result =
list(auc = auc,
precision = precision[[1]],
precision_ = precision[[1]],
recall = recall[[1]],
error_rate = error_rate[[1]],
minority_error_rate = minority_error_rate[[1]]))
Expand Down
2 changes: 0 additions & 2 deletions bench2/py/h2oPerf/H2O.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,6 @@ def __init__(self, cloud_num, nodes_per_cloud, node_num, cloud_name, h2o_jar, ip
self.cloud_name = cloud_name
self.h2o_jar = h2o_jar
self.ip = ip
print "H2O CLOUD NODE"
print ip
self.base_port = base_port
self.xmx = xmx
self.output_dir = output_dir
Expand Down
12 changes: 1 addition & 11 deletions bench2/py/h2oPerf/PerformanceRunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,8 @@ def add_test(self, testDir):
parse_file = testDir + "_Parse.R"
model_file = testDir + "_Model.R"
predict_file = testDir + "_Predict.R"
#ip = self.cloud.get_ip()
#port = self.cloud.get_port()
test_dir = os.path.join(self.test_root_dir, testDir)
test_short_dir = testDir
print "ALLOOHAA"
#print ip
#print port
print test_dir
print test_short_dir
print "ALLLOOHAAA"

test = Test(-1, -1, test_dir, test_short_dir,
self.output_dir, parse_file, model_file, predict_file)
Expand Down Expand Up @@ -208,9 +200,7 @@ def run_tests(self):

ip = self.cloud.get_ip()
port = self.cloud.get_port()
print "MMMOOOOAOAOAOAOA"
print ip
print port

# Do _one_ test at a time
while len(self.tests_not_started) > 0:
test = self.tests_not_started.pop(0)
Expand Down
2 changes: 1 addition & 1 deletion bench2/py/h2oPerf/Process.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def start(self, ip, port):
self.pid = self.child.pid

def scrape_phase(self):
scraper = Scraper(self.rtype, self.test_dir, self.test_short_dir, self.output_dir, self.output_file_name)
scraper = Scraper(self.rtype[0], self.test_dir, self.test_short_dir, self.output_dir, self.output_file_name)
return scraper.scrape()

def block(self):
Expand Down
43 changes: 27 additions & 16 deletions bench2/py/h2oPerf/Scrape.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,16 @@ def __scrape_phase_result__(self):
flag = True
return phase_r['phase_result']

def insert_phase_result(self):
print self.test_run_phase_result
print self.test_run_phase_result.row
print dir(self.test_run_phase_result)
with open(self.output_file_name, "r") as f:
self.test_run_phase_result.row['stdouterr'] = MySQLdb.escape_string(f.read().replace('\n', ''))
self.test_run_phase_result.row['contaminated'] = self.contaminated
self.test_run_phase_result.row['contamination_message'] = self.contamination_message
self.test_run_phase_result.row.update(self.__scrape_phase_result__())
self.test_run_phase_result.update()
@staticmethod
def insert_phase_result(object):
object.test_run_phase_result = TableRow("test_run_phase_result")
time.sleep(1)
with open(object.output_file_name, "r") as f:
object.test_run_phase_result.row['stdouterr'] = MySQLdb.escape_string(f.read().replace('\n', ''))
object.test_run_phase_result.row['contaminated'] = object.contaminated
object.test_run_phase_result.row['contamination_message'] = object.contamination_message
object.test_run_phase_result.row.update(object.__scrape_phase_result__())
object.test_run_phase_result.update()

class ParseScraper(Scraper):
"""
Expand All @@ -96,8 +96,7 @@ def __init__(self, object):
'train_dataset_url': '',
'test_dataset_url': ''}

self.test_run_phase_result = TableRow("test_run_phase_result")
self.insert_phase_result()
Scraper.insert_phase_result(self)

def invoke(self):
"""
Expand Down Expand Up @@ -147,7 +146,7 @@ def __init__(self, object):
"No contamination." if not self.contaminated else object.contamination_message
self.test_run_model_result = TableRow("test_run_model_result")
self.test_run_phase_result = TableRow("test_run_phase_result")
self.insert_phase_result()
Scraper.insert_phase_result(self)

def invoke(self):
"""
Expand All @@ -161,7 +160,8 @@ def invoke(self):
self.test_run_clustering_result.row.update(kmeans_result)
self.test_run_clustering_result.update()

self.test_run_model_result.row['model_json'] = str(self.__scrape_model_result__())
self.test_run_model_result.row['model_json'] = \
MySQLdb.escape_string(str(self.__scrape_model_result__()))
self.test_run_model_result.update()

def __scrape_kmeans_result__(self):
Expand Down Expand Up @@ -216,6 +216,9 @@ def __init__(self, object):
self.test_short_dir = object.test_short_dir
self.output_dir = object.output_dir
self.output_file_name = object.output_file_name
self.contaminated = object.contaminated
self.contamination_message = \
"No contamination." if not self.contaminated else object.contamination_message

self.test_run_binomial_classification_result = "" #\
#TableRow("test_run_binomial_classification_result")
Expand All @@ -225,7 +228,7 @@ def __init__(self, object):

self.test_run_multinomial_classification_result = "" #This is "" for now since we are using a dense format.
#That is 1 row for each class in the response.
self.insert_phase_result()
Scraper.insert_phase_result(self)

def invoke(self):
"""
Expand All @@ -241,14 +244,22 @@ def invoke(self):
flag = False
for line in f:
if flag:
predict_type = line.strip()
predict_type = self.__get_predict_type__(line.strip())[0]
flag = False
break
if "PREDICT TYPE" in line and "print" not in line:
flag = True
self.result_type = predict_type
self.__switch__()

def __get_predict_type__(self, type_candidate):
"""
Returns the type: 'parse', 'model', 'predict'
"""
types = ['binomial', 'regression', 'multinomial', 'cm']
rf = type_candidate.lower()
return [t for t in types if t in rf]

def __switch__(self):
"""
Overrides the __switch__ method of the parent class.
Expand Down
8 changes: 7 additions & 1 deletion bench2/py/h2oPerf/Table.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,16 @@ def insert(self, table_row):
"""
Takes a TableRow object and writes to the db.
"""
#print "CURRENT TABLE ROW'S ROW:"
#print table_row.row
#print "------------------------"

sql = "INSERT INTO {} ({}) VALUES ({});".format(table_row.table_name,
','.join([str(t) for t in table_row.row.keys()]),
','.join(['"' + str(t) + '"' for t in table_row.row.values()]))
#print "CURRENT TABLE ROW'S INSERT SQL:"
#print sql
#print "------------------------"
self.cursor.execute(sql)
self.db.commit()

Expand Down Expand Up @@ -93,7 +99,7 @@ def __init__(self, table_name):
else:
self.pk = self.perfdb_connection.get_table_pk(table_name)
self.row['test_run_id'] = self.perfdb_connection.this_test_run_id
self.row[table_name + "_id"] = self.pk + 1
self.row[table_name + "_id"] = int(self.pk) + 1

def update(self):
"""
Expand Down
16 changes: 11 additions & 5 deletions py/h2o_glm.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,22 @@ def simpleCheckGLMScore(self, glmScore, family='gaussian', allowFailWarning=Fals
print "%15s %s" % ("auc:\t", validation['auc'])
print "%15s %s" % ("threshold:\t", validation['threshold'])

err = False
if family=="poisson" or family=="gaussian":
print "%15s %s" % ("aic:\t", validation['aic'])
if 'aic' not in validation:
print "aic is missing from the glm json response"
err = True

if math.isnan(validation['err']):
emsg = "Why is this err = 'nan'?? %6s %s" % ("err:\t", validation['err'])
raise Exception(emsg)
print "Why is this err = 'nan'?? %6s %s" % ("err:\t", validation['err'])
err = True

if math.isnan(validation['resDev']):
emsg = "Why is this resDev = 'nan'?? %6s %s" % ("resDev:\t", validation['resDev'])
raise Exception(emsg)
print "Why is this resDev = 'nan'?? %6s %s" % ("resDev:\t", validation['resDev'])
err = True

if err:
raise Exception ("How am I supposed to tell that any of these errors should be ignored?")

# legal?
if math.isnan(validation['nullDev']):
Expand Down
2 changes: 1 addition & 1 deletion py/testdir_release/c2/test_c2_rel.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def sub_c2_rel_long(self):
# remove the output too! (378)
ignore_x = []
# for i in [3,4,5,6,7,8,9,10,11,14,16,17,18,19,20,424,425,426,540,541]:
for i in [3,4,5,6,7,8,9,10,11,14,16,17,18,19,20,424,425,426,540,541]:
for i in [3,4,5,6,7,8,9,10,11,14,16,17,18,19,20,424,425,426,540,541,378]:
x.remove(i)
ignore_x.append(i)

Expand Down
5 changes: 3 additions & 2 deletions py/testdir_release/c3/test_c3_rel.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,12 @@ def sub_c3_rel_long(self):
x = range(542) # don't include the output column
# remove the output too! (378)
ignore_x = []
for i in [3,4,5,6,7,8,9,10,11,14,16,17,18,19,20,424,425,426,540,541]:
for i in [3,4,5,6,7,8,9,10,11,14,16,17,18,19,20,424,425,426,540,541,378]:
x.remove(i)
ignore_x.append(i)

x = ",".join(map(lambda x: "C" + str(x), x))
# have to the zero-based offset by 1 (h2o is one-based now)
x = ",".join(map(lambda x: "C" + str(x+1), x))
ignore_x = ",".join(map(lambda x: "C" + str(x), ignore_x))

GLMkwargs = {
Expand Down
4 changes: 2 additions & 2 deletions py/testdir_release/c4/test_c4_four_billion_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ def test_four_billion_rows(self):
'y': 'C' + str(1),
'n_folds': 0,
'family': 'binomial',
'case': '=',
'case_val': 1
'case_mode': '=',
'case': 1
}
# one coefficient is checked a little more
colX = 0
Expand Down
15 changes: 9 additions & 6 deletions py/testdir_release/c7/test_c7_rel.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
print "to run h2o..i.e from the config json which builds the cloud and passes that info to the test"
print "via the cloned cloud mechanism (h2o-nodes.json)"


DO_GLM = False
class releaseTest(h2o_common.ReleaseCommon, unittest.TestCase):

def test_c7_rel(self):
Expand Down Expand Up @@ -62,13 +64,14 @@ def test_c7_rel(self):
}

timeoutSecs = 3600
start = time.time()
glm = h2o_cmd.runGLM(parseResult=parseResult, timeoutSecs=timeoutSecs, pollTimeoutSecs=60, **kwargs)
elapsed = time.time() - start
print "glm completed in", elapsed, "seconds.", \
"%d pct. of timeout" % ((elapsed*100)/timeoutSecs)

h2o_glm.simpleCheckGLM(self, glm, None, **kwargs)
if DO_GLM:
start = time.time()
glm = h2o_cmd.runGLM(parseResult=parseResult, timeoutSecs=timeoutSecs, pollTimeoutSecs=60, **kwargs)
elapsed = time.time() - start
print "glm completed in", elapsed, "seconds.", \
"%d pct. of timeout" % ((elapsed*100)/timeoutSecs)
h2o_glm.simpleCheckGLM(self, glm, None, **kwargs)

# do summary of the parsed dataset last, since we know it fails on this dataset
summaryResult = h2o_cmd.runSummary(key=parseResult['destination_key'])
Expand Down
1 change: 1 addition & 0 deletions py/testdir_release/c8/test_c8_rf_airlines_hdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
'ntree': 5,
'depth': 15,
'bin_limit': 100,
'use_non_local_data': False, # doesn't fit in single jvm
'ignore': 'AirTime, ArrDelay, DepDelay, CarrierDelay, IsArrDelayed',
'timeoutSecs': 14800,
'response_variable': 'IsDepDelayed'
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/hex/GridSearch.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,15 @@
import java.util.Comparator;

public class GridSearch extends Job {
Lockable _src;
public Job[] jobs;

public GridSearch(Lockable src){
_src = src;
}
@Override protected Status exec() {
UKV.put(destination_key, this);
if(_src != null)_src.read_lock(self());
int max = jobs[0].gridParallelism();
int head = 0, tail = 0;
while( head < jobs.length && isRunning(self()) ) {
Expand All @@ -33,6 +38,9 @@ public class GridSearch extends Job {
}
return Status.Done;
}
public void remove() {
if(_src != null)_src.unlock(self());
}

@Override protected void onCancelled() {
for( Job job : jobs )
Expand Down
25 changes: 21 additions & 4 deletions src/main/java/hex/rf/DRF.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import water.*;
import water.H2O.H2OCountedCompleter;
import water.ValueArray.Column;
import water.api.Constants;
import water.util.Log;
import water.util.Log.Tag.Sys;

Expand Down Expand Up @@ -99,11 +100,15 @@ private void validateInputData(){
throw new IllegalArgumentException("Sampling rate must be in [0,1] but found "+ _params._sample);
if (_params._numSplitFeatures!=-1 && (_params._numSplitFeatures< 1 || _params._numSplitFeatures>cs.length-1))
throw new IllegalArgumentException("Number of split features exceeds available data. Should be in [1,"+(cs.length-1)+"]");
if (_params._useNonLocalData && !canLoadAll( (ValueArray) UKV.get(_rfmodel._dataKey) ))
throw new IllegalArgumentException("Cannot load all data from remote nodes. Please provide more memory for JVMs or un-check the option 'Use non local data' (however, it will affect resulting accuracy).");
ChunkAllocInfo cai = new ChunkAllocInfo();
if (_params._useNonLocalData && !canLoadAll( (ValueArray) UKV.get(_rfmodel._dataKey), cai ))
throw new IllegalArgumentException(
"Cannot load all data from remote nodes - " +
"the node " + cai.node + " requires " + PrettyPrint.bytes(cai.requiredMemory) + " to load all data and perform computation but there is only " + PrettyPrint.bytes(cai.availableMemory) + " of available memory. " +
"Please provide more memory for JVMs or disable the option '"+Constants.USE_NON_LOCAL_DATA+"' (however, it may affect resulting accuracy).");
}

private boolean canLoadAll(ValueArray ary) {
private boolean canLoadAll(final ValueArray ary, ChunkAllocInfo cai) {
long[] localChunks = new long[H2O.CLOUD.size()];
// Collect number of local chunks
for(int i=0; i<ary.chunks(); i++) {
Expand All @@ -117,11 +122,23 @@ private boolean canLoadAll(ValueArray ary) {
long nodeFreeMemory = (long)( (hb.get_max_mem()-(hb.get_tot_mem()-hb.get_free_mem())) * OVERHEAD_MAGIC);
Log.debug(Sys.RANDF, i + ": computed available mem: " + PrettyPrint.bytes(nodeFreeMemory));
Log.debug(Sys.RANDF, i + ": remote chunks require: " + PrettyPrint.bytes(memoryForChunks));
if (nodeFreeMemory - memoryForChunks <= 0) return false;
if (nodeFreeMemory - memoryForChunks <= 0) {
cai.node = H2O.CLOUD._memary[i];
cai.availableMemory = nodeFreeMemory;
cai.requiredMemory = memoryForChunks;
return false;
}
}
return true;
}

/** Helper POJO to store required chunk allocation. */
private static class ChunkAllocInfo {
H2ONode node;
long availableMemory;
long requiredMemory;
}

/**Inhale the data, build a DataAdapter and kick-off the computation.
* */
@Override public final void lcompute() {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/water/RPC.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ public synchronized RPC<V> call() {
try {
ab.putTask(UDP.udp.exec,_tasknum).put1(CLIENT_UDP_SEND).put(_dt);
boolean t = ab.hasTCP();
ab.close(t,false);
assert sz_check(ab) : "Resend of "+_dt.getClass()+" changes size from "+_size+" to "+ab.size()+" for task#"+_tasknum;
ab.close(t,false);
_sentTcp = t; // Set after close (and any other possible fail)
break; // Break out of retry loop
} catch( AutoBuffer.TCPIsUnreliableException e ) {
Expand Down Expand Up @@ -318,8 +318,8 @@ public final void resend_ack() {
AutoBuffer rab = new AutoBuffer(_client).putTask(UDP.udp.ack,_tsknum);
if( dt._repliedTcp ) rab.put1(RPC.SERVER_TCP_SEND) ; // Reply sent via TCP
else dt.write(rab.put1(RPC.SERVER_UDP_SEND)); // Reply sent via UDP
rab.close(dt._repliedTcp,false);
assert sz_check(rab) : "Resend of "+_dt.getClass()+" changes size from "+_size+" to "+rab.size();
rab.close(dt._repliedTcp,false);
// Double retry until we exceed existing age. This is the time to delay
// until we try again. Note that we come here immediately on creation,
// so the first doubling happens before anybody does any waiting. Also
Expand Down
Loading

0 comments on commit 63f2f33

Please sign in to comment.