-
Notifications
You must be signed in to change notification settings - Fork 131
/
Copy pathprivleged_containers.py
68 lines (63 loc) · 2.93 KB
/
privleged_containers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import engine.capabilities.capabilities as caps
from api import api_client
from api.config import get_api_client
def list_pods_for_all_namespaces_or_one_namspace(namespace=None):
api_client=get_api_client()
if namespace is None:
pods = api_client.list_pod_for_all_namespaces(watch=False)
else:
pods = api_client.list_namespaced_pod(namespace)
return pods
def list_pods(namespace=None):
return list_pods_for_all_namespaces_or_one_namspace(namespace)
def is_privileged(security_context, is_container=False):
is_privileged = False
if security_context:
# shared to pods and containers
if security_context.run_as_user == 0:
is_privileged = True
elif is_container:
if security_context.privileged:
is_privileged = True
elif security_context.allow_privilege_escalation:
is_privileged = True
elif security_context.capabilities:
if security_context.capabilities.add:
for cap in security_context.capabilities.add:
if cap in caps.dangerous_caps:
is_privileged = True
break
return is_privileged
def get_privileged_containers(namespace=None):
privileged_pods = []
pods = list_pods_for_all_namespaces_or_one_namspace(namespace)
for pod in pods.items:
privileged_containers = []
if pod.spec.host_ipc or pod.spec.host_pid or pod.spec.host_network or is_privileged(pod.spec.security_context, is_container=False):
privileged_containers = pod.spec.containers
else:
for container in pod.spec.containers:
found_privileged_container = False
if is_privileged(container.security_context, is_container=True):
privileged_containers.append(container)
elif container.ports:
for ports in container.ports:
if ports.host_port:
privileged_containers.append(container)
found_privileged_container = True
break
if not found_privileged_container:
if pod.spec.volumes is not None:
for volume in pod.spec.volumes:
if found_privileged_container:
break
if volume.host_path:
for volume_mount in container.volume_mounts:
if volume_mount.name == volume.name:
privileged_containers.append(container)
found_privileged_container = True
break
if privileged_containers:
pod.spec.containers = privileged_containers
privileged_pods.append(pod)
return privileged_pods