forked from jupyterhub/zero-to-jupyterhub-k8s
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_spawn.py
232 lines (209 loc) · 7.48 KB
/
test_spawn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
import subprocess
import time
import pytest
import requests
def test_spawn_basic(
api_request,
jupyter_user,
request_data,
pebble_acme_ca_cert,
extra_files_test_command,
):
"""
Tests the hub api's /users/:user/server POST endpoint. A user pod should be
created with environment variables defined in singleuser.extraEnv,
singleuser.extraFiles should be mounted, etc.
"""
print("asking kubespawner to spawn a server for a test user")
r = api_request.post("/users/" + jupyter_user + "/server")
assert r.status_code in (201, 202)
try:
# check successfull spawn
server_model = _wait_for_user_to_spawn(
api_request, jupyter_user, request_data["test_timeout"]
)
assert server_model
r = requests.get(
request_data["hub_url"].partition("/hub/api")[0]
+ server_model["url"]
+ "api",
verify=pebble_acme_ca_cert,
)
assert r.status_code == 200
assert "version" in r.json()
# check user pod's extra environment variable
pod_name = server_model["state"]["pod_name"]
c = subprocess.run(
[
"kubectl",
"exec",
pod_name,
"--",
"sh",
"-c",
"if [ -z $TEST_ENV_FIELDREF_TO_NAMESPACE ]; then exit 1; fi",
]
)
assert (
c.returncode == 0
), f"singleuser.extraEnv didn't lead to a mounted environment variable!"
# check user pod's extra files
c = subprocess.run(
[
"kubectl",
"exec",
pod_name,
"--",
"sh",
"-c",
extra_files_test_command,
]
)
assert (
c.returncode == 0
), f"The singleuser.extraFiles configuration doesn't seem to have been honored!"
finally:
_delete_server(api_request, jupyter_user, request_data["test_timeout"])
@pytest.mark.netpol
def test_spawn_netpol(api_request, jupyter_user, request_data):
"""
Tests a spawned user pods ability to communicate with allowed and blocked
internet locations.
"""
print(
"asking kubespawner to spawn a server for a test user to test network policies"
)
r = api_request.post("/users/" + jupyter_user + "/server")
assert r.status_code in (201, 202)
try:
# check successfull spawn
server_model = _wait_for_user_to_spawn(
api_request, jupyter_user, request_data["test_timeout"]
)
assert server_model
pod_name = server_model["state"]["pod_name"]
c = subprocess.run(
[
"kubectl",
"exec",
pod_name,
"--",
"nslookup",
"hub",
],
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
if c.returncode != 0:
print(f"Return code: {c.returncode}")
print("---")
print(c.stdout)
raise AssertionError(
"DNS issue: failed to resolve 'hub' from a singleuser-server"
)
c = subprocess.run(
[
"kubectl",
"exec",
pod_name,
"--",
"nslookup",
"jupyter.org",
],
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
if c.returncode != 0:
print(f"Return code: {c.returncode}")
print("---")
print(c.stdout)
raise AssertionError(
"DNS issue: failed to resolve 'jupyter.org' from a singleuser-server"
)
# The IPs we test against are differentiated by the NetworkPolicy shaped
# by the dev-config.yaml's singleuser.networkPolicy.egress
# configuration. If these IPs change, you can use `nslookup jupyter.org`
# to get new IPs but beware that this response may look different over
# time at least on our GitHub Action runners. Note that we have
# explicitly pinned these IPs and explicitly pass the Host header in the
# web-request in order to avoid test failures following additional IPs
# are added.
allowed_jupyter_org_ip = "104.21.25.233"
blocked_jupyter_org_ip = "172.67.134.225"
cmd_kubectl_exec = ["kubectl", "exec", pod_name, "--"]
cmd_python_exec = ["python", "-c"]
cmd_python_code = "import socket; s = socket.socket(); s.settimeout(3); s.connect(('{ip}', 80)); s.close();"
cmd_check_allowed_ip = (
cmd_kubectl_exec
+ cmd_python_exec
+ [cmd_python_code.format(ip=allowed_jupyter_org_ip)]
)
cmd_check_blocked_ip = (
cmd_kubectl_exec
+ cmd_python_exec
+ [cmd_python_code.format(ip=blocked_jupyter_org_ip)]
)
# check allowed jupyter.org ip connectivity
c = subprocess.run(
cmd_check_allowed_ip,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
if c.returncode != 0:
print(f"Return code: {c.returncode}")
print("---")
print(c.stdout)
raise AssertionError(
f"Network issue: access to '{allowed_jupyter_org_ip}' was supposed to be allowed"
)
# check blocked jupyter.org ip connectivity
c = subprocess.run(
cmd_check_blocked_ip,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
if c.returncode == 0:
print(f"Return code: {c.returncode}")
print("---")
print(c.stdout)
raise AssertionError(
f"Network issue: access to '{blocked_jupyter_org_ip}' was supposed to be denied"
)
finally:
_delete_server(api_request, jupyter_user, request_data["test_timeout"])
def _wait_for_user_to_spawn(api_request, jupyter_user, timeout):
endtime = time.time() + timeout
while time.time() < endtime:
# NOTE: If this request fails with a 503 response from the proxy, the
# hub pod has probably crashed by the tests interaction with it.
r = api_request.get("/users/" + jupyter_user)
r.raise_for_status()
user_model = r.json()
# Note that JupyterHub has a concept of named servers, so the default
# server is named "", a blank string.
if "" in user_model["servers"]:
server_model = user_model["servers"][""]
if server_model["ready"]:
return server_model
else:
print("Awaiting server info to be part of user_model...")
time.sleep(1)
return False
def _delete_server(api_request, jupyter_user, timeout):
# NOTE: If this request fails with a 503 response from the proxy, the hub
# pod has probably crashed by the previous tests' interaction with it.
r = api_request.delete("/users/" + jupyter_user + "/server")
assert r.status_code in (202, 204)
endtime = time.time() + timeout
while time.time() < endtime:
r = api_request.get("/users/" + jupyter_user)
r.raise_for_status()
user_model = r.json()
if "" not in user_model["servers"]:
return True
time.sleep(1)
return False