forked from python/cpython
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path_bootsubprocess.py
97 lines (81 loc) · 2.61 KB
/
_bootsubprocess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
"""
Basic subprocess implementation for POSIX which only uses os functions. Only
implement features required by setup.py to build C extension modules when
subprocess is unavailable. setup.py is not used on Windows.
"""
import os
# distutils.spawn used by distutils.command.build_ext
# calls subprocess.Popen().wait()
class Popen:
def __init__(self, cmd, env=None):
self._cmd = cmd
self._env = env
self.returncode = None
def wait(self):
pid = os.fork()
if pid == 0:
# Child process
try:
if self._env is not None:
os.execve(self._cmd[0], self._cmd, self._env)
else:
os.execv(self._cmd[0], self._cmd)
finally:
os._exit(1)
else:
# Parent process
_, status = os.waitpid(pid, 0)
self.returncode = os.waitstatus_to_exitcode(status)
return self.returncode
def _check_cmd(cmd):
# Use regex [a-zA-Z0-9./-]+: reject empty string, space, etc.
safe_chars = []
for first, last in (("a", "z"), ("A", "Z"), ("0", "9")):
for ch in range(ord(first), ord(last) + 1):
safe_chars.append(chr(ch))
safe_chars.append("./-")
safe_chars = ''.join(safe_chars)
if isinstance(cmd, (tuple, list)):
check_strs = cmd
elif isinstance(cmd, str):
check_strs = [cmd]
else:
return False
for arg in check_strs:
if not isinstance(arg, str):
return False
if not arg:
# reject empty string
return False
for ch in arg:
if ch not in safe_chars:
return False
return True
# _aix_support used by distutil.util calls subprocess.check_output()
def check_output(cmd, **kwargs):
if kwargs:
raise NotImplementedError(repr(kwargs))
if not _check_cmd(cmd):
raise ValueError(f"unsupported command: {cmd!r}")
tmp_filename = "check_output.tmp"
if not isinstance(cmd, str):
cmd = " ".join(cmd)
cmd = f"{cmd} >{tmp_filename}"
try:
# system() spawns a shell
status = os.system(cmd)
exitcode = os.waitstatus_to_exitcode(status)
if exitcode:
raise ValueError(f"Command {cmd!r} returned non-zero "
f"exit status {exitcode!r}")
try:
with open(tmp_filename, "rb") as fp:
stdout = fp.read()
except FileNotFoundError:
stdout = b''
finally:
try:
os.unlink(tmp_filename)
except OSError:
pass
return stdout