Skip to content

Commit

Permalink
Ensure the log messages from operators during parsing go somewhere (a…
Browse files Browse the repository at this point in the history
…pache#26779)

* Ensure the log messages from operators during parsing go somewhere

While investigating apache#26599 and the change from AIP-45, I noticed that
these warning messages weren't new! The only thing that was new was that
we started seeing them.

This is because the logger for BaseOperator and all subclasses is
`airflow.task.operators`, and the `airflow.task` logger is not
configured (with `set_context()`) until we have a TaskInstance, so it
just dropped all messages on the floor!

This changes it so that log messages are propagated to parent loggers by
default, but when we configure a context (and thus have a file to write
to) we stop that. A similar change was made for the `airflow.processor`
(but that is unlikely to suffer the same fate)

* Give a real row count value so logs don't fail

The ArangoDB sensor test was logging a mock object, which previously was
getting dropped before emitting, but with this change now fails with
"Mock is not an integer" when attempting the  `%d` interpolation.

To avoid making the mock overly specific (`arangodb_client_for_test.db.`
`return_value.aql.execute.return_value.count.return_value`!) I have
changed the test to mock the hook entirely (which is already tested)
  • Loading branch information
ashb authored Sep 30, 2022
1 parent af36824 commit 7363e35
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 14 deletions.
6 changes: 4 additions & 2 deletions airflow/config_templates/airflow_local_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,14 @@
'airflow.processor': {
'handlers': ['processor_to_stdout' if DAG_PROCESSOR_LOG_TARGET == "stdout" else 'processor'],
'level': LOG_LEVEL,
'propagate': False,
# Set to true here (and reset via set_context) so that if no file is configured we still get logs!
'propagate': True,
},
'airflow.task': {
'handlers': ['task'],
'level': LOG_LEVEL,
'propagate': False,
# Set to true here (and reset via set_context) so that if no file is configured we still get logs!
'propagate': True,
'filters': ['mask_secrets'],
},
'flask_appbuilder': {
Expand Down
3 changes: 3 additions & 0 deletions airflow/utils/log/file_processor_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from airflow import settings
from airflow.utils.helpers import parse_template_string
from airflow.utils.log.logging_mixin import DISABLE_PROPOGATE
from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler


Expand Down Expand Up @@ -64,6 +65,8 @@ def set_context(self, filename):
self._symlink_latest_log_directory()
self._cur_date = datetime.today()

return DISABLE_PROPOGATE

def emit(self, record):
if self.handler is not None:
self.handler.emit(record)
Expand Down
3 changes: 3 additions & 0 deletions airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.utils.context import Context
from airflow.utils.helpers import parse_template_string, render_template_to_string
from airflow.utils.log.logging_mixin import DISABLE_PROPOGATE
from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
from airflow.utils.session import create_session
from airflow.utils.state import State
Expand Down Expand Up @@ -73,6 +74,8 @@ def set_context(self, ti: TaskInstance):
self.handler.setFormatter(self.formatter)
self.handler.setLevel(self.level)

return DISABLE_PROPOGATE

def emit(self, record):
if self.handler:
self.handler.emit(record)
Expand Down
18 changes: 10 additions & 8 deletions airflow/utils/log/logging_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
# 7-bit C1 ANSI escape sequences
ANSI_ESCAPE = re.compile(r'\x1B[@-_][0-?]*[ -/]*[@-~]')

# Private: A sentinel object
DISABLE_PROPOGATE = object()


def remove_escape_codes(text: str) -> str:
"""
Expand Down Expand Up @@ -179,15 +182,14 @@ def set_context(logger, value):
:param logger: logger
:param value: value to set
"""
_logger = logger
while _logger:
for handler in _logger.handlers:
while logger:
for handler in logger.handlers:
# Not all handlers need to have context passed in so we ignore
# the error when handlers do not have set_context defined.
set_context = getattr(handler, 'set_context', None)
if set_context:
set_context(value)
if _logger.propagate is True:
_logger = _logger.parent
if set_context and set_context(value) is DISABLE_PROPOGATE:
logger.propagate = False
if logger.propagate is True:
logger = logger.parent
else:
_logger = None
break
11 changes: 11 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,3 +840,14 @@ def _delete_log_template():
request.addfinalizer(_delete_log_template)

return _create_log_template


@pytest.fixture()
def reset_logging_config():
import logging.config

from airflow import settings
from airflow.utils.module_loading import import_string

logging_config = import_string(settings.LOGGING_CLASS_PATH)
logging.config.dictConfig(logging_config)
10 changes: 10 additions & 0 deletions tests/models/test_baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,16 @@ def test_weight_rule_override(self):
op = BaseOperator(task_id="test_task", weight_rule="upstream")
assert WeightRule.UPSTREAM == op.weight_rule

# ensure the default logging config is used for this test, no matter what ran before
@pytest.mark.usefixtures('reset_logging_config')
def test_logging_propogated_by_default(self, caplog):
"""Test that when set_context hasn't been called that log records are emitted"""
BaseOperator(task_id="test").log.warning("test")
# This looks like "how could it fail" but this actually checks that the handler called `emit`. Testing
# the other case (that when we have set_context it goes to the file is harder to achieve without
# leaking a lot of state)
assert caplog.messages == ["test"]


def test_init_subclass_args():
class InitSubclassOp(BaseOperator):
Expand Down
8 changes: 4 additions & 4 deletions tests/providers/arangodb/sensors/test_arangodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from airflow.utils import db, timezone

DEFAULT_DATE = timezone.datetime(2017, 1, 1)
arangodb_client_mock = Mock(name="arangodb_client_for_test")
arangodb_hook_mock = Mock(name="arangodb_hook_for_test", **{'query.return_value.count.return_value': 1})


class TestAQLSensor(unittest.TestCase):
Expand All @@ -46,9 +46,9 @@ def setUp(self):
)

@patch(
"airflow.providers.arangodb.hooks.arangodb.ArangoDBClient",
"airflow.providers.arangodb.sensors.arangodb.ArangoDBHook",
autospec=True,
return_value=arangodb_client_mock,
return_value=arangodb_hook_mock,
)
def test_arangodb_document_created(self, arangodb_mock):
query = "FOR doc IN students FILTER doc.name == 'judy' RETURN doc"
Expand All @@ -62,4 +62,4 @@ def test_arangodb_document_created(self, arangodb_mock):
)

arangodb_tag_sensor.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
assert arangodb_mock.return_value.db.called
assert arangodb_hook_mock.query.return_value.count.called

0 comments on commit 7363e35

Please sign in to comment.