forked from h2oai/h2o-2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
h2o.py
3474 lines (2984 loc) · 141 KB
/
h2o.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import time, os, stat, json, signal, tempfile, shutil, datetime, inspect, threading, getpass
import requests, psutil, argparse, sys, unittest, glob
import h2o_browse as h2b, h2o_perf, h2o_util, h2o_cmd, h2o_os_util
import h2o_sandbox, h2o_print as h2p
import re, random
# used in shutil.rmtree permission hack for windows
import errno
# use to unencode the urls sent to h2o?
import urlparse
import logging
# for log_download
import requests, zipfile, StringIO
# For checking ports in use, using netstat thru a subprocess.
from subprocess import Popen, PIPE
import stat
class OutWrapper:
def __init__(self, out):
self._out = out
def write(self, x):
self._out.write(x.replace('\n', '\n[{0}] '.format(str(datetime.datetime.now()))))
def flush(self):
self._out.flush()
def check_params_update_kwargs(params_dict, kw, function, print_params):
# only update params_dict..don't add
# throw away anything else as it should come from the model (propagating what RF used)
for k in kw:
if k in params_dict:
params_dict[k] = kw[k]
else:
raise Exception("illegal parameter '%s' in %s" % (k, function))
if print_params:
print "%s parameters:" % function, params_dict
sys.stdout.flush()
def verboseprint(*args, **kwargs):
if verbose:
for x in args: # so you don't have to create a single string
print x,
for x in kwargs: # so you don't have to create a single string
print x,
print
# so we can see problems when hung?
sys.stdout.flush()
def sleep(secs):
if getpass.getuser() == 'jenkins':
period = max(secs, 120)
else:
period = secs
# if jenkins, don't let it sleep more than 2 minutes
# due to left over h2o.sleep(3600)
time.sleep(period)
# The cloud is uniquely named per user (only) and pid
# do the flatfile the same way
# Both are the user that runs the test. The config might have a different username on the
# remote machine (0xdiag, say, or hduser)
def flatfile_pathname():
return (LOG_DIR + '/pytest_flatfile-%s' % getpass.getuser())
# only usable after you've built a cloud (junit, watch out)
def cloud_name():
return nodes[0].cloud_name
def __drain(src, dst):
for l in src:
if type(dst) == type(0):
# got this with random data to parse.. why? it shows up in our stdout?
# UnicodeEncodeError: 'ascii' codec can't encode character u'\x86' in position 60: ordinal not in range(128)
# could we be getting unicode object?
try:
os.write(dst, l)
except:
# os.write(dst,"kbn: non-ascii char in the next line?")
os.write(dst,l.encode('utf8'))
else:
# FIX! this case probably can have the same issue?
dst.write(l)
dst.flush()
src.close()
if type(dst) == type(0):
os.close(dst)
def drain(src, dst):
t = threading.Thread(target=__drain, args=(src, dst))
t.daemon = True
t.start()
# Hackery: find the ip address that gets you to Google's DNS
# Trickiness because you might have multiple IP addresses (Virtualbox), or Windows.
# we used to not like giving ip 127.0.0.1 to h2o?
def get_ip_address():
if ipaddr_from_cmd_line:
verboseprint("get_ip case 1:", ipaddr_from_cmd_line)
return ipaddr_from_cmd_line
import socket
ip = '127.0.0.1'
socket.setdefaulttimeout(0.5)
hostname = socket.gethostname()
# this method doesn't work if vpn is enabled..it gets the vpn ip
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 0))
ip = s.getsockname()[0]
verboseprint("get_ip case 2:", ip)
except:
pass
if ip.startswith('127'):
# drills down into family
ip = socket.getaddrinfo(hostname, None)[0][4][0]
verboseprint("get_ip case 3:", ip)
ipa = None
badHosts = ['lg1', 'ch-0', 'ch-63']
# hack for hosts that don't support this
# the gethostbyname_ex can be slow. the timeout above will save us quickly
if hostname not in badHosts:
try:
# Translate a host name to IPv4 address format, extended interface.
# Return a triple (hostname, aliaslist, ipaddrlist)
# where hostname is the primary host name responding to the given ip_address,
# aliaslist is a (possibly empty) list of alternative host names for the same address, and
# ipaddrlist is a list of IPv4 addresses for the same interface on the same host
ghbx = socket.gethostbyname_ex(hostname)
for ips in ghbx[2]:
# only take the first
if ipa is None and not ips.startswith("127."):
ipa = ips[:]
verboseprint("get_ip case 4:", ipa)
if ip != ipa:
print "\nAssuming", ip, "is the ip address h2o will use but", ipa, "is probably the real ip?"
print "You might have a vpn active. Best to use '-ip " + ipa + "' to get python and h2o the same."
except:
pass
# print "Timeout during socket.gethostbyname_ex(hostname)"
verboseprint("get_ip_address:", ip)
# set it back to default higher timeout (None would be no timeout?)
socket.setdefaulttimeout(5)
return ip
# used to rename the sandbox when running multiple tests in same dir (in different shells)
def get_sandbox_name():
if os.environ.has_key("H2O_SANDBOX_NAME"):
a = os.environ["H2O_SANDBOX_NAME"]
print "H2O_SANDBOX_NAME", a
return a
else:
return "sandbox"
# used to shift ports when running multiple tests on same machine in parallel (in different shells)
def get_base_port(base_port):
a = 0
if os.environ.has_key("H2O_PORT_OFFSET"):
# this will fail if it's not an integer
a = int(os.environ["H2O_PORT_OFFSET"])
# some of the tests select a number 54321, 54323, or 54327, so want to be at least 8 or so apart
# for multiple test runs.
# (54321, 54323, 54325 and 54327 are used in testdir_single_jvm)
# if we're running multi-node with a config json, then obviously the gap needs to be cognizant
# of the number of nodes
verboseprint("H2O_PORT_OFFSET", a)
if a<8 or a>500:
raise Exception("H2O_PORT_OFFSET % os env variable should be either not set, or between 8 and 500" % a)
b = None
if os.environ.has_key("H2O_PORT"):
# this will fail if it's not an integer
b = int(os.environ["H2O_PORT"])
verboseprint("H2O_PORT", a)
if b<54321 or b>54999:
raise Exception("H2O_PORT %s os env variable should be either not set, or between 54321 and 54999." % b)
if b:
base_port = b
else:
if getpass.getuser()=='jenkins':
base_port = 54340
else:
base_port = 54321
if a:
base_port += a
return base_port
def unit_main():
global python_test_name, python_cmd_args, python_cmd_line, python_cmd_ip, python_username
# if I remember correctly there was an issue with using sys.argv[0]
# under nosetests?. yes, see above. We just duplicate it here although sys.argv[0] might be fine here
python_test_name = inspect.stack()[1][1]
python_cmd_args = " ".join(sys.argv[1:])
python_cmd_line = "python %s %s" % (python_test_name, python_cmd_args)
python_username = getpass.getuser()
# if test was run with nosestests, it wouldn't execute unit_main() so we won't see this
# so this is correct, for stuff run with 'python ..."
print "\nTest: %s command line: %s" % (python_test_name, python_cmd_line)
# moved clean_sandbox out of here, because nosetests doesn't execute h2o.unit_main in our tests.
# UPDATE: ..is that really true? I'm seeing the above print in the console output runnning
# jenkins with nosetests
parse_our_args()
unittest.main()
# Global disable. used to prevent browsing when running nosetests, or when given -bd arg
# Defaults to true, if user=jenkins, h2o.unit_main isn't executed, so parse_our_args isn't executed.
# Since nosetests doesn't execute h2o.unit_main, it should have the browser disabled.
browse_disable = True
browse_json = False
verbose = False
ipaddr_from_cmd_line = None
config_json = None
debugger = False
random_udp_drop = False
force_tcp = False
random_seed = None
beta_features = True
sleep_at_tear_down = False
abort_after_import = False
clone_cloud_json = None
disable_time_stamp = False
debug_rest = False
long_test_case = False
# jenkins gets this assign, but not the unit_main one?
python_test_name = inspect.stack()[1][1]
# trust what the user says!
if ipaddr_from_cmd_line:
python_cmd_ip = ipaddr_from_cmd_line
else:
python_cmd_ip = get_ip_address()
# no command line args if run with just nose
python_cmd_args = ""
# don't really know what it is if nosetests did some stuff. Should be just the test with no args
python_cmd_line = ""
python_username = getpass.getuser()
def parse_our_args():
parser = argparse.ArgumentParser()
# can add more here
parser.add_argument('-bd', '--browse_disable',
help="Disable any web browser stuff. Needed for batch. nosetests and jenkins disable browser through other means already, so don't need",
action='store_true')
parser.add_argument('-b', '--browse_json',
help='Pops a browser to selected json equivalent urls. Selective. Also keeps test alive (and H2O alive) till you ctrl-c. Then should do clean exit',
action='store_true')
parser.add_argument('-v', '--verbose', help='increased output', action='store_true')
parser.add_argument('-ip', '--ip', type=str, help='IP address to use for single host H2O with psutil control')
parser.add_argument('-cj', '--config_json',
help='Use this json format file to provide multi-host defaults. Overrides the default file pytest_config-<username>.json. These are used only if you do build_cloud_with_hosts()')
parser.add_argument('-dbg', '--debugger', help='Launch java processes with java debug attach mechanisms',
action='store_true')
parser.add_argument('-rud', '--random_udp_drop', help='Drop 20 pct. of the UDP packets at the receive side',
action='store_true')
parser.add_argument('-s', '--random_seed', type=int, help='initialize SEED (64-bit integer) for random generators')
parser.add_argument('-bf', '--beta_features', help='enable or switch to beta features (import2/parse2)',
action='store_true')
parser.add_argument('-slp', '--sleep_at_tear_down',
help='open browser and time.sleep(3600) at tear_down_cloud() (typical test end/fail)',
action='store_true')
parser.add_argument('-aai', '--abort_after_import',
help='abort the test after printing the full path to the first dataset used by import_parse/import_only',
action='store_true')
parser.add_argument('-ccj', '--clone_cloud_json', type=str,
help='a h2o-nodes.json file can be passed (see build_cloud(create_json=True). This will create a cloned set of node objects, so any test that builds a cloud, can also be run on an existing cloud without changing the test')
parser.add_argument('-dts', '--disable_time_stamp',
help='Disable the timestamp on all stdout. Useful when trying to capture some stdout (like json prints) for use elsewhere',
action='store_true')
parser.add_argument('-debug_rest', '--debug_rest', help='Print REST API interactions to rest.log',
action='store_true')
parser.add_argument('-nc', '--nocolor', help="don't emit the chars that cause color printing", action='store_true')
parser.add_argument('-long', '--long_test_case', help="some tests will vary behavior to more, longer cases", action='store_true')
parser.add_argument('unittest_args', nargs='*')
args = parser.parse_args()
# disable colors if we pipe this into a file to avoid extra chars
if args.nocolor:
h2p.disable_colors()
global browse_disable, browse_json, verbose, ipaddr_from_cmd_line, config_json, debugger, random_udp_drop
global random_seed, beta_features, sleep_at_tear_down, abort_after_import, clone_cloud_json, disable_time_stamp, debug_rest, long_test_case
browse_disable = args.browse_disable or getpass.getuser() == 'jenkins'
browse_json = args.browse_json
verbose = args.verbose
ipaddr_from_cmd_line = args.ip
config_json = args.config_json
debugger = args.debugger
random_udp_drop = args.random_udp_drop
random_seed = args.random_seed
# beta_features is hardwired to True
# beta_features = args.beta_features
sleep_at_tear_down = args.sleep_at_tear_down
abort_after_import = args.abort_after_import
clone_cloud_json = args.clone_cloud_json
disable_time_stamp = args.disable_time_stamp
debug_rest = args.debug_rest
long_test_case = args.long_test_case
# Set sys.argv to the unittest args (leav sys.argv[0] as is)
# FIX! this isn't working to grab the args we don't care about
# Pass "--failfast" to stop on first error to unittest. and -v
# won't get this for jenkins, since it doesn't do parse_our_args
sys.argv[1:] = ['-v', "--failfast"] + args.unittest_args
# sys.argv[1:] = args.unittest_args
def find_file(base):
f = base
if not os.path.exists(f): f = '../' + base
if not os.path.exists(f): f = '../../' + base
if not os.path.exists(f): f = 'py/' + base
# these 2 are for finding from h2o-perf
if not os.path.exists(f): f = '../h2o/' + base
if not os.path.exists(f): f = '../../h2o/' + base
if not os.path.exists(f):
raise Exception("unable to find file %s" % base)
return f
# shutil.rmtree doesn't work on windows if the files are read only.
# On unix the parent dir has to not be readonly too.
# May still be issues with owner being different, like if 'system' is the guy running?
# Apparently this escape function on errors is the way shutil.rmtree can
# handle the permission issue. (do chmod here)
# But we shouldn't have read-only files. So don't try to handle that case.
def handleRemoveError(func, path, exc):
# If there was an error, it could be due to windows holding onto files.
# Wait a bit before retrying. Ignore errors on the retry. Just leave files.
# Ex. if we're in the looping cloud test deleting sandbox.
excvalue = exc[1]
print "Retrying shutil.rmtree of sandbox (2 sec delay). Will ignore errors. Exception was", excvalue.errno
time.sleep(2)
try:
func(path)
except OSError:
pass
LOG_DIR = get_sandbox_name()
def clean_sandbox():
IS_THIS_FASTER = True
if os.path.exists(LOG_DIR):
# shutil.rmtree hangs if symlinks in the dir? (in syn_datasets for multifile parse)
# use os.remove() first
for f in glob.glob(LOG_DIR + '/syn_datasets/*'):
verboseprint("cleaning", f)
os.remove(f)
# shutil.rmtree fails to delete very long filenames on Windoze
### shutil.rmtree(LOG_DIR)
# was this on 3/5/13. This seems reliable on windows+cygwin
# I guess I changed back to rmtree below with something to retry, then ignore, remove errors.
# is it okay now on windows+cygwin?
### os.system("rm -rf "+LOG_DIR)
print "Removing", LOG_DIR, "(if slow, might be old ice dir spill files)"
start = time.time()
if IS_THIS_FASTER:
try:
os.system("rm -rf "+LOG_DIR)
except OSError:
pass
else:
shutil.rmtree(LOG_DIR, ignore_errors=False, onerror=handleRemoveError)
elapsed = time.time() - start
print "Took %s secs to remove %s" % (elapsed, LOG_DIR)
# it should have been removed, but on error it might still be there
if not os.path.exists(LOG_DIR):
os.mkdir(LOG_DIR)
# who knows if this one is ok with windows...doesn't rm dir, just
# the stdout/stderr files
def clean_sandbox_stdout_stderr():
if os.path.exists(LOG_DIR):
files = []
# glob.glob returns an iterator
for f in glob.glob(LOG_DIR + '/*stdout*'):
verboseprint("cleaning", f)
os.remove(f)
for f in glob.glob(LOG_DIR + '/*stderr*'):
verboseprint("cleaning", f)
os.remove(f)
def clean_sandbox_doneToLine():
if os.path.exists(LOG_DIR):
files = []
# glob.glob returns an iterator
for f in glob.glob(LOG_DIR + '/*doneToLine*'):
verboseprint("cleaning", f)
os.remove(f)
def tmp_file(prefix='', suffix='', tmp_dir=None):
if not tmp_dir:
tmpdir = LOG_DIR
else:
tmpdir = tmp_dir
fd, path = tempfile.mkstemp(prefix=prefix, suffix=suffix, dir=tmpdir)
# make sure the file now exists
# os.open(path, 'a').close()
# give everyone permission to read it (jenkins running as
# 0xcustomer needs to archive as jenkins
permissions = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
os.chmod(path, permissions)
return (fd, path)
def tmp_dir(prefix='', suffix=''):
return tempfile.mkdtemp(prefix=prefix, suffix=suffix, dir=LOG_DIR)
def log(cmd, comment=None):
filename = LOG_DIR + '/commands.log'
# everyone can read
with open(filename, 'a') as f:
f.write(str(datetime.datetime.now()) + ' -- ')
# what got sent to h2o
# f.write(cmd)
# let's try saving the unencoded url instead..human readable
if cmd:
f.write(urlparse.unquote(cmd))
if comment:
f.write(' #')
f.write(comment)
f.write("\n")
elif comment: # for comment-only
f.write(comment + "\n")
# jenkins runs as 0xcustomer, and the file wants to be archived by jenkins who isn't in his group
permissions = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
os.chmod(filename, permissions)
def make_syn_dir():
# move under sandbox
# the LOG_DIR must have been created for commands.log before any datasets would be created
SYNDATASETS_DIR = LOG_DIR + '/syn_datasets'
if os.path.exists(SYNDATASETS_DIR):
shutil.rmtree(SYNDATASETS_DIR)
os.mkdir(SYNDATASETS_DIR)
return SYNDATASETS_DIR
def dump_json(j):
return json.dumps(j, sort_keys=True, indent=2)
# can't have a list of cmds, because cmd is a list
# cmdBefore gets executed first, and we wait for it to complete
def spawn_cmd(name, cmd, capture_output=True, **kwargs):
if capture_output:
outfd, outpath = tmp_file(name + '.stdout.', '.log')
errfd, errpath = tmp_file(name + '.stderr.', '.log')
# everyone can read
ps = psutil.Popen(cmd, stdin=None, stdout=outfd, stderr=errfd, **kwargs)
else:
outpath = '<stdout>'
errpath = '<stderr>'
ps = psutil.Popen(cmd, **kwargs)
comment = 'PID %d, stdout %s, stderr %s' % (
ps.pid, os.path.basename(outpath), os.path.basename(errpath))
log(' '.join(cmd), comment=comment)
return (ps, outpath, errpath)
def spawn_wait(ps, stdout, stderr, capture_output=True, timeout=None):
rc = ps.wait(timeout)
if capture_output:
out = file(stdout).read()
err = file(stderr).read()
else:
out = 'stdout not captured'
err = 'stderr not captured'
if rc is None:
ps.terminate()
raise Exception("%s %s timed out after %d\nstdout:\n%s\n\nstderr:\n%s" %
(ps.name, ps.cmdline, timeout or 0, out, err))
elif rc != 0:
raise Exception("%s %s failed.\nstdout:\n%s\n\nstderr:\n%s" %
(ps.name, ps.cmdline, out, err))
return rc
def spawn_cmd_and_wait(name, cmd, capture_output=True, timeout=None, **kwargs):
(ps, stdout, stderr) = spawn_cmd(name, cmd, capture_output, **kwargs)
spawn_wait(ps, stdout, stderr, capture_output, timeout)
# used to get a browser pointing to the last RFview
global json_url_history
json_url_history = []
global nodes
nodes = []
# I suppose we could shuffle the flatfile order!
# but it uses hosts, so if that got shuffled, we got it covered?
# the i in xrange part is not shuffled. maybe create the list first, for possible random shuffle
# FIX! default to random_shuffle for now..then switch to not.
def write_flatfile(node_count=2, base_port=None, hosts=None, rand_shuffle=True):
# too bad this must be in two places..here and build_cloud()..could do a global above?
base_port = get_base_port(base_port)
# always create the flatfile.
ports_per_node = 2
pff = open(flatfile_pathname(), "w+")
# doing this list outside the loops so we can shuffle for better test variation
hostPortList = []
if hosts is None:
ip = python_cmd_ip
for i in range(node_count):
hostPortList.append(ip + ":" + str(base_port + ports_per_node * i))
else:
for h in hosts:
for i in range(node_count):
# removed leading "/"
hostPortList.append(h.addr + ":" + str(base_port + ports_per_node * i))
# note we want to shuffle the full list of host+port
if rand_shuffle:
random.shuffle(hostPortList)
for hp in hostPortList:
pff.write(hp + "\n")
pff.close()
def check_h2o_version():
# assumes you want to know about 3 ports starting at base_port
command1Split = ['java', '-jar', find_file('target/h2o.jar'), '--version']
command2Split = ['egrep', '-v', '( Java | started)']
print "Running h2o to get java version"
p1 = Popen(command1Split, stdout=PIPE)
p2 = Popen(command2Split, stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]
print output
def default_hosts_file():
if os.environ.has_key("H2O_HOSTS_FILE"):
return os.environ["H2O_HOSTS_FILE"]
return 'pytest_config-{0}.json'.format(getpass.getuser())
# node_count is number of H2O instances per host if hosts is specified.
def decide_if_localhost():
# First, look for local hosts file
hostsFile = default_hosts_file()
if config_json:
print "* Using config JSON you passed as -cj argument:", config_json
return False
if os.path.exists(hostsFile):
print "* Using config JSON file discovered in this directory: {0}.".format(hostsFile)
return False
if 'hosts' in os.getcwd():
print "Since you're in a *hosts* directory, we're using a config json"
print "* Expecting default username's config json here. Better exist!"
return False
print "No config json used. Launching local cloud..."
return True
def setup_random_seed(seed=None):
if random_seed is not None:
SEED = random_seed
elif seed is not None:
SEED = seed
else:
SEED = random.randint(0, sys.maxint)
random.seed(SEED)
print "\nUsing random seed:", SEED
return SEED
# assume h2o_nodes_json file in the current directory
def build_cloud_with_json(h2o_nodes_json='h2o-nodes.json'):
log("#*********************************************************************")
log("Starting new test: " + python_test_name + " at build_cloud_with_json()")
log("#*********************************************************************")
print "This only makes sense if h2o is running as defined by", h2o_nodes_json
print "For now, assuming it's a cloud on this machine, and here's info on h2o processes running here"
print "No output means no h2o here! Some other info about stuff on the system is printed first though."
import h2o_os_util
if not os.path.exists(h2o_nodes_json):
raise Exception("build_cloud_with_json: Can't find " + h2o_nodes_json + " file")
# h2o_os_util.show_h2o_processes()
with open(h2o_nodes_json, 'rb') as f:
cloneJson = json.load(f)
# These are supposed to be in the file.
# Just check the first one. if not there, the file must be wrong
if not 'cloud_start' in cloneJson:
raise Exception("Can't find 'cloud_start' in %s, wrong file? h2o-nodes.json?" % h2o_nodes_json)
else:
cs = cloneJson['cloud_start']
print "Info on the how the cloud we're cloning was apparently started (info from %s)" % h2o_nodes_json
# required/legal values in 'cloud_start'. A robust check is good for easy debug when we add stuff
# for instance, if you didn't get the right/latest h2o-nodes.json! (we could check how old the cloud start is?)
valList = ['time', 'cwd', 'python_test_name', 'python_cmd_line', 'config_json', 'username', 'ip']
for v in valList:
if v not in cs:
raise Exception("Can't find %s in %s, wrong file or version change?" % (v, h2o_nodes_json))
print "cloud_start['%s']: %s" % (v, cs[v])
### # write out something that shows how the cloud could be rebuilt, since it's a decoupled cloud build.
### build_cloud_rerun_sh = LOG_DIR + "/" + 'build_cloud_rerun.sh'
### with open(build_cloud_rerun_sh, 'w') as f:
### f.write("echo << ! > ./temp_for_build_cloud_rerun.sh\n")
### f.write("echo 'Rebuilding a cloud built with %s at %s by %s on %s in %s'\n" % \
### (cs['python_test_name'], cs['time'], cs['username'], cs['ip'], cs['cwd']))
### f.write("cd %s\n" % cs['cwd'])
### if cs['config_json']:
### f.write("%s -cj %s\n" % (cs['python_cmd_line'], cs['config_json']))
### else:
### f.write("%s\n" % cs['python_cmd_line'])
### f.write("!\n")
### f.write("ssh %s@%s < ./temp_for_build_cloud_rerun.sh\n" % (cs['username'], cs['ip']))
### # make it executable
### t = os.stat(build_cloud_rerun_sh)
### os.chmod(build_cloud_rerun_sh, t.st_mode | stat.S_IEXEC)
# this is the internal node state for python..h2o.nodes rebuild
nodeStateList = cloneJson['h2o_nodes']
nodeList = []
if not nodeStateList:
raise Exception("nodeStateList is empty. %s file must be empty/corrupt" % h2o_nodes_json)
for nodeState in nodeStateList:
print "Cloning state for node", nodeState['node_id'], 'from', h2o_nodes_json
newNode = ExternalH2O(nodeState)
nodeList.append(newNode)
print ""
h2p.red_print("Ingested from json:", nodeList[0].java_heap_GB, "GB java heap(s) with", len(nodeList), "total nodes")
print ""
nodes[:] = nodeList
# put the test start message in the h2o log, to create a marker
nodes[0].h2o_log_msg()
return nodeList
def setup_benchmark_log():
# an object to keep stuff out of h2o.py
global cloudPerfH2O
cloudPerfH2O = h2o_perf.PerfH2O(python_test_name)
# node_count is per host if hosts is specified.
def build_cloud(node_count=1, base_port=None, hosts=None,
timeoutSecs=30, retryDelaySecs=1, cleanup=True, rand_shuffle=True,
conservative=False, create_json=False, clone_cloud=None, init_sandbox=True, **kwargs):
# redirect to build_cloud_with_json if a command line arg
# wants to force a test to ignore it's build_cloud/build_cloud_with_hosts
# (both come thru here)
# clone_cloud is just another way to get the effect (maybe ec2 config file thru
# build_cloud_with_hosts?
if not disable_time_stamp:
sys.stdout = OutWrapper(sys.stdout)
if clone_cloud_json or clone_cloud:
nodeList = build_cloud_with_json(
h2o_nodes_json=clone_cloud_json if clone_cloud_json else clone_cloud)
return nodeList
# moved to here from unit_main. so will run with nosetests too!
# Normally do this. Don't do it if build_cloud_with_hosts() did and put a flatfile in there already!
if init_sandbox:
clean_sandbox()
log("#*********************************************************************")
log("Starting new test: " + python_test_name + " at build_cloud()")
log("#*********************************************************************")
# start up h2o to report the java version (once). output to python stdout
# only do this for regression testing
if getpass.getuser() == 'jenkins':
check_h2o_version()
# keep this param in kwargs, because we pass it to the H2O node build, so state
# is created that polling and other normal things can check, to decide to dump
# info to benchmark.log
if kwargs.setdefault('enable_benchmark_log', False):
setup_benchmark_log()
ports_per_node = 2
nodeList = []
# see if we need to shift the port used to run groups of tests on the same machine at the same time
base_port = get_base_port(base_port)
try:
# if no hosts list, use psutil method on local host.
totalNodes = 0
# doing this list outside the loops so we can shuffle for better test variation
# this jvm startup shuffle is independent from the flatfile shuffle
portList = [base_port + ports_per_node * i for i in range(node_count)]
if hosts is None:
# if use_flatfile, we should create it,
# because tests will just call build_cloud with use_flatfile=True
# best to just create it all the time..may or may not be used
write_flatfile(node_count=node_count, base_port=base_port)
hostCount = 1
if rand_shuffle:
random.shuffle(portList)
for p in portList:
verboseprint("psutil starting node", i)
newNode = LocalH2O(port=p, node_id=totalNodes, **kwargs)
nodeList.append(newNode)
totalNodes += 1
else:
# if hosts, the flatfile was created and uploaded to hosts already
# I guess don't recreate it, don't overwrite the one that was copied beforehand.
# we don't always use the flatfile (use_flatfile=False)
# Suppose we could dispatch from the flatfile to match it's contents
# but sometimes we want to test with a bad/different flatfile then we invoke h2o?
hostCount = len(hosts)
hostPortList = []
for h in hosts:
for port in portList:
hostPortList.append((h, port))
if rand_shuffle: random.shuffle(hostPortList)
for (h, p) in hostPortList:
verboseprint('ssh starting node', totalNodes, 'via', h)
newNode = h.remote_h2o(port=p, node_id=totalNodes, **kwargs)
nodeList.append(newNode)
totalNodes += 1
verboseprint("Attempting Cloud stabilize of", totalNodes, "nodes on", hostCount, "hosts")
start = time.time()
# UPDATE: best to stabilize on the last node!
stabilize_cloud(nodeList[0], nodeList,
timeoutSecs=timeoutSecs, retryDelaySecs=retryDelaySecs, noExtraErrorCheck=True)
verboseprint(len(nodeList), "Last added node stabilized in ", time.time() - start, " secs")
verboseprint("Built cloud: %d nodes on %d hosts, in %d s" % \
(len(nodeList), hostCount, (time.time() - start)))
h2p.red_print("Built cloud:", nodeList[0].java_heap_GB, "GB java heap(s) with", len(nodeList), "total nodes")
# FIX! using "consensus" in node[-1] should mean this is unnecessary?
# maybe there's a bug. For now do this. long term: don't want?
# UPDATE: do it for all cases now 2/14/13
if conservative: # still needed?
for n in nodeList:
stabilize_cloud(n, nodeList, timeoutSecs=timeoutSecs, noExtraErrorCheck=True)
# this does some extra checking now
# verifies cloud name too if param is not None
verify_cloud_size(nodeList, expectedCloudName=nodeList[0].cloud_name)
# best to check for any errors due to cloud building right away?
check_sandbox_for_errors(python_test_name=python_test_name)
except:
# nodeList might be empty in some exception cases?
# no shutdown issued first, though
if cleanup and nodeList:
for n in nodeList: n.terminate()
else:
nodes[:] = nodeList
check_sandbox_for_errors(python_test_name=python_test_name)
raise
# this is just in case they don't assign the return to the nodes global?
nodes[:] = nodeList
print len(nodeList), "total jvms in H2O cloud"
# put the test start message in the h2o log, to create a marker
nodes[0].h2o_log_msg()
if config_json:
# like cp -p. Save the config file, to sandbox
print "Saving the ", config_json, "we used to", LOG_DIR
shutil.copy(config_json, LOG_DIR + "/" + os.path.basename(config_json))
# Figure out some stuff about how this test was run
cs_time = str(datetime.datetime.now())
cs_cwd = os.getcwd()
cs_python_cmd_line = "python %s %s" % (python_test_name, python_cmd_args)
cs_python_test_name = python_test_name
if config_json:
cs_config_json = os.path.abspath(config_json)
else:
cs_config_json = None
cs_username = python_username
cs_ip = python_cmd_ip
### # write out something that shows how the test could be rerun (could be a cloud build, a mix, or test only)
### print "Writing the test_rerun.sh in", LOG_DIR
### test_rerun_sh = LOG_DIR + "/" + 'test_rerun.sh'
### with open(test_rerun_sh, 'w') as f:
### f.write("echo << ! > ./temp_for_test_rerun.sh\n")
### f.write("echo 'rerunning %s that originally ran at %s by %s on %s in %s'\n" % \
### (cs_python_test_name, cs_time, cs_username, cs_ip, cs_cwd))
### f.write("cd %s\n" % cs_cwd)
### if cs_config_json:
### f.write("%s -cj %s\n" % (cs_python_cmd_line, cs_config_json))
### else:
### f.write("%s\n" % cs_python_cmd_line)
### f.write("!\n")
### f.write("ssh %s@%s < temp_for_test_rerun.sh\n" % (cs_username, cs_ip))
###
### # make it executable
### t = os.stat(test_rerun_sh)
### os.chmod(test_rerun_sh, t.st_mode | stat.S_IEXEC)
# dump the h2o.nodes state to a json file # include enough extra info to have someone
# rebuild the cloud if a test fails that was using that cloud.
if create_json:
q = {
'cloud_start':
{
'time': cs_time,
'cwd': cs_cwd,
'python_test_name': cs_python_test_name,
'python_cmd_line': cs_python_cmd_line,
'config_json': cs_config_json,
'username': cs_username,
'ip': cs_ip,
},
'h2o_nodes': h2o_util.json_repr(nodes),
}
with open('h2o-nodes.json', 'w+') as f:
f.write(json.dumps(q, indent=4))
return nodeList
def upload_jar_to_remote_hosts(hosts, slow_connection=False):
def prog(sofar, total):
# output is bad for jenkins.
username = getpass.getuser()
if username != 'jenkins':
p = int(10.0 * sofar / total)
sys.stdout.write('\rUploading jar [%s%s] %02d%%' % ('#' * p, ' ' * (10 - p), 100 * sofar / total))
sys.stdout.flush()
if not slow_connection:
for h in hosts:
f = find_file('target/h2o.jar')
h.upload_file(f, progress=prog)
# skipping progress indicator for the flatfile
h.upload_file(flatfile_pathname())
else:
f = find_file('target/h2o.jar')
hosts[0].upload_file(f, progress=prog)
hosts[0].push_file_to_remotes(f, hosts[1:])
f = find_file(flatfile_pathname())
hosts[0].upload_file(f, progress=prog)
hosts[0].push_file_to_remotes(f, hosts[1:])
def check_sandbox_for_errors(cloudShutdownIsError=False, sandboxIgnoreErrors=False, python_test_name=''):
# dont' have both tearDown and tearDownClass report the same found error
# only need the first
if nodes and nodes[0].sandbox_error_report(): # gets current state
return
# Can build a cloud that ignores all sandbox things that normally fatal the test
# Kludge, test will set this directly if it wants, rather than thru build_cloud parameter.
# we need the sandbox_ignore_errors, for the test teardown_cloud..the state disappears!
ignore = sandboxIgnoreErrors or (nodes and nodes[0].sandbox_ignore_errors)
errorFound = h2o_sandbox.check_sandbox_for_errors(
LOG_DIR=LOG_DIR,
sandboxIgnoreErrors=ignore,
cloudShutdownIsError=cloudShutdownIsError,
python_test_name=python_test_name)
if errorFound and nodes:
nodes[0].sandbox_error_report(True) # sets
def tear_down_cloud(nodeList=None, sandboxIgnoreErrors=False):
if sleep_at_tear_down:
print "Opening browser to cloud, and sleeping for 3600 secs, before cloud teardown (for debug)"
import h2o_browse
h2b.browseTheCloud()
sleep(3600)
if not nodeList: nodeList = nodes
# could the nodeList still be empty in some exception cases? Assume not for now
try:
# update: send a shutdown to all nodes. h2o maybe doesn't progagate well if sent to one node
# the api watchdog shouldn't complain about this?
for n in nodeList:
n.shutdown_all()
except:
pass
# ah subtle. we might get excepts in issuing the shutdown, don't abort out
# of trying the process kills if we get any shutdown exception (remember we go to all nodes)
# so we might? nodes are shutting down?
# FIX! should we wait a bit for a clean shutdown, before we process kill? It can take more than 1 sec though.
try:
time.sleep(2)
for n in nodeList:
n.terminate()
verboseprint("tear_down_cloud n:", n)
except:
pass
check_sandbox_for_errors(sandboxIgnoreErrors=sandboxIgnoreErrors, python_test_name=python_test_name)
# get rid of all those pesky line marker files. Unneeded now
clean_sandbox_doneToLine()
nodeList[:] = []
# don't need any more?
# Used before to make sure cloud didn't go away between unittest defs
def touch_cloud(nodeList=None):
if not nodeList: nodeList = nodes
for n in nodeList:
n.is_alive()
# timeoutSecs is per individual node get_cloud()
# verify cloud name if cloudName provided
def verify_cloud_size(nodeList=None, expectedCloudName=None, verbose=False, timeoutSecs=10, ignoreHealth=False):
if not nodeList: nodeList = nodes
expectedSize = len(nodeList)
# cloud size and consensus have to reflect a single grab of information from a node.
cloudStatus = [n.get_cloud(timeoutSecs=timeoutSecs) for n in nodeList]
# get cloud_name from all
cloudSizes = [c['cloud_size'] for c in cloudStatus]
cloudConsensus = [c['consensus'] for c in cloudStatus]
cloudHealthy = [c['cloud_healthy'] for c in cloudStatus]
cloudName = [c['cloud_name'] for c in cloudStatus]
if not all(cloudHealthy):
msg = "Some node reported cloud_healthy not true: %s" % cloudHealthy
if not ignoreHealth:
raise Exception(msg)
# gather up all the node_healthy status too
for i, c in enumerate(cloudStatus):
nodesHealthy = [n['node_healthy'] for n in c['nodes']]
if not all(nodesHealthy):
print "node %s cloud status: %s" % (i, dump_json(c))
msg = "node %s says some node is not reporting node_healthy: %s" % (c['node_name'], nodesHealthy)
if not ignoreHealth:
raise Exception(msg)
if expectedSize == 0 or len(cloudSizes) == 0 or len(cloudConsensus) == 0:
print "\nexpectedSize:", expectedSize
print "cloudSizes:", cloudSizes
print "cloudConsensus:", cloudConsensus
raise Exception("Nothing in cloud. Can't verify size")
for s in cloudSizes:
consensusStr = (",".join(map(str, cloudConsensus)))
sizeStr = (",".join(map(str, cloudSizes)))
if (s != expectedSize):
raise Exception("Inconsistent cloud size." +
"nodeList report size: %s consensus: %s instead of %d." % \
(sizeStr, consensusStr, expectedSize))
# check that all cloud_names are right
if expectedCloudName:
for i, cn in enumerate(cloudName):
if cn != expectedCloudName:
# tear everyone down, in case of zombies. so we don't have to kill -9 manually
print "node %s has the wrong cloud name: %s expectedCloudName: %s."
# print "node %s cloud status: %s" % (i, dump_json(cloudStatus[i]))
print "tearing cloud down"
tear_down_cloud(nodeList=nodeList, sandboxIgnoreErrors=False)
raise Exception("node %s has the wrong cloud name: %s expectedCloudName: %s" % \
(i, cn, expectedCloudName))
return (sizeStr, consensusStr, expectedSize)
def stabilize_cloud(node, nodeList, timeoutSecs=14.0, retryDelaySecs=0.25, noExtraErrorCheck=False):
node_count = len(nodeList)