-
Notifications
You must be signed in to change notification settings - Fork 157
/
Copy paths3_multipart_upload.py
executable file
·175 lines (154 loc) · 6.48 KB
/
s3_multipart_upload.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
#!/usr/bin/env python
"""Split large file into multiple pieces for upload to S3.
S3 only supports 5Gb files for uploading directly, so for larger CloudBioLinux
box images we need to use boto's multipart file support.
This parallelizes the task over available cores using multiprocessing.
It checks for an up to date version of the file remotely, skipping transfer
if found.
Note: by default this will look for your default AWS Access Key ID and AWS Secret Access Key
you setup via 'aws configure'. You can store additional profiles using
'aws configure --profile <some_profile_name>'
Usage:
s3_multipart_upload.py <file_to_transfer> <bucket_name> [<s3_key_name>]
if <s3_key_name> is not specified, the filename will be used.
--norr -- Do not use reduced redundancy storage.
--public -- Make uploaded files public.
--cores=n -- Number of cores to use for upload
--profile -- The alternate AWS profile to use for your keys located in ~/.aws/config
Files are stored at cheaper reduced redundancy storage by default.
"""
import os
import sys
import glob
import subprocess
import contextlib
import functools
import multiprocessing
from multiprocessing.pool import IMapIterator
from optparse import OptionParser
import rfc822
import boto
def main(transfer_file, bucket_name, s3_key_name=None, use_rr=True,
make_public=True, cores=None, profile=None):
if s3_key_name is None:
s3_key_name = os.path.basename(transfer_file)
if profile is None:
conn = boto.connect_s3()
else:
conn = boto.connect_s3(profile_name=profile)
bucket = conn.lookup(bucket_name)
if bucket is None:
bucket = conn.create_bucket(bucket_name)
if s3_has_uptodate_file(bucket, transfer_file, s3_key_name):
print "S3 has up to date version of %s in %s. Not transferring." % \
(s3_key_name, bucket.name)
return
mb_size = os.path.getsize(transfer_file) / 1e6
if mb_size < 50:
_standard_transfer(bucket, s3_key_name, transfer_file, use_rr)
else:
_multipart_upload(bucket, s3_key_name, transfer_file, mb_size, use_rr,
cores, profile)
s3_key = bucket.get_key(s3_key_name)
if make_public:
s3_key.set_acl("public-read")
def s3_has_uptodate_file(bucket, transfer_file, s3_key_name):
"""Check if S3 has an existing, up to date version of this file.
"""
s3_key = bucket.get_key(s3_key_name)
if s3_key:
s3_size = s3_key.size
local_size = os.path.getsize(transfer_file)
s3_time = rfc822.mktime_tz(rfc822.parsedate_tz(s3_key.last_modified))
local_time = os.path.getmtime(transfer_file)
return s3_size == local_size and s3_time >= local_time
return False
def upload_cb(complete, total):
sys.stdout.write(".")
sys.stdout.flush()
def _standard_transfer(bucket, s3_key_name, transfer_file, use_rr):
print " Upload with standard transfer, not multipart",
new_s3_item = bucket.new_key(s3_key_name)
new_s3_item.set_contents_from_filename(transfer_file, reduced_redundancy=use_rr,
cb=upload_cb, num_cb=10)
print
def map_wrap(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
return apply(f, *args, **kwargs)
return wrapper
def mp_from_ids(mp_id, mp_keyname, mp_bucketname, profile=None):
"""Get the multipart upload from the bucket and multipart IDs.
This allows us to reconstitute a connection to the upload
from within multiprocessing functions.
"""
if profile is None:
conn = boto.connect_s3()
else:
conn = boto.connect_s3(profile_name=profile)
bucket = conn.lookup(mp_bucketname)
mp = boto.s3.multipart.MultiPartUpload(bucket)
mp.key_name = mp_keyname
mp.id = mp_id
return mp
@map_wrap
def transfer_part(mp_id, mp_keyname, mp_bucketname, i, part, profile):
"""Transfer a part of a multipart upload. Designed to be run in parallel.
"""
mp = mp_from_ids(mp_id, mp_keyname, mp_bucketname, profile)
print " Transferring", i, part
with open(part) as t_handle:
mp.upload_part_from_file(t_handle, i+1)
os.remove(part)
def _multipart_upload(bucket, s3_key_name, tarball, mb_size, use_rr=True,
cores=None, profile=None):
"""Upload large files using Amazon's multipart upload functionality.
"""
def split_file(in_file, mb_size, split_num=5):
prefix = os.path.join(os.path.dirname(in_file),
"%sS3PART" % (os.path.basename(s3_key_name)))
# require a split size between 5Mb (AWS minimum) and 250Mb
split_size = int(max(min(mb_size / (split_num * 2.0), 250), 5))
if not os.path.exists("%saa" % prefix):
cl = ["split", "-b%sm" % split_size, in_file, prefix]
subprocess.check_call(cl)
return sorted(glob.glob("%s*" % prefix))
mp = bucket.initiate_multipart_upload(s3_key_name, reduced_redundancy=use_rr)
with multimap(cores) as pmap:
for _ in pmap(transfer_part, ((mp.id, mp.key_name, mp.bucket_name, i, part, profile)
for (i, part) in
enumerate(split_file(tarball, mb_size, cores)))):
pass
mp.complete_upload()
@contextlib.contextmanager
def multimap(cores=None):
"""Provide multiprocessing imap like function.
The context manager handles setting up the pool, worked around interrupt issues
and terminating the pool on completion.
"""
if cores is None:
cores = max(multiprocessing.cpu_count() - 1, 1)
def wrapper(func):
def wrap(self, timeout=None):
return func(self, timeout=timeout if timeout is not None else 1e100)
return wrap
IMapIterator.next = wrapper(IMapIterator.next)
pool = multiprocessing.Pool(cores)
yield pool.imap
pool.terminate()
if __name__ == "__main__":
parser = OptionParser()
parser.add_option("-r", "--norr", dest="use_rr",
action="store_false", default=True)
parser.add_option("-p", "--public", dest="make_public",
action="store_true", default=False)
parser.add_option("-c", "--cores", dest="cores",
default=multiprocessing.cpu_count())
parser.add_option("--profile", dest="profile")
(options, args) = parser.parse_args()
if len(args) < 2:
print __doc__
sys.exit()
kwargs = dict(use_rr=options.use_rr, make_public=options.make_public,
cores=int(options.cores), profile=options.profile)
main(*args, **kwargs)