forked from javasoze/sensei
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathload-index
executable file
·73 lines (61 loc) · 2.54 KB
/
load-index
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#!/usr/bin/env python
import json, logging, os, re, sys, threading, urllib2
from optparse import OptionParser
class Client:
def __init__(self, endpoint):
self.endpoint = endpoint
def request(self, data):
req = urllib2.Request(self.endpoint)
req.add_header('Content-Type', 'text/json')
res = urllib2.urlopen(req, json.dumps(data))
doc = res.read()
return json.loads(doc)
def main(argv):
usage = "usage: %prog [options] <index uri>"
parser = OptionParser(usage=usage)
parser.add_option("-s", "--server", type="string", dest="host",
default="localhost", help="The host sensei is running on.")
parser.add_option("-p", "--port", type="int", dest="port",
default=8080, help="The port sensei broker is running on.")
(options, args) = parser.parse_args()
if len(args) != 1:
parser.error("Please give me a index uri (hdfs://host:port/path for example).")
uri = args[0]
client = Client('http://%s:%d/sensei/sysinfo' % (options.host, options.port))
sysinfo = client.request(None)
class RequestThread(threading.Thread):
def __init__(self, node, part, *args, **kwargs):
super(RequestThread, self).__init__(*args, **kwargs)
self.node = node
self.part = part
def run(self):
client = Client(os.path.join(self.node['adminlink'], 'admin/jmx'))
res = client.request({
"type" : "exec",
"mbean" : "com.senseidb:zoie-name=pair-admin-%s-%s" % (self.node['id'], self.part),
"operation" : "loadIndex",
"arguments" : [os.path.join(uri, str(self.part))]
})
if not res.get('value'):
if res.get('status') == 404:
print 'Loading index failed for node "%s" partition "%s": index copier not defined, please put for example "sensei.indexer.copier=hdfs" inside your sensei.properties' % (self.node['adminlink'], self.part)
else:
print 'Loading index failed for node "%s" partition "%s": %s' % (self.node['adminlink'], self.part, json.dumps(res))
stacktrace = res.get('stacktrace')
if stacktrace:
print stacktrace
else:
print 'Successfully loaded index for node "%s" partition "%s": %s' % (self.node['adminlink'], self.part, json.dumps(res))
threads = []
for node in sysinfo['clusterinfo']:
for part in node['partitions']:
t = RequestThread(node, part)
t.setDaemon(True)
threads.append(t)
t.start()
for t in threads:
t.join()
def target(*args):
return main, None
if __name__ == '__main__':
main(sys.argv)