Skip to content

Commit

Permalink
s3: provide more helpful messages on common errors (iterative#4480)
Browse files Browse the repository at this point in the history
* s3: provide more helpful messages on common errors

Fix iterative#4478

* utils: introduce error_link

* Update dvc/tree/s3.py

Co-authored-by: Jorge Orpinel <[email protected]>

* fix tests

* dont forget the link

* remove redundant message

Co-authored-by: Jorge Orpinel <[email protected]>
  • Loading branch information
efiop and jorgeorpinel authored Aug 27, 2020
1 parent 334556f commit b9d685a
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 93 deletions.
2 changes: 1 addition & 1 deletion dvc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class RelPath(str):
"endpointurl": str,
"access_key_id": str,
"secret_access_key": str,
Optional("listobjects", default=False): Bool,
Optional("listobjects", default=False): Bool, # obsoleted
Optional("use_ssl", default=True): Bool,
"sse": str,
"sse_kms_key_id": str,
Expand Down
5 changes: 2 additions & 3 deletions dvc/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Exceptions raised by the dvc."""
from funcy import first

from dvc.utils import format_link, relpath
from dvc.utils import error_link, format_link, relpath


class DvcException(Exception):
Expand Down Expand Up @@ -258,8 +258,7 @@ def __init__(self, target_infos, stats=None):
m = (
"Checkout failed for following targets:\n{}\nIs your "
"cache up to date?\n{}".format(
"\n".join(targets),
format_link("https://error.dvc.org/missing-files"),
"\n".join(targets), error_link("missing-files"),
)
)
super().__init__(m)
Expand Down
6 changes: 2 additions & 4 deletions dvc/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from dvc.external_repo import clean_repos
from dvc.logger import FOOTER, disable_other_loggers
from dvc.tree.pool import close_pools
from dvc.utils import format_link
from dvc.utils import error_link

# Workaround for CPython bug. See [1] and [2] for more info.
# [1] https://github.com/aws/aws-cli/blob/1.16.277/awscli/clidriver.py#L55
Expand Down Expand Up @@ -92,9 +92,7 @@ def main(argv=None): # noqa: C901
logger.exception(
"too many open files, please visit "
"{} to see how to handle this "
"problem".format(
format_link("https://error.dvc.org/many-files")
),
"problem".format(error_link("many-files")),
extra={"tb_only": True},
)
else:
Expand Down
167 changes: 87 additions & 80 deletions dvc/tree/s3.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
import threading
from contextlib import contextmanager

from funcy import cached_property, wrap_prop

Expand All @@ -9,6 +10,7 @@
from dvc.path_info import CloudURLInfo
from dvc.progress import Tqdm
from dvc.scheme import Schemes
from dvc.utils import error_link

from .base import BaseTree

Expand All @@ -31,11 +33,6 @@ def __init__(self, repo, config):
self.profile = config.get("profile")
self.endpoint_url = config.get("endpointurl")

if config.get("listobjects"):
self.list_objects_api = "list_objects"
else:
self.list_objects_api = "list_objects_v2"

self.use_ssl = config.get("use_ssl", True)

self.extra_args = {}
Expand Down Expand Up @@ -75,24 +72,49 @@ def s3(self):

session = boto3.session.Session(**session_opts)

return session.client(
return session.resource(
"s3", endpoint_url=self.endpoint_url, use_ssl=self.use_ssl
)

@classmethod
def get_etag(cls, s3, bucket, path):
obj = cls.get_head_object(s3, bucket, path)

return obj["ETag"].strip('"')

@staticmethod
def get_head_object(s3, bucket, path, *args, **kwargs):
@contextmanager
def _get_s3(self):
from botocore.exceptions import (
EndpointConnectionError,
NoCredentialsError,
)

try:
obj = s3.head_object(Bucket=bucket, Key=path, *args, **kwargs)
except Exception as exc:
raise DvcException(f"s3://{bucket}/{path} does not exist") from exc
return obj
yield self.s3
except NoCredentialsError as exc:
link = error_link("no-credentials")
raise DvcException(
f"Unable to find AWS credentials. {link}"
) from exc
except EndpointConnectionError as exc:
link = error_link("connection-error")
name = self.endpoint_url or "AWS S3"
raise DvcException(
f"Unable to connect to '{name}'. {link}"
) from exc

@contextmanager
def _get_bucket(self, bucket):
with self._get_s3() as s3:
try:
yield s3.Bucket(bucket)
except s3.meta.client.exceptions.NoSuchBucket as exc:
link = error_link("no-bucket")
raise DvcException(
f"Bucket '{bucket}' does not exist. {link}"
) from exc

@contextmanager
def _get_obj(self, path_info):
with self._get_bucket(path_info.bucket) as bucket:
try:
yield bucket.Object(path_info.path)
except bucket.meta.client.exceptions.NoSuchKey as exc:
raise DvcException(f"{path_info.url} does not exist") from exc

def _append_aws_grants_to_extra_args(self, config):
# Keys for extra_args can be one of the following list:
Expand Down Expand Up @@ -127,9 +149,12 @@ def _append_aws_grants_to_extra_args(self, config):

def _generate_download_url(self, path_info, expires=3600):
params = {"Bucket": path_info.bucket, "Key": path_info.path}
return self.s3.generate_presigned_url(
ClientMethod="get_object", Params=params, ExpiresIn=int(expires)
)
with self._get_s3() as s3:
return s3.meta.client.generate_presigned_url(
ClientMethod="get_object",
Params=params,
ExpiresIn=int(expires),
)

def exists(self, path_info, use_dvcignore=True):
"""Check if the blob exists. If it does not exist,
Expand Down Expand Up @@ -164,36 +189,19 @@ def isdir(self, path_info):
return bool(list(self._list_paths(dir_path, max_items=1)))

def isfile(self, path_info):
from botocore.exceptions import ClientError

if path_info.path.endswith("/"):
return False

try:
self.s3.head_object(Bucket=path_info.bucket, Key=path_info.path)
except ClientError as exc:
if exc.response["Error"]["Code"] != "404":
raise
return False

return True

def _list_objects(self, path_info, max_items=None):
""" Read config for list object api, paginate through list objects."""
kwargs = {
"Bucket": path_info.bucket,
"Prefix": path_info.path,
"PaginationConfig": {"MaxItems": max_items},
}
paginator = self.s3.get_paginator(self.list_objects_api)
for page in paginator.paginate(**kwargs):
contents = page.get("Contents", ())
yield from contents
return path_info.path in self._list_paths(path_info)

def _list_paths(self, path_info, max_items=None):
return (
item["Key"] for item in self._list_objects(path_info, max_items)
)
kwargs = {"Prefix": path_info.path}
if max_items is not None:
kwargs["MaxKeys"] = max_items

with self._get_bucket(path_info.bucket) as bucket:
for obj_summary in bucket.objects.filter(**kwargs):
yield obj_summary.key

def walk_files(self, path_info, **kwargs):
if not kwargs.pop("prefix", False):
Expand All @@ -209,7 +217,8 @@ def remove(self, path_info):
raise NotImplementedError

logger.debug(f"Removing {path_info}")
self.s3.delete_object(Bucket=path_info.bucket, Key=path_info.path)
with self._get_obj(path_info) as obj:
obj.delete()

def makedirs(self, path_info):
# We need to support creating empty directories, which means
Expand All @@ -221,10 +230,12 @@ def makedirs(self, path_info):
return

dir_path = path_info / ""
self.s3.put_object(Bucket=path_info.bucket, Key=dir_path.path, Body="")
with self._get_obj(dir_path) as obj:
obj.put(Body="")

def copy(self, from_info, to_info):
self._copy(self.s3, from_info, to_info, self.extra_args)
with self._get_s3() as s3:
self._copy(s3.meta.client, from_info, to_info, self.extra_args)

@classmethod
def _copy_multipart(
Expand All @@ -238,8 +249,8 @@ def _copy_multipart(
parts = []
byte_position = 0
for i in range(1, n_parts + 1):
obj = S3Tree.get_head_object(
s3, from_info.bucket, from_info.path, PartNumber=i
obj = s3.head_object(
Bucket=from_info.bucket, Key=from_info.path, PartNumber=i
)
part_size = obj["ContentLength"]
lastbyte = byte_position + part_size - 1
Expand Down Expand Up @@ -293,7 +304,7 @@ def _copy(cls, s3, from_info, to_info, extra_args):
# object is transfered in the same chunks as it was originally.
from boto3.s3.transfer import TransferConfig

obj = S3Tree.get_head_object(s3, from_info.bucket, from_info.path)
obj = s3.head_object(Bucket=from_info.bucket, Key=from_info.path)
etag = obj["ETag"].strip('"')
size = obj["ContentLength"]

Expand All @@ -313,39 +324,35 @@ def _copy(cls, s3, from_info, to_info, extra_args):
Config=TransferConfig(multipart_threshold=size + 1),
)

cached_etag = S3Tree.get_etag(s3, to_info.bucket, to_info.path)
cached_etag = s3.head_object(Bucket=to_info.bucket, Key=to_info.path)[
"ETag"
].strip('"')
if etag != cached_etag:
raise ETagMismatchError(etag, cached_etag)

def get_file_hash(self, path_info):
return (
self.PARAM_CHECKSUM,
self.get_etag(self.s3, path_info.bucket, path_info.path),
)
with self._get_obj(path_info) as obj:
return (
self.PARAM_CHECKSUM,
obj.e_tag.strip('"'),
)

def _upload(self, from_file, to_info, name=None, no_progress_bar=False):
total = os.path.getsize(from_file)
with Tqdm(
disable=no_progress_bar, total=total, bytes=True, desc=name
) as pbar:
self.s3.upload_file(
from_file,
to_info.bucket,
to_info.path,
Callback=pbar.update,
ExtraArgs=self.extra_args,
)
with self._get_obj(to_info) as obj:
total = os.path.getsize(from_file)
with Tqdm(
disable=no_progress_bar, total=total, bytes=True, desc=name
) as pbar:
obj.upload_file(
from_file, Callback=pbar.update, ExtraArgs=self.extra_args,
)

def _download(self, from_info, to_file, name=None, no_progress_bar=False):
if no_progress_bar:
total = None
else:
total = self.s3.head_object(
Bucket=from_info.bucket, Key=from_info.path
)["ContentLength"]
with Tqdm(
disable=no_progress_bar, total=total, bytes=True, desc=name
) as pbar:
self.s3.download_file(
from_info.bucket, from_info.path, to_file, Callback=pbar.update
)
with self._get_obj(from_info) as obj:
with Tqdm(
disable=no_progress_bar,
total=obj.content_length,
bytes=True,
desc=name,
) as pbar:
obj.download_file(to_file, Callback=pbar.update)
4 changes: 4 additions & 0 deletions dvc/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,10 @@ def format_link(link):
)


def error_link(name):
return format_link(f"https://error.dvc.org/{name}")


def parse_target(target, default=None):
from dvc.dvcfile import PIPELINE_FILE, PIPELINE_LOCK, is_valid_filename
from dvc.exceptions import DvcException
Expand Down
2 changes: 1 addition & 1 deletion tests/func/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def test_copy_singlepart_preserve_etag():
def test_link_created_on_non_nested_path(base_info, tmp_dir, dvc, scm):
tree = S3Tree(dvc, {"url": str(base_info.parent)})
cache = CloudCache(tree)
s3 = cache.tree.s3
s3 = cache.tree.s3.meta.client
s3.create_bucket(Bucket=base_info.bucket)
s3.put_object(
Bucket=base_info.bucket, Key=(base_info / "from").path, Body="data"
Expand Down
8 changes: 4 additions & 4 deletions tests/unit/remote/test_remote_tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def test_walk_files(remote):
@pytest.mark.parametrize("remote", [pytest.lazy_fixture("s3")], indirect=True)
def test_copy_preserve_etag_across_buckets(remote, dvc):
s3 = remote.tree.s3
s3.create_bucket(Bucket="another")
s3.Bucket("another").create()

another = S3Tree(dvc, {"url": "s3://another", "region": "us-east-1"})

Expand All @@ -98,10 +98,10 @@ def test_copy_preserve_etag_across_buckets(remote, dvc):

remote.tree.copy(from_info, to_info)

from_etag = S3Tree.get_etag(s3, from_info.bucket, from_info.path)
to_etag = S3Tree.get_etag(s3, "another", "foo")
from_hash = remote.tree.get_hash(from_info)
to_hash = another.get_hash(to_info)

assert from_etag == to_etag
assert from_hash == to_hash


@pytest.mark.parametrize("remote", remotes, indirect=True)
Expand Down
Loading

0 comments on commit b9d685a

Please sign in to comment.