forked from algorand/go-algorand
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathget_current_installers.py
executable file
·52 lines (45 loc) · 1.39 KB
/
get_current_installers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#!/usr/bin/env python3
#
# pip install boto3
# python3 get_current_installers.py s3://bucket/prefix
import re
import sys
import boto3
def get_stage_release_set(response):
prefix = None
they = []
for x in response['Contents']:
path = x['Key']
pre, fname = path.rsplit('/', 1)
if fname.startswith('tools_') or fname.startswith('install_') or fname.startswith('node_'):
continue
if prefix is None:
prefix = pre
they.append(x)
elif prefix == pre:
they.append(x)
else:
break
return they
# return (bucket,prefix)
def parse_s3_path(path):
m = re.match(r's3://([^/]+)/(.*)', path)
if m:
return m.group(1), m.group(2)
return None, None
def main():
bucket, prefix = parse_s3_path(sys.argv[1])
s3 = boto3.client('s3')
staging_response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix, MaxKeys=100)
if (not staging_response.get('KeyCount')) or ('Contents' not in staging_response):
sys.stderr.write('nothing found under {}\n'.format(sys.argv[1]))
sys.exit(1)
rset = get_stage_release_set(staging_response)
for ob in rset:
okey = ob['Key']
if okey.endswith('.rpm') or okey.endswith('.deb'):
_, fname = okey.rsplit('/', 1)
s3.download_file(bucket, okey, fname)
return
if __name__ == '__main__':
main()