forked from ClickHouse/ClickHouse
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathssh.py
134 lines (108 loc) · 3.96 KB
/
ssh.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/env python3
import shutil
import os
import subprocess
import tempfile
import logging
import signal
class SSHAgent:
def __init__(self):
self._env = {}
self._env_backup = {}
self._keys = {}
self.start()
@property
def pid(self):
return int(self._env["SSH_AGENT_PID"])
def start(self):
if shutil.which("ssh-agent") is None:
raise Exception("ssh-agent binary is not available")
self._env_backup["SSH_AUTH_SOCK"] = os.environ.get("SSH_AUTH_SOCK")
self._env_backup["SSH_OPTIONS"] = os.environ.get("SSH_OPTIONS")
# set ENV from stdout of ssh-agent
for line in self._run(["ssh-agent"]).splitlines():
name, _, value = line.partition(b"=")
if _ == b"=":
value = value.split(b";", 1)[0]
self._env[name.decode()] = value.decode()
os.environ[name.decode()] = value.decode()
ssh_options = (
"," + os.environ["SSH_OPTIONS"] if os.environ.get("SSH_OPTIONS") else ""
)
os.environ[
"SSH_OPTIONS"
] = f"{ssh_options}UserKnownHostsFile=/dev/null,StrictHostKeyChecking=no"
def add(self, key):
key_pub = self._key_pub(key)
if key_pub in self._keys:
self._keys[key_pub] += 1
else:
self._run(["ssh-add", "-"], stdin=key.encode())
self._keys[key_pub] = 1
return key_pub
def remove(self, key_pub):
if key_pub not in self._keys:
raise Exception(f"Private key not found, public part: {key_pub}")
if self._keys[key_pub] > 1:
self._keys[key_pub] -= 1
else:
with tempfile.NamedTemporaryFile() as f:
f.write(key_pub)
f.flush()
self._run(["ssh-add", "-d", f.name])
self._keys.pop(key_pub)
def print_keys(self):
keys = self._run(["ssh-add", "-l"]).splitlines()
if keys:
logging.info("ssh-agent keys:")
for key in keys:
logging.info("%s", key)
else:
logging.info("ssh-agent (pid %d) is empty", self.pid)
def kill(self):
for k, v in self._env.items():
os.environ.pop(k, None)
for k, v in self._env_backup.items():
if v is not None:
os.environ[k] = v
os.kill(self.pid, signal.SIGTERM)
def _key_pub(self, key):
with tempfile.NamedTemporaryFile() as f:
f.write(key.encode())
f.flush()
return self._run(["ssh-keygen", "-y", "-f", f.name])
@staticmethod
def _run(cmd, stdin=None):
shell = isinstance(cmd, str)
with subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE if stdin else None,
shell=shell,
) as p:
stdout, stderr = p.communicate(stdin)
if stdout.strip().decode() == "The agent has no identities.":
return ""
if p.returncode:
message = stderr.strip() + b"\n" + stdout.strip()
raise Exception(message.strip().decode())
return stdout
class SSHKey:
def __init__(self, key_name=None, key_value=None):
if key_name is None and key_value is None:
raise Exception("Either key_name or key_value must be specified")
if key_name is not None and key_value is not None:
raise Exception("key_name or key_value must be specified")
if key_name is not None:
self.key = os.getenv(key_name)
else:
self.key = key_value
self._key_pub = None
self._ssh_agent = SSHAgent()
def __enter__(self):
self._key_pub = self._ssh_agent.add(self.key)
self._ssh_agent.print_keys()
def __exit__(self, exc_type, exc_val, exc_tb):
self._ssh_agent.remove(self._key_pub)
self._ssh_agent.print_keys()