Skip to content

Commit

Permalink
allow configuration of retry behavior for file downloads (pantsbuild#…
Browse files Browse the repository at this point in the history
…21308)

As described in pantsbuild#21093, Pants
needs better controls for how the downloads intrinsic operates when
retrying retryable errors.

This PR adds new options `--file-downloads-retry-delay` which allows
users to customize the error delay duration between retries used by the
downloads intrinsic rule and `--file-downloads-max-attempts` which
configures the maximum number of attempts allowed when retryable errors
are encountered.
  • Loading branch information
tdyas authored Aug 19, 2024
1 parent 63e8b7b commit b4f3b19
Show file tree
Hide file tree
Showing 9 changed files with 176 additions and 30 deletions.
4 changes: 4 additions & 0 deletions docs/notes/2.23.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ plugin](https://github.com/pantsbuild/pants/tree/2.23.x/testprojects/pants-plugi
for inspiration. To opt into the feature set the flag
`--enable-target-origin-sources-blocks`.

### General

New advanced options `--file-downloads-retry-delay` and `--file-downloads-max-attempts` allow configuration of the retry behavior when retryable errors occur while Pants is downloading files, for example, while downloading a `http_source` source.

### Goals

#### Package
Expand Down
7 changes: 6 additions & 1 deletion src/python/pants/backend/url_handlers/s3/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from pants.engine.internals.selectors import Get
from pants.engine.rules import collect_rules, rule
from pants.engine.unions import UnionRule
from pants.option.global_options import GlobalOptions
from pants.util.strutil import softwrap

CONTENT_TYPE = "binary/octet-stream"
Expand Down Expand Up @@ -63,7 +64,9 @@ class S3DownloadFile:


@rule
async def download_from_s3(request: S3DownloadFile, aws_credentials: AWSCredentials) -> Digest:
async def download_from_s3(
request: S3DownloadFile, aws_credentials: AWSCredentials, global_options: GlobalOptions
) -> Digest:
from botocore import auth, compat, exceptions # pants: no-infer-dep

# NB: The URL for auth is expected to be in path-style
Expand Down Expand Up @@ -102,6 +105,8 @@ async def download_from_s3(request: S3DownloadFile, aws_credentials: AWSCredenti
url=virtual_hosted_url,
expected_digest=request.expected_digest,
auth_headers=http_request.headers,
retry_delay_duration=global_options.file_downloads_retry_delay,
max_attempts=global_options.file_downloads_max_attempts,
),
)

Expand Down
13 changes: 12 additions & 1 deletion src/python/pants/engine/download_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from pants.engine.internals.selectors import Get
from pants.engine.rules import collect_rules, rule
from pants.engine.unions import UnionMembership, union
from pants.option.global_options import GlobalOptions
from pants.util.strutil import bullet_list, softwrap


Expand Down Expand Up @@ -67,8 +68,10 @@ def rules():
async def download_file(
request: DownloadFile,
union_membership: UnionMembership,
global_options: GlobalOptions,
) -> Digest:
parsed_url = urlparse(request.url)

handlers = union_membership.get(URLDownloadHandler)
matched_handlers = []
for handler in handlers:
Expand Down Expand Up @@ -96,7 +99,15 @@ async def download_file(
handler = matched_handlers[0]
return await Get(Digest, URLDownloadHandler, handler(request.url, request.expected_digest))

return await Get(Digest, NativeDownloadFile(request.url, request.expected_digest))
return await Get(
Digest,
NativeDownloadFile(
request.url,
request.expected_digest,
retry_delay_duration=global_options.file_downloads_retry_delay,
max_attempts=global_options.file_downloads_max_attempts,
),
)


def rules():
Expand Down
49 changes: 42 additions & 7 deletions src/python/pants/engine/download_file_integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
# Licensed under the Apache License, Version 2.0 (see LICENSE).

import hashlib
from datetime import timedelta
from http.server import BaseHTTPRequestHandler, HTTPServer
from threading import Thread
from urllib.parse import parse_qs, urlparse

import pytest
import requests

from pants.engine.download_file import URLDownloadHandler, download_file
from pants.engine.fs import (
Expand All @@ -20,6 +22,8 @@
from pants.engine.internals.scheduler import ExecutionError
from pants.engine.rules import QueryRule
from pants.engine.unions import UnionMembership
from pants.option.global_options import GlobalOptions
from pants.testutil.option_util import create_subsystem
from pants.testutil.rule_runner import MockGet, RuleRunner, run_rule_with_mocks

DOWNLOADS_FILE_DIGEST = FileDigest(
Expand All @@ -30,13 +34,21 @@
)


def test_no_union_members() -> None:
@pytest.fixture
def global_options() -> GlobalOptions:
return create_subsystem(
GlobalOptions, file_downloads_retry_delay=0.01, file_downloads_max_attempts=2
)


def test_no_union_members(global_options: GlobalOptions) -> None:
union_membership = UnionMembership({})
digest = run_rule_with_mocks(
download_file,
rule_args=[
DownloadFile("http://pantsbuild.com/file.txt", DOWNLOADS_FILE_DIGEST),
union_membership,
global_options,
],
mock_gets=[
MockGet(
Expand Down Expand Up @@ -77,7 +89,7 @@ def test_no_union_members() -> None:
("http*", "*.pantsbuild.com", "http://awesome.pantsbuild.com/file.txt"),
],
)
def test_matches(scheme, authority, url) -> None:
def test_matches(scheme, authority, url, global_options: GlobalOptions) -> None:
class UnionMember(URLDownloadHandler):
match_scheme = scheme
match_authority = authority
Expand All @@ -93,6 +105,7 @@ def mock_rule(self) -> Digest:
rule_args=[
DownloadFile(url, DOWNLOADS_FILE_DIGEST),
union_membership,
global_options,
],
mock_gets=[
MockGet(
Expand Down Expand Up @@ -126,7 +139,7 @@ def mock_rule(self) -> Digest:
("https", "*.pantsbuild.com", "https://pantsbuild.com/file.txt"),
],
)
def test_doesnt_match(scheme, authority, url) -> None:
def test_doesnt_match(scheme, authority, url, global_options: GlobalOptions) -> None:
class UnionMember(URLDownloadHandler):
match_scheme = scheme
match_authority = authority
Expand All @@ -138,6 +151,7 @@ class UnionMember(URLDownloadHandler):
rule_args=[
DownloadFile(url, DOWNLOADS_FILE_DIGEST),
union_membership,
global_options,
],
mock_gets=[
MockGet(
Expand All @@ -156,7 +170,7 @@ class UnionMember(URLDownloadHandler):
assert digest == DOWNLOADS_EXPECTED_DIRECTORY_DIGEST


def test_too_many_matches() -> None:
def test_too_many_matches(global_options: GlobalOptions) -> None:
class AuthorityMatcher(URLDownloadHandler):
match_authority = "pantsbuild.com"

Expand All @@ -171,6 +185,7 @@ class SchemeMatcher(URLDownloadHandler):
rule_args=[
DownloadFile("http://pantsbuild.com/file.txt", DOWNLOADS_FILE_DIGEST),
union_membership,
global_options,
],
mock_gets=[
MockGet(
Expand Down Expand Up @@ -209,7 +224,21 @@ def _http_server_thread() -> None:
t.daemon = True
t.start()

rule_runner = RuleRunner(rules=[QueryRule(DigestEntries, (Digest,))], isolated_local_store=True)
# Wait until the http server is operational.
wait_attempts_remaining = 4
while wait_attempts_remaining > 0:
r = requests.get(f"http://127.0.0.1:{port}/?val=test", timeout=0.1)
if r.status_code == 200:
break
wait_attempts_remaining -= 1

if wait_attempts_remaining == 0:
raise Exception("HTTP server thread did not startup.")

rule_runner = RuleRunner(
rules=[QueryRule(DigestEntries, (Digest,))],
isolated_local_store=True,
)

response = "world"
expected_digest = FileDigest(hashlib.sha256(response.encode()).hexdigest(), len(response))
Expand All @@ -218,7 +247,10 @@ def _http_server_thread() -> None:
Digest,
[
NativeDownloadFile(
f"http://127.0.0.1:{port}/hello?val={response}", expected_digest=expected_digest
f"http://127.0.0.1:{port}/hello?val={response}",
expected_digest=expected_digest,
retry_delay_duration=timedelta(milliseconds=1),
max_attempts=2,
)
],
)
Expand All @@ -237,7 +269,10 @@ def _http_server_thread() -> None:
Digest,
[
NativeDownloadFile(
f"http://127.0.0.1:{port}/hello?val=galaxy", expected_digest=expected_digest
f"http://127.0.0.1:{port}/hello?val=galaxy",
expected_digest=expected_digest,
retry_delay_duration=timedelta(milliseconds=1),
max_attempts=2,
)
],
)
13 changes: 12 additions & 1 deletion src/python/pants/engine/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from __future__ import annotations

from dataclasses import dataclass
from datetime import timedelta
from enum import Enum
from typing import TYPE_CHECKING, Iterable, Mapping, Optional, Sequence, Tuple, Union

Expand Down Expand Up @@ -273,12 +274,22 @@ class NativeDownloadFile:
# authorization.
auth_headers: FrozenDict[str, str]

retry_error_duration: timedelta
max_attempts: int

def __init__(
self, url: str, expected_digest: FileDigest, auth_headers: Mapping[str, str] | None = None
self,
url: str,
expected_digest: FileDigest,
auth_headers: Mapping[str, str] | None = None,
retry_delay_duration: timedelta = timedelta(milliseconds=10),
max_attempts: int = 4,
) -> None:
object.__setattr__(self, "url", url)
object.__setattr__(self, "expected_digest", expected_digest)
object.__setattr__(self, "auth_headers", FrozenDict(auth_headers or {}))
object.__setattr__(self, "retry_error_duration", retry_delay_duration)
object.__setattr__(self, "max_attempts", max_attempts)


@dataclass(frozen=True)
Expand Down
3 changes: 3 additions & 0 deletions src/python/pants/engine/fs_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,9 @@ def test_download_body_error_retry(downloads_rule_runner: RuleRunner) -> None:

def test_download_body_error_retry_eventually_fails(downloads_rule_runner: RuleRunner) -> None:
# Returns one more error than the retry will allow.
downloads_rule_runner.set_options(
["--file-downloads-max-attempts=4", "--file-downloads-retry-delay=0.001"]
)
with http_server(stub_erroring_handler(5)) as port:
with pytest.raises(Exception):
_ = downloads_rule_runner.request(
Expand Down
46 changes: 45 additions & 1 deletion src/python/pants/option/global_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import sys
import tempfile
from dataclasses import dataclass
from datetime import datetime
from datetime import datetime, timedelta
from enum import Enum
from pathlib import Path, PurePath
from typing import Any, Callable, Type, TypeVar, cast
Expand Down Expand Up @@ -1748,6 +1748,50 @@ class BootstrapOptions:
"""
),
)
_file_downloads_retry_delay = FloatOption(
default=0.2,
advanced=True,
help=softwrap(
"""
When Pants downloads files (for example, for the `http_source` source), Pants will retry the download
if a "retryable" error occurs. Between each attempt, Pants will delay a random amount of time using an
exponential backoff algorithm.
This option sets the "base" duration in seconds used for calculating the retry delay.
"""
),
)
_file_downloads_max_attempts = IntOption(
default=4,
advanced=True,
help=softwrap(
"""
When Pants downloads files (for example, for the `http_source` source), Pants will retry the download
if a "retryable" error occurs.
This option sets the maximum number of attempts Pants will make to try to download the file before giving up
with an error.
"""
),
)

@property
def file_downloads_retry_delay(self) -> timedelta:
value = self._file_downloads_retry_delay
if value <= 0.0:
raise ValueError(
f"Global option `--file-downloads-retry-delay` must a positive number, got {value}"
)
return timedelta(seconds=value)

@property
def file_downloads_max_attempts(self) -> int:
value = self._file_downloads_max_attempts
if value < 1:
raise ValueError(
f"Global option `--file-downloads-max-attempts` must be at least 1, got {value}"
)
return value


# N.B. By subclassing BootstrapOptions, we inherit all of those options and are also able to extend
Expand Down
16 changes: 13 additions & 3 deletions src/rust/engine/src/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
use std::collections::BTreeMap;
use std::fmt;
use std::io::{self, Write};
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::time::Duration;

use async_trait::async_trait;
use bytes::{BufMut, Bytes};
Expand Down Expand Up @@ -221,6 +223,8 @@ pub async fn download(
auth_headers: BTreeMap<String, String>,
file_name: String,
expected_digest: hashing::Digest,
error_delay: Duration,
max_attempts: NonZeroUsize,
) -> Result<(), String> {
let mut attempt_number = 0;
let (actual_digest, bytes) = in_workunit!(
Expand All @@ -234,9 +238,9 @@ pub async fn download(
.unwrap()
)),
|_workunit| async move {
// TODO: Allow the retry strategy to be configurable?
// For now we retry after 10ms, 100ms, 1s, and 10s.
let retry_strategy = ExponentialBackoff::from_millis(10).map(jitter).take(4);
let retry_strategy = ExponentialBackoff::from_millis(error_delay.as_millis() as u64)
.map(jitter)
.take(max_attempts.get() - 1);
RetryIf::spawn(
retry_strategy,
|| {
Expand Down Expand Up @@ -277,10 +281,12 @@ mod tests {
use std::{
collections::{BTreeMap, HashSet},
net::SocketAddr,
num::NonZeroUsize,
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
time::Duration,
};

use axum::{extract::State, response::IntoResponse, routing::get, Router};
Expand Down Expand Up @@ -328,6 +334,8 @@ mod tests {
auth_headers,
"foo.txt".into(),
expected_digest,
Duration::from_millis(10),
NonZeroUsize::new(1).unwrap(),
)
.await
.unwrap();
Expand Down Expand Up @@ -396,6 +404,8 @@ mod tests {
auth_headers,
"foo.txt".into(),
expected_digest,
Duration::from_millis(10),
NonZeroUsize::new(3).unwrap(),
)
.await
.unwrap();
Expand Down
Loading

0 comments on commit b4f3b19

Please sign in to comment.