Skip to content

Commit

Permalink
[AIRFLOW-1325] Add ElasticSearch log handler and reader
Browse files Browse the repository at this point in the history
Closes apache#3214 from
yrqls21/kevin_yang_add_es_task_handler
  • Loading branch information
KevinYang21 authored and Fokko Driesprong committed Apr 13, 2018
1 parent 34f827f commit ec38ba9
Show file tree
Hide file tree
Showing 25 changed files with 1,339 additions and 282 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,6 @@ ENV/

# Spark
rat-results.txt

# Git stuff
.gitattributes
1 change: 1 addition & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(MIT License) Underscorejs (http://underscorejs.org)
(MIT License) Bootstrap Toggle (http://www.bootstraptoggle.com)
(MIT License) normalize.css (http://necolas.github.io/normalize.css/)
(MIT License) ElasticMock (https://github.com/vrcmarcos/elasticmock)

========================================================================
BSD 2-Clause licenses
Expand Down
4 changes: 1 addition & 3 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,10 +457,8 @@ def run(args, dag=None):
if args.interactive:
_run(args, dag, ti)
else:
with redirect_stdout(ti.log, logging.INFO),\
redirect_stderr(ti.log, logging.WARN):
with redirect_stdout(ti.log, logging.INFO), redirect_stderr(ti.log, logging.WARN):
_run(args, dag, ti)
logging.shutdown()


@cli_utils.action_logging
Expand Down
24 changes: 22 additions & 2 deletions airflow/config_templates/airflow_local_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,19 @@

PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'

LOG_ID_TEMPLATE = '{dag_id}-{task_id}-{execution_date}-{try_number}'

# Storage bucket url for remote logging
# s3 buckets should start with "s3://"
# gcs buckets should start with "gs://"
# wasb buckets should start with "wasb" just to help Airflow select correct handler
# wasb buckets should start with "wasb"
# just to help Airflow select correct handler
REMOTE_BASE_LOG_FOLDER = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')

ELASTICSEARCH_HOST = conf.get('elasticsearch', 'ELASTICSEARCH_HOST')

END_OF_LOG_MARK = 'end_of_log'

DEFAULT_LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
Expand Down Expand Up @@ -145,7 +152,18 @@
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
'delete_local_copy': False,
},
}
},
'elasticsearch': {
'task': {
'class': 'airflow.utils.log.es_task_handler.ElasticsearchTaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'log_id_template': LOG_ID_TEMPLATE,
'filename_template': FILENAME_TEMPLATE,
'end_of_log_mark': END_OF_LOG_MARK,
'host': ELASTICSEARCH_HOST,
},
},
}

REMOTE_LOGGING = conf.get('core', 'remote_logging')
Expand All @@ -156,3 +174,5 @@
DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('wasb'):
DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['wasb'])
elif REMOTE_LOGGING and ELASTICSEARCH_HOST:
DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['elasticsearch'])
7 changes: 5 additions & 2 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ dags_folder = {AIRFLOW_HOME}/dags
# This path must be absolute
base_log_folder = {AIRFLOW_HOME}/logs

# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply an Airflow connection id that provides access to the storage
# Airflow can store logs remotely in AWS S3, Google Cloud Storage or Elastic Search.
# Users must supply an Airflow connection id that provides access to the storage
# location. If remote_logging is set to true, see UPDATING.md for additional
# configuration requirements.
remote_logging = False
Expand Down Expand Up @@ -486,3 +486,6 @@ api_rev = v3
[admin]
# UI to hide sensitive variable fields when set to True
hide_sensitive_variable_fields = True

[elasticsearch]
elasticsearch_host =
4 changes: 2 additions & 2 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,7 @@ def __init__(self, task, execution_date, state=None):
self.init_on_load()
# Is this TaskInstance being currently running within `airflow run --raw`.
# Not persisted to the database so only valid for the current process
self.is_raw = False
self.raw = False

@reconstructor
def init_on_load(self):
Expand Down Expand Up @@ -1956,8 +1956,8 @@ def init_run_context(self, raw=False):
"""
Sets the log context.
"""
self._set_context(self)
self.raw = raw
self._set_context(self)


class TaskFail(Base):
Expand Down
9 changes: 9 additions & 0 deletions airflow/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import sys
import warnings

from jinja2 import Template

from airflow import configuration
from airflow.exceptions import AirflowException

Expand Down Expand Up @@ -223,6 +225,13 @@ def on_terminate(p):
log.error("Process %s (%s) could not be killed. Giving up.", p, p.pid)


def parse_template_string(template_string):
if "{{" in template_string: # jinja mode
return None, Template(template_string)
else:
return template_string, None


class AirflowImporter(object):
"""
Importer that dynamically loads a class and module from its parent. This
Expand Down
183 changes: 183 additions & 0 deletions airflow/utils/log/es_task_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Using `from elasticsearch import *` would break elasticseach mocking used in unit test.
import elasticsearch
import pendulum
from elasticsearch_dsl import Search

from airflow.utils import timezone
from airflow.utils.helpers import parse_template_string
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.logging_mixin import LoggingMixin


class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
PAGE = 0
MAX_LINE_PER_PAGE = 1000

"""
ElasticsearchTaskHandler is a python log handler that
reads logs from Elasticsearch. Note logs are not directly
indexed into Elasticsearch. Instead, it flushes logs
into local files. Additional software setup is required
to index the log into Elasticsearch, such as using
Filebeat and Logstash.
To efficiently query and sort Elasticsearch results, we assume each
log message has a field `log_id` consists of ti primary keys:
`log_id = {dag_id}-{task_id}-{execution_date}-{try_number}`
Log messages with specific log_id are sorted based on `offset`,
which is a unique integer indicates log message's order.
Timestamp here are unreliable because multiple log messages
might have the same timestamp.
"""

def __init__(self, base_log_folder, filename_template,
log_id_template, end_of_log_mark,
host='localhost:9200'):
"""
:param base_log_folder: base folder to store logs locally
:param log_id_template: log id template
:param host: Elasticsearch host name
"""
super(ElasticsearchTaskHandler, self).__init__(
base_log_folder, filename_template)
self.closed = False

self.log_id_template, self.log_id_jinja_template = \
parse_template_string(log_id_template)

self.client = elasticsearch.Elasticsearch([host])

self.mark_end_on_close = True
self.end_of_log_mark = end_of_log_mark

def _render_log_id(self, ti, try_number):
if self.log_id_jinja_template:
jinja_context = ti.get_template_context()
jinja_context['try_number'] = try_number
return self.log_id_jinja_template.render(**jinja_context)

return self.log_id_template.format(dag_id=ti.dag_id,
task_id=ti.task_id,
execution_date=ti
.execution_date.isoformat(),
try_number=try_number)

def _read(self, ti, try_number, metadata=None):
"""
Endpoint for streaming log.
:param ti: task instance object
:param try_number: try_number of the task instance
:param metadata: log metadata,
can be used for steaming log reading and auto-tailing.
:return a list of log documents and metadata.
"""
if not metadata:
metadata = {'offset': 0}
if 'offset' not in metadata:
metadata['offset'] = 0

offset = metadata['offset']
log_id = self._render_log_id(ti, try_number)

logs = self.es_read(log_id, offset)

next_offset = offset if not logs else logs[-1].offset

metadata['offset'] = next_offset
# end_of_log_mark may contain characters like '\n' which is needed to
# have the log uploaded but will not be stored in elasticsearch.
metadata['end_of_log'] = False if not logs \
else logs[-1].message == self.end_of_log_mark.strip()

cur_ts = pendulum.now()
# Assume end of log after not receiving new log for 5 min,
# as executor heartbeat is 1 min and there might be some
# delay before Elasticsearch makes the log available.
if 'last_log_timestamp' in metadata:
last_log_ts = timezone.parse(metadata['last_log_timestamp'])
if cur_ts.diff(last_log_ts).in_minutes() >= 5:
metadata['end_of_log'] = True

if offset != next_offset or 'last_log_timestamp' not in metadata:
metadata['last_log_timestamp'] = str(cur_ts)

message = '\n'.join([log.message for log in logs])

return message, metadata

def es_read(self, log_id, offset):
"""
Returns the logs matching log_id in Elasticsearch and next offset.
Returns '' if no log is found or there was an error.
:param log_id: the log_id of the log to read.
:type log_id: str
:param offset: the offset start to read log from.
:type offset: str
"""

# Offset is the unique key for sorting logs given log_id.
s = Search(using=self.client) \
.query('match', log_id=log_id) \
.sort('offset')

s = s.filter('range', offset={'gt': offset})

logs = []
if s.count() != 0:
try:

logs = s[self.MAX_LINE_PER_PAGE * self.PAGE:self.MAX_LINE_PER_PAGE] \
.execute()
except Exception as e:
msg = 'Could not read log with log_id: {}, ' \
'error: {}'.format(log_id, str(e))
self.log.exception(msg)

return logs

def set_context(self, ti):
super(ElasticsearchTaskHandler, self).set_context(ti)
self.mark_end_on_close = not ti.raw

def close(self):
# When application exit, system shuts down all handlers by
# calling close method. Here we check if logger is already
# closed to prevent uploading the log to remote storage multiple
# times when `logging.shutdown` is called.
if self.closed:
return

if not self.mark_end_on_close:
self.closed = True
return

# Case which context of the handler was not set.
if self.handler is None:
self.closed = True
return

# Reopen the file stream, because FileHandler.close() would be called
# first in logging.shutdown() and the stream in it would be set to None.
if self.handler.stream is None or self.handler.stream.closed:
self.handler.stream = self.handler._open()

# Mark the end of file using end of log mark,
# so we know where to stop while auto-tailing.
self.handler.stream.write(self.end_of_log_mark)

super(ElasticsearchTaskHandler, self).close()

self.closed = True
10 changes: 3 additions & 7 deletions airflow/utils/log/file_processor_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
import logging
import os

from jinja2 import Template

from airflow import configuration as conf
from airflow.utils.helpers import parse_template_string
from datetime import datetime


Expand All @@ -38,11 +37,8 @@ def __init__(self, base_log_folder, filename_template):
self.handler = None
self.base_log_folder = base_log_folder
self.dag_dir = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))
self.filename_template = filename_template
self.filename_jinja_template = None

if "{{" in self.filename_template: #jinja mode
self.filename_jinja_template = Template(self.filename_template)
self.filename_template, self.filename_jinja_template = \
parse_template_string(filename_template)

self._cur_date = datetime.today()
if not os.path.exists(self._get_log_directory()):
Expand Down
Loading

0 comments on commit ec38ba9

Please sign in to comment.