1
1
import threading
2
2
import time
3
3
import string
4
- import random , copy
4
+ import random , copy , subprocess
5
5
import json
6
6
from functools import wraps
7
7
@@ -48,10 +48,30 @@ def __lt__(self, other):
48
48
return self .priority < other .priority
49
49
50
50
def gen_ips_from_base (self ,base_ip ):
51
+ if self .task_base_ip == None :
52
+ return
51
53
self .ips = []
52
54
for i in range (task .max_size ):
53
55
self .ips .append (int_to_ip (base_ip + self .task_base_ip + i + 2 ))
54
56
57
+ def gen_hosts (self ):
58
+ username = self .taskinfo .username
59
+ taskid = self .taskinfo .taskid
60
+ logger .info ("Generate hosts for user(%s) task(%s) base_ip(%s)" % (username ,taskid ,str (self .task_base_ip )))
61
+ fspath = env .getenv ('FS_PREFIX' )
62
+ if not os .path .isdir ("%s/global/users/%s" % (fspath ,username )):
63
+ path = env .getenv ('DOCKLET_LIB' )
64
+ subprocess .call ([path + "/master/userinit.sh" , username ])
65
+ logger .info ("user %s directory not found, create it" % username )
66
+
67
+ hosts_file = open ("%s/global/users/%s/%s.hosts" % (fspath ,username ,"batch-" + taskid ),"w" )
68
+ hosts_file .write ("127.0.0.1 localhost\n " )
69
+ i = 0
70
+ for ip in self .ips :
71
+ hosts_file .write (ip + " batch-" + str (i )+ "\n " )
72
+ i += 1
73
+ hosts_file .close ()
74
+
55
75
def get_one_resources_need (self ):
56
76
return self .vnodeinfo .vnode .instance
57
77
@@ -225,18 +245,20 @@ def task_processor(self, task, vnodes_workers):
225
245
226
246
self .acquire_task_net (task )
227
247
task .gen_ips_from_base (self .base_ip )
248
+ task .gen_hosts ()
228
249
#need to create hosts
229
250
[success , gwip ] = self .setup_tasknet (task ,[w [1 ] for w in vnodes_workers ])
230
251
if not success :
252
+ self .release_task_ips (task )
231
253
return [False , gwip ]
232
-
233
254
token = '' .join (random .sample (string .ascii_letters + string .digits , 8 ))
234
255
placed_workers = []
235
256
236
257
# start vc
237
258
for vid , worker in vnodes_workers :
238
259
vnodeinfo = copy .copy (task .vnodeinfo )
239
260
vnodeinfo .vnodeid = vid
261
+ vnodeinfo .vnode .hostname = "batch-" + str (vid % task .max_size )
240
262
vnode = task .subtask_list [vid ]
241
263
vnode ['status' ] = RUNNING
242
264
vnode ['try_count' ] += 1
@@ -253,7 +275,7 @@ def task_processor(self, task, vnodes_workers):
253
275
ipaddr = task .ips [vid % task .max_size ]
254
276
brname = "docklet-batch-%s-%s" % (username , taskid )
255
277
networkinfo = Network (ipaddr = ipaddr , gateway = gwip , masterip = self .masterip , brname = brname )
256
- vnode .network = networkinfo
278
+ vnodeinfo . vnode .network = networkinfo
257
279
258
280
try :
259
281
self .logger .info ('[task_processor] starting vnode for task [%s] instance [%d]' % (task .vnodeinfo .id , vid ))
0 commit comments