forked from h2oai/h2o-2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathh2odriver.java
206 lines (180 loc) · 5.67 KB
/
h2odriver.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
package water.zookeeper;
import org.apache.zookeeper.*;
import water.zookeeper.nodes.ClusterPayload;
import water.zookeeper.nodes.MasterPayload;
import java.util.concurrent.TimeoutException;
public class h2odriver {
final static int CLOUD_FORMATION_SETTLE_DOWN_SECONDS = 2;
// Used by the running object.
private String _zk;
private String _zkroot;
private int _numNodes;
private int _cloudFormationTimeoutSeconds;
// Used by parseArgs, not by the running object.
public static String g_zk = "";
public static String g_zkroot = "";
public static int g_numNodes = -1;
public static int g_cloudFormationTimeoutSeconds = Constants.DEFAULT_CLOUD_FORMATION_TIMEOUT_SECONDS;
public static boolean g_start = false;
public static boolean g_wait = false;
public void setZk(String v) {
_zk = v;
}
public void setZkroot(String v) {
_zkroot = v;
}
public void setNumNodes(int v) {
_numNodes = v;
}
public void setCloudFormationTimeoutSeconds(int v) {
_cloudFormationTimeoutSeconds = v;
}
public void doStart() throws Exception {
ZooKeeper z = ZooKeeperFactory.makeZk(_zk);
ClusterPayload cp = new ClusterPayload();
cp.numNodes = _numNodes;
byte[] payload = cp.toPayload();
z.create(_zkroot, payload, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
z.create(_zkroot + "/nodes", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
z.close();
}
public MasterPayload doWait() throws Exception {
ZooKeeper z = ZooKeeperFactory.makeZk(_zk);
byte[] payload;
payload = z.getData(_zkroot, null, null);
ClusterPayload cp = ClusterPayload.fromPayload(payload, ClusterPayload.class);
z.close();
assert (cp.numNodes > 0);
long startMillis = System.currentTimeMillis();
while (true) {
z = ZooKeeperFactory.makeZk(_zk);
if (z.exists(_zkroot, false) == null) {
z.close();
throw new Exception("ZooKeeper node does not exist: " + _zkroot);
}
try {
payload = z.getData(_zkroot + "/master", null, null);
MasterPayload mp = MasterPayload.fromPayload(payload, MasterPayload.class);
z.close();
Thread.sleep(CLOUD_FORMATION_SETTLE_DOWN_SECONDS);
return mp;
}
catch (KeeperException.NoNodeException e) {
// This is OK, do nothing
}
long now = System.currentTimeMillis();
if (Math.abs(now - startMillis) > (_cloudFormationTimeoutSeconds * 1000)) {
z.close();
throw new TimeoutException("Timed out waiting for cloud to form");
}
Thread.sleep(1000);
}
}
/**
* Print usage and exit 1.
*/
static void usage() {
System.err.printf("" +
"Step 1: Create a new Zookeeper h2o cloud hierarchy:\n" +
" java -cp h2o-zookeeper.jar water.zookeeper.h2odriver -zk a:b:c:d:e -zkroot /zk/path/h2o-uuid -n <numNodes> -start\n" +
"\n" +
"Step 2: Wait for an h2o cloud to come up:\n" +
" java -cp h2o-zookeeper.jar water.zookeeper.h2odriver -zk a:b:c:d:e -zkroot /zk/path/h2o-uuid -wait [-timeout sec]\n" +
"\n" +
"Exit value:\n" +
" 0 for success; nonzero otherwise.\n" +
"\n"
);
System.exit(1);
}
/**
* Print an error message, print usage, and exit 1.
* @param s Error message
*/
static void error(String s) {
System.err.printf("\nERROR: " + "%s\n\n", s);
usage();
}
/**
* Parse remaining arguments after the ToolRunner args have already been removed.
* @param args Argument list
*/
static void parseArgs(String[] args) {
int i = 0;
while (true) {
if (i >= args.length) {
break;
}
String s = args[i];
if (s.equals("-h") ||
s.equals("help") ||
s.equals("-help") ||
s.equals("--help")) {
usage();
}
else if (s.equals("-zk")) {
i++; if (i >= args.length) { usage(); }
g_zk = args[i];
}
else if (s.equals("-zkroot")) {
i++; if (i >= args.length) { usage(); }
g_zkroot = args[i];
}
else if (s.equals("-n") ||
s.equals("-nodes")) {
i++; if (i >= args.length) { usage(); }
g_numNodes = Integer.parseInt(args[i]);
}
else if (s.equals("-timeout")) {
i++; if (i >= args.length) { usage(); }
g_cloudFormationTimeoutSeconds = Integer.parseInt(args[i]);
}
else if (s.equals("-start")) {
g_start = true;
}
else if (s.equals("-wait")) {
g_wait = true;
}
else {
error("Unrecognized option " + s);
}
i++;
}
// Check for mandatory arguments.
if (g_start) {
if (g_numNodes < 1) {
error("Number of H2O nodes must be greater than 0 (must specify -n)");
}
}
if (g_wait) {
if (g_numNodes != -1) {
error("-nodes option may not be combined with -wait option");
}
}
if (!g_start && !g_wait) {
error("-start or -wait must be specified");
}
}
public static void main(String[] args) throws Exception {
parseArgs(args);
if (g_start) {
h2odriver d = new h2odriver();
d.setZk(g_zk);
d.setZkroot(g_zkroot);
d.setNumNodes(g_numNodes);
d.doStart();
}
else if (g_wait) {
h2odriver d = new h2odriver();
d.setZk(g_zk);
d.setZkroot(g_zkroot);
d.setCloudFormationTimeoutSeconds(g_cloudFormationTimeoutSeconds);
MasterPayload mp = d.doWait();
System.out.println(mp.ip + ":" + mp.port);
}
else {
System.out.println("bad path");
System.exit(1);
}
}
}