Skip to content

Commit

Permalink
proc: implement process signaling(iterative#7062)
Browse files Browse the repository at this point in the history
fix: iterative#7062
1. Add `send_signal`, `kill`, `terminate` to processManager.
2. Add tests for each of them.

Co-authored-by: Peter Rowlands (변기호) <[email protected]>
  • Loading branch information
karajan1001 and pmrowla committed Dec 2, 2021
1 parent 09f48a5 commit 7d68863
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 8 deletions.
5 changes: 5 additions & 0 deletions dvc/proc/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,8 @@ def __init__(self, cmd, timeout):
)
self.cmd = cmd
self.timeout = timeout


class UnsupportedSignalError(DvcException):
def __init__(self, sig):
super().__init__(f"Unsupported signal: {sig}")
54 changes: 47 additions & 7 deletions dvc/proc/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
import json
import logging
import os
import signal
import sys
from typing import Generator, List, Optional, Union

from funcy.flow import reraise
from shortuuid import uuid

from .exceptions import UnsupportedSignalError
from .process import ManagedProcess, ProcessInfo

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -34,6 +38,12 @@ def __getitem__(self, key: str) -> "ProcessInfo":
except FileNotFoundError:
raise KeyError

@reraise(FileNotFoundError, KeyError)
def __setitem__(self, key: str, value: "ProcessInfo"):
info_path = os.path.join(self.wdir, key, f"{key}.json")
with open(info_path, "w", encoding="utf-8") as fobj:
return json.dump(value.asdict(), fobj)

def get(self, key: str, default=None):
try:
return self[key]
Expand Down Expand Up @@ -63,17 +73,47 @@ def spawn(self, args: Union[str, List[str]], name: Optional[str] = None):
pid,
)

def send_signal(self, name: str, signal: int):
def send_signal(self, name: str, sig: int):
"""Send `signal` to the specified named process."""
raise NotImplementedError

def kill(self, name: str):
"""Kill the specified named process."""
raise NotImplementedError
process_info = self[name]
if sys.platform == "win32":
if sig not in (
signal.SIGTERM,
signal.CTRL_C_EVENT,
signal.CTRL_BREAK_EVENT,
):
raise UnsupportedSignalError(sig)

def handle_closed_process():
logging.warning(
f"Process {name} had already aborted unexpectedly."
)
process_info.returncode = -1
self[name] = process_info

if process_info.returncode is None:
try:
os.kill(process_info.pid, sig)
except ProcessLookupError:
handle_closed_process()
raise
except OSError as exc:
if sys.platform == "win32":
if exc.winerror == 87:
handle_closed_process()
raise ProcessLookupError from exc
raise

def terminate(self, name: str):
"""Terminate the specified named process."""
raise NotImplementedError
self.send_signal(name, signal.SIGTERM)

def kill(self, name: str):
"""Kill the specified named process."""
if sys.platform == "win32":
self.send_signal(name, signal.SIGTERM)
else:
self.send_signal(name, signal.SIGKILL)

def remove(self, name: str, force: bool = False):
"""Remove the specified named process from this manager.
Expand Down
5 changes: 4 additions & 1 deletion dvc/proc/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ class ProcessInfo:
def from_dict(cls, d):
return cls(**d)

def asdict(self):
return asdict(self)


class ManagedProcess(AbstractContextManager):
"""Run the specified command with redirected output.
Expand Down Expand Up @@ -105,7 +108,7 @@ def _make_wdir(self):
def _dump(self):
self._make_wdir()
with open(self.info_path, "w", encoding="utf-8") as fobj:
json.dump(asdict(self.info), fobj)
json.dump(self.info.asdict(), fobj)
with open(self.pidfile_path, "w", encoding="utf-8") as fobj:
fobj.write(str(self.pid))

Expand Down
84 changes: 84 additions & 0 deletions tests/unit/proc/test_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import json
import os
import signal
import sys

import pytest

from dvc.proc.exceptions import UnsupportedSignalError
from dvc.proc.manager import ProcessManager
from dvc.proc.process import ProcessInfo

PID_FINISHED = 1234
PID_RUNNING = 5678


def create_process(root: str, name: str, pid: int, returncode=None):
info_path = os.path.join(root, name, f"{name}.json")
os.makedirs(os.path.join(root, name))
process_info = ProcessInfo(
pid=pid, stdin=None, stdout=None, stderr=None, returncode=returncode
)
with open(info_path, "w", encoding="utf-8") as fobj:
json.dump(process_info.asdict(), fobj)


@pytest.fixture
def finished_process(tmp_dir):
key = "finished"
create_process(tmp_dir, key, PID_FINISHED, 0)
return key


@pytest.fixture
def running_process(tmp_dir):
key = "running"
create_process(tmp_dir, key, PID_RUNNING)
return key


def test_send_signal(tmp_dir, mocker, finished_process, running_process):
m = mocker.patch("os.kill")
process_manager = ProcessManager(tmp_dir)
process_manager.send_signal(running_process, signal.SIGTERM)
m.assert_called_once_with(PID_RUNNING, signal.SIGTERM)

m = mocker.patch("os.kill")
process_manager.send_signal(finished_process, signal.SIGTERM)
m.assert_not_called()

if sys.platform == "win32":
with pytest.raises(UnsupportedSignalError):
process_manager.send_signal(finished_process, signal.SIGABRT)


def test_dead_process(tmp_dir, mocker, running_process):
process_manager = ProcessManager(tmp_dir)
with pytest.raises(ProcessLookupError):
process_manager.send_signal(running_process, signal.SIGTERM)
assert process_manager[running_process].returncode == -1


def test_kill(tmp_dir, mocker, finished_process, running_process):
m = mocker.patch("os.kill")
process_manager = ProcessManager(tmp_dir)
process_manager.kill(running_process)
if sys.platform == "win32":
m.assert_called_once_with(PID_RUNNING, signal.SIGTERM)
else:
m.assert_called_once_with(PID_RUNNING, signal.SIGKILL)

m = mocker.patch("os.kill")
process_manager.kill(finished_process)
m.assert_not_called()


def test_terminate(tmp_dir, mocker, running_process, finished_process):
m = mocker.patch("os.kill")
process_manager = ProcessManager(tmp_dir)
process_manager.terminate(running_process)
m.assert_called_once_with(PID_RUNNING, signal.SIGTERM)

m.reset_mock()
process_manager.terminate(finished_process)
m.assert_not_called()

0 comments on commit 7d68863

Please sign in to comment.