Skip to content

Commit

Permalink
Revert "Revert "This patch makes Ansible reuse fork allocation betwee…
Browse files Browse the repository at this point in the history
…n seperate instantations of the runner API, therefore the overhead of recreating forks""

This reverts commit 6685b49.
  • Loading branch information
Michael DeHaan committed Feb 7, 2014
1 parent 60d3611 commit 73ca1a1
Showing 1 changed file with 21 additions and 47 deletions.
68 changes: 21 additions & 47 deletions lib/ansible/runner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

module_replacer = ModuleReplacer(strip_comments=False)

NEED_ATFORK=False
HAS_ATFORK=True
try:
from Crypto.Random import atfork
Expand All @@ -60,30 +61,28 @@
OUTPUT_LOCKFILE = tempfile.TemporaryFile()
PROCESS_LOCKFILE = tempfile.TemporaryFile()

from foon import Foon

FOON = Foon()

################################################

def _executor_hook(job_queue, result_queue, new_stdin):
class KeyboardInterruptError(Exception):
pass

def _executor_hook(params):

(host, my_stdin) = params

# attempt workaround of https://github.com/newsapps/beeswithmachineguns/issues/17
# this function also not present in CentOS 6
if HAS_ATFORK:
if HAS_ATFORK and NEED_ATFORK:
atfork()

signal.signal(signal.SIGINT, signal.SIG_IGN)
while not job_queue.empty():
try:
host = job_queue.get(block=False)
return_data = multiprocessing_runner._executor(host, new_stdin)
result_queue.put(return_data)

if 'LEGACY_TEMPLATE_WARNING' in return_data.flags:
# pass data back up across the multiprocessing fork boundary
template.Flags.LEGACY_TEMPLATE_WARNING = True

except Queue.Empty:
pass
except:
traceback.print_exc()
try:
return multiprocessing_runner._executor(host, my_stdin)
except KeyboardInterrupt:
raise KeyboardInterruptError()

class HostVars(dict):
''' A special view of setup_cache that adds values from the inventory when needed. '''
Expand Down Expand Up @@ -209,6 +208,9 @@ def __init__(self,
else:
self.transport = "ssh"

if self.transport == "paramiko":
global NEED_ATFORK
NEED_ATFORK=True

# misc housekeeping
if subset and self.inventory._subset is None:
Expand Down Expand Up @@ -1056,39 +1058,11 @@ def _configure_module(self, conn, module_name, module_args, inject, complex_args

# *****************************************************


def _parallel_exec(self, hosts):
''' handles mulitprocessing when more than 1 fork is required '''

manager = multiprocessing.Manager()
job_queue = manager.Queue()
for host in hosts:
job_queue.put(host)
result_queue = manager.Queue()

workers = []
for i in range(self.forks):
new_stdin = os.fdopen(os.dup(sys.stdin.fileno()))
prc = multiprocessing.Process(target=_executor_hook,
args=(job_queue, result_queue, new_stdin))
prc.start()
workers.append(prc)

try:
for worker in workers:
worker.join()
except KeyboardInterrupt:
for worker in workers:
worker.terminate()
worker.join()

results = []
try:
while not result_queue.empty():
results.append(result_queue.get(block=False))
except socket.error:
raise errors.AnsibleError("<interrupted>")
return results
FOON.set_size(self.forks)
return FOON.map(_executor_hook, hosts)

# *****************************************************

Expand Down

0 comments on commit 73ca1a1

Please sign in to comment.