forked from KalicoCrew/kalico
-
Notifications
You must be signed in to change notification settings - Fork 0
/
whconsole.py
executable file
·98 lines (85 loc) · 3.01 KB
/
whconsole.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#!/usr/bin/env python2
# Test console for webhooks interface
#
# Copyright (C) 2020 Kevin O'Connor <[email protected]>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
import sys, os, optparse, socket, fcntl, select, json, errno, time
# Set a file-descriptor as non-blocking
def set_nonblock(fd):
fcntl.fcntl(
fd, fcntl.F_SETFL, fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK
)
def webhook_socket_create(uds_filename):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.setblocking(0)
sys.stderr.write("Waiting for connect to %s\n" % (uds_filename,))
while 1:
try:
sock.connect(uds_filename)
except socket.error as e:
if e.errno == errno.ECONNREFUSED:
time.sleep(0.1)
continue
sys.stderr.write(
"Unable to connect socket %s [%d,%s]\n"
% (uds_filename, e.errno, errno.errorcode[e.errno])
)
sys.exit(-1)
break
sys.stderr.write("Connection.\n")
return sock
class KeyboardReader:
def __init__(self, uds_filename):
self.kbd_fd = sys.stdin.fileno()
set_nonblock(self.kbd_fd)
self.webhook_socket = webhook_socket_create(uds_filename)
self.poll = select.poll()
self.poll.register(sys.stdin, select.POLLIN | select.POLLHUP)
self.poll.register(self.webhook_socket, select.POLLIN | select.POLLHUP)
self.kbd_data = self.socket_data = ""
def process_socket(self):
data = self.webhook_socket.recv(4096)
if not data:
sys.stderr.write("Socket closed\n")
sys.exit(0)
parts = data.split("\x03")
parts[0] = self.socket_data + parts[0]
self.socket_data = parts.pop()
for line in parts:
sys.stdout.write("GOT: %s\n" % (line,))
def process_kbd(self):
data = os.read(self.kbd_fd, 4096)
parts = data.split("\n")
parts[0] = self.kbd_data + parts[0]
self.kbd_data = parts.pop()
for line in parts:
line = line.strip()
if not line or line.startswith("#"):
continue
try:
m = json.loads(line)
except:
sys.stderr.write("ERROR: Unable to parse line\n")
continue
cm = json.dumps(m, separators=(",", ":"))
sys.stdout.write("SEND: %s\n" % (cm,))
self.webhook_socket.send("%s\x03" % (cm,))
def run(self):
while 1:
res = self.poll.poll(1000.0)
for fd, event in res:
if fd == self.kbd_fd:
self.process_kbd()
else:
self.process_socket()
def main():
usage = "%prog [options] <socket filename>"
opts = optparse.OptionParser(usage)
options, args = opts.parse_args()
if len(args) != 1:
opts.error("Incorrect number of arguments")
ml = KeyboardReader(args[0])
ml.run()
if __name__ == "__main__":
main()