forked from h2oai/h2o-2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathh2o.py
1852 lines (1579 loc) · 73.5 KB
/
h2o.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import time, os, json, signal, tempfile, shutil, datetime, inspect, threading, os.path, getpass
import requests, psutil, argparse, sys, unittest, glob
import h2o_browse as h2b, h2o_perf, h2o_util
import re, webbrowser, random
# used in shutil.rmtree permission hack for windows
import errno
# use to unencode the urls sent to h2o?
import urlparse
import logging
# for log_download
import requests, zipfile, StringIO
# For checking ports in use, using netstat thru a subprocess.
from subprocess import Popen, PIPE
# The cloud is uniquely named per user (only)
# Fine to uniquely identify the flatfile by name only also?
# Both are the user that runs the test. The config might have a different username on the
# remote machine (0xdiag, say, or hduser)
def flatfile_name():
return('pytest_flatfile-%s' %getpass.getuser())
def cloud_name():
return('pytest-%s-%s' % (getpass.getuser(), os.getpid()))
# return('pytest-%s' % getpass.getuser())
def __drain(src, dst):
for l in src:
if type(dst) == type(0):
os.write(dst, l)
else:
dst.write(l)
dst.flush()
src.close()
if type(dst) == type(0):
os.close(dst)
def drain(src, dst):
t = threading.Thread(target=__drain, args=(src,dst))
t.daemon = True
t.start()
def unit_main():
global python_test_name
python_test_name = inspect.stack()[1][1]
print "\nRunning: python", python_test_name
# moved clean_sandbox out of here, because nosetests doesn't execute h2o.unit_main in our tests.
# UPDATE: ..is that really true? I'm seeing the above print in the console output runnning
# jenkins with nosetests
parse_our_args()
unittest.main()
# Global disable. used to prevent browsing when running nosetests, or when given -bd arg
# Defaults to true, if user=jenkins, h2o.unit_main isn't executed, so parse_our_args isn't executed.
# Since nosetests doesn't execute h2o.unit_main, it should have the browser disabled.
browse_disable = True
browse_json = False
verbose = False
ipaddr = None
config_json = None
debugger = False
random_udp_drop = False
# jenkins gets this assign, but not the unit_main one?
python_test_name = inspect.stack()[1][1]
def parse_our_args():
parser = argparse.ArgumentParser()
# can add more here
parser.add_argument('-bd', '--browse_disable', help="Disable any web browser stuff. Needed for batch. nosetests and jenkins disable browser through other means already, so don't need", action='store_true')
parser.add_argument('-b', '--browse_json', help='Pops a browser to selected json equivalent urls. Selective. Also keeps test alive (and H2O alive) till you ctrl-c. Then should do clean exit', action='store_true')
parser.add_argument('-v', '--verbose', help='increased output', action='store_true')
parser.add_argument('-ip', '--ip', type=str, help='IP address to use for single host H2O with psutil control')
parser.add_argument('-cj', '--config_json', help='Use this json format file to provide multi-host defaults. Overrides the default file pytest_config-<username>.json. These are used only if you do build_cloud_with_hosts()')
parser.add_argument('-dbg', '--debugger', help='Launch java processes with java debug attach mechanisms', action='store_true')
parser.add_argument('-rud', '--random_udp_drop', help='Drop 20 pct. of the UDP packets at the receive side', action='store_true')
parser.add_argument('unittest_args', nargs='*')
args = parser.parse_args()
global browse_disable, browse_json, verbose, ipaddr, config_json, debugger, random_udp_drop
browse_disable = args.browse_disable or getpass.getuser()=='jenkins'
browse_json = args.browse_json
verbose = args.verbose
ipaddr = args.ip
config_json = args.config_json
debugger = args.debugger
random_udp_drop = args.random_udp_drop
# Set sys.argv to the unittest args (leav sys.argv[0] as is)
# FIX! this isn't working to grab the args we don't care about
# Pass "--failfast" to stop on first error to unittest. and -v
# won't get this for jenkins, since it doesn't do parse_our_args
sys.argv[1:] = ['-v', "--failfast"] + args.unittest_args
# sys.argv[1:] = args.unittest_args
def verboseprint(*args, **kwargs):
if verbose:
for x in args: # so you don't have to create a single string
print x,
for x in kwargs: # so you don't have to create a single string
print x,
print
# so we can see problems when hung?
sys.stdout.flush()
def find_dataset(f):
(head, tail) = os.path.split(os.path.abspath('datasets'))
verboseprint("find_dataset looking upwards from", head, "for", tail)
# don't spin forever
levels = 0
while not (os.path.exists(os.path.join(head, tail))):
head = os.path.split(head)[0]
levels += 1
if (levels==10):
raise Exception("unable to find datasets. Did you git it?")
return os.path.join(head, tail, f)
def find_file(base):
f = base
if not os.path.exists(f): f = '../' + base
if not os.path.exists(f): f = '../../' + base
if not os.path.exists(f): f = 'py/' + base
if not os.path.exists(f):
raise Exception("unable to find file %s" % base)
return f
# Return file size.
def get_file_size(f):
return os.path.getsize(f)
# Splits file into chunks of given size and returns an iterator over chunks.
def iter_chunked_file(file, chunk_size=2048):
return iter(lambda: file.read(chunk_size), '')
# shutil.rmtree doesn't work on windows if the files are read only.
# On unix the parent dir has to not be readonly too.
# May still be issues with owner being different, like if 'system' is the guy running?
# Apparently this escape function on errors is the way shutil.rmtree can
# handle the permission issue. (do chmod here)
# But we shouldn't have read-only files. So don't try to handle that case.
def handleRemoveError(func, path, exc):
# If there was an error, it could be due to windows holding onto files.
# Wait a bit before retrying. Ignore errors on the retry. Just leave files.
# Ex. if we're in the looping cloud test deleting sandbox.
excvalue = exc[1]
print "Retrying shutil.rmtree of sandbox (2 sec delay). Will ignore errors. Exception was", excvalue.errno
time.sleep(2)
try:
func(path)
except OSError:
pass
LOG_DIR = 'sandbox'
def clean_sandbox():
if os.path.exists(LOG_DIR):
# shutil.rmtree fails to delete very long filenames on Windoze
#shutil.rmtree(LOG_DIR)
# was this on 3/5/13. This seems reliable on windows+cygwin
### os.system("rm -rf "+LOG_DIR)
shutil.rmtree(LOG_DIR, ignore_errors=False, onerror=handleRemoveError)
# it should have been removed, but on error it might still be there
if not os.path.exists(LOG_DIR):
os.mkdir(LOG_DIR)
# who knows if this one is ok with windows...doesn't rm dir, just
# the stdout/stderr files
def clean_sandbox_stdout_stderr():
if os.path.exists(LOG_DIR):
files = []
# glob.glob returns an iterator
for f in glob.glob(LOG_DIR + '/*stdout*'):
verboseprint("cleaning", f)
os.remove(f)
for f in glob.glob(LOG_DIR + '/*stderr*'):
verboseprint("cleaning", f)
os.remove(f)
def tmp_file(prefix='', suffix=''):
return tempfile.mkstemp(prefix=prefix, suffix=suffix, dir=LOG_DIR)
def tmp_dir(prefix='', suffix=''):
return tempfile.mkdtemp(prefix=prefix, suffix=suffix, dir=LOG_DIR)
def log(cmd, comment=None):
with open(LOG_DIR + '/commands.log', 'a') as f:
f.write(str(datetime.datetime.now()) + ' -- ')
# what got sent to h2o
# f.write(cmd)
# let's try saving the unencoded url instead..human readable
f.write(urlparse.unquote(cmd))
if comment:
f.write(' #')
f.write(comment)
f.write("\n")
def make_syn_dir():
SYNDATASETS_DIR = './syn_datasets'
if os.path.exists(SYNDATASETS_DIR):
shutil.rmtree(SYNDATASETS_DIR)
os.mkdir(SYNDATASETS_DIR)
return SYNDATASETS_DIR
def dump_json(j):
return json.dumps(j, sort_keys=True, indent=2)
# Hackery: find the ip address that gets you to Google's DNS
# Trickiness because you might have multiple IP addresses (Virtualbox), or Windows.
# we used to not like giving ip 127.0.0.1 to h2o?
def get_ip_address():
if ipaddr:
verboseprint("get_ip case 1:", ipaddr)
return ipaddr
import socket
ip = '127.0.0.1'
# this method doesn't work if vpn is enabled..it gets the vpn ip
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8',0))
ip = s.getsockname()[0]
verboseprint("get_ip case 2:", ip)
except:
pass
if ip.startswith('127'):
ip = socket.getaddrinfo(socket.gethostname(), None)[0][4][0]
verboseprint("get_ip case 3:", ip)
ipa = None
for ips in socket.gethostbyname_ex(socket.gethostname())[2]:
# only take the first
if ipa is None and not ips.startswith("127."):
ipa = ips[:]
verboseprint("get_ip case 4:", ipa)
if ip != ipa:
print "\nAssuming", ip, "is the ip address h2o will use but", ipa, "is probably the real ip?"
print "You might have a vpn active. Best to use '-ip "+ipa+"' to get python and h2o the same."
verboseprint("get_ip_address:", ip)
return ip
def spawn_cmd(name, args, capture_output=True):
if capture_output:
outfd,outpath = tmp_file(name + '.stdout.', '.log')
errfd,errpath = tmp_file(name + '.stderr.', '.log')
# make args a string, so we can do ulimit, and execute with shell
if (1==0):
argsStr = 'ulimit -u 1024;' + ' '.join(args)
ps = psutil.Popen(argsStr, stdin=None, stdout=outfd, stderr=errfd, shell=True)
else:
ps = psutil.Popen(args, stdin=None, stdout=outfd, stderr=errfd)
else:
outpath = '<stdout>'
errpath = '<stderr>'
ps = psutil.Popen(args)
comment = 'PID %d, stdout %s, stderr %s' % (
ps.pid, os.path.basename(outpath), os.path.basename(errpath))
log(' '.join(args), comment=comment)
return (ps, outpath, errpath)
def spawn_cmd_and_wait(name, args, timeout=None):
(ps, stdout, stderr) = spawn_cmd(name, args)
rc = ps.wait(timeout)
out = file(stdout).read()
err = file(stderr).read()
if rc is None:
ps.terminate()
raise Exception("%s %s timed out after %d\nstdout:\n%s\n\nstderr:\n%s" %
(name, args, timeout or 0, out, err))
elif rc != 0:
raise Exception("%s %s failed.\nstdout:\n%s\n\nstderr:\n%s" % (name, args, out, err))
# used to get a browser pointing to the last RFview
global json_url_history
json_url_history = []
global nodes
nodes = []
# I suppose we could shuffle the flatfile order!
# but it uses hosts, so if that got shuffled, we got it covered?
# the i in xrange part is not shuffled. maybe create the list first, for possible random shuffle
# FIX! default to random_shuffle for now..then switch to not.
def write_flatfile(node_count=2, base_port=54321, hosts=None, rand_shuffle=True):
# always create the flatfile.
ports_per_node = 2
pff = open(flatfile_name(), "w+")
# doing this list outside the loops so we can shuffle for better test variation
hostPortList = []
if hosts is None:
ip = get_ip_address()
for i in range(node_count):
hostPortList.append("/" + ip + ":" + str(base_port + ports_per_node*i))
else:
for h in hosts:
for i in range(node_count):
hostPortList.append("/" + h.addr + ":" + str(base_port + ports_per_node*i))
# note we want to shuffle the full list of host+port
if rand_shuffle:
random.shuffle(hostPortList)
for hp in hostPortList:
pff.write(hp + "\n")
pff.close()
def check_port_group(base_port):
# for now, only check for jenkins or kevin
if (1==1):
username = getpass.getuser()
if username=='jenkins' or username=='kevin' or username=='michal':
# assumes you want to know about 3 ports starting at base_port
command1Split = ['netstat', '-anp']
command2Split = ['egrep']
# colon so only match ports. space at end? so no submatches
command2Split.append("(%s | %s)" % (base_port, base_port+1) )
command3Split = ['wc','-l']
print "Checking 2 ports starting at ", base_port
print ' '.join(command2Split)
# use netstat thru subprocess
p1 = Popen(command1Split, stdout=PIPE)
p2 = Popen(command2Split, stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]
print output
def default_hosts_file():
return 'pytest_config-{0}.json'.format(getpass.getuser())
# node_count is number of H2O instances per host if hosts is specified.
def decide_if_localhost():
# First, look for local hosts file
hostsFile = default_hosts_file()
if config_json:
print "* Using config JSON you passed as -cj argument:", config_json
return False
if os.path.exists(hostsFile):
print "* Using matching username config JSON file discovered in this directory: {0}.".format(hostsFile)
return False
if 'hosts' in os.getcwd():
print "Since you're in a *hosts* directory, we're using a config json"
print "* Expecting default username's config json here. Better exist!"
return False
print "No config json used. Launching local cloud..."
return True
# node_count is per host if hosts is specified.
def build_cloud(node_count=2, base_port=54321, hosts=None,
timeoutSecs=30, retryDelaySecs=1, cleanup=True, rand_shuffle=True, **kwargs):
# moved to here from unit_main. so will run with nosetests too!
clean_sandbox()
# keep this param in kwargs, because we pass to the H2O node build, so state
# is created that polling and other normal things can check, to decide to dump
# info to benchmark.log
if kwargs.setdefault('enable_benchmark_log', False):
# an object to keep stuff out of h2o.py
global cloudPerfH2O
cloudPerfH2O = h2o_perf.PerfH2O(python_test_name)
ports_per_node = 2
node_list = []
try:
# if no hosts list, use psutil method on local host.
totalNodes = 0
# doing this list outside the loops so we can shuffle for better test variation
# this jvm startup shuffle is independent from the flatfile shuffle
portList = [base_port + ports_per_node*i for i in range(node_count)]
if hosts is None:
# if use_flatfile, we should create it,
# because tests will just call build_cloud with use_flatfile=True
# best to just create it all the time..may or may not be used
write_flatfile(node_count=node_count, base_port=base_port)
hostCount = 1
if rand_shuffle: random.shuffle(portList)
for p in portList:
verboseprint("psutil starting node", i)
newNode = LocalH2O(port=p, node_id=totalNodes, **kwargs)
node_list.append(newNode)
totalNodes += 1
else:
# if hosts, the flatfile was created and uploaded to hosts already
# I guess don't recreate it, don't overwrite the one that was copied beforehand.
# we don't always use the flatfile (use_flatfile=False)
# Suppose we could dispatch from the flatfile to match it's contents
# but sometimes we want to test with a bad/different flatfile then we invoke h2o?
hostCount = len(hosts)
hostPortList = []
for h in hosts:
for port in portList:
hostPortList.append( (h,port) )
if rand_shuffle: random.shuffle(hostPortList)
for (h,p) in hostPortList:
verboseprint('ssh starting node', totalNodes, 'via', h)
newNode = h.remote_h2o(port=p, node_id=totalNodes, **kwargs)
node_list.append(newNode)
totalNodes += 1
verboseprint("Attempting Cloud stabilize of", totalNodes, "nodes on", hostCount, "hosts")
start = time.time()
# UPDATE: best to stabilize on the last node!
stabilize_cloud(node_list[-1], len(node_list),
timeoutSecs=timeoutSecs, retryDelaySecs=retryDelaySecs)
verboseprint(len(node_list), "Last added node stabilized in ", time.time()-start, " secs")
verboseprint("Built cloud: %d node_list, %d hosts, in %d s" % (len(node_list),
hostCount, (time.time() - start)))
# FIX! using "consensus" in node[-1] should mean this is unnecessary?
# maybe there's a bug. For now do this. long term: don't want?
# UPDATE: do it for all cases now 2/14/13
for n in node_list:
stabilize_cloud(n, len(node_list), timeoutSecs=timeoutSecs)
# best to check for any errors due to cloud building right away?
check_sandbox_for_errors()
except:
if cleanup:
for n in node_list: n.terminate()
else:
nodes[:] = node_list
check_sandbox_for_errors()
raise
# this is just in case they don't assign the return to the nodes global?
nodes[:] = node_list
print len(node_list), "total jvms in H2O cloud"
return node_list
def upload_jar_to_remote_hosts(hosts, slow_connection=False):
def prog(sofar, total):
# output is bad for jenkins.
username = getpass.getuser()
if username!='jenkins':
p = int(10.0 * sofar / total)
sys.stdout.write('\rUploading jar [%s%s] %02d%%' % ('#'*p, ' '*(10-p), 100*sofar/total))
sys.stdout.flush()
if not slow_connection:
for h in hosts:
f = find_file('target/h2o.jar')
h.upload_file(f, progress=prog)
# skipping progress indicator for the flatfile
h.upload_file(flatfile_name())
else:
f = find_file('target/h2o.jar')
hosts[0].upload_file(f, progress=prog)
hosts[0].push_file_to_remotes(f, hosts[1:])
f = find_file(flatfile_name())
hosts[0].upload_file(f, progress=prog)
hosts[0].push_file_to_remotes(f, hosts[1:])
def check_sandbox_for_errors(sandbox_ignore_errors=False):
if not os.path.exists(LOG_DIR):
return
# dont' have both tearDown and tearDownClass report the same found error
# only need the first
if nodes and nodes[0].sandbox_error_report():
return
# FIX! wait for h2o to flush to files? how?
# Dump any assertion or error line to the screen
# Both "passing" and failing tests??? I guess that's good.
# if you find a problem, just keep printing till the end, in that file.
# The stdout/stderr is shared for the entire cloud session?
# so don't want to dump it multiple times?
errLines = []
for filename in os.listdir(LOG_DIR):
if re.search('stdout|stderr',filename):
sandFile = open(LOG_DIR + "/" + filename, "r")
# just in case error/assert is lower or upper case
# FIX! aren't we going to get the cloud building info failure messages
# oh well...if so ..it's a bug! "killing" is temp to detect jar mismatch error
regex1 = re.compile(
'found multiple|exception|error|ERRR|assert|killing|killed|required ports',
re.IGNORECASE)
regex2 = re.compile('Caused',re.IGNORECASE)
regex3 = re.compile('warn|info|TCP', re.IGNORECASE)
# there are many hdfs/apache messages with error in the text. treat as warning if they have '[WARN]'
# i.e. they start with:
# [WARN]
# if we started due to "warning" ...then if we hit exception, we don't want to stop
# we want that to act like a new beginning. Maybe just treat "warning" and "info" as
# single line events? that's better
printing = 0 # "printing" is per file.
lines = 0 # count per file! errLines accumulates for multiple files.
for line in sandFile:
# JIT reporting looks like this..don't detect that as an error
printSingleWarning = False
foundBad = False
if not ' bytes)' in line:
# no multiline FSM on this
# ignore the [WARN] from 'RestS3Service'
printSingleWarning = regex3.search(line) and not ('[Loaded ' in line) and not ('RestS3Service' in line)
# 13190 280 ### sun.nio.ch.DatagramChannelImpl::ensureOpen (16 bytes)
# FIX! temp to avoid the INFO in jan's latest logging
printSingleWarning = False
# don't detect these class loader info messags as errors
#[Loaded java.lang.Error from /usr/lib/jvm/java-7-oracle/jre/lib/rt.jar]
foundBad = regex1.search(line) and not (
('error rate' in line) or ('[Loaded ' in line) or ('[WARN]' in line))
if (printing==0 and foundBad):
printing = 1
lines = 1
elif (printing==1):
lines += 1
# if we've been printing, stop when you get to another error
# keep printing if the pattern match for the condition
# is on a line with "Caused" in it ("Caused by")
# only use caused for overriding an end condition
foundCaused = regex2.search(line)
# since the "at ..." lines may have the "bad words" in them, we also don't want
# to stop if a line has " *at " at the beginning.
# Update: Assertion can be followed by Exception.
# Make sure we keep printing for a min of 4 lines
foundAt = re.match(r'[\t ]+at ',line)
if foundBad and (lines>10) and not (foundCaused or foundAt):
printing = 2
if (printing==1):
# to avoid extra newline from print. line already has one
errLines.append(line)
sys.stdout.write(line)
if (printSingleWarning):
# don't print this one
if not re.search("Unable to load native-hadoop library for your platform", line):
sys.stdout.write(line)
sandFile.close()
sys.stdout.flush()
# already has \n in each line
# doing this kludge to put multiple line message in the python traceback,
# so it will be reported by jenkins. The problem with printing it to stdout
# is that we're in the tearDown class, and jenkins won't have this captured until
# after it thinks the test is done (tearDown is separate from the test)
# we probably could have a tearDown with the test rather than the class, but we
# would have to update all tests.
if len(errLines)!=0:
# check if the lines all start with INFO: or have "apache" in them
justInfo = True
for e in errLines:
justInfo &= re.match("INFO:", e) or ("apache" in e)
if not justInfo:
emsg1 = " check_sandbox_for_errors: Errors in sandbox stdout or stderr.\n" + \
"Could have occurred at any prior time\n\n"
emsg2 = "".join(errLines)
if nodes:
nodes[0].sandbox_error_report(True)
# can build a cloud that ignores all sandbox things that normally fatal the test
# kludge, test will set this directly if it wants, rather than thru build_cloud
# parameter.
# we need the sandbox_ignore_errors, for the test teardown_cloud..the state
# disappears!
if sandbox_ignore_errors or (nodes and nodes[0].sandbox_ignore_errors):
pass
else:
raise Exception(python_test_name + emsg1 + emsg2)
def tear_down_cloud(node_list=None, sandbox_ignore_errors=False):
if not node_list: node_list = nodes
try:
for n in node_list:
n.terminate()
verboseprint("tear_down_cloud n:", n)
finally:
check_sandbox_for_errors(sandbox_ignore_errors=sandbox_ignore_errors)
node_list[:] = []
# don't need any more?
# Used before to make sure cloud didn't go away between unittest defs
def touch_cloud(node_list=None):
if not node_list: node_list = nodes
for n in node_list:
n.is_alive()
def verify_cloud_size():
expectedSize = len(nodes)
cloudSizes = [n.get_cloud()['cloud_size'] for n in nodes]
cloudConsensus = [n.get_cloud()['consensus'] for n in nodes]
for s in cloudSizes:
consensusStr = (",".join(map(str,cloudConsensus)))
sizeStr = (",".join(map(str,cloudSizes)))
if (s != expectedSize):
raise Exception("Inconsistent cloud size." +
"nodes report size: %s consensus: %s instead of %d." % \
(sizeStr, consensusStr, expectedSize))
return (sizeStr, consensusStr, expectedSize)
def stabilize_cloud(node, node_count, timeoutSecs=14.0, retryDelaySecs=0.25):
node.wait_for_node_to_accept_connections(timeoutSecs)
# want node saying cloud = expected size, plus thinking everyone agrees with that.
def test(n, tries=None):
c = n.get_cloud()
# don't want to check everything. But this will check that the keys are returned!
consensus = c['consensus']
locked = c['locked']
cloud_size = c['cloud_size']
cloud_name = c['cloud_name']
node_name = c['node_name']
if 'nodes' not in c:
emsg = "\nH2O didn't include a list of nodes in get_cloud response after initial cloud build"
raise Exception(emsg)
if (cloud_size != node_count):
print "\nNodes in current cloud:"
for c in c['nodes']:
print c['name']
if (cloud_size > node_count):
emsg = (
"\n\nERROR: cloud_size: %d reported via json is bigger than we expect: %d" % (cloud_size, node_count) +
"\nYou likely have zombie(s) with the same cloud name on the network, that's forming up with you." +
"\nLook at the cloud IP's in 'grep Paxos sandbox/*stdout*' for some IP's you didn't expect." +
"\n\nYou probably don't have to do anything, as the cloud shutdown in this test should" +
"\nhave sent a Shutdown.json to all in that cloud (you'll see a kill -2 in the *stdout*)." +
"\nIf you try again, and it still fails, go to those IPs and kill the zombie h2o's." +
"\nIf you think you really have an intermittent cloud build, report it." +
"\n" +
"\nUPDATE: building cloud size of 2 with 127.0.0.1 may temporarily report 3 incorrectly, with no zombie?"
)
raise Exception(emsg)
a = (cloud_size==node_count) and consensus
if a:
verboseprint("\tLocked won't happen until after keys are written")
verboseprint("\nNodes in current cloud:")
for c in c['nodes']:
verboseprint(c['name'])
return a
node.stabilize(test, error=('A cloud of size %d' % node_count),
timeoutSecs=timeoutSecs, retryDelaySecs=retryDelaySecs)
class H2O(object):
def __url(self, loc, port=None):
# always use the new api port
if port is None: port = self.port
u = 'http://%s:%d/%s' % (self.http_addr, port, loc)
return u
def __do_json_request(self, jsonRequest=None, fullUrl=None, timeout=10, params=None,
cmd='get', extraComment=None, ignoreH2oError=False, **kwargs):
# if url param is used, use it as full url. otherwise crate from the jsonRequest
if fullUrl:
url = fullUrl
else:
url = self.__url(jsonRequest)
# remove any params that are 'None'
# need to copy dictionary, since can't delete while iterating
if params is not None:
params2 = params.copy()
for k in params2:
if params2[k] is None:
del params[k]
paramsStr = '?' + '&'.join(['%s=%s' % (k,v) for (k,v) in params.items()])
else:
paramsStr = ''
if extraComment:
log('Start ' + url + paramsStr + " # " + extraComment)
else:
log('Start ' + url + paramsStr)
# file get passed thru kwargs here
if cmd=='post':
r = requests.post(url, timeout=timeout, params=params, **kwargs)
else:
r = requests.get(url, timeout=timeout, params=params, **kwargs)
# fatal if no response
if not r:
raise Exception("Maybe bad url? no r in __do_json_request %s in %s:" % (e, inspect.stack()[1][3]))
# this is used to open a browser on results, or to redo the operation in the browser
# we don't' have that may urls flying around, so let's keep them all
json_url_history.append(r.url)
if not r.json():
raise Exception("Maybe bad url? no r.json in __do_json_request %s in %s:" % (e, inspect.stack()[1][3]))
rjson = r.json()
for e in ['error', 'Error', 'errors', 'Errors']:
if e in rjson:
verboseprint(dump_json(rjson))
emsg = 'rjson %s in %s: %s' % (e, inspect.stack()[1][3], rjson[e])
if ignoreH2oError:
# well, we print it..so not totally ignore. test can look at rjson returned
print emsg
else:
raise Exception(emsg)
for w in ['warning', 'Warning', 'warnings', 'Warnings']:
if w in rjson:
verboseprint(dump_json(rjson))
print 'rjson %s in %s: %s' % (w, inspect.stack()[1][3], rjson[w])
return rjson
def test_redirect(self):
return self.__do_json_request('TestRedirect.json')
def test_poll(self, args):
return self.__do_json_request('TestPoll.json', params=args)
def get_cloud(self):
a = self.__do_json_request('Cloud.json')
consensus = a['consensus']
locked = a['locked']
cloud_size = a['cloud_size']
cloud_name = a['cloud_name']
node_name = a['node_name']
node_id = self.node_id
verboseprint('%s%s %s%s %s%s %s%s' %(
"\tnode_id: ", node_id,
"\tcloud_size: ", cloud_size,
"\tconsensus: ", consensus,
"\tlocked: ", locked,
))
return a
def get_timeline(self):
return self.__do_json_request('Timeline.json')
# Shutdown url is like a reset button. Doesn't send a response before it kills stuff
# safer if random things are wedged, rather than requiring response
# so request library might retry and get exception. allow that.
def shutdown_all(self):
try:
self.__do_json_request('Shutdown.json')
except:
pass
time.sleep(1) # a little delay needed?
return(True)
def put_value(self, value, key=None, repl=None):
return self.__do_json_request(
'PutValue.json',
params={"value": value, "key": key, "replication_factor": repl},
extraComment = str(value) + "," + str(key) + "," + str(repl))
def put_file(self, f, key=None, timeoutSecs=60):
if key is None:
key = os.path.basename(f)
### print "putfile specifying this key:", key
resp = self.__do_json_request(
'PostFile.json',
cmd='post',
timeout=timeoutSecs,
params={"key": key},
files={"file": open(f, 'rb')},
extraComment = str(f))
verboseprint("\nput_file response: ", dump_json(resp))
return key
def get_key(self, key):
return self.__do_json_request('Get.html', params={"key": key})
# noise is a 2-tuple ("StoreView", none) for url plus args for doing during poll to create noise
# so we can create noise with different urls!, and different parms to that url
# no noise if None
def poll_url(self, response,
timeoutSecs=10, retryDelaySecs=0.5, initialDelaySecs=None, pollTimeoutSecs=180,
noise=None, benchmarkLogging=None, noPoll=False):
### print "poll_url: pollTimeoutSecs", pollTimeoutSecs
verboseprint('poll_url input: response:', dump_json(response))
url = self.__url(response['redirect_request'])
params = response['redirect_request_args']
# no need to recreate the string for messaging, in the loop..
paramsStr = '&'.join(['%s=%s' % (k,v) for (k,v) in params.items()])
if noise is not None:
print "Using noise during poll_url:", noise
# noise_json should be like "Storeview"
(noise_json, noiseParams) = noise
noiseUrl = self.__url(noise_json + ".json")
if noiseParams is None:
noiseParamsStr = ""
else:
noiseParamsStr = '&'.join(['%s=%s' % (k,v) for (k,v) in noiseParams.items()])
status = 'poll'
r = {} # response
start = time.time()
count = 0
if initialDelaySecs:
time.sleep(initialDelaySecs)
# can end with status = 'redirect' or 'done'
while status == 'poll':
# UPDATE: 1/24/13 change to always wait before the first poll..
time.sleep(retryDelaySecs)
# every other one?
create_noise = noise is not None and ((count%2)==0)
if create_noise:
urlUsed = noiseUrl
paramsUsed = noiseParams
paramsUsedStr = noiseParamsStr
msgUsed = "\nNoise during polling with"
else:
urlUsed = url
paramsUsed = params
paramsUsedStr = paramsStr
msgUsed = "\nPolling with"
r = self.__do_json_request(fullUrl=urlUsed, timeout=pollTimeoutSecs, params=paramsUsed)
if ((count%5)==0):
verboseprint(msgUsed, urlUsed, paramsUsedStr, "Response:", dump_json(r['response']))
# hey, check the sandbox if we've been waiting a long time...rather than wait for timeout
# to find the badness?
# if ((count%15)==0):
if ((count%6)==0):
check_sandbox_for_errors()
if (create_noise):
# this guarantees the loop is done, so we don't need to worry about
# a 'return r' being interpreted from a noise response
status = 'poll'
else:
status = r['response']['status']
if ((time.time()-start)>timeoutSecs):
# show what we're polling with
emsg = "Exceeded timeoutSecs: %d secs while polling." % timeoutSecs +\
"status: %s, url: %s?%s" % (status, urlUsed, paramsUsedStr)
raise Exception(emsg)
count += 1
if noPoll:
return r
if benchmarkLogging:
cloudPerfH2O.get_log_save(benchmarkLogging)
return r
def kmeans_apply(self, data_key, model_key, destination_key,
timeoutSecs=300, retryDelaySecs=0.2, initialDelaySecs=None, pollTimeoutSecs=180,
**kwargs):
# defaults
params_dict = {
'destination_key': destination_key,
'model_key': model_key,
'data_key': data_key,
}
browseAlso = kwargs.get('browseAlso', False)
params_dict.update(kwargs)
print "\nKMeansApply params list", params_dict
a = self.__do_json_request('KMeansApply.json', timeout=timeoutSecs, params=params_dict)
# Check that the response has the right Progress url it's going to steer us to.
if a['response']['redirect_request']!='Progress':
print dump_json(a)
raise Exception('H2O kmeans redirect is not Progress. KMeansApply json response precedes.')
a = self.poll_url(a['response'],
timeoutSecs=timeoutSecs, retryDelaySecs=retryDelaySecs,
initialDelaySecs=initialDelaySecs, pollTimeoutSecs=pollTimeoutSecs)
verboseprint("\nKMeansApply result:", dump_json(a))
if (browseAlso | browse_json):
print "Redoing the KMeansApply through the browser, no results saved though"
h2b.browseJsonHistoryAsUrlLastMatch('KMeansApply')
time.sleep(5)
return a
# model_key
# key
def kmeans_score(self, key, model_key,
timeoutSecs=300, retryDelaySecs=0.2, initialDelaySecs=None, pollTimeoutSecs=180,
**kwargs):
# defaults
params_dict = {
'key': key,
'model_key': model_key,
}
browseAlso = kwargs.get('browseAlso', False)
params_dict.update(kwargs)
print "\nKMeansScore params list", params_dict
a = self.__do_json_request('KMeansScore.json', timeout=timeoutSecs, params=params_dict)
# kmeans_score doesn't need polling?
verboseprint("\nKMeansScore result:", dump_json(a))
if (browseAlso | browse_json):
print "Redoing the KMeansScore through the browser, no results saved though"
h2b.browseJsonHistoryAsUrlLastMatch('KMeansScore')
time.sleep(5)
return a
# additional params include: cols=.
# don't need to include in params_dict it doesn't need a default
def kmeans(self, key, key2=None,
timeoutSecs=300, retryDelaySecs=0.2, initialDelaySecs=None, pollTimeoutSecs=180,
**kwargs):
# defaults
params_dict = {
'epsilon': 1e-6,
'k': 1,
'source_key': key,
'destination_key': None,
}
if key2 is not None: params_dict['destination_key'] = key2
browseAlso = kwargs.get('browseAlso', False)
params_dict.update(kwargs)
print "\nKMeans params list", params_dict
a = self.__do_json_request('KMeans.json', timeout=timeoutSecs, params=params_dict)
# Check that the response has the right Progress url it's going to steer us to.
if a['response']['redirect_request']!='Progress':
print dump_json(a)
raise Exception('H2O kmeans redirect is not Progress. KMeans json response precedes.')
a = self.poll_url(a['response'],
timeoutSecs=timeoutSecs, retryDelaySecs=retryDelaySecs,
initialDelaySecs=initialDelaySecs, pollTimeoutSecs=pollTimeoutSecs)
verboseprint("\nKMeans result:", dump_json(a))
if (browseAlso | browse_json):
print "Redoing the KMeans through the browser, no results saved though"
h2b.browseJsonHistoryAsUrlLastMatch('KMeans')
time.sleep(5)
return a
# params:
# header=1,
# separator=1 (hex encode?
# exclude=
# noise is a 2-tuple: ("StoreView",params_dict)
def parse(self, key, key2=None,
timeoutSecs=300, retryDelaySecs=0.2, initialDelaySecs=None, pollTimeoutSecs=180,
noise=None, benchmarkLogging=None, noPoll=False, **kwargs):
browseAlso = kwargs.pop('browseAlso',False)
# this doesn't work. webforums indicate max_retries might be 0 already? (as of 3 months ago)
# requests.defaults({max_retries : 4})
# https://github.com/kennethreitz/requests/issues/719
# it was closed saying Requests doesn't do retries. (documentation implies otherwise)
verboseprint("\nParsing key:", key, "to key2:", key2, "(if None, means default)")
# other h2o parse parameters, not in the defauls
# header
# exclude
params_dict = {
'source_key': key, # can be a regex
'destination_key': key2,
}
params_dict.update(kwargs)
if benchmarkLogging:
cloudPerfH2O.get_log_save(initOnly=True)
a = self.__do_json_request('Parse.json', timeout=timeoutSecs, params=params_dict)
# Check that the response has the right Progress url it's going to steer us to.
if a['response']['redirect_request']!='Progress':
print dump_json(a)
raise Exception('H2O parse redirect is not Progress. Parse json response precedes.')
if noPoll:
return a
# noise is a 2-tuple ("StoreView, none) for url plus args for doing during poll to create noise
# no noise if None
verboseprint('Parse.Json noise:', noise)
a = self.poll_url(a['response'],
timeoutSecs=timeoutSecs, retryDelaySecs=retryDelaySecs,
initialDelaySecs=initialDelaySecs, pollTimeoutSecs=pollTimeoutSecs,
noise=noise, benchmarkLogging=benchmarkLogging)
verboseprint("\nParse result:", dump_json(a))
return a
def netstat(self):
return self.__do_json_request('Network.json')
def jstack(self):
return self.__do_json_request("JStack.json")
def iostatus(self):
return self.__do_json_request("IOStatus.json")
# &offset=
# &view=
def inspect(self, key, offset=None, view=None, ignoreH2oError=False):