Skip to content

Commit

Permalink
Respect LogFormat when using ES logging with Json Format (apache#13310)
Browse files Browse the repository at this point in the history
This was a log standing bug / behaviour where Timestamps, log level,
line number etc were not shown when using ElasticSearch Task Handler
(Elasticsearch as remote logging) with json_format=True.
  • Loading branch information
kaxil authored Dec 25, 2020
1 parent 69d6d02 commit b6bf253
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 1 deletion.
25 changes: 24 additions & 1 deletion airflow/providers/elasticsearch/log/es_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,26 @@ def _read(
# to prevent it from showing in the UI.
def concat_logs(lines):
log_range = (len(lines) - 1) if lines[-1].message == self.end_of_log_mark.strip() else len(lines)
return '\n'.join([lines[i].message for i in range(log_range)])
return '\n'.join([self._format_msg(lines[i]) for i in range(log_range)])

message = [(host, concat_logs(hosted_log)) for host, hosted_log in logs_by_host]

return message, metadata

def _format_msg(self, log_line):
"""Format ES Record to match settings.LOG_FORMAT when used with json_format"""
# Using formatter._style.format makes it future proof i.e.
# if we change the formatter style from '%' to '{' or '$', this will still work
if self.json_format:
try:
# pylint: disable=protected-access
return self.formatter._style.format(_ESJsonLogFmt(**log_line.to_dict()))
except Exception: # noqa pylint: disable=broad-except
pass

# Just a safe-guard to preserve backwards-compatibility
return log_line.message

def es_read(self, log_id: str, offset: str, metadata: dict) -> list:
"""
Returns the logs matching log_id in Elasticsearch and next offset.
Expand Down Expand Up @@ -246,6 +260,7 @@ def set_context(self, ti: TaskInstance) -> None:

if self.json_format:
self.formatter = JSONFormatter(
fmt=self.formatter._fmt, # pylint: disable=protected-access
json_fields=self.json_fields,
extras={
'dag_id': str(ti.dag_id),
Expand Down Expand Up @@ -328,3 +343,11 @@ def get_external_log_url(self, task_instance: TaskInstance, try_number: int) ->
)
url = 'https://' + self.frontend.format(log_id=quote(log_id))
return url


class _ESJsonLogFmt:
"""Helper class to read ES Logs and re-format it to match settings.LOG_FORMAT"""

# A separate class is needed because 'self.formatter._style.format' uses '.__dict__'
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
25 changes: 25 additions & 0 deletions tests/providers/elasticsearch/log/test_es_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,31 @@ def test_set_context_w_json_format_and_write_stdout(self):
self.es_task_handler.json_format = True
self.es_task_handler.set_context(self.ti)

def test_read_with_json_format(self):
ts = pendulum.now()
formatter = logging.Formatter('[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s')
self.es_task_handler.formatter = formatter
self.es_task_handler.json_format = True

self.body = {
'message': self.test_message,
'log_id': f'{self.DAG_ID}-{self.TASK_ID}-2016_01_01T00_00_00_000000-1',
'offset': 1,
'asctime': '2020-12-24 19:25:00,962',
'filename': 'taskinstance.py',
'lineno': 851,
'levelname': 'INFO',
}
self.es_task_handler.set_context(self.ti)
self.es.index(index=self.index_name, doc_type=self.doc_type, body=self.body, id=id)

logs, _ = self.es_task_handler.read(
self.ti, 1, {'offset': 0, 'last_log_timestamp': str(ts), 'end_of_log': False}
)
self.assertEqual(
"[2020-12-24 19:25:00,962] {taskinstance.py:851} INFO - some random stuff", logs[0][0][1]
)

def test_close(self):
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
self.es_task_handler.formatter = formatter
Expand Down

0 comments on commit b6bf253

Please sign in to comment.