From b6bf25306243e78bf12528f9a080ea100a575641 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Fri, 25 Dec 2020 12:18:09 +0000 Subject: [PATCH] Respect LogFormat when using ES logging with Json Format (#13310) This was a log standing bug / behaviour where Timestamps, log level, line number etc were not shown when using ElasticSearch Task Handler (Elasticsearch as remote logging) with json_format=True. --- .../elasticsearch/log/es_task_handler.py | 25 ++++++++++++++++++- .../elasticsearch/log/test_es_task_handler.py | 25 +++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py index 35c99ea4f7453..064b796afd435 100644 --- a/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/airflow/providers/elasticsearch/log/es_task_handler.py @@ -194,12 +194,26 @@ def _read( # to prevent it from showing in the UI. def concat_logs(lines): log_range = (len(lines) - 1) if lines[-1].message == self.end_of_log_mark.strip() else len(lines) - return '\n'.join([lines[i].message for i in range(log_range)]) + return '\n'.join([self._format_msg(lines[i]) for i in range(log_range)]) message = [(host, concat_logs(hosted_log)) for host, hosted_log in logs_by_host] return message, metadata + def _format_msg(self, log_line): + """Format ES Record to match settings.LOG_FORMAT when used with json_format""" + # Using formatter._style.format makes it future proof i.e. + # if we change the formatter style from '%' to '{' or '$', this will still work + if self.json_format: + try: + # pylint: disable=protected-access + return self.formatter._style.format(_ESJsonLogFmt(**log_line.to_dict())) + except Exception: # noqa pylint: disable=broad-except + pass + + # Just a safe-guard to preserve backwards-compatibility + return log_line.message + def es_read(self, log_id: str, offset: str, metadata: dict) -> list: """ Returns the logs matching log_id in Elasticsearch and next offset. @@ -246,6 +260,7 @@ def set_context(self, ti: TaskInstance) -> None: if self.json_format: self.formatter = JSONFormatter( + fmt=self.formatter._fmt, # pylint: disable=protected-access json_fields=self.json_fields, extras={ 'dag_id': str(ti.dag_id), @@ -328,3 +343,11 @@ def get_external_log_url(self, task_instance: TaskInstance, try_number: int) -> ) url = 'https://' + self.frontend.format(log_id=quote(log_id)) return url + + +class _ESJsonLogFmt: + """Helper class to read ES Logs and re-format it to match settings.LOG_FORMAT""" + + # A separate class is needed because 'self.formatter._style.format' uses '.__dict__' + def __init__(self, **kwargs): + self.__dict__.update(kwargs) diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py b/tests/providers/elasticsearch/log/test_es_task_handler.py index 5f34ab2fe94e9..3531c09611b70 100644 --- a/tests/providers/elasticsearch/log/test_es_task_handler.py +++ b/tests/providers/elasticsearch/log/test_es_task_handler.py @@ -251,6 +251,31 @@ def test_set_context_w_json_format_and_write_stdout(self): self.es_task_handler.json_format = True self.es_task_handler.set_context(self.ti) + def test_read_with_json_format(self): + ts = pendulum.now() + formatter = logging.Formatter('[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s') + self.es_task_handler.formatter = formatter + self.es_task_handler.json_format = True + + self.body = { + 'message': self.test_message, + 'log_id': f'{self.DAG_ID}-{self.TASK_ID}-2016_01_01T00_00_00_000000-1', + 'offset': 1, + 'asctime': '2020-12-24 19:25:00,962', + 'filename': 'taskinstance.py', + 'lineno': 851, + 'levelname': 'INFO', + } + self.es_task_handler.set_context(self.ti) + self.es.index(index=self.index_name, doc_type=self.doc_type, body=self.body, id=id) + + logs, _ = self.es_task_handler.read( + self.ti, 1, {'offset': 0, 'last_log_timestamp': str(ts), 'end_of_log': False} + ) + self.assertEqual( + "[2020-12-24 19:25:00,962] {taskinstance.py:851} INFO - some random stuff", logs[0][0][1] + ) + def test_close(self): formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') self.es_task_handler.formatter = formatter