Skip to content

Commit

Permalink
Allow xcom sidecar container image to be configurable in KPO (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
bdsoha authored Nov 9, 2022
1 parent 409a4de commit aefadb8
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 3 deletions.
7 changes: 7 additions & 0 deletions airflow/providers/cncf/kubernetes/hooks/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ def get_connection_form_widgets() -> dict[str, Any]:
"cluster_context": StringField(lazy_gettext("Cluster context"), widget=BS3TextFieldWidget()),
"disable_verify_ssl": BooleanField(lazy_gettext("Disable SSL")),
"disable_tcp_keepalive": BooleanField(lazy_gettext("Disable TCP keepalive")),
"xcom_sidecar_container_image": StringField(
lazy_gettext("XCom sidecar image"), widget=BS3TextFieldWidget()
),
}

@staticmethod
Expand Down Expand Up @@ -341,6 +344,10 @@ def _get_namespace(self) -> str | None:
return self._get_field("namespace")
return None

def get_xcom_sidecar_container_image(self):
"""Returns the xcom sidecar image that defined in the connection"""
return self._get_field("xcom_sidecar_container_image")

def get_pod_log_stream(
self,
pod_name: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,9 @@ def build_pod_request_obj(self, context: Context | None = None) -> k8s.V1Pod:
pod = secret.attach_to_pod(pod)
if self.do_xcom_push:
self.log.debug("Adding xcom sidecar to task %s", self.task_id)
pod = xcom_sidecar.add_xcom_sidecar(pod)
pod = xcom_sidecar.add_xcom_sidecar(
pod, sidecar_container_image=self.hook.get_xcom_sidecar_container_image()
)

labels = self._get_ti_pod_labels(context)
self.log.info("Building pod %s with labels: %s", pod.metadata.name, labels)
Expand Down
6 changes: 4 additions & 2 deletions airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@ class PodDefaults:
)


def add_xcom_sidecar(pod: k8s.V1Pod) -> k8s.V1Pod:
def add_xcom_sidecar(pod: k8s.V1Pod, *, sidecar_container_image=None) -> k8s.V1Pod:
"""Adds sidecar"""
pod_cp = copy.deepcopy(pod)
pod_cp.spec.volumes = pod.spec.volumes or []
pod_cp.spec.volumes.insert(0, PodDefaults.VOLUME)
pod_cp.spec.containers[0].volume_mounts = pod_cp.spec.containers[0].volume_mounts or []
pod_cp.spec.containers[0].volume_mounts.insert(0, PodDefaults.VOLUME_MOUNT)
pod_cp.spec.containers.append(PodDefaults.SIDECAR_CONTAINER)
sidecar = copy.deepcopy(PodDefaults.SIDECAR_CONTAINER)
sidecar.image = sidecar_container_image or PodDefaults.SIDECAR_CONTAINER.image
pod_cp.spec.containers.append(sidecar)

return pod_cp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ Disable TCP keepalive
TCP keepalive is a feature (enabled by default) that tries to keep long-running connections
alive. Set this parameter to True to disable this feature.

Xcom sidecar image
Define the ``image`` used by the ``PodDefaults.SIDECAR_CONTAINER`` (defaults to ``"alpine"``) to allow private
repositories, as well as custom image overrides.

Example storing connection in env var using URI format:

.. code-block:: bash
Expand Down
1 change: 1 addition & 0 deletions kubernetes_tests/test_kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,7 @@ def test_pod_template_file(
# todo: This isn't really a system test
await_xcom_sidecar_container_start_mock.return_value = None
hook_mock.return_value.is_in_cluster = False
hook_mock.return_value.get_xcom_sidecar_container_image.return_value = None
extract_xcom_mock.return_value = "{}"
path = sys.path[0] + "/tests/kubernetes/pod.yaml"
k = KubernetesPodOperator(
Expand Down
13 changes: 13 additions & 0 deletions tests/providers/cncf/kubernetes/hooks/test_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ def setup_class(cls) -> None:
("disable_verify_ssl_empty", {"disable_verify_ssl": ""}),
("disable_tcp_keepalive", {"disable_tcp_keepalive": True}),
("disable_tcp_keepalive_empty", {"disable_tcp_keepalive": ""}),
("sidecar_container_image", {"xcom_sidecar_container_image": "private.repo.com/alpine:3.16"}),
("sidecar_container_image_empty", {"xcom_sidecar_container_image": ""}),
]:
db.merge_conn(Connection(conn_type="kubernetes", conn_id=conn_id, extra=json.dumps(extra)))

Expand Down Expand Up @@ -316,6 +318,17 @@ def test_get_namespace(self, conn_id, expected):
"and rename _get_namespace to get_namespace."
)

@pytest.mark.parametrize(
"conn_id, expected",
(
pytest.param("sidecar_container_image", "private.repo.com/alpine:3.16", id="sidecar-with-image"),
pytest.param("sidecar_container_image_empty", None, id="sidecar-without-image"),
),
)
def test_get_xcom_sidecar_container_image(self, conn_id, expected):
hook = KubernetesHook(conn_id=conn_id)
assert hook.get_xcom_sidecar_container_image() == expected

@patch("kubernetes.config.kube_config.KubeConfigLoader")
@patch("kubernetes.config.kube_config.KubeConfigMerger")
def test_client_types(self, mock_kube_config_merger, mock_kube_config_loader):
Expand Down
22 changes: 22 additions & 0 deletions tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,28 @@ def test_omitted_namespace_no_conn_not_in_k8s(self, mock_find, mock_path):
)
mock_find.assert_called_once_with("default", context=context)

@patch(HOOK_CLASS)
def test_xcom_sidecar_container_image_default(self, hook_mock):
hook_mock.return_value.get_xcom_sidecar_container_image.return_value = None
k = KubernetesPodOperator(
name="test",
task_id="task",
do_xcom_push=True,
)
pod = k.build_pod_request_obj(create_context(k))
assert pod.spec.containers[1].image == "alpine"

@patch(HOOK_CLASS)
def test_xcom_sidecar_container_image_custom(self, hook_mock):
hook_mock.return_value.get_xcom_sidecar_container_image.return_value = "private.repo/alpine:3.13"
k = KubernetesPodOperator(
name="test",
task_id="task",
do_xcom_push=True,
)
pod = k.build_pod_request_obj(create_context(k))
assert pod.spec.containers[1].image == "private.repo/alpine:3.13"

def test_image_pull_policy_correctly_set(self):
k = KubernetesPodOperator(
task_id="task",
Expand Down

0 comments on commit aefadb8

Please sign in to comment.