Skip to content

Commit

Permalink
Add support for a subdir in GitDagBundle (apache#44582)
Browse files Browse the repository at this point in the history
  • Loading branch information
jedcunningham authored Dec 2, 2024
1 parent f1f6499 commit aac8098
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 6 deletions.
18 changes: 12 additions & 6 deletions airflow/dag_processing/bundles/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,21 @@ class GitDagBundle(BaseDagBundle):
:param repo_url: URL of the git repository
:param head: Branch or tag for this DAG bundle
:param subdir: Subdirectory within the repository where the DAGs are stored (Optional)
"""

supports_versioning = True

def __init__(self, *, repo_url: str, head: str, **kwargs) -> None:
def __init__(self, *, repo_url: str, head: str, subdir: str | None = None, **kwargs) -> None:
super().__init__(**kwargs)
self.repo_url = repo_url
self.head = head
self.subdir = subdir

self.bare_repo_path = self._dag_bundle_root_storage_path / "git" / self.name
self.repo_path = (
self._dag_bundle_root_storage_path / "git" / (self.name + f"+{self.version or self.head}")
)
self._clone_bare_repo_if_required()
self._ensure_version_in_bare_repo()
self._clone_repo_if_required()
Expand All @@ -64,12 +69,12 @@ def __init__(self, *, repo_url: str, head: str, **kwargs) -> None:
self.refresh()

def _clone_repo_if_required(self) -> None:
if not os.path.exists(self.path):
if not os.path.exists(self.repo_path):
Repo.clone_from(
url=self.bare_repo_path,
to_path=self.path,
to_path=self.repo_path,
)
self.repo = Repo(self.path)
self.repo = Repo(self.repo_path)

def _clone_bare_repo_if_required(self) -> None:
if not os.path.exists(self.bare_repo_path):
Expand All @@ -96,8 +101,9 @@ def get_current_version(self) -> str:

@property
def path(self) -> Path:
location = self.version or self.head
return self._dag_bundle_root_storage_path / "git" / f"{self.name}+{location}"
if self.subdir:
return self.repo_path / self.subdir
return self.repo_path

@staticmethod
def _has_version(repo: Repo, version: str) -> bool:
Expand Down
19 changes: 19 additions & 0 deletions tests/dag_processing/test_dag_bundles.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,22 @@ def test_version_not_found(self, git_repo):

with pytest.raises(AirflowException, match="Version not_found not found in the repository"):
GitDagBundle(name="test", version="not_found", repo_url=repo_path, head="master")

def test_subdir(self, git_repo):
repo_path, repo = git_repo

subdir = "somesubdir"
subdir_path = repo_path / subdir
subdir_path.mkdir()

file_path = subdir_path / "some_new_file.py"
with open(file_path, "w") as f:
f.write("hello world")
repo.index.add([file_path])
repo.index.commit("Initial commit")

bundle = GitDagBundle(name="test", repo_url=repo_path, head="master", subdir=subdir)

files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()}
assert str(bundle.path).endswith(subdir)
assert {"some_new_file.py"} == files_in_repo

0 comments on commit aac8098

Please sign in to comment.