Skip to content

Commit

Permalink
Merge pull request EpistasisLab#389 from weixuanfu2016/multi_process_fix
Browse files Browse the repository at this point in the history
Hot patch for multiprocessing function in TPOT
  • Loading branch information
rhiever authored Mar 24, 2017
2 parents 3e97e8e + f6fba82 commit e315a21
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 27 deletions.
5 changes: 4 additions & 1 deletion tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ def test_timeout_func():
return 100
except TimedOutExc:
return time.time() - start_time
ret_timeout = int(test_timeout_func())
try: # pass in Linux
ret_timeout = int(test_timeout_func())
except TimedOutExc: # windows exception
ret_timeout = 1
assert ret_timeout == 1

def test_init_default_scoring():
Expand Down
46 changes: 20 additions & 26 deletions tpot/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
import time
from functools import partial
from datetime import datetime
from pathos.multiprocessing import ProcessPool

from pathos.multiprocessing import Pool, cpu_count

import numpy as np
import deap
Expand Down Expand Up @@ -216,6 +215,8 @@ def __init__(self, generations=100, population_size=100, offspring_size=None,
'FunctionTransformer': FunctionTransformer
}



self._pbar = None

# Dictionary of individuals that have already been evaluated in previous generations
Expand Down Expand Up @@ -244,10 +245,12 @@ def __init__(self, generations=100, population_size=100, offspring_size=None,

self.cv = cv
# If the OS is windows, reset cpu number to 1 since the OS did not have multiprocessing module
if sys.platform.startswith('win') and n_jobs > 1:
if sys.platform.startswith('win') and n_jobs != 1:
print('Warning: Parallelization is not currently supported in TPOT for Windows. ',
'Setting n_jobs to 1 during the TPOT optimization process.')
self.n_jobs = 1
elif n_jobs == -1:
self.n_jobs = cpu_count()
else:
self.n_jobs = n_jobs

Expand Down Expand Up @@ -722,30 +725,21 @@ def _wrapped_cross_val_score(sklearn_pipeline, features=features, classes=classe
return resulting_score

if not sys.platform.startswith('win'):
if self.n_jobs == -1:
pool = ProcessPool()
else:
pool = ProcessPool(nodes=self.n_jobs)
res_imap = pool.imap(_wrapped_cross_val_score, sklearn_pipeline_list)
if not self._pbar.disable:
ini_pbar_n = self._pbar.n
# Hacky way for pbar update by using imap in pathos.multiprocessing.ProcessPool
while not self._pbar.disable:
tmp_fitness = np.array(res_imap._items)
num_job_done = len(tmp_fitness)
if not self._pbar.disable and num_job_done:
timeout_index = list(np.where(tmp_fitness[:, 1] == 'Timeout')[0])
for idx in timeout_index:
if self.verbosity > 2 and self._pbar.n - ini_pbar_n <= idx:
pool = Pool(processes=self.n_jobs)
resulting_score_list = []
# chunk size for pbar update
for chunk_idx in range(0, len(sklearn_pipeline_list),self.n_jobs*2):
tmp_result_score = pool.map(_wrapped_cross_val_score, sklearn_pipeline_list[chunk_idx:chunk_idx+self.n_jobs*2])
for val in tmp_result_score:
if not self._pbar.disable:
self._pbar.update(1)
if val == 'Timeout':
if self.verbosity > 2:
self._pbar.write('Skipped pipeline #{0} due to time out. '
'Continuing to the next pipeline.'.format(ini_pbar_n + idx + 1))
self._pbar.update(ini_pbar_n + num_job_done - self._pbar.n)
if num_job_done >= len(sklearn_pipeline_list):
break
else:
time.sleep(0.2)
resulting_score_list = [-float('inf') if x == 'Timeout' else x for x in list(res_imap)]

'Continuing to the next pipeline.'.format(self._pbar.n))
resulting_score_list.append(-float('inf'))
else:
resulting_score_list.append(val)
else:
resulting_score_list = []
for sklearn_pipeline in sklearn_pipeline_list:
Expand Down

0 comments on commit e315a21

Please sign in to comment.