|
| 1 | +import atexit |
1 | 2 | import os
|
| 3 | +import time |
2 | 4 | import tempfile
|
3 | 5 | import importlib
|
4 | 6 | from boltons.fileutils import atomic_save
|
|
7 | 9 | from dataspin.utils.common import uuid_generator, marshal, format_timestring
|
8 | 10 | from dataspin.utils.schedule import add_schedule, run_scheduler
|
9 | 11 | from dataspin.functions import creat_function_with
|
10 |
| -from multiprocessing import Process, Pool |
11 | 12 | from basepy.log import logger
|
12 | 13 |
|
13 | 14 |
|
@@ -258,7 +259,10 @@ def _load(self):
|
258 | 259 | def start(self):
|
259 | 260 | if self.is_fetch_job and self._schedules:
|
260 | 261 | for schedule_str in self._schedules:
|
261 |
| - add_schedule(schedule_str, self.run) |
| 262 | + add_schedule(schedule_str, self.run_in_pool) |
| 263 | + |
| 264 | + def run_in_pool(self): |
| 265 | + self.engine.run_data_process(self.run) |
262 | 266 |
|
263 | 267 | def run(self):
|
264 | 268 | def append_or_extend(datafiles, newfile):
|
@@ -321,17 +325,17 @@ def task_list(self):
|
321 | 325 | class SpinEngine:
|
322 | 326 | def __init__(self, conf):
|
323 | 327 | self.conf = conf
|
324 |
| - # self.runner_pool = Pool(4) |
325 | 328 | self.config = {}
|
326 | 329 | self.sources = {}
|
327 | 330 | self.streams = {}
|
328 | 331 | self.storages = {}
|
329 | 332 | self.data_views = {}
|
330 | 333 | self.data_processes = {}
|
331 |
| - self.stop_scheduler_event = run_scheduler() |
| 334 | + self.stop_scheduler_event, self.scheduler_thread = run_scheduler() |
332 | 335 | self.load()
|
333 | 336 | self.uuid = 'project_' + uuid_generator()
|
334 | 337 | self.temp_dir_path = os.path.join(os.getcwd(), self.uuid)
|
| 338 | + atexit.register(self.join) |
335 | 339 |
|
336 | 340 | @property
|
337 | 341 | def working_dir(self):
|
@@ -363,16 +367,16 @@ def load(self):
|
363 | 367 |
|
364 | 368 | def run(self):
|
365 | 369 | for process_name, process in self.data_processes.items():
|
366 |
| - self.run_process(process) |
367 |
| - |
368 |
| - def run_process(self, process): |
369 |
| - process.run() |
370 |
| - # self.runner_pool.apply_async(process.run) |
| 370 | + process.run() |
371 | 371 |
|
372 | 372 | def start(self):
|
373 | 373 | for _, process in self.data_processes.items():
|
374 | 374 | process.start()
|
375 | 375 |
|
| 376 | + def run_data_process(self, process_fn): |
| 377 | + process_fn() |
| 378 | + |
376 | 379 | def join(self):
|
377 | 380 | self.stop_scheduler_event.set()
|
378 |
| - self.runner_pool.join() |
| 381 | + time.sleep(3) |
| 382 | + self.scheduler_thread.join() |
0 commit comments