Skip to content

Commit

Permalink
/vsicurl_streaming/: implement retry strategy if GDAL_HTTP_MAX_RETRY …
Browse files Browse the repository at this point in the history
…is set
  • Loading branch information
rouault committed Sep 22, 2023
1 parent 0eabdfb commit 592c453
Show file tree
Hide file tree
Showing 3 changed files with 248 additions and 16 deletions.
171 changes: 171 additions & 0 deletions autotest/gcore/vsicurl_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
# DEALINGS IN THE SOFTWARE.
###############################################################################

import threading
import time

import gdaltest
Expand Down Expand Up @@ -214,3 +215,173 @@ def method(request):
assert gdal.VSIFReadL(1, 1, f) == b"x"
finally:
gdal.VSIFCloseL(f)


###############################################################################
#


def test_vsicurl_streaming_retry_at_beginning(webserver_port):

gdal.VSICurlClearCache()

with gdaltest.config_options(
{
"GDAL_HTTP_MAX_RETRY": "2",
"GDAL_HTTP_RETRY_DELAY": "0.01",
"CPL_VSIL_CURL_STREMAING_SIMULATED_CURL_ERROR": "Send failure: Connection was reset",
},
thread_local=False,
):
handler = webserver.SequentialHandler()
handler.add("GET", "/test.bin", 200, {"Content-Length": "50000"}, "x")
handler.add("GET", "/test.bin", 200, {"Content-Length": "50000"}, "x" * 50000)
with webserver.install_http_handler(handler):
f = gdal.VSIFOpenL(
f"/vsicurl_streaming/http://localhost:{webserver_port}/test.bin", "rb"
)
assert f
try:
read = gdal.VSIFReadL(1, 50, f)
if read != b"x" * 50:
print(read)
assert False
read = gdal.VSIFReadL(1, 50000 - 50, f)
if read != b"x" * (50000 - 50):
assert False
finally:
gdal.VSIFCloseL(f)


###############################################################################
#


def test_vsicurl_streaming_retry_in_middle(webserver_port):

gdal.VSICurlClearCache()

with gdaltest.config_options(
{
"GDAL_HTTP_MAX_RETRY": "2",
"GDAL_HTTP_RETRY_DELAY": "0.01",
"CPL_VSIL_CURL_STREMAING_SIMULATED_CURL_ERROR": "Send failure: Connection was reset",
},
thread_local=False,
):
handler = webserver.SequentialHandler()

file_size = 50000
first_batch_len = 1024

lock = threading.Lock()
cv = threading.Condition(lock)
stop_server = False

def method(request):
request.protocol_version = "HTTP/1.1"
request.send_response(200)
request.send_header("Content-Length", file_size)
request.end_headers()
request.wfile.write(("x" * first_batch_len).encode("ascii"))
request.wfile.flush()
with lock:
while not stop_server:
cv.wait()

handler.add("GET", "/test.bin", custom_method=method)
# Not very realistic: the server changes the content from x to y in
# the retry, but this enables us to easily check where we got data from
handler.add("GET", "/test.bin", 200, {}, "y" * file_size)
with webserver.install_http_handler(handler):
f = gdal.VSIFOpenL(
f"/vsicurl_streaming/http://localhost:{webserver_port}/test.bin", "rb"
)
assert f
try:
first_read_len = 50
assert first_read_len <= first_batch_len

read = gdal.VSIFReadL(1, first_read_len, f)
if read != b"x" * first_read_len:
print(read)
assert False

with lock:
stop_server = True
cv.notify()

read = gdal.VSIFReadL(1, file_size - first_read_len, f)
if read != b"x" * (first_batch_len - first_read_len) + b"y" * (
file_size - first_batch_len
):
print(read)
assert False
finally:
gdal.VSIFCloseL(f)


###############################################################################
#


def test_vsicurl_streaming_retry_in_middle_failed(webserver_port):

gdal.VSICurlClearCache()

with gdaltest.config_options(
{
"GDAL_HTTP_MAX_RETRY": "2",
"GDAL_HTTP_RETRY_DELAY": "0.01",
"CPL_VSIL_CURL_STREMAING_SIMULATED_CURL_ERROR": "Send failure: Connection was reset",
},
thread_local=False,
):
handler = webserver.SequentialHandler()

file_size = 50000
first_batch_len = 1024

lock = threading.Lock()
cv = threading.Condition(lock)
stop_server = False

def method(request):
request.protocol_version = "HTTP/1.1"
request.send_response(200)
request.send_header("Content-Length", file_size)
request.end_headers()
request.wfile.write(("x" * first_batch_len).encode("ascii"))
request.wfile.flush()
with lock:
while not stop_server:
cv.wait()

handler.add("GET", "/test.bin", custom_method=method)
handler.add("GET", "/test.bin", 200, {"Content-Length": str(file_size)}, "y")
handler.add("GET", "/test.bin", 200, {"Content-Length": str(file_size)}, "y")
with webserver.install_http_handler(handler):
f = gdal.VSIFOpenL(
f"/vsicurl_streaming/http://localhost:{webserver_port}/test.bin", "rb"
)
assert f
try:
first_read_len = 50
assert first_read_len <= first_batch_len

read = gdal.VSIFReadL(1, first_read_len, f)
if read != b"x" * first_read_len:
print(read)
assert False

with lock:
stop_server = True
cv.notify()

read = gdal.VSIFReadL(1, file_size - first_read_len, f)
if read != b"x" * (first_batch_len - first_read_len):
print(read)
assert False
assert gdal.VSIFEofL(f)
finally:
gdal.VSIFCloseL(f)
7 changes: 5 additions & 2 deletions autotest/pymod/webserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,13 @@ def _process_req_resp(req_resp, request):
request.send_response(req_resp.code)
for k in req_resp.headers:
request.send_header(k, req_resp.headers[k])
if req_resp.add_content_length_header:
if (
req_resp.add_content_length_header
and "Content-Length" not in req_resp.headers
):
if req_resp.body:
request.send_header("Content-Length", len(req_resp.body))
elif "Content-Length" not in req_resp.headers:
else:
request.send_header("Content-Length", "0")
request.end_headers()
if req_resp.body:
Expand Down
Loading

0 comments on commit 592c453

Please sign in to comment.