Skip to content

Commit

Permalink
[AIRFLOW-3281] Fix Kubernetes operator with git-sync (apache#3770)
Browse files Browse the repository at this point in the history
* Refactor Kubernetes operator with git-sync

Currently the implementation of git-sync is broken because:
- git-sync clones the repository in /tmp and not in airflow-dags volume
- git-sync add a link to point to the revision required but it is not
taken into account in AIRFLOW__CORE__DAGS_FOLDER

Dags/logs hostPath volume has been added (needed if airflow run in
kubernetes in local environment)

To avoid false positive in CI `load_examples` is set to `False`
otherwise DAGs from `airflow/example_dags` are always loaded. In this
way is possible to test `import` in DAGs

Remove `worker_dags_folder` config:
`worker_dags_folder` is redundant and can lead to confusion.
In WorkerConfiguration `self.kube_config.dags_folder` defines the path of
the dags and can be set in the worker using airflow_configmap
Refactor worker_configuration.py
Use a docker container to run setup.py
Compile web assets
Fix codecov application path

* Fix kube_config.dags_in_image
  • Loading branch information
odracci authored and Tao Feng committed Dec 31, 2018
1 parent b50ffa0 commit 9de9721
Show file tree
Hide file tree
Showing 26 changed files with 547 additions and 128 deletions.
1 change: 1 addition & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ omit =
scripts/*
dev/*
airflow/migrations/*
airflow/www_rbac/node_modules/**
1 change: 1 addition & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ node_modules/*
coverage/*
git_version
flake8_diff.sh
coverage.xml

rat-results.txt
apache-airflow-.*\+incubating-source.tar.gz.*
Expand Down
11 changes: 9 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,18 @@ install:
- chmod +x docker-compose
- sudo mv docker-compose /usr/local/bin
- pip install --upgrade pip
- if [ ! -z "$KUBERNETES_VERSION" ]; then ./scripts/ci/kubernetes/setup_kubernetes.sh; fi
script:
- if [ -z "$KUBERNETES_VERSION" ]; then docker-compose --log-level ERROR -f scripts/ci/docker-compose.yml run airflow-testing /app/scripts/ci/run-ci.sh; fi
- if [ ! -z "$KUBERNETES_VERSION" ]; then
./scripts/ci/kubernetes/kube/deploy.sh &&
./scripts/ci/kubernetes/minikube/stop_minikube.sh &&
./scripts/ci/kubernetes/setup_kubernetes.sh &&
./scripts/ci/kubernetes/kube/deploy.sh -d persistent_mode &&
MINIKUBE_IP=$(minikube ip) docker-compose --log-level ERROR -f scripts/ci/docker-compose.yml -f scripts/ci/docker-compose-kubernetes.yml run airflow-testing /app/scripts/ci/run-ci.sh;
fi
- if [ ! -z "$KUBERNETES_VERSION" ]; then
./scripts/ci/kubernetes/minikube/stop_minikube.sh &&
./scripts/ci/kubernetes/setup_kubernetes.sh &&
./scripts/ci/kubernetes/kube/deploy.sh -d git_mode &&
MINIKUBE_IP=$(minikube ip) docker-compose --log-level ERROR -f scripts/ci/docker-compose.yml -f scripts/ci/docker-compose-kubernetes.yml run airflow-testing /app/scripts/ci/run-ci.sh;
fi
before_cache:
Expand Down
18 changes: 15 additions & 3 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ hostname_callable = socket:getfqdn
default_timezone = utc

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor
executor = SequentialExecutor

# The SqlAlchemy connection string to the metadata database.
Expand Down Expand Up @@ -594,7 +594,6 @@ elasticsearch_end_of_log_mark = end_of_log
worker_container_repository =
worker_container_tag =
worker_container_image_pull_policy = IfNotPresent
worker_dags_folder =

# If True (default), worker pods will be deleted upon termination
delete_worker_pods = True
Expand All @@ -612,7 +611,7 @@ dags_in_image = False
# For either git sync or volume mounted DAGs, the worker will look in this subpath for DAGs
dags_volume_subpath =

# For DAGs mounted via a volume claim (mutually exclusive with volume claim)
# For DAGs mounted via a volume claim (mutually exclusive with git-sync and host path)
dags_volume_claim =

# For volume mounted logs, the worker will look in this subpath for logs
Expand All @@ -621,12 +620,25 @@ logs_volume_subpath =
# A shared volume claim for the logs
logs_volume_claim =

# For DAGs mounted via a hostPath volume (mutually exclusive with volume claim and git-sync)
# Useful in local environment, discouraged in production
dags_volume_host =

# A hostPath volume for the logs
# Useful in local environment, discouraged in production
logs_volume_host =

# Git credentials and repository for DAGs mounted via Git (mutually exclusive with volume claim)
git_repo =
git_branch =
git_user =
git_password =
git_subpath =
git_sync_root = /git
git_sync_dest = repo
# Mount point of the volume if git-sync is being used.
# i.e. {AIRFLOW_HOME}/dags
git_dags_folder_mount_point =

# For cloning DAGs from git repositories into volumes: https://github.com/kubernetes/git-sync
git_sync_container_repository = gcr.io/google-containers/git-sync-amd64
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from __future__ import print_function
import airflow
from airflow.operators.python_operator import PythonOperator
from libs.helper import print_stuff
from airflow.models import DAG
import os

Expand All @@ -33,10 +34,6 @@
)


def print_stuff():
print("annotated!")


def test_volume_mount():
with open('/foo/volume_mount_test.txt', 'w') as foo:
foo.write('Hello')
Expand All @@ -62,14 +59,14 @@ def test_volume_mount():
"KubernetesExecutor": {
"volumes": [
{
"name": "test-volume",
"name": "example-kubernetes-test-volume",
"hostPath": {"path": "/tmp/"},
},
],
"volume_mounts": [
{
"mountPath": "/foo/",
"name": "test-volume",
"name": "example-kubernetes-test-volume",
},
]
}
Expand Down
18 changes: 18 additions & 0 deletions airflow/contrib/example_dags/libs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
22 changes: 22 additions & 0 deletions airflow/contrib/example_dags/libs/helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.


def print_stuff():
print("annotated!")
29 changes: 23 additions & 6 deletions airflow/contrib/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,6 @@ def __init__(self):
self.kubernetes_section, 'worker_container_repository')
self.worker_container_tag = configuration.get(
self.kubernetes_section, 'worker_container_tag')
self.worker_dags_folder = configuration.get(
self.kubernetes_section, 'worker_dags_folder')
self.kube_image = '{}:{}'.format(
self.worker_container_repository, self.worker_container_tag)
self.kube_image_pull_policy = configuration.get(
Expand All @@ -148,6 +146,14 @@ def __init__(self):
self.git_branch = conf.get(self.kubernetes_section, 'git_branch')
# Optionally, the directory in the git repository containing the dags
self.git_subpath = conf.get(self.kubernetes_section, 'git_subpath')
# Optionally, the root directory for git operations
self.git_sync_root = conf.get(self.kubernetes_section, 'git_sync_root')
# Optionally, the name at which to publish the checked-out files under --root
self.git_sync_dest = conf.get(self.kubernetes_section, 'git_sync_dest')
# Optionally, if git_dags_folder_mount_point is set the worker will use
# {git_dags_folder_mount_point}/{git_sync_dest}/{git_subpath} as dags_folder
self.git_dags_folder_mount_point = conf.get(self.kubernetes_section,
'git_dags_folder_mount_point')

# Optionally a user may supply a `git_user` and `git_password` for private
# repositories
Expand All @@ -171,6 +177,12 @@ def __init__(self):
self.logs_volume_subpath = conf.get(
self.kubernetes_section, 'logs_volume_subpath')

# Optionally, hostPath volume containing DAGs
self.dags_volume_host = conf.get(self.kubernetes_section, 'dags_volume_host')

# Optionally, write logs to a hostPath Volume
self.logs_volume_host = conf.get(self.kubernetes_section, 'logs_volume_host')

# This prop may optionally be set for PV Claims and is used to write logs
self.base_log_folder = configuration.get(self.core_section, 'base_log_folder')

Expand Down Expand Up @@ -208,12 +220,17 @@ def __init__(self):
self._validate()

def _validate(self):
if not self.dags_volume_claim and not self.dags_in_image \
and (not self.git_repo or not self.git_branch):
# TODO: use XOR for dags_volume_claim and git_dags_folder_mount_point
if not self.dags_volume_claim \
and not self.dags_volume_host \
and not self.dags_in_image \
and (not self.git_repo or not self.git_branch or not self.git_dags_folder_mount_point):
raise AirflowConfigException(
'In kubernetes mode the following must be set in the `kubernetes` '
'config section: `dags_volume_claim` or `git_repo and git_branch` '
'or `dags_in_image`')
'config section: `dags_volume_claim` '
'or `dags_volume_host` '
'or `dags_in_image` '
'or `git_repo and git_branch and git_dags_folder_mount_point`')


class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object):
Expand Down
Loading

0 comments on commit 9de9721

Please sign in to comment.