forked from h2oai/h2o-2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathh2o_import.py
168 lines (147 loc) · 7.3 KB
/
h2o_import.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import h2o, h2o_cmd
import time, re, getpass
def setupImportS3(node=None, bucket='home-0xdiag-datasets', timeoutSecs=180):
if not bucket: raise Exception('No S3 bucket specified')
if not node: node = h2o.nodes[0]
importS3Result = node.import_s3(bucket, timeoutSecs=timeoutSecs)
# too many files now to print
### print h2o.dump_json(importS3Result)
return importS3Result
# assumes you call setupImportS3 first
def parseImportS3File(node=None,
csvFilename='covtype.data', path='home-0xdiag-datasets', key2=None,
timeoutSecs=360, retryDelaySecs=2, initialDelaySecs=1, pollTimeoutSecs=180, noise=None,
benchmarkLogging=None, noPoll=False, **kwargs):
if not node: node = h2o.nodes[0]
if not csvFilename: raise Exception('parseImportS3File: No csvFilename')
s3Key= "s3://" + path + "/" + csvFilename
# We like the short parse key2 name.
# We don't drop anything from csvFilename, unlike H2O default
if key2 is None:
# don't rely on h2o default key name
myKey2 = csvFilename + '.hex'
else:
myKey2 = key2
print "Waiting for the slow parse of the file:", csvFilename
parseKey = node.parse(s3Key, myKey2,
timeoutSecs, retryDelaySecs, initialDelaySecs, pollTimeoutSecs, noise,
benchmarkLogging, noPoll, **kwargs)
# a hack so we know what the source_key was, bask at the caller
parseKey['source_key'] = s3Key
print "\nParse result:", parseKey
return parseKey
def setupImportFolder(node=None, path='/home/0xdiag/datasets', timeoutSecs=180):
# a little hack to redirect import folder tests to an s3 folder
# we don't have any "state" other than per node, so stuck this sort-of-global
# test config state (which gets set only from the config json use-case)
# on the nodes. The only globals we have are command line args..so lets keep that
# Really should have a class for global H2O cloud state? or test state?
if not node: node = h2o.nodes[0]
if node.redirect_import_folder_to_s3_path:
# FIX! make bucket vary depending on path
bucket = 'home-0xdiag-datasets'
importFolderResult = setupImportS3(node=node, bucket=bucket, timeoutSecs=timeoutSecs)
elif node.redirect_import_folder_to_s3n_path:
# FIX! make bucket vary depending on path
path = re.sub('/home/0xdiag/datasets', '/home-0xdiag-datasets', path)
importFolderResult = setupImportHdfs(node=node, path=path, schema="s3n",
timeoutSecs=timeoutSecs)
else:
if getpass.getuser()=='jenkins':
print "michal: Temp hack of /home/0xdiag/datasets/standard to /home/0xdiag/datasets till EC2 image is fixed"
path = re.sub('/home/0xdiag/datasets/standard', '/home/0xdiag/datasets', path)
importFolderResult = node.import_files(path, timeoutSecs=timeoutSecs)
### h2o.dump_json(importFolderResult)
return importFolderResult
# assumes you call setupImportFolder first
def parseImportFolderFile(node=None, csvFilename=None, path=None, key2=None,
timeoutSecs=30, retryDelaySecs=0.5, initialDelaySecs=1, pollTimeoutSecs=180, noise=None,
benchmarkLogging=None, noPoll=False, **kwargs):
if not node: node = h2o.nodes[0]
# a little hack to redirect import folder tests to an s3 folder
# TEMP hack: translate /home/0xdiag/datasets to /home-0xdiag-datasets
if not csvFilename: raise Exception('parseImportFolderFile: No csvFilename')
# We like the short parse key2 name.
# We don't drop anything from csvFilename, unlike H2O default
if key2 is None:
# don't rely on h2o default key name
myKey2 = csvFilename + '.hex'
else:
myKey2 = key2
print "Waiting for the slow parse of the file:", csvFilename
if node.redirect_import_folder_to_s3_path:
# why no leading / for s3 key here. only one / after s3:/ ?
path = re.sub('/home/0xdiag/datasets', 'home-0xdiag-datasets', path)
parseKey = parseImportS3File(node, csvFilename, path, myKey2,
timeoutSecs, retryDelaySecs, initialDelaySecs, pollTimeoutSecs, noise,
benchmarkLogging, noPoll)
elif node.redirect_import_folder_to_s3n_path:
path = re.sub('/home/0xdiag/datasets', '/home-0xdiag-datasets', path)
parseKey = parseImportHdfsFile(node, csvFilename, path, myKey2, "s3n",
timeoutSecs, retryDelaySecs, initialDelaySecs, pollTimeoutSecs, noise,
benchmarkLogging, noPoll)
else:
if getpass.getuser()=='jenkins':
print "michal: Temp hack of /home/0xdiag/datasets/standard to /home/0xdiag/datasets till EC2 image is fixed"
path = re.sub('/home/0xdiag/datasets/standard', '/home/0xdiag/datasets', path)
importKey = "nfs:/" + path + "/" + csvFilename
parseKey = node.parse(importKey, myKey2,
timeoutSecs, retryDelaySecs, initialDelaySecs, pollTimeoutSecs, noise,
benchmarkLogging, noPoll, **kwargs)
# a hack so we know what the source_key was, bask at the caller
parseKey['source_key'] = importKey
print "\nParse result:", parseKey
return parseKey
def setupImportHdfs(node=None, path=None, schema='hdfs', timeoutSecs=180):
if not node: node = h2o.nodes[0]
print "setupImportHdfs schema:", schema
# FIX! H2O has horrible inconsistencies between the URIs used for different filesystems
if schema == "maprfs":
hdfsPrefix = schema + "://"
elif schema == "s3n":
hdfsPrefix = schema + ":/"
elif schema == "hdfs":
hdfsPrefix = schema + "://" + node.hdfs_name_node
else:
raise Exception('Uknown schema: ' + schema + ' in setupImportHdfs')
if path is None:
URI = hdfsPrefix + '/datasets'
else:
URI = hdfsPrefix + path
print "URI:", URI
importHdfsResult = node.import_hdfs(URI, timeoutSecs=timeoutSecs)
h2o.verboseprint(h2o.dump_json(importHdfsResult))
return importHdfsResult
def parseImportHdfsFile(node=None, csvFilename=None, path=None, key2=None, schema='hdfs',
timeoutSecs=3600, retryDelaySecs=2, initialDelaySecs=1, pollTimeoutSecs=180, noise=None,
benchmarkLogging=None, noPoll=False, **kwargs):
if not csvFilename: raise Exception('No csvFilename parameter in parseImportHdfsFile')
if not node: node = h2o.nodes[0]
print "parseImportHdfsFile schema:", schema
if schema == "maprfs":
hdfsPrefix = schema + ":" # no ?? ? inconsistent with import
elif schema == "s3n":
hdfsPrefix = schema + ":/"
elif schema == "hdfs":
hdfsPrefix = schema + "://" + node.hdfs_name_node
else:
raise Exception('Uknown schema: ' + schema + ' in parseImportHdfsFile')
if path is None:
URI = hdfsPrefix + '/datasets'
else:
URI = hdfsPrefix + path
hdfsKey = URI + "/" + csvFilename
print "parseImportHdfsFile hdfsKey:", hdfsKey
inspect = h2o_cmd.runInspect(key=hdfsKey, timeoutSecs=180)
print "parseImportHdfsFile inspect of source:", inspect
if key2 is None:
myKey2 = csvFilename + ".hex"
else:
myKey2 = key2
parseKey = node.parse(hdfsKey, myKey2,
timeoutSecs, retryDelaySecs, initialDelaySecs, pollTimeoutSecs, noise,
benchmarkLogging, noPoll, **kwargs)
# a hack so we know what the source_key was, bask at the caller
parseKey['source_key'] = hdfsKey
print "parseImportHdfsFile:", parseKey
return parseKey