Skip to content

Commit

Permalink
[Airflow-2760] Decouple DAG parsing loop from scheduler loop (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
KevinYang21 authored and kaxil committed Oct 26, 2018
1 parent 62b21d7 commit 75e2288
Show file tree
Hide file tree
Showing 13 changed files with 1,111 additions and 431 deletions.
21 changes: 13 additions & 8 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@ assists users migrating to a new version.

## Airflow Master

### New `dag_processor_manager_log_location` config option

The DAG parsing manager log now by default will be log into a file, where its location is
controlled by the new `dag_processor_manager_log_location` config option in core section.

### new `sync_parallelism` config option in celery section

The new `sync_parallelism` config option will control how many processes CeleryExecutor will use to
fetch celery task state in parallel. Default value is max(1, number of cores - 1)

### Rename of BashTaskRunner to StandardTaskRunner

BashTaskRunner has been renamed to StandardTaskRunner. It is the default task runner
Expand All @@ -26,11 +36,6 @@ We also provide a new cli command(``sync_perm``) to allow admin to auto sync per
The scheduler.min_file_parsing_loop_time config option has been temporarily removed due to
some bugs.

### new `sync_parallelism` config option in celery section

The new `sync_parallelism` config option will control how many processes CeleryExecutor will use to
fetch celery task state in parallel. Default value is max(1, number of cores - 1)

### CLI Changes

The ability to manipulate users from the command line has been changed. 'airflow create_user' and 'airflow delete_user' and 'airflow list_users' has been grouped to a single command `airflow users` with optional flags `--create`, `--list` and `--delete`.
Expand Down Expand Up @@ -185,11 +190,11 @@ With Airflow 1.9 or lower, `FILENAME_TEMPLATE`, `PROCESSOR_FILENAME_TEMPLATE`, `
```
[core]
fab_logging_level = WARN
log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log
log_processor_filename_template = {{ filename }}.log
log_filename_template = {{{{ ti.dag_id }}}}/{{{{ ti.task_id }}}}/{{{{ ts }}}}/{{{{ try_number }}}}.log
log_processor_filename_template = {{{{ filename }}}}.log
[elasticsearch]
elasticsearch_log_id_template = {dag_id}-{task_id}-{execution_date}-{try_number}
elasticsearch_log_id_template = {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}
elasticsearch_end_of_log_mark = end_of_log
```

Expand Down
43 changes: 42 additions & 1 deletion airflow/config_templates/airflow_local_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import os

from airflow import configuration as conf
from airflow.utils.file import mkdirs

# TODO: Logging format and level should be configured
# in this file instead of from airflow.cfg. Currently
Expand All @@ -38,7 +39,11 @@

PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'CHILD_PROCESS_LOG_DIRECTORY')

DAG_PROCESSOR_MANAGER_LOG_LOCATION = \
conf.get('core', 'DAG_PROCESSOR_MANAGER_LOG_LOCATION')

FILENAME_TEMPLATE = conf.get('core', 'LOG_FILENAME_TEMPLATE')

PROCESSOR_FILENAME_TEMPLATE = conf.get('core', 'LOG_PROCESSOR_FILENAME_TEMPLATE')

# Storage bucket url for remote logging
Expand Down Expand Up @@ -79,7 +84,7 @@
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
},
}
},
'loggers': {
'airflow.processor': {
Expand All @@ -104,6 +109,26 @@
}
}

DEFAULT_DAG_PARSING_LOGGING_CONFIG = {
'handlers': {
'processor_manager': {
'class': 'logging.handlers.RotatingFileHandler',
'formatter': 'airflow',
'filename': DAG_PROCESSOR_MANAGER_LOG_LOCATION,
'mode': 'a',
'maxBytes': 104857600, # 100MB
'backupCount': 5
}
},
'loggers': {
'airflow.processor_manager': {
'handlers': ['processor_manager'],
'level': LOG_LEVEL,
'propagate': False,
}
}
}

REMOTE_HANDLERS = {
's3': {
'task': {
Expand Down Expand Up @@ -172,6 +197,22 @@

REMOTE_LOGGING = conf.get('core', 'remote_logging')

# Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is set.
# This is to avoid exceptions when initializing RotatingFileHandler multiple times
# in multiple processes.
if os.environ.get('CONFIG_PROCESSOR_MANAGER_LOGGER') == 'True':
DEFAULT_LOGGING_CONFIG['handlers'] \
.update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers'])
DEFAULT_LOGGING_CONFIG['loggers'] \
.update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['loggers'])

# Manually create log directory for processor_manager handler as RotatingFileHandler
# will only create file but not the directory.
processor_manager_handler_config = DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers'][
'processor_manager']
directory = os.path.dirname(processor_manager_handler_config['filename'])
mkdirs(directory, 0o755)

if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('s3://'):
DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'):
Expand Down
1 change: 1 addition & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
# Log filename format
log_filename_template = {{{{ ti.dag_id }}}}/{{{{ ti.task_id }}}}/{{{{ ts }}}}/{{{{ try_number }}}}.log
log_processor_filename_template = {{{{ filename }}}}.log
dag_processor_manager_log_location = {AIRFLOW_HOME}/logs/dag_processor_manager/dag_processor_manager.log

# Hostname by providing a path to a callable, which will resolve the hostname
hostname_callable = socket:getfqdn
Expand Down
1 change: 1 addition & 0 deletions airflow/config_templates/default_test.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ logging_level = INFO
fab_logging_level = WARN
log_filename_template = {{{{ ti.dag_id }}}}/{{{{ ti.task_id }}}}/{{{{ ts }}}}/{{{{ try_number }}}}.log
log_processor_filename_template = {{{{ filename }}}}.log
dag_processor_manager_log_location = {AIRFLOW_HOME}/logs/dag_processor_manager/dag_processor_manager.log
executor = SequentialExecutor
sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/unittests.db
load_examples = True
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def heartbeat(self):
queue=queue,
executor_config=ti.executor_config)
else:
self.logger.info(
self.log.info(
'Task is already running, not sending to '
'executor: {}'.format(key))

Expand Down
Loading

0 comments on commit 75e2288

Please sign in to comment.