Skip to content

Commit

Permalink
Use kubernetes queue in kubernetes hybrid executors (apache#23048)
Browse files Browse the repository at this point in the history
When using "hybrid" executors (`CeleryKubernetesExecutor` or `LocalKubernetesExecutor`),
then the `clear_not_launched_queued_tasks` mechnism in the `KubernetesExecutor` can
reset the queued tasks, that were given to the other executor. 

`KuberneterExecutor` should limit itself to the configured queue when working in the
"hybrid" mode.
  • Loading branch information
tanelk authored May 5, 2022
1 parent 627b569 commit ae19eab
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 4 deletions.
1 change: 1 addition & 0 deletions airflow/executors/celery_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def __init__(self, celery_executor: CeleryExecutor, kubernetes_executor: Kuberne
self._job_id: Optional[int] = None
self.celery_executor = celery_executor
self.kubernetes_executor = kubernetes_executor
self.kubernetes_executor.kubernetes_queue = self.KUBERNETES_QUEUE

@property
def queued_tasks(self) -> Dict[TaskInstanceKey, QueuedTaskInstanceType]:
Expand Down
9 changes: 6 additions & 3 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ def __init__(self):
self.scheduler_job_id: Optional[str] = None
self.event_scheduler: Optional[EventScheduler] = None
self.last_handled: Dict[TaskInstanceKey, float] = {}
self.kubernetes_queue: Optional[str] = None
super().__init__(parallelism=self.kube_config.parallelism)

@provide_session
Expand All @@ -456,9 +457,11 @@ def clear_not_launched_queued_tasks(self, session=None) -> None:
self.log.debug("Clearing tasks that have not been launched")
if not self.kube_client:
raise AirflowException(NOT_STARTED_MESSAGE)
queued_tis: List[TaskInstance] = (
session.query(TaskInstance).filter(TaskInstance.state == State.QUEUED).all()
)

query = session.query(TaskInstance).filter(TaskInstance.state == State.QUEUED)
if self.kubernetes_queue:
query = query.filter(TaskInstance.queue == self.kubernetes_queue)
queued_tis: List[TaskInstance] = query.all()
self.log.info('Found %s queued task instances', len(queued_tis))

# Go through the "last seen" dictionary and clean out old entries
Expand Down
1 change: 1 addition & 0 deletions airflow/executors/local_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def __init__(self, local_executor: LocalExecutor, kubernetes_executor: Kubernete
self._job_id: Optional[str] = None
self.local_executor = local_executor
self.kubernetes_executor = kubernetes_executor
self.kubernetes_executor.kubernetes_queue = self.KUBERNETES_QUEUE

@property
def queued_tasks(self) -> Dict[TaskInstanceKey, QueuedTaskInstanceType]:
Expand Down
10 changes: 10 additions & 0 deletions tests/executors/test_celery_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from parameterized import parameterized

from airflow.configuration import conf
from airflow.executors.celery_executor import CeleryExecutor
from airflow.executors.celery_kubernetes_executor import CeleryKubernetesExecutor
from airflow.executors.kubernetes_executor import KubernetesExecutor
Expand Down Expand Up @@ -213,3 +214,12 @@ def test_job_id_setter(self):
job_id = 'this-job-id'
cel_k8s_exec.job_id = job_id
assert cel_exec.job_id == k8s_exec.job_id == cel_k8s_exec.job_id == job_id

def test_kubernetes_executor_knows_its_queue(self):
celery_executor_mock = mock.MagicMock()
k8s_executor_mock = mock.MagicMock()
CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)

assert k8s_executor_mock.kubernetes_queue == conf.get(
'celery_kubernetes_executor', 'kubernetes_queue'
)
38 changes: 37 additions & 1 deletion tests/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,17 @@ def test_clear_not_launched_queued_tasks_not_launched(self, dag_maker, create_du
),
)

def test_clear_not_launched_queued_tasks_launched(self, dag_maker, create_dummy_dag, session):
@pytest.mark.parametrize(
'task_queue, kubernetes_queue',
[
pytest.param('default', None),
pytest.param('kubernetes', None),
pytest.param('kubernetes', 'kubernetes'),
],
)
def test_clear_not_launched_queued_tasks_launched(
self, dag_maker, create_dummy_dag, session, task_queue, kubernetes_queue
):
"""Leave the state alone if a pod already exists"""
mock_kube_client = mock.MagicMock()
mock_kube_client.list_namespaced_pod.return_value = k8s.V1PodList(items=["something"])
Expand All @@ -732,9 +742,11 @@ def test_clear_not_launched_queued_tasks_launched(self, dag_maker, create_dummy_
ti = dag_run.task_instances[0]
ti.state = State.QUEUED
ti.queued_by_job_id = 1
ti.queue = task_queue
session.flush()

executor = self.kubernetes_executor
executor.kubernetes_queue = kubernetes_queue
executor.kube_client = mock_kube_client
executor.clear_not_launched_queued_tasks(session=session)

Expand Down Expand Up @@ -800,6 +812,30 @@ def list_namespaced_pod(*args, **kwargs):
any_order=True,
)

def test_clear_not_launched_queued_tasks_not_launched_other_queue(
self, dag_maker, create_dummy_dag, session
):
"""Queued TI has no pod, but it is not queued for the k8s executor"""
mock_kube_client = mock.MagicMock()
mock_kube_client.list_namespaced_pod.return_value = k8s.V1PodList(items=[])

create_dummy_dag(dag_id="test_clear", task_id="task1", with_dagrun_type=None)
dag_run = dag_maker.create_dagrun()

ti = dag_run.task_instances[0]
ti.state = State.QUEUED
ti.queued_by_job_id = 1
session.flush()

executor = self.kubernetes_executor
executor.kubernetes_queue = 'kubernetes'
executor.kube_client = mock_kube_client
executor.clear_not_launched_queued_tasks(session=session)

ti.refresh_from_db()
assert ti.state == State.QUEUED
assert mock_kube_client.list_namespaced_pod.call_count == 0


class TestKubernetesJobWatcher(unittest.TestCase):
def setUp(self):
Expand Down
7 changes: 7 additions & 0 deletions tests/executors/test_local_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,10 @@ def test_slots_available(self):

# Should be equal to Local Executor default parallelism.
assert local_kubernetes_executor.slots_available == conf.getint('core', 'PARALLELISM')

def test_kubernetes_executor_knows_its_queue(self):
local_executor_mock = mock.MagicMock()
k8s_executor_mock = mock.MagicMock()
LocalKubernetesExecutor(local_executor_mock, k8s_executor_mock)

assert k8s_executor_mock.kubernetes_queue == conf.get('local_kubernetes_executor', 'kubernetes_queue')

0 comments on commit ae19eab

Please sign in to comment.