Skip to content

Commit

Permalink
Compare k8s executor against alias, not full ExecutorName repr (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
o-nikolas authored Dec 17, 2024
1 parent bccd720 commit 89fdc03
Showing 1 changed file with 7 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,12 @@ def clear_not_launched_queued_tasks(self, session: Session = NEW_SESSION) -> Non
from airflow.models.taskinstance import TaskInstance

hybrid_executor_enabled = hasattr(TaskInstance, "executor")
default_executor = None
default_executor_alias = None
if hybrid_executor_enabled:
from airflow.executors.executor_loader import ExecutorLoader

default_executor = str(ExecutorLoader.get_default_executor_name())
default_executor = default_executor.strip(":")
default_executor_name = ExecutorLoader.get_default_executor_name()
default_executor_alias = default_executor_name.alias

with Stats.timer("kubernetes_executor.clear_not_launched_queued_tasks.duration"):
self.log.debug("Clearing tasks that have not been launched")
Expand All @@ -254,7 +254,10 @@ def clear_not_launched_queued_tasks(self, session: Session = NEW_SESSION) -> Non
)
if self.kubernetes_queue:
query = query.where(TaskInstance.queue == self.kubernetes_queue)
elif hybrid_executor_enabled and default_executor == KUBERNETES_EXECUTOR:
# KUBERNETES_EXECUTOR is the string name/alias of the "core" executor represented by this
# module. The ExecutorName for "core" executors always contains an alias and cannot be modified
# to be different from the constant (in this case KUBERNETES_EXECUTOR).
elif hybrid_executor_enabled and default_executor_alias == KUBERNETES_EXECUTOR:
query = query.where(
or_(
TaskInstance.executor == KUBERNETES_EXECUTOR,
Expand Down

0 comments on commit 89fdc03

Please sign in to comment.