Skip to content

Commit

Permalink
Kubernetes worker pod doesn't use docker container entrypoint (apache…
Browse files Browse the repository at this point in the history
…#12766)

* Kubernetes worker pod doesn't use docker container entrypoint

Fixes issue on openshift caused by KubernetesExecutor pods not running
via the entrypoint script

* fix

* Update UPGRADING_TO_2.0.md

Co-authored-by: Ash Berlin-Taylor <[email protected]>

* fix UPDGRADING

* @ashb comments

Co-authored-by: Ash Berlin-Taylor <[email protected]>
  • Loading branch information
dimberman and ashb authored Dec 7, 2020
1 parent c2411e3 commit 190066c
Show file tree
Hide file tree
Showing 12 changed files with 38 additions and 19 deletions.
22 changes: 22 additions & 0 deletions UPGRADING_TO_2.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ assists users migrating to a new version.
- [Step 6: Upgrade KubernetesExecutor settings](#step-6-upgrade-kubernetesexecutor-settings)
- [The KubernetesExecutor Will No Longer Read from the airflow.cfg for Base Pod Configurations](#the-kubernetesexecutor-will-no-longer-read-from-the-airflowcfg-for-base-pod-configurations)
- [The `executor_config` Will Now Expect a `kubernetes.client.models.V1Pod` Class When Launching Tasks](#the-executor_config-will-now-expect-a-kubernetesclientmodelsv1pod-class-when-launching-tasks)
- [The "airflow run" Command Will Row run through the `args` argument Instead of the `command` argument](#the-airflow-run-command-will-row-run-through-the-args-argument-instead-of-the-command-argument)
- [Appendix](#appendix)
- [Changed Parameters for the KubernetesPodOperator](#changed-parameters-for-the-kubernetespodoperator)
- [Migration Guide from Experimental API to Stable API v1](#migration-guide-from-experimental-api-to-stable-api-v1)
Expand Down Expand Up @@ -493,6 +494,27 @@ second_task = PythonOperator(
For Airflow 2.0, the traditional `executor_config` will continue operation with a deprecation warning,
but will be removed in a future version.

### The "airflow run" Command Will Row run through the `args` argument Instead of the `command` argument

To ensure that the KubernetesExecutor can work with entrypoint scripts (which are critical to running on OpenShift),
KubernetesExecutor pods will run the `airflow run` command via the `arg` argument instead of the `command`
argument. This information is really only relevent if you modify the `command` or `args` arguments in your
`pod_mutation_hook`. For example, if you previously had a `pod_mutation_hook` that looked like

```python
def pod_mutation_hook(pod):
if "my_task_name" in pod.spec.containers[0].command:
...
```

You would need to change it to

```python
def pod_mutation_hook(pod):
if "my_task_name" in pod.spec.containers[0].args:
...
```

## Appendix

### Changed Parameters for the KubernetesPodOperator
Expand Down
2 changes: 1 addition & 1 deletion airflow/cli/commands/kubernetes_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def generate_pod_yaml(args):
try_number=ti.try_number,
kube_image=kube_config.kube_image,
date=ti.execution_date,
command=ti.command_as_list(),
args=ti.command_as_list(),
pod_override_object=PodGenerator.from_obj(ti.executor_config),
scheduler_job_id="worker-config",
namespace=kube_config.executor_namespace,
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ def run_next(self, next_job: KubernetesJobType) -> None:
kube_image=self.kube_config.kube_image,
try_number=try_number,
date=execution_date,
command=command,
args=command,
pod_override_object=kube_executor_config,
base_worker_pod=base_worker_pod,
)
Expand Down
4 changes: 2 additions & 2 deletions airflow/kubernetes/pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ def construct_pod( # pylint: disable=too-many-arguments
try_number: int,
kube_image: str,
date: datetime.datetime,
command: List[str],
args: List[str],
pod_override_object: Optional[k8s.V1Pod],
base_worker_pod: k8s.V1Pod,
namespace: str,
Expand Down Expand Up @@ -394,7 +394,7 @@ def construct_pod( # pylint: disable=too-many-arguments
containers=[
k8s.V1Container(
name="base",
command=command,
args=args,
image=image,
env=[k8s.V1EnvVar(name="AIRFLOW_IS_K8S_EXECUTOR_POD", value="True")],
)
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1693,7 +1693,7 @@ def render_k8s_pod_yaml(self) -> Optional[dict]:
try_number=self.try_number,
kube_image=kube_config.kube_image,
date=self.execution_date,
command=self.command_as_list(),
args=self.command_as_list(),
pod_override_object=PodGenerator.from_obj(self.executor_config),
scheduler_job_id="worker-config",
namespace=kube_config.executor_namespace,
Expand Down
2 changes: 1 addition & 1 deletion chart/templates/scheduler/scheduler-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ spec:
- name: scheduler
image: {{ template "airflow_image" . }}
imagePullPolicy: {{ .Values.images.airflow.pullPolicy }}
args: ["scheduler"]
args: ["airflow", "scheduler"]
envFrom:
{{- include "custom_airflow_environment_from" . | default "\n []" | indent 10 }}
env:
Expand Down
2 changes: 1 addition & 1 deletion chart/templates/webserver/webserver-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ spec:
- name: webserver
image: {{ template "airflow_image" . }}
imagePullPolicy: {{ .Values.images.airflow.pullPolicy }}
args: ["webserver"]
args: ["airflow", "webserver"]
resources:
{{ toYaml .Values.webserver.resources | indent 12 }}
volumeMounts:
Expand Down
2 changes: 0 additions & 2 deletions chart/tests/test_basic_helm_chart.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,5 +115,3 @@ def get_k8s_objs_with_image(obj: Union[List[Any], Dict[str, Any]]) -> List[Dict[
if image.startswith(image_repo):
# Make sure that a command is not specified
self.assertNotIn("command", obj)
# Make sure that the first arg is never airflow
self.assertNotEqual(obj["args"][0], "airflow") # pylint: disable=invalid-sequence-index
7 changes: 3 additions & 4 deletions scripts/in_container/prod/entrypoint_prod.sh
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ else
verify_db_connection "${AIRFLOW__CORE__SQL_ALCHEMY_CONN}"
fi


# The Bash and python commands still should verify the basic connections so they are run after the
# DB check but before the broker check
if [[ ${AIRFLOW_COMMAND} == "bash" ]]; then
Expand All @@ -125,7 +126,7 @@ elif [[ ${AIRFLOW_COMMAND} == "airflow" ]]; then
fi

# Note: the broker backend configuration concerns only a subset of Airflow components
if [[ ${AIRFLOW_COMMAND} =~ ^(scheduler|worker|flower)$ ]]; then
if [[ ${AIRFLOW_COMMAND} =~ ^(scheduler|celery|worker|flower)$ ]]; then
if [[ -n "${AIRFLOW__CELERY__BROKER_URL_CMD=}" ]]; then
verify_db_connection "$(eval "$AIRFLOW__CELERY__BROKER_URL_CMD")"
else
Expand All @@ -136,6 +137,4 @@ if [[ ${AIRFLOW_COMMAND} =~ ^(scheduler|worker|flower)$ ]]; then
fi
fi


# Run the command
exec airflow "${@}"
exec "airflow" "${@}"
8 changes: 4 additions & 4 deletions tests/kubernetes/test_pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ def test_construct_pod(self, mock_uuid):
kube_image='airflow_image',
try_number=self.try_number,
date=self.execution_date,
command=['command'],
args=['command'],
pod_override_object=executor_config,
base_worker_pod=worker_config,
namespace='test_namespace',
Expand All @@ -437,7 +437,7 @@ def test_construct_pod(self, mock_uuid):
expected.metadata.annotations = self.annotations
expected.metadata.name = 'pod_id-' + self.static_uuid.hex
expected.metadata.namespace = 'test_namespace'
expected.spec.containers[0].command = ['command']
expected.spec.containers[0].args = ['command']
expected.spec.containers[0].image = 'airflow_image'
expected.spec.containers[0].resources = {'limits': {'cpu': '1m', 'memory': '1G'}}
expected.spec.containers[0].env.append(
Expand Down Expand Up @@ -465,15 +465,15 @@ def test_construct_pod_empty_executor_config(self, mock_uuid):
kube_image='test-image',
try_number=3,
date=self.execution_date,
command=['command'],
args=['command'],
pod_override_object=executor_config,
base_worker_pod=worker_config,
namespace='namespace',
scheduler_job_id='uuid',
)
sanitized_result = self.k8s_client.sanitize_for_serialization(result)
worker_config.spec.containers[0].image = "test-image"
worker_config.spec.containers[0].command = ["command"]
worker_config.spec.containers[0].args = ["command"]
worker_config.metadata.annotations = self.annotations
worker_config.metadata.labels = self.labels
worker_config.metadata.labels['app'] = 'myapp'
Expand Down
2 changes: 1 addition & 1 deletion tests/models/test_renderedtifields.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def test_get_k8s_pod_yaml(self, mock_pod_mutation_hook):
'spec': {
'containers': [
{
'command': [
'args': [
'airflow',
'tasks',
'run',
Expand Down
2 changes: 1 addition & 1 deletion tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1844,7 +1844,7 @@ def test_get_rendered_k8s_spec(self):
'spec': {
'containers': [
{
'command': [
'args': [
'airflow',
'tasks',
'run',
Expand Down

0 comments on commit 190066c

Please sign in to comment.