Skip to content

Commit

Permalink
bpo-36668: FIX reuse semaphore tracker for child processes (python#5172)
Browse files Browse the repository at this point in the history
Fix the multiprocessing.semaphore_tracker so it is reused by child processes.
  • Loading branch information
tomMoral authored and pitrou committed Apr 24, 2019
1 parent 09d434c commit 004b93e
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 10 deletions.
36 changes: 26 additions & 10 deletions Lib/multiprocessing/semaphore_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,23 @@ def ensure_running(self):
This can be run from any process. Usually a child process will use
the semaphore created by its parent.'''
with self._lock:
if self._pid is not None:
if self._fd is not None:
# semaphore tracker was launched before, is it still running?
if self._check_alive():
# => still alive
return
# => dead, launch it again
os.close(self._fd)

# Clean-up to avoid dangling processes.
try:
pid, _ = os.waitpid(self._pid, os.WNOHANG)
# _pid can be None if this process is a child from another
# python process, which has started the semaphore_tracker.
if self._pid is not None:
os.waitpid(self._pid, 0)
except ChildProcessError:
# The process terminated
# The semaphore_tracker has already been terminated.
pass
else:
if not pid:
# => still alive
return

# => dead, launch it again
os.close(self._fd)
self._fd = None
self._pid = None

Expand Down Expand Up @@ -99,6 +102,17 @@ def ensure_running(self):
finally:
os.close(r)

def _check_alive(self):
'''Check that the pipe has not been closed by sending a probe.'''
try:
# We cannot use send here as it calls ensure_running, creating
# a cycle.
os.write(self._fd, b'PROBE:0\n')
except OSError:
return False
else:
return True

def register(self, name):
'''Register name of semaphore with semaphore tracker.'''
self._send('REGISTER', name)
Expand Down Expand Up @@ -150,6 +164,8 @@ def main(fd):
cache.add(name)
elif cmd == b'UNREGISTER':
cache.remove(name)
elif cmd == b'PROBE':
pass
else:
raise RuntimeError('unrecognized command %r' % cmd)
except Exception:
Expand Down
28 changes: 28 additions & 0 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4891,6 +4891,34 @@ def test_semaphore_tracker_sigkill(self):
# Uncatchable signal.
self.check_semaphore_tracker_death(signal.SIGKILL, True)

@staticmethod
def _is_semaphore_tracker_reused(conn, pid):
from multiprocessing.semaphore_tracker import _semaphore_tracker
_semaphore_tracker.ensure_running()
# The pid should be None in the child process, expect for the fork
# context. It should not be a new value.
reused = _semaphore_tracker._pid in (None, pid)
reused &= _semaphore_tracker._check_alive()
conn.send(reused)

def test_semaphore_tracker_reused(self):
from multiprocessing.semaphore_tracker import _semaphore_tracker
_semaphore_tracker.ensure_running()
pid = _semaphore_tracker._pid

r, w = multiprocessing.Pipe(duplex=False)
p = multiprocessing.Process(target=self._is_semaphore_tracker_reused,
args=(w, pid))
p.start()
is_semaphore_tracker_reused = r.recv()

# Clean up
p.join()
w.close()
r.close()

self.assertTrue(is_semaphore_tracker_reused)


class TestSimpleQueue(unittest.TestCase):

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix the multiprocessing.semaphore_tracker so it is reused by child processes

0 comments on commit 004b93e

Please sign in to comment.