Skip to content

Commit

Permalink
Fix starting dag processor when running as a daemon (apache#22720)
Browse files Browse the repository at this point in the history
+ move reading [scheduler]standalone_dag_processor outside of the loop

See
apache#22305 (comment)
  • Loading branch information
mhenc authored Apr 4, 2022
1 parent ba313cd commit 215993b
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 2 deletions.
1 change: 1 addition & 0 deletions airflow/cli/commands/dag_processor_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def dag_processor(args):
with ctx:
try:
manager.register_exit_signals()
manager.start()
finally:
manager.terminate()
manager.end()
Expand Down
4 changes: 2 additions & 2 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ def _run_parsing_loop(self):
self._refresh_dag_dir()
self.prepare_file_path_queue()
max_callbacks_per_loop = conf.getint("scheduler", "max_callbacks_per_loop")

standalone_dag_processor = conf.getboolean("scheduler", "standalone_dag_processor")
if self._async_mode:
# If we're in async mode, we can start up straight away. If we're
# in sync mode we need to be told to start a "loop"
Expand Down Expand Up @@ -578,7 +578,7 @@ def _run_parsing_loop(self):
self.waitables.pop(sentinel)
self._processors.pop(processor.file_path)

if conf.getboolean("scheduler", "standalone_dag_processor"):
if standalone_dag_processor:
self._fetch_callbacks(max_callbacks_per_loop)
self._deactivate_stale_dags()
self._refresh_dag_dir()
Expand Down

0 comments on commit 215993b

Please sign in to comment.