Skip to content

Commit

Permalink
VSISync(): make CHUNK_SIZE option when uploading to /vsis3/
Browse files Browse the repository at this point in the history
  • Loading branch information
rouault committed Mar 6, 2020
1 parent 9cfadf5 commit 912f3eb
Show file tree
Hide file tree
Showing 3 changed files with 419 additions and 47 deletions.
170 changes: 166 additions & 4 deletions autotest/gcore/vsis3.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ def test_vsis3_no_sign_request():


###############################################################################
# Test Sync() and multithreading
# Test Sync() and multithreaded download


def test_vsis3_sync_multithreaded():
def test_vsis3_sync_multithreaded_download():

if not gdaltest.built_against_curl():
pytest.skip()
Expand All @@ -117,10 +117,10 @@ def cbk(pct, _, tab):


###############################################################################
# Test Sync() and multithreading and CHUNK_SIZE
# Test Sync() and multithreaded download and CHUNK_SIZE


def test_vsis3_sync_multithreaded_chunk_size():
def test_vsis3_sync_multithreaded_download_chunk_size():

if not gdaltest.built_against_curl():
pytest.skip()
Expand Down Expand Up @@ -2356,6 +2356,168 @@ def test_vsis3_fake_rename_on_existing_dir():
assert gdal.Rename( '/vsis3/test/source.txt', '/vsis3/test_target_dir') == -1


###############################################################################
# Test Sync() and multithreaded download and CHUNK_SIZE


def test_vsis3_fake_sync_multithreaded_upload_chunk_size():

if gdaltest.webserver_port == 0:
pytest.skip()

gdal.VSICurlClearCache()

def cbk(pct, _, tab):
assert pct >= tab[0]
tab[0] = pct
return True

gdal.Mkdir('/vsimem/test', 0)
gdal.FileFromMemBuffer('/vsimem/test/foo', 'foo\n')

tab = [ -1 ]
handler = webserver.SequentialHandler()
handler.add('GET', '/test_bucket/?prefix=test%2F', 200)
handler.add('GET', '/test_bucket/test', 404)
handler.add('GET', '/test_bucket/?delimiter=%2F&max-keys=100&prefix=test%2F', 200)
handler.add('GET', '/test_bucket/', 200)
handler.add('GET', '/test_bucket/test/', 404)
handler.add('PUT', '/test_bucket/test/', 200)

def method(request):
request.protocol_version = 'HTTP/1.1'
response = '<?xml version="1.0" encoding="UTF-8"?><InitiateMultipartUploadResult><UploadId>my_id</UploadId></InitiateMultipartUploadResult>'
request.send_response(200)
request.send_header('Content-type', 'application/xml')
request.send_header('Content-Length', len(response))
request.end_headers()
request.wfile.write(response.encode('ascii'))

handler.add('POST', '/test_bucket/test/foo?uploads', custom_method=method)

def method(request):
if request.headers['Content-Length'] != '3':
sys.stderr.write('Did not get expected headers: %s\n' % str(request.headers))
request.send_response(400)
request.send_header('Content-Length', 0)
request.end_headers()
return
request.send_response(200)
request.send_header('ETag', '"first_etag"')
request.send_header('Content-Length', 0)
request.end_headers()

handler.add('PUT', '/test_bucket/test/foo?partNumber=1&uploadId=my_id', custom_method=method)

def method(request):
if request.headers['Content-Length'] != '1':
sys.stderr.write('Did not get expected headers: %s\n' % str(request.headers))
request.send_response(400)
request.send_header('Content-Length', 0)
request.end_headers()
return
request.send_response(200)
request.send_header('ETag', '"second_etag"')
request.send_header('Content-Length', 0)
request.end_headers()

handler.add('PUT', '/test_bucket/test/foo?partNumber=2&uploadId=my_id', custom_method=method)


def method(request):

if request.headers['Content-Length'] != '186':
sys.stderr.write('Did not get expected headers: %s\n' % str(request.headers))
request.send_response(400)
request.send_header('Content-Length', 0)
request.end_headers()
return

content = request.rfile.read(186).decode('ascii')
if content != """<CompleteMultipartUpload>
<Part>
<PartNumber>1</PartNumber><ETag>"first_etag"</ETag></Part>
<Part>
<PartNumber>2</PartNumber><ETag>"second_etag"</ETag></Part>
</CompleteMultipartUpload>
""":
sys.stderr.write('Did not get expected content: %s\n' % content)
request.send_response(400)
request.send_header('Content-Length', 0)
request.end_headers()
return

request.send_response(200)
request.send_header('Content-Length', 0)
request.end_headers()

handler.add('POST', '/test_bucket/test/foo?uploadId=my_id', custom_method=method)

with gdaltest.config_option('VSIS3_SIMULATE_THREADING', 'YES'):
with webserver.install_http_handler(handler):
assert gdal.Sync('/vsimem/test',
'/vsis3/test_bucket',
options=['NUM_THREADS=1', 'CHUNK_SIZE=3'],
callback=cbk, callback_data=tab)
assert tab[0] == 1.0

gdal.RmdirRecursive('/vsimem/test')


def test_vsis3_fake_sync_multithreaded_upload_chunk_size_failure():

if gdaltest.webserver_port == 0:
pytest.skip()

gdal.VSICurlClearCache()

gdal.Mkdir('/vsimem/test', 0)
gdal.FileFromMemBuffer('/vsimem/test/foo', 'foo\n')

handler = webserver.SequentialHandler()
handler.add('GET', '/test_bucket/?prefix=test%2F', 200)
handler.add('GET', '/test_bucket/test', 404)
handler.add('GET', '/test_bucket/?delimiter=%2F&max-keys=100&prefix=test%2F', 200)
handler.add('GET', '/test_bucket/', 200)
handler.add('GET', '/test_bucket/test/', 404)
handler.add('PUT', '/test_bucket/test/', 200)

def method(request):
request.protocol_version = 'HTTP/1.1'
response = '<?xml version="1.0" encoding="UTF-8"?><InitiateMultipartUploadResult><UploadId>my_id</UploadId></InitiateMultipartUploadResult>'
request.send_response(200)
request.send_header('Content-type', 'application/xml')
request.send_header('Content-Length', len(response))
request.end_headers()
request.wfile.write(response.encode('ascii'))

handler.add('POST', '/test_bucket/test/foo?uploads', custom_method=method)

def method(request):
if request.headers['Content-Length'] != '3':
sys.stderr.write('Did not get expected headers: %s\n' % str(request.headers))
request.send_response(400)
request.send_header('Content-Length', 0)
request.end_headers()
return
request.send_response(200)
request.send_header('ETag', '"first_etag"')
request.send_header('Content-Length', 0)
request.end_headers()

handler.add('PUT', '/test_bucket/test/foo?partNumber=1&uploadId=my_id', 400)
handler.add('DELETE', '/test_bucket/test/foo?uploadId=my_id', 204)

with gdaltest.config_options({'VSIS3_SIMULATE_THREADING': 'YES',
'VSIS3_SYNC_MULTITHREADING': 'NO'}):
with webserver.install_http_handler(handler):
with gdaltest.error_handler():
assert not gdal.Sync('/vsimem/test',
'/vsis3/test_bucket',
options=['NUM_THREADS=1', 'CHUNK_SIZE=3'])

gdal.RmdirRecursive('/vsimem/test')

###############################################################################
# Read credentials from simulated ~/.aws/credentials

Expand Down
5 changes: 4 additions & 1 deletion gdal/port/cpl_vsil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,10 @@ int VSIRename( const char * oldpath, const char * newpath )
* Since GDAL 3.1</li>
* <li>CHUNK_SIZE=integer. Maximum size of chunk (in bytes) to use to split
* large objects when downloading them from /vsis3/, /vsigs/ or /vsiaz/ to
* local file system. Only used if NUM_THREADS > 1. Since GDAL 3.1</li>
* local file system, or for upload to /vsis3/ from local file system.
* Only used if NUM_THREADS > 1.
* For upload to /vsis3/, this chunk size will be set at least to 5 MB.
* Since GDAL 3.1</li>
* </ul>
* @param pProgressFunc Progress callback, or NULL.
* @param pProgressData User data of progress callback, or NULL.
Expand Down
Loading

0 comments on commit 912f3eb

Please sign in to comment.