forked from hail-is/hail
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwait-for.py
117 lines (94 loc) · 4.19 KB
/
wait-for.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import os
import sys
import traceback
import argparse
import concurrent.futures
import asyncio
import aiohttp
import uvloop
from kubernetes_asyncio import client, config
uvloop.install()
async def timeout(timeout_seconds):
print('info: in timeout', file=sys.stderr)
await asyncio.sleep(timeout_seconds)
print('error: timed out', file=sys.stderr)
sys.exit(1)
async def wait_for_pod_complete(v1, namespace, name):
print('info: in wait_for_pod_complete', file=sys.stderr)
while True:
try:
try:
pod = await v1.read_namespaced_pod(
name,
namespace,
_request_timeout=5.0)
if pod and pod.status and pod.status.container_statuses:
container_statuses = pod.status.container_statuses
if all(cs.state and cs.state.terminated for cs in container_statuses):
if all(cs.state.terminated.exit_code == 0 for cs in container_statuses):
print('info: success')
sys.exit(0)
else:
print('error: a container failed')
sys.exit(1)
except client.rest.ApiException as exc:
if exc.status == 404:
print('info: 404', file=sys.stderr)
else:
raise
except concurrent.futures.CancelledError:
print('info: CancelledError', file=sys.stderr)
raise
except Exception as e:
print(f'wait_for_pod_complete failed due to exception {traceback.format_exc()}{e}', file=sys.stderr)
await asyncio.sleep(1)
# this needs to agree with hailtop.config
def internal_base_url(location, namespace, service):
if location == 'gce':
if namespace == 'default':
return f'http://{service}.hail'
return f'http://internal.hail/{namespace}/{service}'
assert location == 'k8s'
if namespace == 'default':
return f'http://{service}.default'
return f'http://{service}.{namespace}/{namespace}/{service}'
async def wait_for_service_alive(namespace, name, location, endpoint, headers):
print('info: in wait_for_service_alive', file=sys.stderr)
base_url = internal_base_url(location, namespace, name)
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=5.0)) as session:
while True:
try:
async with session.get(f'{base_url}{endpoint}', headers=headers) as resp:
if resp.status >= 200 and resp.status < 300:
print('info: success')
sys.exit(0)
except concurrent.futures.CancelledError:
print('info: CancelledError', file=sys.stderr)
raise
except Exception as e:
print(f'wait_for_service_alive failed due to exception {traceback.format_exc()}{e}', file=sys.stderr)
await asyncio.sleep(1)
async def main():
parser = argparse.ArgumentParser()
parser.add_argument('timeout_seconds', type=int)
parser.add_argument('namespace', type=str)
subparsers = parser.add_subparsers(dest='kind')
pod_parser = subparsers.add_parser('Pod')
pod_parser.add_argument('name', type=str)
service_parser = subparsers.add_parser('Service')
service_parser.add_argument('name', type=str)
service_parser.add_argument('--location', type=str, default='gce')
service_parser.add_argument('--endpoint', '-e', type=str, default='/healthcheck')
service_parser.add_argument('--header', action='append', type=str, nargs=2)
args = parser.parse_args()
if args.kind == 'Pod':
await config.load_kube_config()
v1 = client.CoreV1Api()
t = wait_for_pod_complete(v1, args.namespace, args.name)
else:
assert args.kind == 'Service'
headers = None if args.header is None else {flag: val for flag, val in args.header}
t = wait_for_service_alive(args.namespace, args.name, args.location, args.endpoint, headers)
await asyncio.gather(timeout(args.timeout_seconds), t)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())