Skip to content

Commit

Permalink
Improve handling of backwards compat for airflow.io (apache#36199)
Browse files Browse the repository at this point in the history
Older providers do not have a get_fs method that takes
storage_options as arguments. If we encounter such provider
and storage_options are passed we should error out instead
if silently ignoring.

Co-authored-by: Tzu-ping Chung <[email protected]>
  • Loading branch information
bolkedebruin and uranusjr authored Dec 13, 2023
1 parent 97e8f58 commit 6c94ddf
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 0 deletions.
7 changes: 7 additions & 0 deletions airflow/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,18 @@ def get_fs(
raise ValueError(f"No filesystem registered for scheme {scheme}") from None

options = storage_options or {}

# MyPy does not recognize dynamic parameters inspection when we call the method, and we have to do
# it for compatibility reasons with already released providers, that's why we need to ignore
# mypy errors here
parameters = inspect.signature(fs).parameters
if len(parameters) == 1:
if options:
raise AttributeError(
f"Filesystem {scheme} does not support storage options, but options were passed."
f"This most likely means that you are using an old version of the provider that does not "
f"support storage options. Please upgrade the provider if possible."
)
return fs(conn_id) # type: ignore[call-arg]
return fs(conn_id, options) # type: ignore[call-arg]

Expand Down
21 changes: 21 additions & 0 deletions tests/io/test_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from fsspec.implementations.local import LocalFileSystem
from fsspec.utils import stringify_path

from airflow.io import _register_filesystems, get_fs
from airflow.io.path import ObjectStoragePath
from airflow.io.store import _STORE_CACHE, ObjectStore, attach
from airflow.utils.module_loading import qualname
Expand All @@ -51,6 +52,10 @@ def _strip_protocol(cls, path) -> str:
return path[i + 3 :] if i > 0 else path


def get_fs_no_storage_options(_: str):
return LocalFileSystem()


class TestFs:
def setup_class(self):
self._store_cache = _STORE_CACHE.copy()
Expand Down Expand Up @@ -285,3 +290,19 @@ def test_serde_store(self):
assert s["conn_id"] is None
assert s["filesystem"] == qualname(LocalFileSystem)
assert store == d

def test_backwards_compat(self):
_register_filesystems.cache_clear()
from airflow.io import _BUILTIN_SCHEME_TO_FS as SCHEMES

try:
SCHEMES["file"] = get_fs_no_storage_options # type: ignore[call-arg]

assert get_fs("file")

with pytest.raises(AttributeError):
get_fs("file", storage_options={"foo": "bar"})

finally:
# Reset the cache to avoid side effects
_register_filesystems.cache_clear()

0 comments on commit 6c94ddf

Please sign in to comment.