forked from donnemartin/gitsome
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
527 lines (476 loc) · 15.6 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
# -*- coding: utf-8 -*-
"""The main xonsh script."""
import os
import sys
import enum
import argparse
import builtins
import contextlib
import signal
import traceback
from xonsh import __version__
from xonsh.timings import setup_timings
from xonsh.lazyasd import lazyobject
from xonsh.shell import Shell
from xonsh.pretty import pretty
from xonsh.execer import Execer
from xonsh.proc import HiddenCommandPipeline
from xonsh.jobs import ignore_sigtstp
from xonsh.tools import setup_win_unicode_console, print_color, to_bool_or_int
from xonsh.platform import HAS_PYGMENTS, ON_WINDOWS
from xonsh.codecache import run_script_with_cache, run_code_with_cache
from xonsh.xonfig import print_welcome_screen
from xonsh.xontribs import xontribs_load
from xonsh.lazyimps import pygments, pyghooks
from xonsh.imphooks import install_import_hooks
from xonsh.events import events
from xonsh.environ import xonshrc_context, make_args_env
from xonsh.built_ins import XonshSession, load_builtins, load_proxies
from gitsome import __version__ as gitsome_version
events.transmogrify("on_post_init", "LoadEvent")
events.doc(
"on_post_init",
"""
on_post_init() -> None
Fired after all initialization is finished and we're ready to do work.
NOTE: This is fired before the wizard is automatically started.
""",
)
events.transmogrify("on_exit", "LoadEvent")
events.doc(
"on_exit",
"""
on_exit() -> None
Fired after all commands have been executed, before tear-down occurs.
NOTE: All the caveats of the ``atexit`` module also apply to this event.
""",
)
events.transmogrify("on_pre_cmdloop", "LoadEvent")
events.doc(
"on_pre_cmdloop",
"""
on_pre_cmdloop() -> None
Fired just before the command loop is started, if it is.
""",
)
events.transmogrify("on_post_cmdloop", "LoadEvent")
events.doc(
"on_post_cmdloop",
"""
on_post_cmdloop() -> None
Fired just after the command loop finishes, if it is.
NOTE: All the caveats of the ``atexit`` module also apply to this event.
""",
)
events.transmogrify("on_pre_rc", "LoadEvent")
events.doc(
"on_pre_rc",
"""
on_pre_rc() -> None
Fired just before rc files are loaded, if they are.
""",
)
events.transmogrify("on_post_rc", "LoadEvent")
events.doc(
"on_post_rc",
"""
on_post_rc() -> None
Fired just after rc files are loaded, if they are.
""",
)
def get_setproctitle():
"""Proxy function for loading process title"""
try:
from setproctitle import setproctitle as spt
except ImportError:
return
return spt
def path_argument(s):
"""Return a path only if the path is actually legal
This is very similar to argparse.FileType, except that it doesn't return
an open file handle, but rather simply validates the path."""
s = os.path.abspath(os.path.expanduser(s))
if not os.path.isfile(s):
msg = "{0!r} must be a valid path to a file".format(s)
raise argparse.ArgumentTypeError(msg)
return s
@lazyobject
def parser():
p = argparse.ArgumentParser(description="xonsh", add_help=False)
p.add_argument(
"-h",
"--help",
dest="help",
action="store_true",
default=False,
help="show help and exit",
)
p.add_argument(
"-V",
"--version",
dest="version",
action="store_true",
default=False,
help="show version information and exit",
)
p.add_argument(
"-c",
help="Run a single command and exit",
dest="command",
required=False,
default=None,
)
p.add_argument(
"-i",
"--interactive",
help="force running in interactive mode",
dest="force_interactive",
action="store_true",
default=False,
)
p.add_argument(
"-l",
"--login",
help="run as a login shell",
dest="login",
action="store_true",
default=False,
)
p.add_argument(
"--config-path",
help="DEPRECATED: static configuration files may now be used "
"in the XONSHRC file list, see the --rc option.",
dest="config_path",
default=None,
type=path_argument,
)
p.add_argument(
"--rc",
help="The xonshrc files to load, these may be either xonsh "
"files or JSON-based static configuration files.",
dest="rc",
nargs="+",
type=path_argument,
default=None,
)
p.add_argument(
"--no-rc",
help="Do not load the .xonshrc files",
dest="norc",
action="store_true",
default=False,
)
p.add_argument(
"--no-script-cache",
help="Do not cache scripts as they are run",
dest="scriptcache",
action="store_false",
default=True,
)
p.add_argument(
"--cache-everything",
help="Use a cache, even for interactive commands",
dest="cacheall",
action="store_true",
default=False,
)
p.add_argument(
"-D",
dest="defines",
help="define an environment variable, in the form of "
"-DNAME=VAL. May be used many times.",
metavar="ITEM",
action="append",
default=None,
)
p.add_argument(
"--shell-type",
help="What kind of shell should be used. "
"Possible options: readline, prompt_toolkit, random. "
"Warning! If set this overrides $SHELL_TYPE variable.",
dest="shell_type",
choices=tuple(Shell.shell_type_aliases.keys()),
default=None,
)
p.add_argument(
"--timings",
help="Prints timing information before the prompt is shown. "
"This is useful while tracking down performance issues "
"and investigating startup times.",
dest="timings",
action="store_true",
default=None,
)
p.add_argument(
"file",
metavar="script-file",
help="If present, execute the script in script-file" " and exit",
nargs="?",
default=None,
)
p.add_argument(
"args",
metavar="args",
help="Additional arguments to the script specified " "by script-file",
nargs=argparse.REMAINDER,
default=[],
)
return p
def _pprint_displayhook(value):
if value is None:
return
builtins._ = None # Set '_' to None to avoid recursion
if isinstance(value, HiddenCommandPipeline):
builtins._ = value
return
env = builtins.__xonsh__.env
if env.get("PRETTY_PRINT_RESULTS"):
printed_val = pretty(value)
else:
printed_val = repr(value)
if HAS_PYGMENTS and env.get("COLOR_RESULTS"):
tokens = list(pygments.lex(printed_val, lexer=pyghooks.XonshLexer()))
end = "" if env.get("SHELL_TYPE") == "prompt_toolkit2" else "\n"
print_color(tokens, end=end)
else:
print(printed_val) # black & white case
builtins._ = value
class XonshMode(enum.Enum):
single_command = 0
script_from_file = 1
script_from_stdin = 2
interactive = 3
def start_services(shell_kwargs, args):
"""Starts up the essential services in the proper order.
This returns the environment instance as a convenience.
"""
install_import_hooks()
# create execer, which loads builtins
ctx = shell_kwargs.get("ctx", {})
debug = to_bool_or_int(os.getenv("XONSH_DEBUG", "0"))
events.on_timingprobe.fire(name="pre_execer_init")
execer = Execer(
xonsh_ctx=ctx,
debug_level=debug,
scriptcache=shell_kwargs.get("scriptcache", True),
cacheall=shell_kwargs.get("cacheall", False),
)
events.on_timingprobe.fire(name="post_execer_init")
# load rc files
login = shell_kwargs.get("login", True)
env = builtins.__xonsh__.env
rc = shell_kwargs.get("rc", None)
rc = env.get("XONSHRC") if rc is None else rc
if args.mode != XonshMode.interactive and not args.force_interactive:
# Don't load xonshrc if not interactive shell
rc = None
events.on_pre_rc.fire()
xonshrc_context(rcfiles=rc, execer=execer, ctx=ctx, env=env, login=login)
events.on_post_rc.fire()
# create shell
builtins.__xonsh__.shell = Shell(execer=execer, **shell_kwargs)
ctx["__name__"] = "__main__"
return env
def premain(argv=None):
"""Setup for main xonsh entry point. Returns parsed arguments."""
if argv is None:
argv = sys.argv[1:]
builtins.__xonsh__ = XonshSession()
setup_timings(argv)
setproctitle = get_setproctitle()
if setproctitle is not None:
setproctitle(" ".join(["xonsh"] + argv))
args = parser.parse_args(argv)
if args.help:
parser.print_help()
parser.exit()
if args.version:
version = "/".join(("xonsh", __version__))
print(version)
parser.exit()
shell_kwargs = {
"shell_type": args.shell_type,
"completer": False,
"login": False,
"scriptcache": args.scriptcache,
"cacheall": args.cacheall,
"ctx": builtins.__xonsh__.ctx,
}
if args.login:
shell_kwargs["login"] = True
if args.norc:
shell_kwargs["rc"] = ()
elif args.rc:
shell_kwargs["rc"] = args.rc
setattr(sys, "displayhook", _pprint_displayhook)
print('Gitsome Version: ' + gitsome_version)
if args.command is not None:
args.mode = XonshMode.single_command
shell_kwargs["shell_type"] = "none"
elif args.file is not None:
args.mode = XonshMode.script_from_file
shell_kwargs["shell_type"] = "none"
elif not sys.stdin.isatty() and not args.force_interactive:
args.mode = XonshMode.script_from_stdin
shell_kwargs["shell_type"] = "none"
else:
args.mode = XonshMode.interactive
shell_kwargs["completer"] = True
shell_kwargs["login"] = True
env = start_services(shell_kwargs, args)
env["XONSH_LOGIN"] = shell_kwargs["login"]
if args.defines is not None:
env.update([x.split("=", 1) for x in args.defines])
env["XONSH_INTERACTIVE"] = args.force_interactive or (
args.mode == XonshMode.interactive
)
if ON_WINDOWS:
setup_win_unicode_console(env.get("WIN_UNICODE_CONSOLE", True))
return args
def _failback_to_other_shells(args, err):
# only failback for interactive shell; if we cannot tell, treat it
# as an interactive one for safe.
if hasattr(args, "mode") and args.mode != XonshMode.interactive:
raise err
foreign_shell = None
shells_file = "/etc/shells"
if not os.path.exists(shells_file):
# right now, it will always break here on Windows
raise err
excluded_list = ["xonsh", "screen"]
with open(shells_file) as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "/" not in line:
continue
_, shell = line.rsplit("/", 1)
if shell in excluded_list:
continue
if not os.path.exists(line):
continue
foreign_shell = line
break
if foreign_shell:
traceback.print_exc()
print("Xonsh encountered an issue during launch", file=sys.stderr)
print("Failback to {}".format(foreign_shell), file=sys.stderr)
os.execlp(foreign_shell, foreign_shell)
else:
raise err
def main(argv=None):
args = None
try:
args = premain(argv)
return main_xonsh(args)
except Exception as err:
_failback_to_other_shells(args, err)
def main_xonsh(args):
"""Main entry point for xonsh cli."""
if not ON_WINDOWS:
def func_sig_ttin_ttou(n, f):
pass
signal.signal(signal.SIGTTIN, func_sig_ttin_ttou)
signal.signal(signal.SIGTTOU, func_sig_ttin_ttou)
events.on_post_init.fire()
env = builtins.__xonsh__.env
shell = builtins.__xonsh__.shell
try:
if args.mode == XonshMode.interactive:
# enter the shell
env["XONSH_INTERACTIVE"] = True
ignore_sigtstp()
if env["XONSH_INTERACTIVE"] and not any(
os.path.isfile(i) for i in env["XONSHRC"]
):
print_welcome_screen()
events.on_pre_cmdloop.fire()
try:
shell.shell.cmdloop()
finally:
events.on_post_cmdloop.fire()
elif args.mode == XonshMode.single_command:
# run a single command and exit
run_code_with_cache(args.command.lstrip(), shell.execer, mode="single")
elif args.mode == XonshMode.script_from_file:
# run a script contained in a file
path = os.path.abspath(os.path.expanduser(args.file))
if os.path.isfile(path):
sys.argv = [args.file] + args.args
env.update(make_args_env()) # $ARGS is not sys.argv
env["XONSH_SOURCE"] = path
shell.ctx.update({"__file__": args.file, "__name__": "__main__"})
run_script_with_cache(
args.file, shell.execer, glb=shell.ctx, loc=None, mode="exec"
)
else:
print("xonsh: {0}: No such file or directory.".format(args.file))
elif args.mode == XonshMode.script_from_stdin:
# run a script given on stdin
code = sys.stdin.read()
run_code_with_cache(
code, shell.execer, glb=shell.ctx, loc=None, mode="exec"
)
finally:
events.on_exit.fire()
postmain(args)
def postmain(args=None):
"""Teardown for main xonsh entry point, accepts parsed arguments."""
if ON_WINDOWS:
setup_win_unicode_console(enable=False)
builtins.__xonsh__.shell = None
@contextlib.contextmanager
def main_context(argv=None):
"""Generator that runs pre- and post-main() functions. This has two iterations.
The first yields the shell. The second returns None but cleans
up the shell.
"""
args = premain(argv)
yield builtins.__xonsh__.shell
postmain(args)
def setup(
ctx=None,
shell_type="none",
env=(("RAISE_SUBPROC_ERROR", True),),
aliases=(),
xontribs=(),
threadable_predictors=(),
):
"""Starts up a new xonsh shell. Calling this in function in another
packages __init__.py will allow xonsh to be fully used in the
package in headless or headed mode. This function is primarily indended to
make starting up xonsh for 3rd party packages easier.
Parameters
----------
ctx : dict-like or None, optional
The xonsh context to start with. If None, an empty dictionary
is provided.
shell_type : str, optional
The type of shell to start. By default this is 'none', indicating
we should start in headless mode.
env : dict-like, optional
Environment to update the current environment with after the shell
has been initialized.
aliases : dict-like, optional
Aliases to add after the shell has been initialized.
xontribs : iterable of str, optional
Xontrib names to load.
threadable_predictors : dict-like, optional
Threadable predictors to start up with. These overide the defaults.
"""
ctx = {} if ctx is None else ctx
# setup xonsh ctx and execer
if not hasattr(builtins, "__xonsh__"):
execer = Execer(xonsh_ctx=ctx)
builtins.__xonsh__ = XonshSession(ctx=ctx, execer=execer)
load_builtins(ctx=ctx, execer=execer)
load_proxies()
builtins.__xonsh__.shell = Shell(execer, ctx=ctx, shell_type=shell_type)
builtins.__xonsh__.env.update(env)
install_import_hooks()
builtins.aliases.update(aliases)
if xontribs:
xontribs_load(xontribs)
tp = builtins.__xonsh__.commands_cache.threadable_predictors
tp.update(threadable_predictors)