forked from h2oai/h2o-2
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
change the spelling of downloadCsv h2o url
add it to test_parse_fs_schmoo2.py add the csv_download to h2o.py
- Loading branch information
Kevin Normoyle
committed
Jun 27, 2013
1 parent
9421fc5
commit 2d8baff
Showing
6 changed files
with
264 additions
and
17 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,168 @@ | ||
import os, json, unittest, time, shutil, sys | ||
sys.path.extend(['.','..','py']) | ||
import h2o, h2o_cmd,h2o_hosts, h2o_browse as h2b, h2o_import as h2i, h2o_hosts, h2o_glm | ||
import h2o_exec as h2e, h2o_jobs | ||
import time, random, logging | ||
|
||
import h2o, h2o_cmd | ||
import h2o_browse as h2b | ||
import random | ||
import gzip | ||
|
||
def write_syn_dataset_gz(csvPathname, rowCount, headerData, rowData): | ||
f = gzip.open(csvPathname, 'wb') | ||
f.write(headerData + "\n") | ||
for i in range(rowCount): | ||
f.write(rowData + "\n") | ||
f.close() | ||
|
||
def rand_rowData(): | ||
# first column is output? | ||
rowData = str(random.randint(0,7)) | ||
for i in range(100): | ||
rowData = rowData + "," + str(random.randint(0,7)) | ||
return rowData | ||
|
||
class Basic(unittest.TestCase): | ||
def tearDown(self): | ||
h2o.check_sandbox_for_errors() | ||
|
||
@classmethod | ||
def setUpClass(cls): | ||
pass | ||
|
||
@classmethod | ||
def tearDownClass(cls): | ||
if not h2o.browse_disable: | ||
# time.sleep(500000) | ||
pass | ||
h2o.tear_down_cloud(h2o.nodes) | ||
|
||
def test_parse_1k_files(self): | ||
SEED = random.randint(0, sys.maxint) | ||
# if you have to force to redo a test | ||
# SEED = | ||
random.seed(SEED) | ||
print "\nUsing random seed:", SEED | ||
|
||
SYNDATASETS_DIR = h2o.make_syn_dir() | ||
csvFilename = "syn.csv.gz" | ||
headerData = "ID,CAPSULE,AGE,RACE,DPROS,DCAPS,PSA,VOL,GLEASON" | ||
totalRows = 10 | ||
maxFilenum = 10000 | ||
for filenum in range(maxFilenum): | ||
rowData = rand_rowData() | ||
filePrefix = "%04d" % filenum | ||
csvPathname = SYNDATASETS_DIR + '/' + filePrefix + "_" + csvFilename | ||
write_syn_dataset_gz(csvPathname, totalRows, headerData, rowData) | ||
|
||
avgFileSize = os.path.getsize(csvPathname) | ||
|
||
importFolderPath = os.path.abspath(SYNDATASETS_DIR) | ||
print "\nimportFolderPath:", importFolderPath | ||
csvFilenameList = [ | ||
("*_syn.csv.gz", "syn_all.csv", maxFilenum * avgFileSize, 1200), | ||
] | ||
|
||
trialMax = 1 | ||
base_port = 54321 | ||
tryHeap = 4 | ||
DO_GLM = True | ||
noPoll = False | ||
benchmarkLogging = ['cpu','disk', 'iostats', 'jstack'] | ||
benchmarkLogging = ['cpu','disk'] | ||
pollTimeoutSecs = 120 | ||
retryDelaySecs = 10 | ||
|
||
for i,(csvFilepattern, csvFilename, totalBytes, timeoutSecs) in enumerate(csvFilenameList): | ||
localhost = h2o.decide_if_localhost() | ||
if (localhost): | ||
h2o.build_cloud(3,java_heap_GB=tryHeap, base_port=base_port, | ||
enable_benchmark_log=True) | ||
else: | ||
h2o_hosts.build_cloud_with_hosts(1, java_heap_GB=tryHeap, base_port=base_port, | ||
enable_benchmark_log=True) | ||
h2b.browseTheCloud() | ||
|
||
for trial in range(trialMax): | ||
importFolderResult = h2i.setupImportFolder(None, importFolderPath) | ||
importFullList = importFolderResult['succeeded'] | ||
importFailList = importFolderResult['failed'] | ||
print "\n Problem if this is not empty: importFailList:", h2o.dump_json(importFailList) | ||
|
||
h2o.cloudPerfH2O.change_logfile(csvFilename) | ||
h2o.cloudPerfH2O.message("") | ||
h2o.cloudPerfH2O.message("Parse " + csvFilename + " Start--------------------------------") | ||
start = time.time() | ||
parseKey = h2i.parseImportFolderFile(None, csvFilepattern, importFolderPath, | ||
key2=csvFilename + ".hex", timeoutSecs=timeoutSecs, | ||
retryDelaySecs=retryDelaySecs, | ||
pollTimeoutSecs=pollTimeoutSecs, | ||
noPoll=noPoll, | ||
benchmarkLogging=benchmarkLogging) | ||
|
||
elapsed = time.time() - start | ||
print "Parse #", trial, "completed in", "%6.2f" % elapsed, "seconds.", \ | ||
"%d pct. of timeout" % ((elapsed*100)/timeoutSecs) | ||
|
||
if noPoll: | ||
# does it take a little while to show up in Jobs, from where we issued the parse? | ||
time.sleep(2) | ||
# FIX! use the last (biggest?) timeoutSecs? maybe should increase since parallel | ||
h2o_jobs.pollWaitJobs(pattern=csvFilename, | ||
timeoutSecs=timeoutSecs, benchmarkLogging=benchmarkLogging) | ||
totalBytes += totalBytes2 + totalBytes3 | ||
elapsed = time.time() - start | ||
h2o.check_sandbox_for_errors() | ||
|
||
|
||
if totalBytes is not None: | ||
fileMBS = (totalBytes/1e6)/elapsed | ||
l = '{!s} jvms, {!s}GB heap, {:s} {:s} {:6.2f} MB/sec for {:.2f} secs'.format( | ||
len(h2o.nodes), tryHeap, csvFilepattern, csvFilename, fileMBS, elapsed) | ||
print l | ||
h2o.cloudPerfH2O.message(l) | ||
|
||
print csvFilepattern, 'parse time:', parseKey['response']['time'] | ||
print "Parse result['destination_key']:", parseKey['destination_key'] | ||
|
||
# BUG here? | ||
if not noPoll: | ||
# We should be able to see the parse result? | ||
h2o_cmd.check_enums_from_inspect(parseKey) | ||
|
||
print "\n" + csvFilepattern | ||
|
||
#********************************************************************************** | ||
# Do GLM too | ||
# Argument case error: Value 0.0 is not between 12.0 and 9987.0 (inclusive) | ||
if DO_GLM: | ||
GLMkwargs = {'y': 0, 'case': 1, 'case_mode': '>', | ||
'max_iter': 10, 'n_folds': 1, 'alpha': 0.2, 'lambda': 1e-5} | ||
start = time.time() | ||
glm = h2o_cmd.runGLMOnly(parseKey=parseKey, timeoutSecs=timeoutSecs, **GLMkwargs) | ||
h2o_glm.simpleCheckGLM(self, glm, None, **GLMkwargs) | ||
elapsed = time.time() - start | ||
h2o.check_sandbox_for_errors() | ||
l = '{:d} jvms, {:d}GB heap, {:s} {:s} GLM: {:6.2f} secs'.format( | ||
len(h2o.nodes), tryHeap, csvFilepattern, csvFilename, elapsed) | ||
print l | ||
h2o.cloudPerfH2O.message(l) | ||
|
||
#********************************************************************************** | ||
|
||
h2o_cmd.check_key_distribution() | ||
h2o_cmd.delete_csv_key(csvFilename, importFullList) | ||
|
||
time.sleep(500000) | ||
|
||
h2o.tear_down_cloud() | ||
if not localhost: | ||
print "Waiting 30 secs before building cloud again (sticky ports?)" | ||
time.sleep(30) | ||
|
||
sys.stdout.write('.') | ||
sys.stdout.flush() | ||
|
||
if __name__ == '__main__': | ||
h2o.unit_main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
import os, json, unittest, time, shutil, sys | ||
sys.path.extend(['.','..','py']) | ||
|
||
import h2o, h2o_cmd, h2o_hosts | ||
import h2o_browse as h2b | ||
|
||
|
||
def write_syn_dataset(csvPathname, rowCount, headerData, rowData): | ||
dsf = open(csvPathname, "w+") | ||
dsf.write(headerData + "\n") | ||
for i in range(rowCount): | ||
dsf.write(rowData + "\n") | ||
dsf.close() | ||
|
||
# append! | ||
def append_syn_dataset(csvPathname, rowData): | ||
with open(csvPathname, "a") as dsf: | ||
dsf.write(rowData + "\n") | ||
|
||
class Basic(unittest.TestCase): | ||
def tearDown(self): | ||
h2o.check_sandbox_for_errors() | ||
|
||
@classmethod | ||
def setUpClass(cls): | ||
# fails with 3 | ||
localhost = h2o.decide_if_localhost() | ||
if (localhost): | ||
h2o.build_cloud(3,java_heap_GB=4,use_flatfile=True) | ||
else: | ||
h2o_hosts.build_cloud_with_hosts() | ||
|
||
h2b.browseTheCloud() | ||
|
||
@classmethod | ||
def tearDownClass(cls): | ||
h2o.tear_down_cloud(h2o.nodes) | ||
|
||
def test_parse_fs_schmoo2(self): | ||
SYNDATASETS_DIR = h2o.make_syn_dir() | ||
csvFilename = "syn_prostate.csv" | ||
csvPathname = SYNDATASETS_DIR + '/' + csvFilename | ||
|
||
headerData = "ID,CAPSULE,AGE,RACE,DPROS,DCAPS,PSA,VOL,GLEASON" | ||
rowData = "1,0,65,1,2,1,1.4,0,6" | ||
rowData = "1,0,65,1,2,1,1,0,6" | ||
|
||
totalRows = 99860 | ||
write_syn_dataset(csvPathname, totalRows, headerData, rowData) | ||
|
||
print "This is the same format/data file used by test_same_parse, but the non-gzed version" | ||
print "\nSchmoo the # of rows" | ||
print "Updating the key and key2 names for each trial" | ||
for trial in range (200): | ||
append_syn_dataset(csvPathname, rowData) | ||
totalRows += 1 | ||
|
||
start = time.time() | ||
key = csvFilename + "_" + str(trial) | ||
key2 = csvFilename + "_" + str(trial) + ".hex" | ||
key = h2o_cmd.parseFile(csvPathname=csvPathname, key=key, key2=key2) | ||
print "trial #", trial, "totalRows:", totalRows, "parse end on ", \ | ||
csvFilename, 'took', time.time() - start, 'seconds' | ||
|
||
h2o_cmd.runInspect(key=key2) | ||
# do a little testing of saving the key as a csv | ||
h2o.nodes[0].csv_download(key=key2) | ||
|
||
# only used this for debug to look at parse (red last row) on failure | ||
### h2b.browseJsonHistoryAsUrlLastMatch("Inspect") | ||
h2o.check_sandbox_for_errors() | ||
|
||
if __name__ == '__main__': | ||
h2o.unit_main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters