Skip to content

Commit

Permalink
✨ Stream pyarrow.dataset.dataset supported formats in Artifact.open() (
Browse files Browse the repository at this point in the history
  • Loading branch information
Koncopd authored Oct 28, 2024
1 parent 6e3d8b4 commit e764dcb
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 17 deletions.
17 changes: 12 additions & 5 deletions lamindb/_artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
save_feature_set_links,
save_feature_sets,
)
from .core.storage._pyarrow_dataset import PYARROW_SUFFIXES
from .core.storage.objects import _mudata_is_installed
from .core.storage.paths import AUTO_KEY_PREFIX

Expand All @@ -72,6 +73,7 @@ def zarr_is_adata(storepath): # type: ignore
if TYPE_CHECKING:
from lamindb_setup.core.types import UPathStr
from mudata import MuData
from pyarrow.dataset import Dataset as PyArrowDataset
from tiledbsoma import Collection as SOMACollection
from tiledbsoma import Experiment as SOMAExperiment

Expand Down Expand Up @@ -905,14 +907,19 @@ def replace(
# docstring handled through attach_func_to_class_method
def open(
self, mode: str = "r", is_run_input: bool | None = None
) -> AnnDataAccessor | BackedAccessor | SOMACollection | SOMAExperiment:
) -> (
AnnDataAccessor | BackedAccessor | SOMACollection | SOMAExperiment | PyArrowDataset
):
# ignore empty suffix for now
suffixes = (".h5", ".hdf5", ".h5ad", ".zarr", ".tiledbsoma", "")
suffixes = ("", ".h5", ".hdf5", ".h5ad", ".zarr", ".tiledbsoma") + PYARROW_SUFFIXES
if self.suffix not in suffixes:
raise ValueError(
"Artifact should have a zarr, h5 or tiledbsoma object as the underlying data, please"
" use one of the following suffixes for the object name:"
f" {', '.join(suffixes[:-1])}."
"Artifact should have a zarr, h5, tiledbsoma object"
" or a compatible `pyarrow.dataset.dataset` directory"
" as the underlying data, please use one of the following suffixes"
f" for the object name: {', '.join(suffixes[1:])}."
f" Or no suffix for a folder with {', '.join(PYARROW_SUFFIXES)} files"
" (no mixing allowed)."
)
if self.suffix != ".tiledbsoma" and self.key != "soma" and mode != "r":
raise ValueError("Only a tiledbsoma store can be openened with `mode!='r'`.")
Expand Down
24 changes: 16 additions & 8 deletions lamindb/core/storage/_backed_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
from lnschema_core import Artifact

from ._anndata_accessor import AnnDataAccessor, StorageType, registry
from ._pyarrow_dataset import _is_pyarrow_dataset, _open_pyarrow_dataset
from ._tiledbsoma import _open_tiledbsoma
from .paths import filepath_from_artifact

if TYPE_CHECKING:
from fsspec.core import OpenFile
from pyarrow.dataset import Dataset as PyArrowDataset
from tiledbsoma import Collection as SOMACollection
from tiledbsoma import Experiment as SOMAExperiment
from upath import UPath
Expand Down Expand Up @@ -67,22 +69,28 @@ def backed_access(
artifact_or_filepath: Artifact | UPath,
mode: str = "r",
using_key: str | None = None,
) -> AnnDataAccessor | BackedAccessor | SOMACollection | SOMAExperiment:
) -> (
AnnDataAccessor | BackedAccessor | SOMACollection | SOMAExperiment | PyArrowDataset
):
if isinstance(artifact_or_filepath, Artifact):
filepath, _ = filepath_from_artifact(artifact_or_filepath, using_key=using_key)
objectpath, _ = filepath_from_artifact(
artifact_or_filepath, using_key=using_key
)
else:
filepath = artifact_or_filepath
name = filepath.name
suffix = filepath.suffix
objectpath = artifact_or_filepath
name = objectpath.name
suffix = objectpath.suffix

if name == "soma" or suffix == ".tiledbsoma":
if mode not in {"r", "w"}:
raise ValueError("`mode` should be either 'r' or 'w' for tiledbsoma.")
return _open_tiledbsoma(filepath, mode=mode) # type: ignore
return _open_tiledbsoma(objectpath, mode=mode) # type: ignore
elif suffix in {".h5", ".hdf5", ".h5ad"}:
conn, storage = registry.open("h5py", filepath, mode=mode)
conn, storage = registry.open("h5py", objectpath, mode=mode)
elif suffix == ".zarr":
conn, storage = registry.open("zarr", filepath, mode=mode)
conn, storage = registry.open("zarr", objectpath, mode=mode)
elif _is_pyarrow_dataset(objectpath):
return _open_pyarrow_dataset(objectpath)
else:
raise ValueError(
"object should have .h5, .hdf5, .h5ad, .zarr, .tiledbsoma suffix, not"
Expand Down
31 changes: 31 additions & 0 deletions lamindb/core/storage/_pyarrow_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from __future__ import annotations

from typing import TYPE_CHECKING

import pyarrow.dataset
from lamindb_setup.core.upath import LocalPathClasses

if TYPE_CHECKING:
from upath import UPath


PYARROW_SUFFIXES = (".parquet", ".csv", ".json", ".orc", ".arrow", ".feather")


def _is_pyarrow_dataset(path: UPath) -> bool:
# it is assumed here that path exists
if path.is_file():
return path.suffix in PYARROW_SUFFIXES
else:
objects = path.rglob("*")
suffixes = {object.suffix for object in objects if object.suffix != ""}
return len(suffixes) == 1 and suffixes.pop() in PYARROW_SUFFIXES


def _open_pyarrow_dataset(path: UPath) -> pyarrow.dataset.Dataset:
if isinstance(path, LocalPathClasses):
path_str, filesystem = path.as_posix(), None
else:
path_str, filesystem = path.path, path.fs

return pyarrow.dataset.dataset(path_str, filesystem=filesystem)
2 changes: 1 addition & 1 deletion sub/lnschema-core
6 changes: 3 additions & 3 deletions tests/core/test_artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,9 @@ def test_create_from_dataframe(df):
assert artifact.type == "dataset"
assert hasattr(artifact, "_local_filepath")
artifact.save()
# can't do backed
with pytest.raises(ValueError):
artifact.open()
# can do backed now, tested in test_storage.py
ds = artifact.open()
assert len(ds.files) == 1
# check that the local filepath has been cleared
assert not hasattr(artifact, "_local_filepath")
artifact.delete(permanent=True, storage=True)
Expand Down
49 changes: 49 additions & 0 deletions tests/storage/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,3 +335,52 @@ def test_write_read_tiledbsoma(storage):

if storage is not None:
ln.settings.storage = previous_storage


def test_backed_pyarrow():
previous_storage = ln.setup.settings.storage.root_as_str
ln.settings.storage = "s3://lamindb-test/storage"

df = pd.DataFrame({"feat1": [0, 0, 1, 1], "feat2": [6, 7, 8, 9]})
# check as non-partitioned file
df.to_parquet("save_df.parquet", engine="pyarrow")
artifact_file = ln.Artifact(
"save_df.parquet", description="Test non-partitioned parquet"
)
artifact_file.save()
# cached after saving
ds = artifact_file.open()
assert ds.to_table().to_pandas().equals(df)
# remove cache
artifact_file.cache().unlink()
ds = artifact_file.open()
assert ds.to_table().to_pandas().equals(df)
# check as partitioned folder
df.to_parquet("save_df", engine="pyarrow", partition_cols=["feat1"])
assert Path("save_df").is_dir()
artifact_folder = ln.Artifact("save_df", description="Test partitioned parquet")
artifact_folder.save()
# cached after saving
ds = artifact_folder.open()
assert ds.to_table().to_pandas().equals(df[["feat2"]])
# remove cache
shutil.rmtree(artifact_folder.cache())
ds = artifact_folder.open()
assert ds.to_table().to_pandas().equals(df[["feat2"]])

artifact_file.delete(permanent=True)
artifact_folder.delete(permanent=True)

ln.settings.storage = previous_storage


def test_backed_wrong_suffix():
fp = Path("test_file.txt")
fp.write_text("test open with wrong suffix")

artifact = ln.Artifact(fp, description="Test open wrong suffix")
# do not save here, it just tries to open the local path
with pytest.raises(ValueError):
artifact.open()

fp.unlink()

0 comments on commit e764dcb

Please sign in to comment.