Skip to content

Commit

Permalink
Python: PyArrow support for S3/S3A with properties (apache#5747)
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuarobinson authored Sep 19, 2022
1 parent 2977289 commit f8aecf6
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 80 deletions.
49 changes: 37 additions & 12 deletions python/pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,16 @@
"""

import os
from typing import Union
from functools import lru_cache
from typing import Callable, Tuple, Union
from urllib.parse import urlparse

from pyarrow.fs import FileInfo, FileSystem, FileType
from pyarrow.fs import (
FileInfo,
FileSystem,
FileType,
S3FileSystem,
)

from pyiceberg.io import (
FileIO,
Expand All @@ -35,6 +41,7 @@
OutputFile,
OutputStream,
)
from pyiceberg.typedef import EMPTY_DICT, Properties


class PyArrowFile(InputFile, OutputFile):
Expand All @@ -59,12 +66,9 @@ class PyArrowFile(InputFile, OutputFile):
>>> # output_file.create().write(b'foobytes')
"""

def __init__(self, location: str):
parsed_location = urlparse(location) # Create a ParseResult from the URI
if not parsed_location.scheme: # If no scheme, assume the path is to a local file
self._filesystem, self._path = FileSystem.from_uri(os.path.abspath(location))
else:
self._filesystem, self._path = FileSystem.from_uri(location) # Infer the proper filesystem
def __init__(self, location: str, path: str, fs: FileSystem):
self._filesystem = fs
self._path = path
super().__init__(location=location)

def _file_info(self) -> FileInfo:
Expand Down Expand Up @@ -165,6 +169,24 @@ def to_input_file(self) -> "PyArrowFile":


class PyArrowFileIO(FileIO):
def __init__(self, properties: Properties = EMPTY_DICT):
self.get_fs_and_path: Callable = lru_cache(self._get_fs_and_path)
super().__init__(properties=properties)

def _get_fs_and_path(self, location: str) -> Tuple[FileSystem, str]:
uri = urlparse(location) # Create a ParseResult from the URI
if not uri.scheme: # If no scheme, assume the path is to a local file
return FileSystem.from_uri(os.path.abspath(location))
elif uri.scheme in {"s3", "s3a", "s3n"}:
client_kwargs = {
"endpoint_override": self.properties.get("s3.endpoint"),
"access_key": self.properties.get("s3.access-key-id"),
"secret_key": self.properties.get("s3.secret-access-key"),
}
return (S3FileSystem(**client_kwargs), uri.netloc + uri.path)
else:
return FileSystem.from_uri(location) # Infer the proper filesystem

def new_input(self, location: str) -> PyArrowFile:
"""Get a PyArrowFile instance to read bytes from the file at the given location
Expand All @@ -174,7 +196,8 @@ def new_input(self, location: str) -> PyArrowFile:
Returns:
PyArrowFile: A PyArrowFile instance for the given location
"""
return PyArrowFile(location)
fs, path = self.get_fs_and_path(location)
return PyArrowFile(fs=fs, location=location, path=path)

def new_output(self, location: str) -> PyArrowFile:
"""Get a PyArrowFile instance to write bytes to the file at the given location
Expand All @@ -185,7 +208,8 @@ def new_output(self, location: str) -> PyArrowFile:
Returns:
PyArrowFile: A PyArrowFile instance for the given location
"""
return PyArrowFile(location)
fs, path = self.get_fs_and_path(location)
return PyArrowFile(fs=fs, location=location, path=path)

def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
"""Delete the file at the given location
Expand All @@ -201,9 +225,10 @@ def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
an AWS error code 15
"""
str_path = location.location if isinstance(location, (InputFile, OutputFile)) else location
filesystem, path = FileSystem.from_uri(str_path) # Infer the proper filesystem
fs, path = self.get_fs_and_path(str_path)

try:
filesystem.delete_file(path)
fs.delete_file(path)
except FileNotFoundError:
raise
except PermissionError:
Expand Down
56 changes: 32 additions & 24 deletions python/tests/io/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
load_file_io,
)
from pyiceberg.io.fsspec import FsspecFileIO
from pyiceberg.io.pyarrow import PyArrowFile, PyArrowFileIO
from pyiceberg.io.pyarrow import PyArrowFileIO


class LocalInputFile(InputFile):
Expand Down Expand Up @@ -133,8 +133,8 @@ def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
raise FileNotFoundError(f"Cannot delete file, does not exist: {parsed_location.path}") from e


@pytest.mark.parametrize("CustomInputFile", [LocalInputFile, PyArrowFile])
def test_custom_local_input_file(CustomInputFile):
@pytest.mark.parametrize("CustomFileIO", [LocalFileIO, PyArrowFileIO])
def test_custom_local_input_file(CustomFileIO):
"""Test initializing an InputFile implementation to read a local file"""
with tempfile.TemporaryDirectory() as tmpdirname:
file_location = os.path.join(tmpdirname, "foo.txt")
Expand All @@ -146,7 +146,7 @@ def test_custom_local_input_file(CustomInputFile):

# Instantiate the input file
absolute_file_location = os.path.abspath(file_location)
input_file = CustomInputFile(location=f"{absolute_file_location}")
input_file = CustomFileIO().new_input(location=f"{absolute_file_location}")

# Test opening and reading the file
f = input_file.open()
Expand All @@ -155,15 +155,15 @@ def test_custom_local_input_file(CustomInputFile):
assert len(input_file) == 3


@pytest.mark.parametrize("CustomOutputFile", [LocalOutputFile, PyArrowFile])
def test_custom_local_output_file(CustomOutputFile):
@pytest.mark.parametrize("CustomFileIO", [LocalFileIO, PyArrowFileIO])
def test_custom_local_output_file(CustomFileIO):
"""Test initializing an OutputFile implementation to write to a local file"""
with tempfile.TemporaryDirectory() as tmpdirname:
file_location = os.path.join(tmpdirname, "foo.txt")

# Instantiate the output file
absolute_file_location = os.path.abspath(file_location)
output_file = CustomOutputFile(location=f"{absolute_file_location}")
output_file = CustomFileIO().new_output(location=f"{absolute_file_location}")

# Create the output file and write to it
f = output_file.create()
Expand All @@ -176,8 +176,8 @@ def test_custom_local_output_file(CustomOutputFile):
assert len(output_file) == 3


@pytest.mark.parametrize("CustomOutputFile", [LocalOutputFile, PyArrowFile])
def test_custom_local_output_file_with_overwrite(CustomOutputFile):
@pytest.mark.parametrize("CustomFileIO", [LocalFileIO, PyArrowFileIO])
def test_custom_local_output_file_with_overwrite(CustomFileIO):
"""Test initializing an OutputFile implementation to overwrite a local file"""
with tempfile.TemporaryDirectory() as tmpdirname:
output_file_location = os.path.join(tmpdirname, "foo.txt")
Expand All @@ -187,7 +187,7 @@ def test_custom_local_output_file_with_overwrite(CustomOutputFile):
f.write(b"foo")

# Instantiate an output file
output_file = CustomOutputFile(location=f"{output_file_location}")
output_file = CustomFileIO().new_output(location=f"{output_file_location}")

# Confirm that a FileExistsError is raised when overwrite=False
with pytest.raises(FileExistsError):
Expand All @@ -201,8 +201,8 @@ def test_custom_local_output_file_with_overwrite(CustomOutputFile):
assert f.read() == b"bar"


@pytest.mark.parametrize("CustomFile", [LocalInputFile, LocalOutputFile, PyArrowFile, PyArrowFile])
def test_custom_file_exists(CustomFile):
@pytest.mark.parametrize("CustomFileIO", [LocalFileIO, PyArrowFileIO])
def test_custom_file_exists(CustomFileIO):
"""Test that the exists property returns the proper value for existing and non-existing files"""
with tempfile.TemporaryDirectory() as tmpdirname:
file_location = os.path.join(tmpdirname, "foo.txt")
Expand All @@ -218,23 +218,31 @@ def test_custom_file_exists(CustomFile):
absolute_file_location = os.path.abspath(file_location)
non_existent_absolute_file_location = os.path.abspath(nonexistent_file_location)

# Create File instances
file = CustomFile(location=f"{absolute_file_location}")
non_existent_file = CustomFile(location=f"{non_existent_absolute_file_location}")
# Create InputFile instances
file = CustomFileIO().new_input(location=f"{absolute_file_location}")
non_existent_file = CustomFileIO().new_input(location=f"{non_existent_absolute_file_location}")

# Test opening and reading the file
assert file.exists()
assert not non_existent_file.exists()

# Create OutputFile instances
file = CustomFileIO().new_output(location=f"{absolute_file_location}")
non_existent_file = CustomFileIO().new_output(location=f"{non_existent_absolute_file_location}")

# Test opening and reading the file
assert file.exists()
assert not non_existent_file.exists()


@pytest.mark.parametrize("CustomOutputFile", [LocalOutputFile, PyArrowFile])
def test_output_file_to_input_file(CustomOutputFile):
@pytest.mark.parametrize("CustomFileIO", [LocalFileIO, PyArrowFileIO])
def test_output_file_to_input_file(CustomFileIO):
"""Test initializing an InputFile using the `to_input_file()` method on an OutputFile instance"""
with tempfile.TemporaryDirectory() as tmpdirname:
output_file_location = os.path.join(tmpdirname, "foo.txt")

# Create an output file instance
output_file = CustomOutputFile(location=f"{output_file_location}")
output_file = CustomFileIO().new_output(location=f"{output_file_location}")

# Create the output file and write to it
f = output_file.create()
Expand Down Expand Up @@ -334,8 +342,8 @@ def test_raise_file_not_found_error_for_fileio_delete(CustomFileIO):
assert not os.path.exists(output_file_location)


@pytest.mark.parametrize("CustomFileIO, CustomInputFile", [(LocalFileIO, LocalInputFile), (PyArrowFileIO, PyArrowFile)])
def test_deleting_local_file_using_file_io_input_file(CustomFileIO, CustomInputFile):
@pytest.mark.parametrize("CustomFileIO", [LocalFileIO, PyArrowFileIO])
def test_deleting_local_file_using_file_io_input_file(CustomFileIO):
"""Test deleting a local file by passing an InputFile instance to FileIO.delete(...)"""
with tempfile.TemporaryDirectory() as tmpdirname:
# Write to the temporary file
Expand All @@ -350,7 +358,7 @@ def test_deleting_local_file_using_file_io_input_file(CustomFileIO, CustomInputF
assert os.path.exists(file_location)

# Instantiate the custom InputFile
input_file = CustomInputFile(location=f"{file_location}")
input_file = CustomFileIO().new_input(location=f"{file_location}")

# Delete the file using the file-io implementations delete method
file_io.delete(input_file)
Expand All @@ -359,8 +367,8 @@ def test_deleting_local_file_using_file_io_input_file(CustomFileIO, CustomInputF
assert not os.path.exists(file_location)


@pytest.mark.parametrize("CustomFileIO, CustomOutputFile", [(LocalFileIO, LocalOutputFile), (PyArrowFileIO, PyArrowFile)])
def test_deleting_local_file_using_file_io_output_file(CustomFileIO, CustomOutputFile):
@pytest.mark.parametrize("CustomFileIO", [LocalFileIO, PyArrowFileIO])
def test_deleting_local_file_using_file_io_output_file(CustomFileIO):
"""Test deleting a local file by passing an OutputFile instance to FileIO.delete(...)"""
with tempfile.TemporaryDirectory() as tmpdirname:
# Write to the temporary file
Expand All @@ -375,7 +383,7 @@ def test_deleting_local_file_using_file_io_output_file(CustomFileIO, CustomOutpu
assert os.path.exists(file_location)

# Instantiate the custom OutputFile
output_file = CustomOutputFile(location=f"{file_location}")
output_file = CustomFileIO().new_output(location=f"{file_location}")

# Delete the file using the file-io implementations delete method
file_io.delete(output_file)
Expand Down
Loading

0 comments on commit f8aecf6

Please sign in to comment.