forked from quantumlib/OpenFermion
-
Notifications
You must be signed in to change notification settings - Fork 0
/
shell_tools.py
272 lines (229 loc) · 10.2 KB
/
shell_tools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# Copyright 2018 The Cirq Developers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import subprocess
import sys
from typing import (
List,
Optional,
Tuple,
Union,
IO,
Any,
cast,
NamedTuple,
)
from collections.abc import AsyncIterable
CommandOutput = NamedTuple("CommandOutput", [
('out', Optional[str]),
('err', Optional[str]),
('exit_code', int),
])
BOLD = 1
DIM = 2
RED = 31
GREEN = 32
YELLOW = 33
def highlight(text: str, color_code: int, bold: bool = False) -> str:
"""Wraps the given string with terminal color codes.
Args:
text: The content to highlight.
color_code: The color to highlight with, e.g. 'shelltools.RED'.
bold: Whether to bold the content in addition to coloring.
Returns:
The highlighted string.
"""
return '{}\033[{}m{}\033[0m'.format(
'\033[1m' if bold else '',
color_code,
text,
)
class TeeCapture:
"""Marker class indicating desire to capture output written to a pipe.
If out_pipe is None, the caller just wants to capture output without
writing it to anything in particular.
"""
def __init__(self, out_pipe: Optional[IO[str]] = None) -> None:
self.out_pipe = out_pipe
async def _async_forward(async_chunks: AsyncIterable,
out: Optional[Union[TeeCapture, IO[str]]]
) -> Optional[str]:
"""Prints/captures output from the given asynchronous iterable.
Args:
async_chunks: An asynchronous source of bytes or str.
out: Where to put the chunks.
Returns:
The complete captured output, or else None if the out argument wasn't a
TeeCapture instance.
"""
capture = isinstance(out, TeeCapture)
out_pipe = out.out_pipe if isinstance(out, TeeCapture) else out
chunks: Optional[List[str]] = [] if capture else None
async for chunk in async_chunks:
if not isinstance(chunk, str):
chunk = chunk.decode()
if out_pipe:
print(chunk, file=out_pipe, end='')
if chunks is not None:
chunks.append(chunk)
return ''.join(chunks) if chunks is not None else None
async def _async_wait_for_process(
future_process: Any,
out: Optional[Union[TeeCapture, IO[str]]] = sys.stdout,
err: Optional[Union[TeeCapture, IO[str]]] = sys.stderr
) -> CommandOutput:
"""Awaits the creation and completion of an asynchronous process.
Args:
future_process: The eventually created process.
out: Where to write stuff emitted by the process' stdout.
err: Where to write stuff emitted by the process' stderr.
Returns:
A (captured output, captured error output, return code) triplet.
"""
process = await future_process
future_output = _async_forward(process.stdout, out)
future_err_output = _async_forward(process.stderr, err)
output, err_output = await asyncio.gather(future_output, future_err_output)
await process.wait()
return CommandOutput(output, err_output, process.returncode)
def abbreviate_command_arguments_after_switches(cmd: Tuple[str, ...]
) -> Tuple[str, ...]:
result = [cmd[0]]
for i in range(1, len(cmd)):
if not cmd[i].startswith('-'):
result.append('[...]')
break
result.append(cmd[i])
return tuple(result)
def run_cmd(*cmd: Optional[str],
out: Optional[Union[TeeCapture, IO[str]]] = sys.stdout,
err: Optional[Union[TeeCapture, IO[str]]] = sys.stderr,
raise_on_fail: bool = True,
log_run_to_stderr: bool = True,
abbreviate_non_option_arguments: bool = False,
**kwargs) -> CommandOutput:
"""Invokes a subprocess and waits for it to finish.
Args:
cmd: Components of the command to execute, e.g. ["echo", "dog"].
out: Where to write the process' stdout. Defaults to sys.stdout. Can be
anything accepted by print's 'file' parameter, or None if the
output should be dropped, or a TeeCapture instance. If a TeeCapture
instance is given, the first element of the returned tuple will be
the captured output.
err: Where to write the process' stderr. Defaults to sys.stderr. Can be
anything accepted by print's 'file' parameter, or None if the
output should be dropped, or a TeeCapture instance. If a TeeCapture
instance is given, the second element of the returned tuple will be
the captured error output.
raise_on_fail: If the process returns a non-zero error code
and this flag is set, a CalledProcessError will be raised.
Otherwise the return code is the third element of the returned
tuple.
log_run_to_stderr: Determines whether the fact that this shell command
was executed is logged to sys.stderr or not.
abbreviate_non_option_arguments: When logging to stderr, this cuts off
the potentially-huge tail of the command listing off e.g. hundreds
of file paths. No effect if log_run_to_stderr is not set.
**kwargs: Extra arguments for asyncio.create_subprocess_shell, such as
a cwd (current working directory) argument.
Returns:
A (captured output, captured error output, return code) triplet. The
captured outputs will be None if the out or err parameters were not set
to an instance of TeeCapture.
Raises:
subprocess.CalledProcessError: The process returned a non-zero error
code and raise_on_fail was set.
"""
kept_cmd = tuple(cast(str, e) for e in cmd if e is not None)
if log_run_to_stderr:
cmd_desc = kept_cmd
if abbreviate_non_option_arguments:
cmd_desc = abbreviate_command_arguments_after_switches(cmd_desc)
print('run:', cmd_desc, file=sys.stderr)
result = asyncio.get_event_loop().run_until_complete(
_async_wait_for_process(
asyncio.create_subprocess_exec(*kept_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
**kwargs), out, err))
if raise_on_fail and result[2]:
raise subprocess.CalledProcessError(result[2], kept_cmd)
return result
def run_shell(cmd: str,
out: Optional[Union[TeeCapture, IO[str]]] = sys.stdout,
err: Optional[Union[TeeCapture, IO[str]]] = sys.stderr,
raise_on_fail: bool = True,
log_run_to_stderr: bool = True,
**kwargs) -> CommandOutput:
"""Invokes a shell command and waits for it to finish.
Args:
cmd: The command line string to execute, e.g. "echo dog | cat > file".
out: Where to write the process' stdout. Defaults to sys.stdout. Can be
anything accepted by print's 'file' parameter, or None if the
output should be dropped, or a TeeCapture instance. If a TeeCapture
instance is given, the first element of the returned tuple will be
the captured output.
err: Where to write the process' stderr. Defaults to sys.stderr. Can be
anything accepted by print's 'file' parameter, or None if the
output should be dropped, or a TeeCapture instance. If a TeeCapture
instance is given, the second element of the returned tuple will be
the captured error output.
raise_on_fail: If the process returns a non-zero error code
and this flag is set, a CalledProcessError will be raised.
Otherwise the return code is the third element of the returned
tuple.
log_run_to_stderr: Determines whether the fact that this shell command
was executed is logged to sys.stderr or not.
**kwargs: Extra arguments for asyncio.create_subprocess_shell, such as
a cwd (current working directory) argument.
Returns:
A (captured output, captured error output, return code) triplet. The
captured outputs will be None if the out or err parameters were not set
to an instance of TeeCapture.
Raises:
subprocess.CalledProcessError: The process returned a non-zero error
code and raise_on_fail was set.
"""
if log_run_to_stderr:
print('shell:', cmd, file=sys.stderr)
result = asyncio.get_event_loop().run_until_complete(
_async_wait_for_process(
asyncio.create_subprocess_shell(cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
**kwargs), out, err))
if raise_on_fail and result[2]:
raise subprocess.CalledProcessError(result[2], cmd)
return result
def output_of(*cmd: Optional[str], **kwargs) -> str:
"""Invokes a subprocess and returns its output as a string.
Args:
cmd: Components of the command to execute, e.g. ["echo", "dog"].
**kwargs: Extra arguments for asyncio.create_subprocess_shell, such as
a cwd (current working directory) argument.
Returns:
A (captured output, captured error output, return code) triplet. The
captured outputs will be None if the out or err parameters were not set
to an instance of TeeCapture.
Raises:
subprocess.CalledProcessError: The process returned a non-zero error
code and raise_on_fail was set.
"""
result = cast(
str,
run_cmd(*cmd, log_run_to_stderr=False, out=TeeCapture(), **kwargs).out)
# Strip final newline.
if result.endswith('\n'):
result = result[:-1]
return result