Skip to content

Commit

Permalink
Increase robustness of ProcessManager.terminate() in the face of zomb…
Browse files Browse the repository at this point in the history
…ies.

- Implement zombie detection in ProcessManager.is_alive()
- Implement stale pid detection in ProcessManager.is_alive() and enforce for NailgunExecutor
- Misc refactoring + improved usage of psutil.Process attributes in support of the above
- Tests

Testing Done:
pantsbuild#1848 + setup a centos6 vm, created a zombie process via a test harness and attempted to kill it via the ng-killall code path (which relies on ProcessManager.terminate() & is_alive()). verified this change successfully prevents the exception case on linux when trying to kill a zombie'd process.

Reviewed at https://rbcommons.com/s/twitter/r/2513/
  • Loading branch information
kwlzn authored and stuhood committed Jul 23, 2015
1 parent 5eb41d9 commit eca2c71
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 27 deletions.
5 changes: 3 additions & 2 deletions src/python/pants/java/nailgun_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(self):

def _iter_nailgun_instances(self, everywhere=False):
def predicate(proc):
if proc.name == b'java':
if proc.name == NailgunExecutor._PROCESS_NAME:
if not everywhere:
return NailgunExecutor._PANTS_NG_ARG in proc.cmdline
else:
Expand Down Expand Up @@ -77,11 +77,12 @@ class NailgunExecutor(Executor, ProcessManager):

_NAILGUN_SPAWN_LOCK = threading.Lock()
_SELECT_WAIT = 1
_PROCESS_NAME = b'java'

def __init__(self, identity, workdir, nailgun_classpath, distribution=None, ins=None,
connect_timeout=10, connect_attempts=5):
Executor.__init__(self, distribution=distribution)
ProcessManager.__init__(self, name=identity)
ProcessManager.__init__(self, name=identity, process_name=self._PROCESS_NAME)

if not isinstance(workdir, string_types):
raise ValueError('Workdir must be a path string, not: {workdir}'.format(workdir=workdir))
Expand Down
45 changes: 32 additions & 13 deletions src/python/pants/pantsd/process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def _swallow_psutil_exceptions(self):

def _instance_from_process(self, process):
"""Default converter from psutil.Process to process instance classes for subclassing."""
return ProcessManager(name=process.cmdline[0], pid=process.pid, process_name=process.cmdline[0])
return ProcessManager(name=process.name, pid=process.pid, process_name=process.name)

def iter_processes(self, proc_filter=None):
proc_filter = proc_filter or (lambda x: True)
Expand Down Expand Up @@ -74,25 +74,32 @@ def __init__(self, name, pid=None, socket=None, process_name=None, socket_type=N

@property
def name(self):
"""The logical name/label of the process."""
return self._name

@property
def process_name(self):
"""The logical process name. If defined, this is compared to exe_name for stale pid checking."""
return self._process_name

@property
def exe(self):
try:
return self.as_process().cmdline[0]
except AssertionError:
return None
"""The full path of the process executable e.g. '/opt/java/jdk1.7.0/Contents/Home/bin/java'."""
return getattr(self.as_process(), 'exe', None)

@property
def process_name(self):
return self._process_name
def exe_name(self):
"""The basename of the process executable e.g. 'java'."""
return getattr(self.as_process(), 'name', None)

@property
def pid(self):
"""The running processes pid (or None)."""
return self._pid or self.get_pid()

@property
def socket(self):
"""The running processes socket/port information (or None)."""
return self._socket or self.get_socket()

@staticmethod
Expand All @@ -103,9 +110,11 @@ def _maybe_cast(x, caster):
return x

def as_process(self):
assert self.is_alive(), 'cannot get process for a non-running process'
if not self._process:
self._process = psutil.Process(self.pid)
if self._process is None and self.pid:
try:
self._process = psutil.Process(self.pid)
except psutil.NoSuchProcess:
pass
return self._process

def _read_file(self, filename):
Expand Down Expand Up @@ -191,10 +200,20 @@ def get_socket(self):
except (IOError, OSError):
return None

def is_alive(self, pid=None):
def is_alive(self):
"""Return a boolean indicating whether the process is running."""
return psutil.pid_exists(pid or self.pid)
# TODO: consider stale pidfile case and assertion of self.process_name == proc.cmdline[0]
if self.as_process():
try:
if (self.as_process().status == psutil.STATUS_ZOMBIE or # Check for walkers.
(self.process_name and self.process_name != self.exe_name)): # Check for stale pids.
return False
except psutil.NoSuchProcess:
# On some platforms, accessing attributes of a zombie'd Process results in NoSuchProcess.
return False

return True
else:
return False

def kill(self, kill_sig):
"""Send a signal to the current process."""
Expand Down
58 changes: 46 additions & 12 deletions tests/python/pants_test/pantsd/test_process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
PATCH_OPTS = dict(autospec=True, spec_set=True)


FakeProcess = namedtuple('Process', 'cmdline pid')
FakeProcess = namedtuple('Process', 'pid name status')


class TestProcessGroup(unittest.TestCase):
Expand All @@ -45,8 +45,10 @@ def test_iter_processes_filtered(self):

def test_iter_instances(self):
with mock.patch('psutil.process_iter', **PATCH_OPTS) as mock_process_iter:
mock_process_iter.return_value = [FakeProcess(cmdline=['a_test'], pid=3),
FakeProcess(cmdline=['b_test'], pid=4)]
mock_process_iter.return_value = [
FakeProcess(name='a_test', pid=3, status=psutil.STATUS_IDLE),
FakeProcess(name='b_test', pid=4, status=psutil.STATUS_IDLE)
]

items = [item for item in self.pg.iter_instances()]
self.assertEqual(len(items), 2)
Expand Down Expand Up @@ -74,10 +76,13 @@ def test_readwrite_file(self):

def test_as_process(self):
sentinel = 3333
with mock.patch.object(ProcessManager, 'is_alive', **PATCH_OPTS):
with mock.patch('psutil.Process', **PATCH_OPTS) as mock_proc:
mock_proc.return_value = sentinel
self.assertEqual(self.pm.as_process(), sentinel)
with mock.patch('psutil.Process', **PATCH_OPTS) as mock_proc:
mock_proc.return_value = sentinel
self.pm._pid = sentinel
self.assertEqual(self.pm.as_process(), sentinel)

def test_as_process_none(self):
self.assertEqual(self.pm.as_process(), None)

def test_wait_for_file(self):
with temporary_dir() as td:
Expand Down Expand Up @@ -163,21 +168,50 @@ def test_get_socket(self):
patched_pm.return_value = '3333'
self.assertEqual(self.pm.get_socket(), 3333)

def test_is_alive_neg(self):
with mock.patch.object(ProcessManager, 'as_process', **PATCH_OPTS) as mock_as_process:
mock_as_process.return_value = None
self.assertFalse(self.pm.is_alive())
mock_as_process.assert_called_once_with(self.pm)

def test_is_alive(self):
with mock.patch('psutil.pid_exists', **PATCH_OPTS) as mock_psutil:
mock_psutil.return_value = False
with mock.patch.object(ProcessManager, 'as_process', **PATCH_OPTS) as mock_as_process:
mock_as_process.return_value = FakeProcess(name='test', pid=3, status=psutil.STATUS_IDLE)
self.pm._process = mock.Mock(status=psutil.STATUS_IDLE)
self.assertTrue(self.pm.is_alive())
mock_as_process.assert_called_with(self.pm)

def test_is_alive_zombie(self):
with mock.patch.object(ProcessManager, 'as_process', **PATCH_OPTS) as mock_as_process:
mock_as_process.return_value = FakeProcess(name='test', pid=3, status=psutil.STATUS_ZOMBIE)
self.assertFalse(self.pm.is_alive())
mock_as_process.assert_called_with(self.pm)

def test_is_alive_zombie_exception(self):
with mock.patch.object(ProcessManager, 'as_process', **PATCH_OPTS) as mock_as_process:
mock_as_process.side_effect = iter([
FakeProcess(name='test', pid=3, status=psutil.STATUS_IDLE),
psutil.NoSuchProcess(0)
])
self.assertFalse(self.pm.is_alive())
mock_psutil.assert_called_once_with(None)
mock_as_process.assert_called_with(self.pm)

def test_is_alive_stale_pid(self):
with mock.patch.object(ProcessManager, 'as_process', **PATCH_OPTS) as mock_as_process:
mock_as_process.return_value = FakeProcess(name='not_test', pid=3, status=psutil.STATUS_IDLE)
self.pm._process_name = 'test'
self.assertFalse(self.pm.is_alive())
mock_as_process.assert_called_with(self.pm)

def test_kill(self):
with mock.patch('os.kill', **PATCH_OPTS) as mock_kill:
self.pm.kill(0)
mock_kill.assert_called_once_with(None, 0)

def test_terminate(self):
with mock.patch('psutil.pid_exists', **PATCH_OPTS) as mock_exists:
with mock.patch.object(ProcessManager, 'is_alive', **PATCH_OPTS) as mock_alive:
with mock.patch('os.kill', **PATCH_OPTS):
mock_exists.return_value = True
mock_alive.return_value = True
with self.assertRaises(self.pm.NonResponsiveProcess):
self.pm.terminate(kill_wait=.1, purge=False)

Expand Down

0 comments on commit eca2c71

Please sign in to comment.