forked from awslabs/aws-shell
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
512 lines (422 loc) · 18.1 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
"""AWS Shell application.
Main entry point to the AWS Shell.
"""
from __future__ import unicode_literals
import os
import subprocess
import logging
import sys
from prompt_toolkit.document import Document
from prompt_toolkit.shortcuts import create_eventloop
from prompt_toolkit.buffer import Buffer
from prompt_toolkit.filters import Always
from prompt_toolkit.interface import CommandLineInterface, Application
from prompt_toolkit.interface import AbortAction, AcceptAction
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
from prompt_toolkit.history import InMemoryHistory, FileHistory
from prompt_toolkit.enums import EditingMode
from awsshell.ui import create_default_layout
from awsshell.config import Config
from awsshell.keys import KeyManager
from awsshell.style import StyleFactory
from awsshell.toolbar import Toolbar
from awsshell.utils import build_config_file_path, temporary_file
from awsshell import compat
LOG = logging.getLogger(__name__)
EXIT_REQUESTED = object()
def create_aws_shell(completer, model_completer, docs):
return AWSShell(completer, model_completer, docs)
class InputInterrupt(Exception):
"""Stops the input of commands.
Raising `InputInterrupt` is useful to force a cli rebuild, which is
sometimes necessary in order for config changes to take effect.
"""
pass
class ChangeDirHandler(object):
def __init__(self, output=sys.stdout, err=sys.stderr, chdir=os.chdir):
self._output = output
self._err = err
self._chdir = chdir
def run(self, command, application):
# command is a list of parsed commands
if len(command) != 2:
self._err.write("invalid syntax, must be: .cd dirname\n")
return
dirname = os.path.expandvars(os.path.expanduser(command[1]))
try:
self._chdir(dirname)
except OSError as e:
self._err.write("cd: %s\n" % e)
class EditHandler(object):
def __init__(self, popen_cls=None, env=None, err=sys.stderr):
if popen_cls is None:
popen_cls = subprocess.Popen
self._popen_cls = popen_cls
if env is None:
env = os.environ
self._env = env
self._err = err
def _get_editor_command(self):
if 'EDITOR' in self._env:
return self._env['EDITOR']
else:
return compat.default_editor()
def _generate_edit_history(self, application):
history = list(application.history)
commands = [h for h in history if not h.startswith(('.', '!'))]
return '\n'.join(commands)
def run(self, command, application):
"""Open application's history buffer in an editor.
:type command: list
:param command: The dot command as a list split
on whitespace, e.g ``['.foo', 'arg1', 'arg2']``
:type application: AWSShell
:param application: The application object.
"""
with temporary_file('w') as f:
all_commands = self._generate_edit_history(application)
f.write(all_commands)
f.flush()
editor = self._get_editor_command()
try:
p = self._popen_cls([editor, f.name])
p.communicate()
except OSError:
self._err.write("Unable to launch editor: %s\n"
"You can configure which editor to use by "
"exporting the EDITOR environment variable.\n"
% editor)
class ProfileHandler(object):
USAGE = (
'.profile # Print the current profile\n'
'.profile <name> # Change the current profile\n'
)
def __init__(self, output=sys.stdout, err=sys.stderr):
self._output = output
self._err = err
def run(self, command, application):
"""Get or set the profile.
If .profile is called with no args, the current profile
is displayed. If the .profile command is called with a
single arg, then the current profile for the application
will be set to the new value.
"""
if len(command) == 1:
profile = application.profile
if profile is None:
self._output.write(
"Current shell profile: no profile configured\n"
"You can change profiles using: .profile profile-name\n")
else:
self._output.write("Current shell profile: %s\n" % profile)
elif len(command) == 2:
new_profile_name = command[1]
application.profile = new_profile_name
self._output.write("Current shell profile changed to: %s\n" %
new_profile_name)
else:
self._err.write("Usage:\n%s\n" % self.USAGE)
class ExitHandler(object):
def run(self, command, application):
return EXIT_REQUESTED
class DotCommandHandler(object):
HANDLER_CLASSES = {
'edit': EditHandler,
'profile': ProfileHandler,
'cd': ChangeDirHandler,
'exit': ExitHandler,
'quit': ExitHandler,
}
def __init__(self, output=sys.stdout, err=sys.stderr):
self._output = output
self._err = err
def handle_cmd(self, command, application):
"""Handle running a given dot command from a user.
:type command: str
:param command: The full dot command string, e.g. ``.edit``,
of ``.profile prod``.
:type application: AWSShell
:param application: The application object.
"""
parts = command.split()
cmd_name = parts[0][1:]
if cmd_name not in self.HANDLER_CLASSES:
self._unknown_cmd(parts, application)
else:
# Note we expect the class to support no-arg
# instantiation.
return self.HANDLER_CLASSES[cmd_name]().run(parts, application)
def _unknown_cmd(self, cmd_parts, application):
self._err.write("Unknown dot command: %s\n" % cmd_parts[0])
class AWSShell(object):
"""Encapsulate the ui, completer, command history, docs, and config.
Runs the input event loop and delegates the command execution to either
the `awscli` or the underlying shell.
:type refresh_cli: bool
:param refresh_cli: Flag to refresh the cli.
:type config_obj: :class:`configobj.ConfigObj`
:param config_obj: Contains the config information for reading and writing.
:type config_section: :class:`configobj.Section`
:param config_section: Convenience attribute to access the main section
of the config.
:type model_completer: :class:`AWSCLIModelCompleter`
:param model_completer: Matches input with completions. `AWSShell` sets
and gets the attribute `AWSCLIModelCompleter.match_fuzzy`.
:type enable_vi_bindings: bool
:param enable_vi_bindings: If True, enables Vi key bindings. Else, Emacs
key bindings are enabled.
:type show_completion_columns: bool
param show_completion_columns: If True, completions are shown in multiple
columns. Else, completions are shown in a single scrollable column.
:type show_help: bool
:param show_help: If True, shows the help pane. Else, hides the help pane.
:type theme: str
:param theme: The pygments theme.
"""
def __init__(self, completer, model_completer, docs,
input=None, output=None, popen_cls=None):
self.completer = completer
self.model_completer = model_completer
self.history = InMemoryHistory()
self.file_history = FileHistory(build_config_file_path('history'))
self._cli = None
self._docs = docs
self.current_docs = u''
self.refresh_cli = False
self.key_manager = None
self._dot_cmd = DotCommandHandler()
self._env = os.environ.copy()
self._profile = None
self._input = input
self._output = output
if popen_cls is None:
popen_cls = subprocess.Popen
self._popen_cls = popen_cls
# These attrs come from the config file.
self.config_obj = None
self.config_section = None
self.enable_vi_bindings = None
self.show_completion_columns = None
self.show_help = None
self.theme = None
self.load_config()
def load_config(self):
"""Load the config from the config file or template."""
config = Config()
self.config_obj = config.load('awsshellrc')
self.config_section = self.config_obj['aws-shell']
self.model_completer.match_fuzzy = self.config_section.as_bool(
'match_fuzzy')
self.enable_vi_bindings = self.config_section.as_bool(
'enable_vi_bindings')
self.show_completion_columns = self.config_section.as_bool(
'show_completion_columns')
self.show_help = self.config_section.as_bool('show_help')
self.theme = self.config_section['theme']
def save_config(self):
"""Save the config to the config file."""
self.config_section['match_fuzzy'] = self.model_completer.match_fuzzy
self.config_section['enable_vi_bindings'] = self.enable_vi_bindings
self.config_section['show_completion_columns'] = \
self.show_completion_columns
self.config_section['show_help'] = self.show_help
self.config_section['theme'] = self.theme
self.config_obj.write()
@property
def cli(self):
if self._cli is None or self.refresh_cli:
self._cli = self.create_cli_interface(self.show_completion_columns)
self.refresh_cli = False
return self._cli
def run(self):
while True:
try:
document = self.cli.run(reset_current_buffer=True)
text = document.text
except InputInterrupt:
pass
except (KeyboardInterrupt, EOFError):
self.save_config()
break
else:
if text.startswith('.'):
# These are special commands (dot commands) that are
# interpreted by the aws-shell directly and typically used
# to modify some type of behavior in the aws-shell.
result = self._dot_cmd.handle_cmd(text, application=self)
if result is EXIT_REQUESTED:
break
else:
if text.startswith('!'):
# Then run the rest as a normally shell command.
full_cmd = text[1:]
else:
full_cmd = 'aws ' + text
self.history.append(full_cmd)
self.current_docs = u''
self.cli.buffers['clidocs'].reset(
initial_document=Document(self.current_docs,
cursor_position=0))
self.cli.request_redraw()
p = self._popen_cls(full_cmd, shell=True, env=self._env)
p.communicate()
def stop_input_and_refresh_cli(self):
"""Stop input by raising an `InputInterrupt`, forces a cli refresh.
The cli refresh is necessary because changing options such as key
bindings, single vs multi column menu completions, and the help pane
all require a rebuild.
:raises: :class:`InputInterrupt <exceptions.InputInterrupt>`.
"""
self.refresh_cli = True
self.cli.request_redraw()
raise InputInterrupt
def create_layout(self, display_completions_in_columns, toolbar):
from awsshell.lexer import ShellLexer
lexer = ShellLexer
if self.config_section['theme'] == 'none':
lexer = None
return create_default_layout(
self, u'aws> ', lexer=lexer, reserve_space_for_menu=True,
display_completions_in_columns=display_completions_in_columns,
get_bottom_toolbar_tokens=toolbar.handler)
def create_buffer(self, completer, history):
return Buffer(
history=history,
auto_suggest=AutoSuggestFromHistory(),
enable_history_search=True,
completer=completer,
complete_while_typing=Always(),
accept_action=AcceptAction.RETURN_DOCUMENT)
def create_key_manager(self):
"""Create the :class:`KeyManager`.
The inputs to KeyManager are expected to be callable, so we can't
use the standard @property and @attrib.setter for these attributes.
Lambdas cannot contain assignments so we're forced to define setters.
:rtype: :class:`KeyManager`
:return: A KeyManager with callables to set the toolbar options. Also
includes the method stop_input_and_refresh_cli to ensure certain
options take effect within the current session.
"""
def set_match_fuzzy(match_fuzzy):
"""Setter for fuzzy matching mode.
:type match_fuzzy: bool
:param match_fuzzy: The match fuzzy flag.
"""
self.model_completer.match_fuzzy = match_fuzzy
def set_enable_vi_bindings(enable_vi_bindings):
"""Setter for vi mode keybindings.
If vi mode is off, emacs mode is enabled by default by
`prompt_toolkit`.
:type enable_vi_bindings: bool
:param enable_vi_bindings: The enable Vi bindings flag.
"""
self.enable_vi_bindings = enable_vi_bindings
def set_show_completion_columns(show_completion_columns):
"""Setter for showing the completions in columns flag.
:type show_completion_columns: bool
:param show_completion_columns: The show completions in
multiple columns flag.
"""
self.show_completion_columns = show_completion_columns
def set_show_help(show_help):
"""Setter for showing the help container flag.
:type show_help: bool
:param show_help: The show help flag.
"""
self.show_help = show_help
return KeyManager(
lambda: self.model_completer.match_fuzzy, set_match_fuzzy,
lambda: self.enable_vi_bindings, set_enable_vi_bindings,
lambda: self.show_completion_columns, set_show_completion_columns,
lambda: self.show_help, set_show_help,
self.stop_input_and_refresh_cli)
def create_application(self, completer, history,
display_completions_in_columns):
self.key_manager = self.create_key_manager()
toolbar = Toolbar(
lambda: self.model_completer.match_fuzzy,
lambda: self.enable_vi_bindings,
lambda: self.show_completion_columns,
lambda: self.show_help)
style_factory = StyleFactory(self.theme)
buffers = {
'clidocs': Buffer(read_only=True)
}
if self.enable_vi_bindings:
editing_mode = EditingMode.VI
else:
editing_mode = EditingMode.EMACS
return Application(
editing_mode=editing_mode,
layout=self.create_layout(display_completions_in_columns, toolbar),
mouse_support=False,
style=style_factory.style,
buffers=buffers,
buffer=self.create_buffer(completer, history),
on_abort=AbortAction.RETRY,
on_exit=AbortAction.RAISE_EXCEPTION,
on_input_timeout=self.on_input_timeout,
key_bindings_registry=self.key_manager.manager.registry,
)
def on_input_timeout(self, cli):
if not self.show_help:
return
document = cli.current_buffer.document
text = document.text
LOG.debug("document.text = %s", text)
LOG.debug("current_command = %s", self.completer.current_command)
if text.strip():
command = self.completer.current_command
key_name = '.'.join(command.split()).encode('utf-8')
last_option = self.completer.last_option
if last_option:
self.current_docs = self._docs.extract_param(
key_name, last_option)
else:
self.current_docs = self._docs.extract_description(key_name)
else:
self.current_docs = u''
position = cli.buffers['clidocs'].document.cursor_position
# if the docs to be displayed have changed, reset position to 0
if cli.buffers['clidocs'].text != self.current_docs:
position = 0
cli.buffers['clidocs'].reset(
initial_document=Document(
self.current_docs,
cursor_position=position
)
)
cli.request_redraw()
def create_cli_interface(self, display_completions_in_columns):
# A CommandLineInterface from prompt_toolkit
# accepts two things: an application and an
# event loop.
loop = create_eventloop()
app = self.create_application(self.completer,
self.file_history,
display_completions_in_columns)
cli = CommandLineInterface(application=app, eventloop=loop,
input=self._input, output=self._output)
return cli
@property
def profile(self):
return self._profile
@profile.setter
def profile(self, new_profile_name):
# There's only two things that need to know about new profile
# changes.
#
# First, the actual command runner. If we want
# to use a different profile, it should ensure that the CLI
# commands that get run use the new profile (via the
# AWS_DEFAULT_PROFILE env var).
#
# Second, we also need to let the server side autocompleter know.
#
# Given this is easy to manage by hand, I don't think
# it's worth adding an event system or observers just yet.
# If this gets hard to manage, the complexity of those systems
# would be worth it.
self._env['AWS_DEFAULT_PROFILE'] = new_profile_name
self.completer.change_profile(new_profile_name)
self._profile = new_profile_name