Skip to content

Commit

Permalink
Merge branch 'master' of github.com:0xdata/h2o
Browse files Browse the repository at this point in the history
  • Loading branch information
mmalohlava committed Apr 16, 2013
2 parents bd57268 + 4cb9768 commit 8146a55
Show file tree
Hide file tree
Showing 9 changed files with 232 additions and 30 deletions.
3 changes: 2 additions & 1 deletion py/ec2_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
'''
DEFAULT_EC2_INSTANCE_CONFIGS = {
'us-east-1':{
'image_id' : 'ami-b85cc4d1', # 'ami-cd9a11a4',
'image_id' : 'ami-30c6a059', #'ami-b85cc4d1', # 'ami-cd9a11a4',
'security_groups' : [ 'MrJenkinsTest' ],
'key_name' : 'mrjenkins_test',
'instance_type' : 'm1.xlarge',
Expand Down Expand Up @@ -132,6 +132,7 @@ def run_instances(count, ec2_config, region, waitForSSH=True, tags=None):
log('Reservation: {0}'.format(reservation.id))
log('Waiting for {0} EC2 instances {1} to come up, this can take 1-2 minutes.'.format(len(reservation.instances), reservation.instances))
start = time.time()
time.sleep(1)
for instance in reservation.instances:
while instance.update() == 'pending':
time.sleep(1)
Expand Down
2 changes: 1 addition & 1 deletion py/h2o_hosts.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def build_cloud_with_hosts(node_count=None, **kwargs):
node_count=paramsToUse['h2o_per_host'],
base_port=paramsToUse['base_port'],
hosts=hosts,
rand_shuffle=True
rand_shuffle=paramsToUse['rand_shuffle']
)

if hosts is not None:
Expand Down
173 changes: 173 additions & 0 deletions py/testdir_hosts/test_flashgordon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
import unittest, sys, random, time
sys.path.extend(['.','..','py'])
import h2o, h2o_cmd, h2o_browse as h2b, h2o_import as h2i, h2o_hosts
import h2o_jobs
import logging

class Basic(unittest.TestCase):
def tearDown(self):
h2o.check_sandbox_for_errors()

@classmethod
def setUpClass(cls):
print "Will build clouds with incrementing heap sizes and import folder/parse"

@classmethod
def tearDownClass(cls):
# the node state is gone when we tear down the cloud, so pass the ignore here also.
h2o.tear_down_cloud(sandbox_ignore_errors=True)

def test_flashgordon(self):
# typical size of the michal files
avgMichalSize = 116561140
avgSynSize = 4020000
csvFilenameList = [
("100.dat.gz", "dat_1", 1 * avgSynSize, 700),
("11[0-9].dat.gz", "dat_10", 10 * avgSynSize, 700),
("1[32][0-9].dat.gz", "dat_20", 20 * avgSynSize, 800),
("1[5-9][0-9].dat.gz", "dat_50", 50 * avgSynSize, 900),
# ("1[0-9][0-9].dat.gz", "dat_100", 100 * avgSynSize, 1200),
]

print "Using the -.gz files from s3"
# want just s3n://home-0xdiag-datasets/manyfiles-nflx-gz/file_1.dat.gz

USE_S3 = False
noPoll = True
benchmarkLogging = ['cpu','disk']
bucket = "home-0xdiag-datasets"
if USE_S3:
URI = "s3://flashgordon"
protocol = "s3"
else:
URI = "s3n://flashgordon"
protocol = "s3n/hdfs"

# split out the pattern match and the filename used for the hex
trialMax = 1
# use i to forward reference in the list, so we can do multiple outstanding parses below
for i, (csvFilepattern, csvFilename, totalBytes, timeoutSecs) in enumerate(csvFilenameList):
## for tryHeap in [54, 28]:
for tryHeap in [54]:

print "\n", tryHeap,"GB heap, 1 jvm per host, import", protocol, "then parse"
h2o_hosts.build_cloud_with_hosts(node_count=1, java_heap_GB=tryHeap,
enable_benchmark_log=True, timeoutSecs=120, retryDelaySecs=10,
# all hdfs info is done thru the hdfs_config michal's ec2 config sets up?
# this is for our amazon ec hdfs
# see https://github.com/0xdata/h2o/wiki/H2O-and-s3n
hdfs_name_node='10.78.14.235:9000',
hdfs_version='0.20.2')

# don't raise exception if we find something bad in h2o stdout/stderr?
h2o.nodes[0].sandbox_ignore_errors = True

for trial in range(trialMax):
# since we delete the key, we have to re-import every iteration, to get it again
# s3n URI thru HDFS is not typical.
if USE_S3:
importResult = h2o.nodes[0].import_s3(bucket)
else:
importResult = h2o.nodes[0].import_hdfs(URI)

s3nFullList = importResult['succeeded']
for k in s3nFullList:
key = k['key']
# just print the first tile
# if 'nflx' in key and 'file_1.dat.gz' in key:
if csvFilepattern in key:
# should be s3n://home-0xdiag-datasets/manyfiles-nflx-gz/file_1.dat.gz
print "example file we'll use:", key
break
else:
### print key
pass

### print "s3nFullList:", h2o.dump_json(s3nFullList)
# error if none?
self.assertGreater(len(s3nFullList),8,"Didn't see more than 8 files in s3n?")

s3nKey = URI + "/" + csvFilepattern
key2 = csvFilename + "_" + str(trial) + ".hex"
print "Loading", protocol, "key:", s3nKey, "to", key2
start = time.time()
parseKey = h2o.nodes[0].parse(s3nKey, key2,
timeoutSecs=timeoutSecs, retryDelaySecs=10, pollTimeoutSecs=60,
noPoll=noPoll,
benchmarkLogging=benchmarkLogging)

if noPoll:
time.sleep(1)
h2o.check_sandbox_for_errors()
(csvFilepattern, csvFilename, totalBytes2, timeoutSecs) = csvFilenameList[i+1]
s3nKey = URI + "/" + csvFilepattern
key2 = csvFilename + "_" + str(trial) + ".hex"
print "Loading", protocol, "key:", s3nKey, "to", key2
parse2Key = h2o.nodes[0].parse(s3nKey, key2,
timeoutSecs=timeoutSecs, retryDelaySecs=10, pollTimeoutSecs=60,
noPoll=noPoll,
benchmarkLogging=benchmarkLogging)

time.sleep(1)
h2o.check_sandbox_for_errors()
(csvFilepattern, csvFilename, totalBytes3, timeoutSecs) = csvFilenameList[i+2]
s3nKey = URI + "/" + csvFilepattern
key2 = csvFilename + "_" + str(trial) + ".hex"
print "Loading", protocol, "key:", s3nKey, "to", key2
parse3Key = h2o.nodes[0].parse(s3nKey, key2,
timeoutSecs=timeoutSecs, retryDelaySecs=10, pollTimeoutSecs=60,
noPoll=noPoll,
benchmarkLogging=benchmarkLogging)

elapsed = time.time() - start
print s3nKey, 'parse time:', parseKey['response']['time']
print "parse result:", parseKey['destination_key']
print "Parse #", trial, "completed in", "%6.2f" % elapsed, "seconds.", \
"%d pct. of timeout" % ((elapsed*100)/timeoutSecs)

# print stats on all three if noPoll
if noPoll:
# does it take a little while to show up in Jobs, from where we issued the parse?
time.sleep(2)
# FIX! use the last (biggest?) timeoutSecs? maybe should increase since parallel
h2o_jobs.pollWaitJobs(pattern=csvFilename,
timeoutSecs=timeoutSecs, benchmarkLogging=benchmarkLogging)
# for getting the MB/sec closer to 'right'
totalBytes += totalBytes2 + totalBytes3
elapsed = time.time() - start
h2o.check_sandbox_for_errors()

if totalBytes is not None:
fileMBS = (totalBytes/1e6)/elapsed
print "\nMB/sec (before uncompress)", "%6.2f" % fileMBS
h2o.cloudPerfH2O.message('{:d} jvms, {:d}GB heap, {:s} {:s} {:6.2f} MB/sec for {:6.2f} secs'.format(
len(h2o.nodes), tryHeap, csvFilepattern, csvFilename, fileMBS, elapsed))

# BUG here?
if not noPoll:
# We should be able to see the parse result?
inspect = h2o_cmd.runInspect(key=parseKey['destination_key'])

print "Deleting key in H2O so we get it from S3 (if ec2) or nfs again.", \
"Otherwise it would just parse the cached key."

storeView = h2o.nodes[0].store_view()
### print "storeView:", h2o.dump_json(storeView)
# "key": "s3n://home-0xdiag-datasets/manyfiles-nflx-gz/file_84.dat.gz"
# have to do the pattern match ourself, to figure out what keys to delete
# we're deleting the keys in the initial import. We leave the keys we created
# by the parse. We use unique dest keys for those, so no worries.
# Leaving them is good because things fill up! (spill)
for k in s3nFullList:
deleteKey = k['key']
if csvFilename in deleteKey and not ".hex" in key:
print "Removing", deleteKey
removeKeyResult = h2o.nodes[0].remove_key(key=deleteKey)
### print "removeKeyResult:", h2o.dump_json(removeKeyResult)

h2o.tear_down_cloud()
# sticky ports? wait a bit.
time.sleep(120)

if __name__ == '__main__':
h2o.unit_main()
2 changes: 1 addition & 1 deletion py/testdir_hosts/test_hosts_with_a_browser.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def setUpClass(cls):
# do what my json says, but with my hdfs. hdfs_name_node from the json
# I'll set use_hdfs to False here, because H2O won't start if it can't talk to the hdfs
# h2o_hosts.build_cloud_with_hosts(use_hdfs=False)
h2o_hosts.build_cloud_with_hosts(use_hdfs=True)
h2o_hosts.build_cloud_with_hosts(use_hdfs=False)

@classmethod
def tearDownClass(cls):
Expand Down
16 changes: 14 additions & 2 deletions py/testdir_hosts/test_parse_nflx_loop_s3n_hdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,17 @@ def tearDownClass(cls):
def test_parse_nflx_loop_s3n_hdfs(self):
# typical size of the michal files
avgMichalSize = 116561140
avgSynSize = 4020000
csvFilenameList = [
# ("manyfiles-nflx-gz/file_1[0-9].dat.gz", "file_10.dat.gz"),
# 100 files takes too long on two machines?
# I use different files to avoid OS caching effects
("syn_datasets/syn_7350063254201195578_10000x200.csv_000[0-9][0-9]", "syn_100.csv", 100 * avgSynSize, 700),
("syn_datasets/syn_7350063254201195578_10000x200.csv_00000", "syn_1.csv", avgSynSize, 700),
("syn_datasets/syn_7350063254201195578_10000x200.csv_0001[0-9]", "syn_10.csv", 10 * avgSynSize, 700),
("syn_datasets/syn_7350063254201195578_10000x200.csv_000[23][0-9]", "syn_20.csv", 20 * avgSynSize, 700),
("syn_datasets/syn_7350063254201195578_10000x200.csv_000[45678][0-9]", "syn_50.csv", 50 * avgSynSize, 700),

("manyfiles-nflx-gz/file_1.dat.gz", "file_1.dat.gz", 1 * avgMichalSize, 300),
("manyfiles-nflx-gz/file_[2][0-9].dat.gz", "file_10.dat.gz", 10 * avgMichalSize, 700),
("manyfiles-nflx-gz/file_[34][0-9].dat.gz", "file_20.dat.gz", 20 * avgMichalSize, 900),
Expand All @@ -36,7 +43,7 @@ def test_parse_nflx_loop_s3n_hdfs(self):
print "Using the -.gz files from s3"
# want just s3n://home-0xdiag-datasets/manyfiles-nflx-gz/file_1.dat.gz

USE_S3 = True
USE_S3 = False
noPoll = True
benchmarkLogging = ['cpu','disk']
bucket = "home-0xdiag-datasets"
Expand Down Expand Up @@ -78,9 +85,14 @@ def test_parse_nflx_loop_s3n_hdfs(self):
for k in s3nFullList:
key = k['key']
# just print the first tile
if 'nflx' in key and 'file_1.dat.gz' in key:
# if 'nflx' in key and 'file_1.dat.gz' in key:
if csvFilepattern in key:
# should be s3n://home-0xdiag-datasets/manyfiles-nflx-gz/file_1.dat.gz
print "example file we'll use:", key
break
else:
### print key
pass

### print "s3nFullList:", h2o.dump_json(s3nFullList)
# error if none?
Expand Down
8 changes: 7 additions & 1 deletion py/testdir_multi_jvm/test_benchmark_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def test_benchmark_import(self):
print "Using .gz'ed files in", importFolderPath
# all exactly the same prior to gzip!
avgMichalSize = 237270000
avgSynSize = 4020000
# could use this, but remember import folder -> import folder s3 for jenkins?
# how would it get it right?
# os.path.getsize(f)
Expand All @@ -66,13 +67,18 @@ def test_benchmark_import(self):
# 100 files takes too long on two machines?
# ("covtype200x.data", "covtype200x.data", 15033863400, 700),
# I use different files to avoid OS caching effects
("covtype200x.data", "covtype200x.data", 15033863400, 700),
("syn_datasets/syn_7350063254201195578_10000x200.csv_000[0-9][0-9]", "syn_100.csv", 100 * avgSynSize, 700),
("manyfiles-nflx-gz/file_1.dat.gz", "file_1.dat.gz", 1 * avgMichalSize, 700),
("syn_datasets/syn_7350063254201195578_10000x200.csv_00000", "syn_1.csv", avgSynSize, 700),
("syn_datasets/syn_7350063254201195578_10000x200.csv_0001[0-9]", "syn_10.csv", 10 * avgSynSize, 700),
("syn_datasets/syn_7350063254201195578_10000x200.csv_000[23][0-9]", "syn_20.csv", 20 * avgSynSize, 700),
("syn_datasets/syn_7350063254201195578_10000x200.csv_000[45678][0-9]", "syn_50.csv", 50 * avgSynSize, 700),
# ("manyfiles-nflx-gz/file_10.dat.gz", "file_10_1.dat.gz", 1 * avgMichalSize, 700),
# ("manyfiles-nflx-gz/file_1[0-9].dat.gz", "file_10.dat.gz", 10 * avgMichalSize, 700),
("manyfiles-nflx-gz/file_[2][0-9].dat.gz", "file_10.dat.gz", 10 * avgMichalSize, 700),
("manyfiles-nflx-gz/file_[34][0-9].dat.gz", "file_20.dat.gz", 20 * avgMichalSize, 700),
("manyfiles-nflx-gz/file_[5-9][0-9].dat.gz", "file_50.dat.gz", 50 * avgMichalSize, 700),
# ("covtype200x.data", "covtype200x.data", 15033863400, 700),
# ("manyfiles-nflx-gz/file_*.dat.gz", "file_100.dat.gz", 100 * avgMichalSize, 700),

# do it twice
Expand Down
36 changes: 18 additions & 18 deletions src/main/java/water/TimeLine.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,6 @@
public class TimeLine extends UDP {
private static final Unsafe _unsafe = UtilUnsafe.getUnsafe();

/**
* Only for debugging.
* Prints local timeline to stdout.
*
* To be used in case of an error when global timeline can not be relied upon as we might not be able to talk to other nodes.
*/
public static void printMyTimeLine(){
long [] s = TimeLine.snapshot();
System.out.println("===================================<TIMELINE>==============================================");
for(int i = 0; i < TimeLine.length(); ++i) {
if(!TimeLine.isEmpty(s, i) && ((TimeLine.l0(s, i) & 0xFF) == UDP.udp.exec.ordinal()))
System.out.println(TimeLine.ms(s, i) + ": " + (((TimeLine.ns(s, i) & 4) != 0)?"TCP":"UDP") + TimeLine.inet(s, i) + " | " + UDP.printx16(TimeLine.l0(s, i), TimeLine.l8(s, i)));
}
System.out.println("===========================================================================================");
}

// The TimeLine buffer.

// The TimeLine buffer is full of Events; each event has a timestamp and some
Expand Down Expand Up @@ -108,8 +92,8 @@ private static void record( AutoBuffer b, boolean tcp, int sr, int drop ) {
tl[idx*WORDS_PER_EVENT+2+1] = tmp;
tl[idx*WORDS_PER_EVENT+3+1] = b.get8(8);
}
public static void record_send( AutoBuffer b , boolean tcp) { record(b, tcp, 0,0); }
public static void record_recv( AutoBuffer b, boolean tcp, int drop ) { record(b,tcp, 1,drop); }
public static void record_send( AutoBuffer b, boolean tcp) { record(b,tcp,0, 0); }
public static void record_recv( AutoBuffer b, boolean tcp, int drop) { record(b,tcp,1,drop); }

// Accessors, for TimeLines that come from all over the system
public static int length( ) { return MAX_EVENTS; }
Expand Down Expand Up @@ -209,4 +193,20 @@ static void tcp_call( final AutoBuffer ab ) {
}

public String print16( AutoBuffer ab ) { return ""; } // no extra info in a timeline packet

/**
* Only for debugging.
* Prints local timeline to stdout.
*
* To be used in case of an error when global timeline can not be relied upon as we might not be able to talk to other nodes.
*/
public static void printMyTimeLine(){
long [] s = TimeLine.snapshot();
System.out.println("===================================<TIMELINE>==============================================");
for(int i = 0; i < TimeLine.length(); ++i) {
if(!TimeLine.isEmpty(s, i) && ((TimeLine.l0(s, i) & 0xFF) == UDP.udp.exec.ordinal()))
System.out.println(TimeLine.ms(s, i) + ": " + (((TimeLine.ns(s, i) & 4) != 0)?"TCP":"UDP") + TimeLine.inet(s, i) + " | " + UDP.printx16(TimeLine.l0(s, i), TimeLine.l8(s, i)));
}
System.out.println("===========================================================================================");
}
}
Loading

0 comments on commit 8146a55

Please sign in to comment.