Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

respect chunk size #24

Merged
merged 1 commit into from
Jun 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
respect chunk size
bumps httpx to 0.17.0 as the chunk size is only available since.
  • Loading branch information
skshetry committed Jun 10, 2021
commit fed595e43617f5c9bbe6b7739982ae411f369cad
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ classifiers =
[options]
setup_requires = setuptools_scm
install_requires=
httpx>=0.16.1
httpx>=0.17.0
python-dateutil~=2.8.1
package_dir=
=src
Expand Down
32 changes: 25 additions & 7 deletions src/webdav4/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""Client for the webdav."""

import locale
import shutil
import threading
Expand Down Expand Up @@ -49,6 +48,9 @@
_T = TypeVar("_T")


DEFAULT_CHUNK_SIZE = 2 ** 22


def _prepare_result_info(
response: Response, base_url: URL, detail: bool = True
) -> Union[str, Dict[str, Any]]:
Expand Down Expand Up @@ -178,6 +180,7 @@ def __init__(
auth: "AuthTypes" = None,
http_client: "HTTPClient" = None,
retry: Union[Callable[[Callable[[], _T]], _T], bool] = True,
chunk_size: int = DEFAULT_CHUNK_SIZE,
**client_opts: Any,
) -> None:
"""Instantiate client for webdav.
Expand Down Expand Up @@ -240,6 +243,7 @@ def __init__(
self.with_retry = retry if callable(retry) else _retry(retry)
self._detected_features: Optional[FeatureDetection] = None
self._detect_feature_lock = threading.RLock()
self.chunk_size = chunk_size

@property
def detected_features(self) -> FeatureDetection:
Expand Down Expand Up @@ -532,7 +536,7 @@ def open(
path: str,
mode: str = "r",
encoding: str = None,
block_size: int = None,
chunk_size: int = None,
) -> Iterator[Union[TextIO, BinaryIO]]:
"""Returns file-like object to a resource."""
if self.isdir(path):
Expand All @@ -542,7 +546,7 @@ def open(
with IterStream(
self,
self.join_url(path),
chunk_size=block_size,
chunk_size=chunk_size or self.chunk_size,
) as buffer:
buff = cast(BinaryIO, buffer)

Expand All @@ -561,9 +565,12 @@ def download_fileobj(
from_path: str,
file_obj: BinaryIO,
callback: Callable[[int], Any] = None,
chunk_size: int = None,
) -> None:
"""Write stream from path to given file object."""
with self.open(from_path, mode="rb") as remote_obj:
with self.open(
from_path, mode="rb", chunk_size=chunk_size
) as remote_obj:
# TODO: fix typings for open to always return BinaryIO on mode=rb
remote_obj = cast(BinaryIO, remote_obj)
wrapped = wrap_file_like(file_obj, callback, method="write")
Expand All @@ -573,23 +580,31 @@ def download_file(
self,
from_path: str,
to_path: "PathLike[AnyStr]",
chunk_size: int = None,
callback: Callable[[int], Any] = None,
) -> None:
"""Download file from remote path to local path."""
with open(to_path, mode="wb") as fobj:
self.download_fileobj(from_path, fobj, callback=callback)
self.download_fileobj(
from_path, fobj, callback=callback, chunk_size=chunk_size
)

def upload_file(
self,
from_path: "PathLike[AnyStr]",
to_path: str,
overwrite: bool = False,
chunk_size: int = None,
callback: Callable[[int], Any] = None,
) -> None:
"""Upload file from local path to a given remote path."""
with open(from_path, mode="rb") as fobj:
self.upload_fileobj(
fobj, to_path, overwrite=overwrite, callback=callback
fobj,
to_path,
overwrite=overwrite,
chunk_size=chunk_size,
callback=callback,
)

def upload_fileobj(
Expand All @@ -598,6 +613,7 @@ def upload_fileobj(
to_path: str,
overwrite: bool = False,
callback: Callable[[int], Any] = None,
chunk_size: int = None,
) -> None:
"""Upload file from file object to given path."""
length = -1
Expand All @@ -610,7 +626,9 @@ def upload_fileobj(
raise ResourceAlreadyExists(to_path)

wrapped = wrap_file_like(file_obj, callback)
content = read_chunks(wrapped, 50 * 1024 * 1024)
content = read_chunks(
wrapped, chunk_size=chunk_size or self.chunk_size
)

http_resp = self.request(
HTTPMethod.PUT, to_path, content=content, headers=headers
Expand Down
27 changes: 21 additions & 6 deletions src/webdav4/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def __init__(
(refer to it's documenting for more information).
"""
super().__init__()
client_opts.setdefault("chunk_size", self.blocksize)
self.client = client or Client(base_url, auth=auth, **client_opts)

@classmethod
Expand Down Expand Up @@ -277,8 +278,6 @@ def _open(
return UploadFile(
self, path=path, mode=mode, block_size=block_size
)
if mode == "rb" and not size:
size = self.size(path)

return WebdavFile(
self,
Expand Down Expand Up @@ -332,6 +331,8 @@ def put_file(
class WebdavFile(AbstractBufferedFile):
"""WebdavFile that provides file-like access to remote file."""

size: int

def __init__(
self,
fs: "WebdavFileSystem",
Expand Down Expand Up @@ -364,9 +365,18 @@ def __init__(
self.path,
mode=self.mode,
encoding=encoding,
block_size=self.blocksize,
chunk_size=self.blocksize,
)
self.reader: Union[TextIO, BinaryIO] = self.fobj.__enter__()

# only get the file size if GET request didnot send Content-Length
# or was retrieved before.
if not self.size:
if getattr(self.reader, "size", None):
self.size = self.reader.size # type: ignore
else:
self.size = self.fs.size(self.path)

self.closed: bool = False

def read(self, length: int = -1) -> Union[str, bytes, None]:
Expand Down Expand Up @@ -468,8 +478,11 @@ def __init__( # pylint: disable=invalid-name
"""Extended interface with path and fs."""
assert fs
assert path

self.blocksize = block_size or io.DEFAULT_BUFFER_SIZE
self.blocksize = (
AbstractBufferedFile.DEFAULT_BLOCK_SIZE
if block_size in ["default", None]
else block_size
)
self.fs: WebdavFileSystem = fs # pylint: disable=invalid-name
assert mode
self.path: str = path
Expand Down Expand Up @@ -501,7 +514,9 @@ def commit(self) -> None:
"""
self.seek(0)
fileobj = cast(BinaryIO, self)
self.fs.client.upload_fileobj(fileobj, self.path, overwrite=True)
self.fs.client.upload_fileobj(
fileobj, self.path, chunk_size=self.blocksize, overwrite=True
)

def close(self) -> None:
"""Close the file."""
Expand Down
13 changes: 8 additions & 5 deletions src/webdav4/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def request(
def iter_url( # noqa: C901
client: "Client",
url: "URLTypes",
chunk_size: int = None,
pos: int = 0,
) -> Iterator[Tuple["HTTPResponse", Iterator[bytes]]]:
"""Iterate over chunks requested from url.
Expand All @@ -72,7 +73,7 @@ def gen(
response.raise_for_status()

try:
for chunk in response.iter_bytes():
for chunk in response.iter_bytes(chunk_size=chunk_size):
pos += len(chunk)
yield chunk
break
Expand Down Expand Up @@ -110,11 +111,11 @@ def __init__(
self.buffer = b""
# setting chunk_size is not possible yet with httpx
# though it is to be released in a new version.
self.chunk_size = chunk_size or DEFAULT_BUFFER_SIZE
self.chunk_size = chunk_size or client.chunk_size
self.client = client
self.url = url
self._loc: int = 0
self._cm = iter_url(client, self.url)
self._cm = iter_url(client, self.url, chunk_size=chunk_size)
self.size: Optional[int] = None
self._iterator: Optional[Iterator[bytes]] = None
self._response: Optional["HTTPResponse"] = None
Expand Down Expand Up @@ -177,7 +178,9 @@ def seek(self, offset: int, whence: int = 0) -> int:
raise ValueError("Seek before start of file")

self.close()
self._cm = iter_url(self.client, self.url, pos=loc)
self._cm = iter_url(
self.client, self.url, pos=loc, chunk_size=self.chunk_size
)
# pylint: disable=no-member
self._response, self._iterator = self._cm.__enter__()
self.loc = loc
Expand Down Expand Up @@ -248,7 +251,7 @@ def readinto1(self, sequence: Buffer) -> int:

def read_chunks(obj: IO[AnyStr], chunk_size: int = None) -> Iterator[AnyStr]:
"""Read file object in chunks."""
func = partial(obj.read, chunk_size or 1024 * 1024)
func = partial(obj.read, chunk_size or DEFAULT_BUFFER_SIZE)
return takewhile(bool, repeat_func(func))


Expand Down
4 changes: 1 addition & 3 deletions tests/test_stream.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""Testing stream utilities."""

from io import DEFAULT_BUFFER_SIZE, BytesIO, StringIO
from typing import Any, Iterator

Expand Down Expand Up @@ -51,7 +50,6 @@ def test_retry_reconnect_on_failure(
monkeypatch: MonkeyPatch,
):
"""Test retry/reconnect on network failures."""
from io import DEFAULT_BUFFER_SIZE
from unittest import mock

from webdav4.http import HTTPNetworkError, HTTPResponse
Expand All @@ -74,7 +72,7 @@ def bad_iter_content(
# Text should be longer than default chunk to test resume,
# using twice of that plus something tests second resume,
# this is important because second response is different
text1 = "0123456789" * (DEFAULT_BUFFER_SIZE // 10 + 1)
text1 = "0123456789" * (client.chunk_size // 10 + 1)
storage_dir.gen("sample.txt", text1 * 2)
propfind_resp = client.propfind("sample.txt")

Expand Down