-
Notifications
You must be signed in to change notification settings - Fork 65
/
Copy pathapi.py
310 lines (259 loc) · 10 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import logging
import sys
import traceback
from functools import partial
from pathlib import Path
from typing import Generator, Iterable, List, Optional
import click
import trailrunner
from moreorless.click import echo_color_precomputed_diff
from .config import collect_rules, generate_config
from .engine import LintRunner
from .format import format_module
from .ftypes import (
Config,
FileContent,
LintViolation,
MetricsHook,
Options,
OutputFormat,
Result,
STDIN,
)
LOG = logging.getLogger(__name__)
def print_result(
result: Result,
*,
show_diff: bool = False,
stderr: bool = False,
output_format: OutputFormat = OutputFormat.fixit,
output_template: str = "",
) -> int:
"""
Print linting results in a simple format designed for human eyes.
Setting ``show_diff=True`` will output autofixes or suggested changes in unified
diff format, using ANSI colors when possible.
Returns ``True`` if the result is "dirty" - either a lint error or exception.
"""
path = result.path
try:
path = path.relative_to(Path.cwd())
except ValueError:
pass
if result.violation:
rule_name = result.violation.rule_name
start_line = result.violation.range.start.line
start_col = result.violation.range.start.column
message = result.violation.message
if result.violation.autofixable:
message += " (has autofix)"
if output_format == OutputFormat.fixit:
line = f"{path}@{start_line}:{start_col} {rule_name}: {message}"
elif output_format == OutputFormat.vscode:
line = f"{path}:{start_line}:{start_col} {rule_name}: {message}"
elif output_format == OutputFormat.custom:
line = output_template.format(
message=message,
path=path,
result=result,
rule_name=rule_name,
start_col=start_col,
start_line=start_line,
)
else:
raise NotImplementedError(f"output-format = {output_format!r}")
click.secho(line, fg="yellow", err=stderr)
if show_diff and result.violation.diff:
echo_color_precomputed_diff(result.violation.diff)
return True
elif result.error:
# An exception occurred while processing a file
error, tb = result.error
click.secho(f"{path}: EXCEPTION: {error}", fg="red", err=stderr)
click.echo(tb.strip(), err=stderr)
return True
else:
LOG.debug("%s: clean", path)
return False
def fixit_bytes(
path: Path,
content: FileContent,
*,
config: Config,
autofix: bool = False,
metrics_hook: Optional[MetricsHook] = None,
) -> Generator[Result, bool, Optional[FileContent]]:
"""
Lint raw bytes content representing a single path, using the given configuration.
Yields :class:`Result` objects for each lint error or exception found, or a single
empty result if the file is clean. A file is considered clean if no lint errors or
no rules are enabled for the given path.
Returns the final :class:`FileContent` including any fixes applied.
Use :func:`capture` to more easily capture return value after iterating through
violations. Use ``generator.send(...)`` with a boolean value to apply individual
fixes for each violation.
If ``autofix`` is ``True``, all violations with replacements will be applied
automatically, even if ``False`` is sent back to the generator.
"""
try:
rules = collect_rules(config)
if not rules:
yield Result(path, violation=None)
return None
runner = LintRunner(path, content)
pending_fixes: List[LintViolation] = []
clean = True
for violation in runner.collect_violations(rules, config, metrics_hook):
clean = False
fix = yield Result(path, violation)
if fix or autofix:
pending_fixes.append(violation)
if clean:
yield Result(path, violation=None)
if pending_fixes:
updated = runner.apply_replacements(pending_fixes)
return format_module(updated, path, config)
except Exception as error:
# TODO: this is not the right place to catch errors
LOG.debug("Exception while linting", exc_info=error)
yield Result(path, violation=None, error=(error, traceback.format_exc()))
return None
def fixit_stdin(
path: Path,
*,
autofix: bool = False,
options: Optional[Options] = None,
metrics_hook: Optional[MetricsHook] = None,
) -> Generator[Result, bool, None]:
"""
Wrapper around :func:`fixit_bytes` for formatting content from STDIN.
The resulting fixed content will be printed to STDOUT.
Requires passing a path that represents the filesystem location matching the
contents to be linted. This will be used to resolve the ``fixit.toml`` config
file(s).
"""
path = path.resolve()
try:
content: FileContent = sys.stdin.buffer.read()
config = generate_config(path, options=options)
updated = yield from fixit_bytes(
path, content, config=config, autofix=autofix, metrics_hook=metrics_hook
)
if autofix:
sys.stdout.buffer.write(updated or content)
except Exception as error:
LOG.debug("Exception while fixit_stdin", exc_info=error)
yield Result(path, violation=None, error=(error, traceback.format_exc()))
def fixit_file(
path: Path,
*,
autofix: bool = False,
options: Optional[Options] = None,
metrics_hook: Optional[MetricsHook] = None,
) -> Generator[Result, bool, None]:
"""
Lint a single file on disk, detecting and generating appropriate configuration.
Generates a merged :ref:`configuration` based on all applicable config files.
Reads file from disk as raw bytes, and uses :func:`fixit_bytes` to lint and apply
any fixes to the content. Writes content back to disk if changes are detected.
Yields :class:`Result` objects for each lint error or exception found, or a single
empty result if the file is clean.
See :func:`fixit_bytes` for semantics.
"""
path = path.resolve()
try:
content: FileContent = path.read_bytes()
config = generate_config(path, options=options)
updated = yield from fixit_bytes(
path, content, config=config, autofix=autofix, metrics_hook=metrics_hook
)
if updated and updated != content:
LOG.info(f"{path}: writing changes to file")
path.write_bytes(updated)
except Exception as error:
LOG.debug("Exception while fixit_file", exc_info=error)
yield Result(path, violation=None, error=(error, traceback.format_exc()))
def _fixit_file_wrapper(
path: Path,
*,
autofix: bool = False,
options: Optional[Options] = None,
metrics_hook: Optional[MetricsHook] = None,
) -> List[Result]:
"""
Wrapper because generators can't be pickled or used directly via multiprocessing
TODO: replace this with some sort of queue or whatever
"""
return list(
fixit_file(path, autofix=autofix, options=options, metrics_hook=metrics_hook)
)
def fixit_paths(
paths: Iterable[Path],
*,
autofix: bool = False,
options: Optional[Options] = None,
parallel: bool = True,
metrics_hook: Optional[MetricsHook] = None,
) -> Generator[Result, bool, None]:
"""
Lint multiple files or directories, recursively expanding each path.
Walks all paths given, obeying any ``.gitignore`` exclusions, finding Python source
files. Lints each file found using :func:`fixit_file`, using a process pool when
more than one file is being linted.
Yields :class:`Result` objects for each path, lint error, or exception found.
See :func:`fixit_bytes` for semantics.
If the first given path is STDIN (``Path("-")``), then content will be linted
from STDIN using :func:`fixit_stdin`. The fixed content will be written to STDOUT.
A second path argument may be given, which represents the original content's true
path name, and will be used:
- to resolve the ``fixit.toml`` configuration file(s)
- when printing status messages, diffs, or errors.
If no second path argument is given, it will default to "stdin" in the current
working directory.
Any further path names will result in a runtime error.
.. note::
Currently does not support applying individual fixes when ``parallel=True``,
due to limitations in the multiprocessing method in use.
Setting ``parallel=False`` will enable interactive fixes.
Setting ``autofix=True`` will always apply fixes automatically during linting.
"""
if not paths:
return
expanded_paths: List[Path] = []
is_stdin = False
stdin_path = Path("stdin")
for i, path in enumerate(paths):
if path == STDIN:
if i == 0:
is_stdin = True
else:
LOG.warning("Cannot mix stdin ('-') with normal paths, ignoring")
elif is_stdin:
if i == 1:
stdin_path = path
else:
raise ValueError("too many stdin paths")
else:
expanded_paths.extend(trailrunner.walk(path))
if is_stdin:
yield from fixit_stdin(
stdin_path, autofix=autofix, options=options, metrics_hook=metrics_hook
)
elif len(expanded_paths) == 1 or not parallel:
for path in expanded_paths:
yield from fixit_file(
path, autofix=autofix, options=options, metrics_hook=metrics_hook
)
else:
fn = partial(
_fixit_file_wrapper,
autofix=autofix,
options=options,
metrics_hook=metrics_hook,
)
for _, results in trailrunner.run_iter(expanded_paths, fn):
yield from results