From 7363e35c9dbb9860eabf2444307f4d6f8140ab70 Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Fri, 30 Sep 2022 14:08:41 +0100 Subject: [PATCH] Ensure the log messages from operators during parsing go somewhere (#26779) * Ensure the log messages from operators during parsing go somewhere While investigating #26599 and the change from AIP-45, I noticed that these warning messages weren't new! The only thing that was new was that we started seeing them. This is because the logger for BaseOperator and all subclasses is `airflow.task.operators`, and the `airflow.task` logger is not configured (with `set_context()`) until we have a TaskInstance, so it just dropped all messages on the floor! This changes it so that log messages are propagated to parent loggers by default, but when we configure a context (and thus have a file to write to) we stop that. A similar change was made for the `airflow.processor` (but that is unlikely to suffer the same fate) * Give a real row count value so logs don't fail The ArangoDB sensor test was logging a mock object, which previously was getting dropped before emitting, but with this change now fails with "Mock is not an integer" when attempting the `%d` interpolation. To avoid making the mock overly specific (`arangodb_client_for_test.db.` `return_value.aql.execute.return_value.count.return_value`!) I have changed the test to mock the hook entirely (which is already tested) --- .../config_templates/airflow_local_settings.py | 6 ++++-- airflow/utils/log/file_processor_handler.py | 3 +++ airflow/utils/log/file_task_handler.py | 3 +++ airflow/utils/log/logging_mixin.py | 18 ++++++++++-------- tests/conftest.py | 11 +++++++++++ tests/models/test_baseoperator.py | 10 ++++++++++ .../arangodb/sensors/test_arangodb.py | 8 ++++---- 7 files changed, 45 insertions(+), 14 deletions(-) diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py index e08274de31dc1..317544ca7ed87 100644 --- a/airflow/config_templates/airflow_local_settings.py +++ b/airflow/config_templates/airflow_local_settings.py @@ -120,12 +120,14 @@ 'airflow.processor': { 'handlers': ['processor_to_stdout' if DAG_PROCESSOR_LOG_TARGET == "stdout" else 'processor'], 'level': LOG_LEVEL, - 'propagate': False, + # Set to true here (and reset via set_context) so that if no file is configured we still get logs! + 'propagate': True, }, 'airflow.task': { 'handlers': ['task'], 'level': LOG_LEVEL, - 'propagate': False, + # Set to true here (and reset via set_context) so that if no file is configured we still get logs! + 'propagate': True, 'filters': ['mask_secrets'], }, 'flask_appbuilder': { diff --git a/airflow/utils/log/file_processor_handler.py b/airflow/utils/log/file_processor_handler.py index e9d29239784ab..11e473ecdfa43 100644 --- a/airflow/utils/log/file_processor_handler.py +++ b/airflow/utils/log/file_processor_handler.py @@ -24,6 +24,7 @@ from airflow import settings from airflow.utils.helpers import parse_template_string +from airflow.utils.log.logging_mixin import DISABLE_PROPOGATE from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler @@ -64,6 +65,8 @@ def set_context(self, filename): self._symlink_latest_log_directory() self._cur_date = datetime.today() + return DISABLE_PROPOGATE + def emit(self, record): if self.handler is not None: self.handler.emit(record) diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 072137b363a26..1c9c559bd1033 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -29,6 +29,7 @@ from airflow.exceptions import RemovedInAirflow3Warning from airflow.utils.context import Context from airflow.utils.helpers import parse_template_string, render_template_to_string +from airflow.utils.log.logging_mixin import DISABLE_PROPOGATE from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler from airflow.utils.session import create_session from airflow.utils.state import State @@ -73,6 +74,8 @@ def set_context(self, ti: TaskInstance): self.handler.setFormatter(self.formatter) self.handler.setLevel(self.level) + return DISABLE_PROPOGATE + def emit(self, record): if self.handler: self.handler.emit(record) diff --git a/airflow/utils/log/logging_mixin.py b/airflow/utils/log/logging_mixin.py index 89bddb3558fb8..b54a6f3baad37 100644 --- a/airflow/utils/log/logging_mixin.py +++ b/airflow/utils/log/logging_mixin.py @@ -28,6 +28,9 @@ # 7-bit C1 ANSI escape sequences ANSI_ESCAPE = re.compile(r'\x1B[@-_][0-?]*[ -/]*[@-~]') +# Private: A sentinel object +DISABLE_PROPOGATE = object() + def remove_escape_codes(text: str) -> str: """ @@ -179,15 +182,14 @@ def set_context(logger, value): :param logger: logger :param value: value to set """ - _logger = logger - while _logger: - for handler in _logger.handlers: + while logger: + for handler in logger.handlers: # Not all handlers need to have context passed in so we ignore # the error when handlers do not have set_context defined. set_context = getattr(handler, 'set_context', None) - if set_context: - set_context(value) - if _logger.propagate is True: - _logger = _logger.parent + if set_context and set_context(value) is DISABLE_PROPOGATE: + logger.propagate = False + if logger.propagate is True: + logger = logger.parent else: - _logger = None + break diff --git a/tests/conftest.py b/tests/conftest.py index cb5f449eb1b6f..1026dc3c0112f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -840,3 +840,14 @@ def _delete_log_template(): request.addfinalizer(_delete_log_template) return _create_log_template + + +@pytest.fixture() +def reset_logging_config(): + import logging.config + + from airflow import settings + from airflow.utils.module_loading import import_string + + logging_config = import_string(settings.LOGGING_CLASS_PATH) + logging.config.dictConfig(logging_config) diff --git a/tests/models/test_baseoperator.py b/tests/models/test_baseoperator.py index 6dd0e22396826..f1728db9c9fa4 100644 --- a/tests/models/test_baseoperator.py +++ b/tests/models/test_baseoperator.py @@ -685,6 +685,16 @@ def test_weight_rule_override(self): op = BaseOperator(task_id="test_task", weight_rule="upstream") assert WeightRule.UPSTREAM == op.weight_rule + # ensure the default logging config is used for this test, no matter what ran before + @pytest.mark.usefixtures('reset_logging_config') + def test_logging_propogated_by_default(self, caplog): + """Test that when set_context hasn't been called that log records are emitted""" + BaseOperator(task_id="test").log.warning("test") + # This looks like "how could it fail" but this actually checks that the handler called `emit`. Testing + # the other case (that when we have set_context it goes to the file is harder to achieve without + # leaking a lot of state) + assert caplog.messages == ["test"] + def test_init_subclass_args(): class InitSubclassOp(BaseOperator): diff --git a/tests/providers/arangodb/sensors/test_arangodb.py b/tests/providers/arangodb/sensors/test_arangodb.py index 103e97129c096..b98bdb4b95930 100644 --- a/tests/providers/arangodb/sensors/test_arangodb.py +++ b/tests/providers/arangodb/sensors/test_arangodb.py @@ -26,7 +26,7 @@ from airflow.utils import db, timezone DEFAULT_DATE = timezone.datetime(2017, 1, 1) -arangodb_client_mock = Mock(name="arangodb_client_for_test") +arangodb_hook_mock = Mock(name="arangodb_hook_for_test", **{'query.return_value.count.return_value': 1}) class TestAQLSensor(unittest.TestCase): @@ -46,9 +46,9 @@ def setUp(self): ) @patch( - "airflow.providers.arangodb.hooks.arangodb.ArangoDBClient", + "airflow.providers.arangodb.sensors.arangodb.ArangoDBHook", autospec=True, - return_value=arangodb_client_mock, + return_value=arangodb_hook_mock, ) def test_arangodb_document_created(self, arangodb_mock): query = "FOR doc IN students FILTER doc.name == 'judy' RETURN doc" @@ -62,4 +62,4 @@ def test_arangodb_document_created(self, arangodb_mock): ) arangodb_tag_sensor.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - assert arangodb_mock.return_value.db.called + assert arangodb_hook_mock.query.return_value.count.called