forked from donnemartin/gitsome
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathproc.py
2493 lines (2233 loc) · 82.3 KB
/
proc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*- coding: utf-8 -*-
"""Interface for running Python functions as subprocess-mode commands.
Code for several helper methods in the `ProcProxy` class have been reproduced
without modification from `subprocess.py` in the Python 3.4.2 standard library.
The contents of `subprocess.py` (and, thus, the reproduced methods) are
Copyright (c) 2003-2005 by Peter Astrand <[email protected]> and were
licensed to the Python Software foundation under a Contributor Agreement.
"""
import io
import os
import re
import sys
import time
import queue
import array
import ctypes
import signal
import inspect
import builtins
import functools
import threading
import subprocess
import collections.abc as cabc
from xonsh.platform import (
ON_WINDOWS,
ON_POSIX,
ON_MSYS,
ON_CYGWIN,
CAN_RESIZE_WINDOW,
LFLAG,
CC,
)
from xonsh.tools import (
redirect_stdout,
redirect_stderr,
print_exception,
XonshCalledProcessError,
findfirst,
on_main_thread,
XonshError,
format_std_prepost,
ALIAS_KWARG_NAMES,
)
from xonsh.lazyasd import lazyobject, LazyObject
from xonsh.jobs import wait_for_active_job, give_terminal_to, _continue
from xonsh.lazyimps import fcntl, termios, _winapi, msvcrt, winutils
# these decorators are imported for users back-compatible
from xonsh.tools import unthreadable, uncapturable # NOQA
# foreground has be deprecated
foreground = unthreadable
@lazyobject
def STDOUT_CAPTURE_KINDS():
return frozenset(["stdout", "object"])
# The following escape codes are xterm codes.
# See http://rtfm.etla.org/xterm/ctlseq.html for more.
MODE_NUMS = ("1049", "47", "1047")
START_ALTERNATE_MODE = LazyObject(
lambda: frozenset("\x1b[?{0}h".format(i).encode() for i in MODE_NUMS),
globals(),
"START_ALTERNATE_MODE",
)
END_ALTERNATE_MODE = LazyObject(
lambda: frozenset("\x1b[?{0}l".format(i).encode() for i in MODE_NUMS),
globals(),
"END_ALTERNATE_MODE",
)
ALTERNATE_MODE_FLAGS = LazyObject(
lambda: tuple(START_ALTERNATE_MODE) + tuple(END_ALTERNATE_MODE),
globals(),
"ALTERNATE_MODE_FLAGS",
)
RE_HIDDEN_BYTES = LazyObject(
lambda: re.compile(b"(\001.*?\002)"), globals(), "RE_HIDDEN"
)
@lazyobject
def RE_VT100_ESCAPE():
return re.compile(b"(\x9B|\x1B\\[)[0-?]*[ -\\/]*[@-~]")
@lazyobject
def RE_HIDE_ESCAPE():
return re.compile(
b"(" + RE_HIDDEN_BYTES.pattern + b"|" + RE_VT100_ESCAPE.pattern + b")"
)
class QueueReader:
"""Provides a file-like interface to reading from a queue."""
def __init__(self, fd, timeout=None):
"""
Parameters
----------
fd : int
A file descriptor
timeout : float or None, optional
The queue reading timeout.
"""
self.fd = fd
self.timeout = timeout
self.closed = False
self.queue = queue.Queue()
self.thread = None
def close(self):
"""close the reader"""
self.closed = True
def is_fully_read(self):
"""Returns whether or not the queue is fully read and the reader is
closed.
"""
return (
self.closed
and (self.thread is None or not self.thread.is_alive())
and self.queue.empty()
)
def read_queue(self):
"""Reads a single chunk from the queue. This is blocking if
the timeout is None and non-blocking otherwise.
"""
try:
return self.queue.get(block=True, timeout=self.timeout)
except queue.Empty:
return b""
def read(self, size=-1):
"""Reads bytes from the file."""
i = 0
buf = b""
while size < 0 or i != size:
line = self.read_queue()
if line:
buf += line
else:
break
i += len(line)
return buf
def readline(self, size=-1):
"""Reads a line, or a partial line from the file descriptor."""
i = 0
nl = b"\n"
buf = b""
while size < 0 or i != size:
line = self.read_queue()
if line:
buf += line
if line.endswith(nl):
break
else:
break
i += len(line)
return buf
def _read_all_lines(self):
"""This reads all remaining lines in a blocking fashion."""
lines = []
while not self.is_fully_read():
chunk = self.read_queue()
lines.extend(chunk.splitlines(keepends=True))
return lines
def readlines(self, hint=-1):
"""Reads lines from the file descriptor. This is blocking for negative
hints (i.e. read all the remaining lines) and non-blocking otherwise.
"""
if hint == -1:
return self._read_all_lines()
lines = []
while len(lines) != hint:
chunk = self.read_queue()
if not chunk:
break
lines.extend(chunk.splitlines(keepends=True))
return lines
def fileno(self):
"""Returns the file descriptor number."""
return self.fd
@staticmethod
def readable():
"""Returns true, because this object is always readable."""
return True
def iterqueue(self):
"""Iterates through all remaining chunks in a blocking fashion."""
while not self.is_fully_read():
chunk = self.read_queue()
if not chunk:
continue
yield chunk
def populate_fd_queue(reader, fd, queue):
"""Reads 1 kb of data from a file descriptor into a queue.
If this ends or fails, it flags the calling reader object as closed.
"""
while True:
try:
c = os.read(fd, 1024)
except OSError:
reader.closed = True
break
if c:
queue.put(c)
else:
reader.closed = True
break
class NonBlockingFDReader(QueueReader):
"""A class for reading characters from a file descriptor on a background
thread. This has the advantages that the calling thread can close the
file and that the reading does not block the calling thread.
"""
def __init__(self, fd, timeout=None):
"""
Parameters
----------
fd : int
A file descriptor
timeout : float or None, optional
The queue reading timeout.
"""
super().__init__(fd, timeout=timeout)
# start reading from stream
self.thread = threading.Thread(
target=populate_fd_queue, args=(self, self.fd, self.queue)
)
self.thread.daemon = True
self.thread.start()
def populate_buffer(reader, fd, buffer, chunksize):
"""Reads bytes from the file descriptor and copies them into a buffer.
The reads happen in parallel using the pread() syscall; which is only
available on POSIX systems. If the read fails for any reason, the reader is
flagged as closed.
"""
offset = 0
while True:
try:
buf = os.pread(fd, chunksize, offset)
except OSError:
reader.closed = True
break
if buf:
buffer.write(buf)
offset += len(buf)
else:
reader.closed = True
break
class BufferedFDParallelReader:
"""Buffered, parallel background thread reader."""
def __init__(self, fd, buffer=None, chunksize=1024):
"""
Parameters
----------
fd : int
File descriptor from which to read.
buffer : binary file-like or None, optional
A buffer to write bytes into. If None, a new BytesIO object
is created.
chunksize : int, optional
The max size of the parallel reads, default 1 kb.
"""
self.fd = fd
self.buffer = io.BytesIO() if buffer is None else buffer
self.chunksize = chunksize
self.closed = False
# start reading from stream
self.thread = threading.Thread(
target=populate_buffer, args=(self, fd, self.buffer, chunksize)
)
self.thread.daemon = True
self.thread.start()
def _expand_console_buffer(cols, max_offset, expandsize, orig_posize, fd):
# if we are getting close to the end of the console buffer,
# expand it so that we can read from it successfully.
if cols == 0:
return orig_posize[-1], max_offset, orig_posize
rows = ((max_offset + expandsize) // cols) + 1
winutils.set_console_screen_buffer_size(cols, rows, fd=fd)
orig_posize = orig_posize[:3] + (rows,)
max_offset = (rows - 1) * cols
return rows, max_offset, orig_posize
def populate_console(reader, fd, buffer, chunksize, queue, expandsize=None):
"""Reads bytes from the file descriptor and puts lines into the queue.
The reads happened in parallel,
using xonsh.winutils.read_console_output_character(),
and is thus only available on windows. If the read fails for any reason,
the reader is flagged as closed.
"""
# OK, so this function is super annoying because Windows stores its
# buffers as a 2D regular, dense array -- without trailing newlines.
# Meanwhile, we want to add *lines* to the queue. Also, as is typical
# with parallel reads, the entire buffer that you ask for may not be
# filled. Thus we have to deal with the full generality.
# 1. reads may end in the middle of a line
# 2. excess whitespace at the end of a line may not be real, unless
# 3. you haven't read to the end of the line yet!
# So there are alignment issues everywhere. Also, Windows will automatically
# read past the current cursor position, even though there is presumably
# nothing to see there.
#
# These chunked reads basically need to happen like this because,
# a. The default buffer size is HUGE for the console (90k lines x 120 cols)
# as so we can't just read in everything at the end and see what we
# care about without a noticeable performance hit.
# b. Even with this huge size, it is still possible to write more lines than
# this, so we should scroll along with the console.
# Unfortunately, because we do not have control over the terminal emulator,
# It is not possible to compute how far back we should set the beginning
# read position because we don't know how many characters have been popped
# off the top of the buffer. If we did somehow know this number we could do
# something like the following:
#
# new_offset = (y*cols) + x
# if new_offset == max_offset:
# new_offset -= scrolled_offset
# x = new_offset%cols
# y = new_offset//cols
# continue
#
# So this method is imperfect and only works as long as the screen has
# room to expand to. Thus the trick here is to expand the screen size
# when we get close enough to the end of the screen. There remain some
# async issues related to not being able to set the cursor position.
# but they just affect the alignment / capture of the output of the
# first command run after a screen resize.
if expandsize is None:
expandsize = 100 * chunksize
x, y, cols, rows = posize = winutils.get_position_size(fd)
pre_x = pre_y = -1
orig_posize = posize
offset = (cols * y) + x
max_offset = (rows - 1) * cols
# I believe that there is a bug in PTK that if we reset the
# cursor position, the cursor on the next prompt is accidentally on
# the next line. If this is fixed, uncomment the following line.
# if max_offset < offset + expandsize:
# rows, max_offset, orig_posize = _expand_console_buffer(
# cols, max_offset, expandsize,
# orig_posize, fd)
# winutils.set_console_cursor_position(x, y, fd=fd)
while True:
posize = winutils.get_position_size(fd)
offset = (cols * y) + x
if ((posize[1], posize[0]) <= (y, x) and posize[2:] == (cols, rows)) or (
pre_x == x and pre_y == y
):
# already at or ahead of the current cursor position.
if reader.closed:
break
else:
time.sleep(reader.timeout)
continue
elif max_offset <= offset + expandsize:
ecb = _expand_console_buffer(cols, max_offset, expandsize, orig_posize, fd)
rows, max_offset, orig_posize = ecb
continue
elif posize[2:] == (cols, rows):
# cursor updated but screen size is the same.
pass
else:
# screen size changed, which is offset preserving
orig_posize = posize
cols, rows = posize[2:]
x = offset % cols
y = offset // cols
pre_x = pre_y = -1
max_offset = (rows - 1) * cols
continue
try:
buf = winutils.read_console_output_character(
x=x, y=y, fd=fd, buf=buffer, bufsize=chunksize, raw=True
)
except (OSError, IOError):
reader.closed = True
break
# cursor position and offset
if not reader.closed:
buf = buf.rstrip()
nread = len(buf)
if nread == 0:
time.sleep(reader.timeout)
continue
cur_x, cur_y = posize[0], posize[1]
cur_offset = (cols * cur_y) + cur_x
beg_offset = (cols * y) + x
end_offset = beg_offset + nread
if end_offset > cur_offset and cur_offset != max_offset:
buf = buf[: cur_offset - end_offset]
# convert to lines
xshift = cols - x
yshift = (nread // cols) + (1 if nread % cols > 0 else 0)
lines = [buf[:xshift]]
lines += [
buf[l * cols + xshift : (l + 1) * cols + xshift] for l in range(yshift)
]
lines = [line for line in lines if line]
if not lines:
time.sleep(reader.timeout)
continue
# put lines in the queue
nl = b"\n"
for line in lines[:-1]:
queue.put(line.rstrip() + nl)
if len(lines[-1]) == xshift:
queue.put(lines[-1].rstrip() + nl)
else:
queue.put(lines[-1])
# update x and y locations
if (beg_offset + len(buf)) % cols == 0:
new_offset = beg_offset + len(buf)
else:
new_offset = beg_offset + len(buf.rstrip())
pre_x = x
pre_y = y
x = new_offset % cols
y = new_offset // cols
time.sleep(reader.timeout)
class ConsoleParallelReader(QueueReader):
"""Parallel reader for consoles that runs in a background thread.
This is only needed, available, and useful on Windows.
"""
def __init__(self, fd, buffer=None, chunksize=1024, timeout=None):
"""
Parameters
----------
fd : int
Standard buffer file descriptor, 0 for stdin, 1 for stdout (default),
and 2 for stderr.
buffer : ctypes.c_wchar_p, optional
An existing buffer to (re-)use.
chunksize : int, optional
The max size of the parallel reads, default 1 kb.
timeout : float, optional
The queue reading timeout.
"""
timeout = timeout or builtins.__xonsh__.env.get("XONSH_PROC_FREQUENCY")
super().__init__(fd, timeout=timeout)
self._buffer = buffer # this cannot be public
if buffer is None:
self._buffer = ctypes.c_char_p(b" " * chunksize)
self.chunksize = chunksize
# start reading from stream
self.thread = threading.Thread(
target=populate_console,
args=(self, fd, self._buffer, chunksize, self.queue),
)
self.thread.daemon = True
self.thread.start()
def safe_fdclose(handle, cache=None):
"""Closes a file handle in the safest way possible, and potentially
storing the result.
"""
if cache is not None and cache.get(handle, False):
return
status = True
if handle is None:
pass
elif isinstance(handle, int):
if handle >= 3:
# don't close stdin, stdout, stderr, -1
try:
os.close(handle)
except OSError:
status = False
elif handle is sys.stdin or handle is sys.stdout or handle is sys.stderr:
# don't close stdin, stdout, or stderr
pass
else:
try:
handle.close()
except OSError:
status = False
if cache is not None:
cache[handle] = status
def safe_flush(handle):
"""Attempts to safely flush a file handle, returns success bool."""
status = True
try:
handle.flush()
except OSError:
status = False
return status
def still_writable(fd):
"""Determines whether a file descriptor is still writable by trying to
write an empty string and seeing if it fails.
"""
try:
os.write(fd, b"")
status = True
except OSError:
status = False
return status
class PopenThread(threading.Thread):
"""A thread for running and managing subprocess. This allows reading
from the stdin, stdout, and stderr streams in a non-blocking fashion.
This takes the same arguments and keyword arguments as regular Popen.
This requires that the captured_stdout and captured_stderr attributes
to be set following instantiation.
"""
def __init__(self, *args, stdin=None, stdout=None, stderr=None, **kwargs):
super().__init__()
self.lock = threading.RLock()
env = builtins.__xonsh__.env
# stdin setup
self.orig_stdin = stdin
if stdin is None:
self.stdin_fd = 0
elif isinstance(stdin, int):
self.stdin_fd = stdin
else:
self.stdin_fd = stdin.fileno()
self.store_stdin = env.get("XONSH_STORE_STDIN")
self.timeout = env.get("XONSH_PROC_FREQUENCY")
self.in_alt_mode = False
self.stdin_mode = None
# stdout setup
self.orig_stdout = stdout
self.stdout_fd = 1 if stdout is None else stdout.fileno()
self._set_pty_size()
# stderr setup
self.orig_stderr = stderr
# Set some signal handles, if we can. Must come before process
# is started to prevent deadlock on windows
self.proc = None # has to be here for closure for handles
self.old_int_handler = self.old_winch_handler = None
self.old_tstp_handler = self.old_quit_handler = None
if on_main_thread():
self.old_int_handler = signal.signal(signal.SIGINT, self._signal_int)
if ON_POSIX:
self.old_tstp_handler = signal.signal(signal.SIGTSTP, self._signal_tstp)
self.old_quit_handler = signal.signal(signal.SIGQUIT, self._signal_quit)
if CAN_RESIZE_WINDOW:
self.old_winch_handler = signal.signal(
signal.SIGWINCH, self._signal_winch
)
# start up process
if ON_WINDOWS and stdout is not None:
os.set_inheritable(stdout.fileno(), False)
try:
self.proc = proc = subprocess.Popen(
*args, stdin=stdin, stdout=stdout, stderr=stderr, **kwargs
)
except Exception:
self._clean_up()
raise
self.pid = proc.pid
self.universal_newlines = uninew = proc.universal_newlines
if uninew:
self.encoding = enc = env.get("XONSH_ENCODING")
self.encoding_errors = err = env.get("XONSH_ENCODING_ERRORS")
self.stdin = io.BytesIO() # stdin is always bytes!
self.stdout = io.TextIOWrapper(io.BytesIO(), encoding=enc, errors=err)
self.stderr = io.TextIOWrapper(io.BytesIO(), encoding=enc, errors=err)
else:
self.encoding = self.encoding_errors = None
self.stdin = io.BytesIO()
self.stdout = io.BytesIO()
self.stderr = io.BytesIO()
self.suspended = False
self.prevs_are_closed = False
self.start()
def run(self):
"""Runs the subprocess by performing a parallel read on stdin if allowed,
and copying bytes from captured_stdout to stdout and bytes from
captured_stderr to stderr.
"""
proc = self.proc
spec = self._wait_and_getattr("spec")
# get stdin and apply parallel reader if needed.
stdin = self.stdin
if self.orig_stdin is None:
origin = None
elif ON_POSIX and self.store_stdin:
origin = self.orig_stdin
origfd = origin if isinstance(origin, int) else origin.fileno()
origin = BufferedFDParallelReader(origfd, buffer=stdin)
else:
origin = None
# get non-blocking stdout
stdout = self.stdout.buffer if self.universal_newlines else self.stdout
capout = spec.captured_stdout
if capout is None:
procout = None
else:
procout = NonBlockingFDReader(capout.fileno(), timeout=self.timeout)
# get non-blocking stderr
stderr = self.stderr.buffer if self.universal_newlines else self.stderr
caperr = spec.captured_stderr
if caperr is None:
procerr = None
else:
procerr = NonBlockingFDReader(caperr.fileno(), timeout=self.timeout)
# initial read from buffer
self._read_write(procout, stdout, sys.__stdout__)
self._read_write(procerr, stderr, sys.__stderr__)
# loop over reads while process is running.
i = j = cnt = 1
while proc.poll() is None:
# this is here for CPU performance reasons.
if i + j == 0:
cnt = min(cnt + 1, 1000)
tout = self.timeout * cnt
if procout is not None:
procout.timeout = tout
if procerr is not None:
procerr.timeout = tout
elif cnt == 1:
pass
else:
cnt = 1
if procout is not None:
procout.timeout = self.timeout
if procerr is not None:
procerr.timeout = self.timeout
# redirect some output!
i = self._read_write(procout, stdout, sys.__stdout__)
j = self._read_write(procerr, stderr, sys.__stderr__)
if self.suspended:
break
if self.suspended:
return
# close files to send EOF to non-blocking reader.
# capout & caperr seem to be needed only by Windows, while
# orig_stdout & orig_stderr are need by posix and Windows.
# Also, order seems to matter here,
# with orig_* needed to be closed before cap*
safe_fdclose(self.orig_stdout)
safe_fdclose(self.orig_stderr)
if ON_WINDOWS:
safe_fdclose(capout)
safe_fdclose(caperr)
# read in the remaining data in a blocking fashion.
while (procout is not None and not procout.is_fully_read()) or (
procerr is not None and not procerr.is_fully_read()
):
self._read_write(procout, stdout, sys.__stdout__)
self._read_write(procerr, stderr, sys.__stderr__)
# kill the process if it is still alive. Happens when piping.
if proc.poll() is None:
proc.terminate()
def _wait_and_getattr(self, name):
"""make sure the instance has a certain attr, and return it."""
while not hasattr(self, name):
time.sleep(1e-7)
return getattr(self, name)
def _read_write(self, reader, writer, stdbuf):
"""Reads a chunk of bytes from a buffer and write into memory or back
down to the standard buffer, as appropriate. Returns the number of
successful reads.
"""
if reader is None:
return 0
i = -1
for i, chunk in enumerate(iter(reader.read_queue, b"")):
self._alt_mode_switch(chunk, writer, stdbuf)
if i >= 0:
writer.flush()
stdbuf.flush()
return i + 1
def _alt_mode_switch(self, chunk, membuf, stdbuf):
"""Enables recursively switching between normal capturing mode
and 'alt' mode, which passes through values to the standard
buffer. Pagers, text editors, curses applications, etc. use
alternate mode.
"""
i, flag = findfirst(chunk, ALTERNATE_MODE_FLAGS)
if flag is None:
self._alt_mode_writer(chunk, membuf, stdbuf)
else:
# This code is executed when the child process switches the
# terminal into or out of alternate mode. The line below assumes
# that the user has opened vim, less, or similar, and writes writes
# to stdin.
j = i + len(flag)
# write the first part of the chunk in the current mode.
self._alt_mode_writer(chunk[:i], membuf, stdbuf)
# switch modes
# write the flag itself the current mode where alt mode is on
# so that it is streamed to the terminal ASAP.
# this is needed for terminal emulators to find the correct
# positions before and after alt mode.
alt_mode = flag in START_ALTERNATE_MODE
if alt_mode:
self.in_alt_mode = alt_mode
self._alt_mode_writer(flag, membuf, stdbuf)
self._enable_cbreak_stdin()
else:
self._alt_mode_writer(flag, membuf, stdbuf)
self.in_alt_mode = alt_mode
self._disable_cbreak_stdin()
# recurse this function, but without the current flag.
self._alt_mode_switch(chunk[j:], membuf, stdbuf)
def _alt_mode_writer(self, chunk, membuf, stdbuf):
"""Write bytes to the standard buffer if in alt mode or otherwise
to the in-memory buffer.
"""
if not chunk:
pass # don't write empty values
elif self.in_alt_mode:
stdbuf.buffer.write(chunk)
else:
with self.lock:
p = membuf.tell()
membuf.seek(0, io.SEEK_END)
membuf.write(chunk)
membuf.seek(p)
#
# Window resize handlers
#
def _signal_winch(self, signum, frame):
"""Signal handler for SIGWINCH - window size has changed."""
self.send_signal(signal.SIGWINCH)
self._set_pty_size()
def _set_pty_size(self):
"""Sets the window size of the child pty based on the window size of
our own controlling terminal.
"""
if ON_WINDOWS or not os.isatty(self.stdout_fd):
return
# Get the terminal size of the real terminal, set it on the
# pseudoterminal.
buf = array.array("h", [0, 0, 0, 0])
# 1 = stdout here
try:
fcntl.ioctl(1, termios.TIOCGWINSZ, buf, True)
fcntl.ioctl(self.stdout_fd, termios.TIOCSWINSZ, buf)
except OSError:
pass
#
# SIGINT handler
#
def _signal_int(self, signum, frame):
"""Signal handler for SIGINT - Ctrl+C may have been pressed."""
self.send_signal(signum)
if self.proc is not None and self.proc.poll() is not None:
self._restore_sigint(frame=frame)
if on_main_thread():
signal.pthread_kill(threading.get_ident(), signal.SIGINT)
def _restore_sigint(self, frame=None):
old = self.old_int_handler
if old is not None:
if on_main_thread():
signal.signal(signal.SIGINT, old)
self.old_int_handler = None
if frame is not None:
self._disable_cbreak_stdin()
if old is not None and old is not self._signal_int:
old(signal.SIGINT, frame)
#
# SIGTSTP handler
#
def _signal_tstp(self, signum, frame):
"""Signal handler for suspending SIGTSTP - Ctrl+Z may have been pressed.
"""
self.suspended = True
self.send_signal(signum)
self._restore_sigtstp(frame=frame)
def _restore_sigtstp(self, frame=None):
old = self.old_tstp_handler
if old is not None:
if on_main_thread():
signal.signal(signal.SIGTSTP, old)
self.old_tstp_handler = None
if frame is not None:
self._disable_cbreak_stdin()
#
# SIGQUIT handler
#
def _signal_quit(self, signum, frame):
r"""Signal handler for quiting SIGQUIT - Ctrl+\ may have been pressed.
"""
self.send_signal(signum)
self._restore_sigquit(frame=frame)
def _restore_sigquit(self, frame=None):
old = self.old_quit_handler
if old is not None:
if on_main_thread():
signal.signal(signal.SIGQUIT, old)
self.old_quit_handler = None
if frame is not None:
self._disable_cbreak_stdin()
#
# cbreak mode handlers
#
def _enable_cbreak_stdin(self):
if not ON_POSIX:
return
try:
self.stdin_mode = termios.tcgetattr(self.stdin_fd)[:]
except termios.error:
# this can happen for cases where another process is controlling
# xonsh's tty device, such as in testing.
self.stdin_mode = None
return
new = self.stdin_mode[:]
new[LFLAG] &= ~(termios.ECHO | termios.ICANON)
new[CC][termios.VMIN] = 1
new[CC][termios.VTIME] = 0
try:
# termios.TCSAFLUSH may be less reliable than termios.TCSANOW
termios.tcsetattr(self.stdin_fd, termios.TCSANOW, new)
except termios.error:
self._disable_cbreak_stdin()
def _disable_cbreak_stdin(self):
if not ON_POSIX or self.stdin_mode is None:
return
new = self.stdin_mode[:]
new[LFLAG] |= termios.ECHO | termios.ICANON
new[CC][termios.VMIN] = 1
new[CC][termios.VTIME] = 0
try:
termios.tcsetattr(self.stdin_fd, termios.TCSANOW, new)
except termios.error:
pass
#
# Dispatch methods
#
def poll(self):
"""Dispatches to Popen.returncode."""
return self.proc.returncode
def wait(self, timeout=None):
"""Dispatches to Popen.wait(), but also does process cleanup such as
joining this thread and replacing the original window size signal
handler.
"""
self._disable_cbreak_stdin()
rtn = self.proc.wait(timeout=timeout)
self.join()
# need to replace the old signal handlers somewhere...
if self.old_winch_handler is not None and on_main_thread():
signal.signal(signal.SIGWINCH, self.old_winch_handler)
self.old_winch_handler = None
self._clean_up()
return rtn
def _clean_up(self):
self._restore_sigint()
self._restore_sigtstp()
self._restore_sigquit()
@property
def returncode(self):
"""Process return code."""
return self.proc.returncode
@returncode.setter
def returncode(self, value):
"""Process return code."""
self.proc.returncode = value
@property
def signal(self):
"""Process signal, or None."""
s = getattr(self.proc, "signal", None)
if s is None:
rtn = self.returncode
if rtn is not None and rtn != 0:
s = (-1 * rtn, rtn < 0 if ON_WINDOWS else os.WCOREDUMP(rtn))
return s
@signal.setter
def signal(self, value):
"""Process signal, or None."""
self.proc.signal = value
def send_signal(self, signal):
"""Dispatches to Popen.send_signal()."""
dt = 0.0
while self.proc is None and dt < self.timeout:
time.sleep(1e-7)
dt += 1e-7
if self.proc is None:
return
try:
rtn = self.proc.send_signal(signal)
except ProcessLookupError:
# This can happen in the case of !(cmd) when the command has ended
rtn = None
return rtn
def terminate(self):
"""Dispatches to Popen.terminate()."""
return self.proc.terminate()
def kill(self):
"""Dispatches to Popen.kill()."""
return self.proc.kill()
class Handle(int):
closed = False
def Close(self, CloseHandle=None):
CloseHandle = CloseHandle or _winapi.CloseHandle
if not self.closed:
self.closed = True
CloseHandle(self)
def Detach(self):
if not self.closed:
self.closed = True
return int(self)
raise ValueError("already closed")
def __repr__(self):
return "Handle(%d)" % int(self)
__del__ = Close
__str__ = __repr__
class FileThreadDispatcher:
"""Dispatches to different file handles depending on the
current thread. Useful if you want file operation to go to different
places for different threads.
"""
def __init__(self, default=None):
"""
Parameters
----------
default : file-like or None, optional
The file handle to write to if a thread cannot be found in
the registry. If None, a new in-memory instance.
Attributes
----------
registry : dict
Maps thread idents to file handles.
"""
if default is None:
default = io.TextIOWrapper(io.BytesIO())
self.default = default
self.registry = {}