Skip to content

Commit

Permalink
Introduce BaseExecutor.validate_command to avoid duplication (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
turbaszek authored Jul 28, 2020
1 parent 8de5ea3 commit 21371b6
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 11 deletions.
6 changes: 6 additions & 0 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,3 +264,9 @@ def terminate(self):
This method is called when the daemon receives a SIGTERM
"""
raise NotImplementedError()

@staticmethod
def validate_command(command: List[str]) -> None:
"""Check if the command to execute is airflow comnand"""
if command[0:3] != ["airflow", "tasks", "run"]:
raise ValueError('The command must start with ["airflow", "tasks", "run"].')
4 changes: 1 addition & 3 deletions airflow/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,7 @@
@app.task
def execute_command(command_to_exec: CommandType) -> None:
"""Executes command."""
if command_to_exec[0:3] != ["airflow", "tasks", "run"]:
raise ValueError('The command must start with ["airflow", "tasks", "run"].')

BaseExecutor.validate_command(command_to_exec)
log.info("Executing command in Celery: %s", command_to_exec)
env = os.environ.copy()
try:
Expand Down
3 changes: 1 addition & 2 deletions airflow/executors/dask_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ def execute_async(self,
queue: Optional[str] = None,
executor_config: Optional[Any] = None) -> None:

if command[0:3] != ["airflow", "tasks", "run"]:
raise ValueError('The command must start with ["airflow", "tasks", "run"].')
self.validate_command(command)

def airflow_run():
return subprocess.check_call(command, close_fds=True)
Expand Down
3 changes: 1 addition & 2 deletions airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,7 @@ def execute_async(self, key: TaskInstanceKey,
if not self.impl:
raise AirflowException(NOT_STARTED_MESSAGE)

if command[0:3] != ["airflow", "tasks", "run"]:
raise ValueError('The command must start with ["airflow", "tasks", "run"].')
self.validate_command(command)

self.impl.execute_async(key=key, command=command, queue=queue, executor_config=executor_config)

Expand Down
5 changes: 1 addition & 4 deletions airflow/executors/sequential_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@ def execute_async(self,
command: CommandType,
queue: Optional[str] = None,
executor_config: Optional[Any] = None) -> None:

if command[0:3] != ["airflow", "tasks", "run"]:
raise ValueError('The command must start with ["airflow", "tasks", "run"].')

self.validate_command(command)
self.commands_to_run.append((key, command))

def sync(self) -> None:
Expand Down

0 comments on commit 21371b6

Please sign in to comment.