Skip to content

Commit

Permalink
[AIRFLOW-880] Make webserver serve logs in a sane way for remote logs
Browse files Browse the repository at this point in the history
There are two major problems with remote logs in
Airflow right now:
1. Lack of Complete Logs: Remote logs should be
the default instead of the log that is only loaded
if the local log is not present, because the
remote log will have the logs for all of the tries
of a task, whereas the local log is only
guaranteed to have the most recent one
2. Lack of Consistency: The logs returned will
always be the same from all the webservers (right
now different logs can be returned if a webserver
has a log vs doesn't, and there can be different
logs between webservers that have the log).
Right now log functionality is not consistent when
it comes to remote logs.

This PR addresses these issues by ALWAYS reading
from remote logs and then also reading logs from
worker hosts if the task is already running (to
get in-flight logs). The one issue with this PR is
that if a task is running on a worker it already
ran on, then you will get duplicate logs for all
of the previous runs of the task that already
completed (delimited by something like "***
Getting remote logs" "*** Getting logs on local
worker"). This can be fixed later (either by
streaming logs to the log server or by creating a
proper abstraction for multiple task instance
runs), and is still better than the current
behavior (duplicate info is better than omitting
previous task instance logs from the webserver
log).

Testing Done:
Tested on staging cluster:
- Task instance doesn't exist
- Task instance is running and has previous remote
log
- Task instance is running for first time
- Task instance has completed and has remote log

Closes apache#2086 from aoen/ddavydov/fix_s3_logging
  • Loading branch information
aoen committed Feb 21, 2017
1 parent 50702d0 commit 974b75e
Showing 1 changed file with 97 additions and 81 deletions.
178 changes: 97 additions & 81 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,79 +711,94 @@ def log(self):
loc = loc.format(**locals())
log = ""
TI = models.TaskInstance
session = Session()
dttm = dateutil.parser.parse(execution_date)
form = DateTimeForm(data={'execution_date': dttm})
session = Session()
ti = session.query(TI).filter(
TI.dag_id == dag_id, TI.task_id == task_id,
TI.execution_date == dttm).first()
dttm = dateutil.parser.parse(execution_date)
form = DateTimeForm(data={'execution_date': dttm})

if ti:
host = ti.hostname
log_loaded = False

if os.path.exists(loc):
try:
f = open(loc)
log += "".join(f.readlines())
f.close()
log_loaded = True
except:
log = "*** Failed to load local log file: {0}.\n".format(loc)
else:
WORKER_LOG_SERVER_PORT = \
conf.get('celery', 'WORKER_LOG_SERVER_PORT')
url = os.path.join(
"http://{host}:{WORKER_LOG_SERVER_PORT}/log", log_relative
).format(**locals())
log += "*** Log file isn't local.\n"
log += "*** Fetching here: {url}\n".format(**locals())
try:
import requests
timeout = None # No timeout
try:
timeout = conf.getint('webserver', 'log_fetch_timeout_sec')
except (AirflowConfigException, ValueError):
pass

response = requests.get(url, timeout=timeout)
response.raise_for_status()
log += '\n' + response.text
log_loaded = True
except:
log += "*** Failed to fetch log file from worker.\n".format(
**locals())

if not log_loaded:
# load remote logs
remote_log_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
remote_log = os.path.join(remote_log_base, log_relative)
log += '\n*** Reading remote logs...\n'
if ti is None:
log = "*** Task instance did not exist in the DB\n"
else:
# load remote logs
remote_log_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
remote_log_loaded = False
if remote_log_base:
remote_log_path = os.path.join(remote_log_base, log_relative)
remote_log = ""

# Only display errors reading the log if the task completed or ran at least
# once before (otherwise there won't be any remote log stored).
ti_execution_completed = ti.state in {State.SUCCESS, State.FAILED}
ti_ran_more_than_once = ti.try_number > 1
surface_log_retrieval_errors = (
ti_execution_completed or ti_ran_more_than_once)

# S3
if remote_log.startswith('s3:/'):
log += log_utils.S3Log().read(remote_log, return_error=True)

if remote_log_path.startswith('s3:/'):
remote_log += log_utils.S3Log().read(
remote_log_path, return_error=surface_log_retrieval_errors)
remote_log_loaded = True
# GCS
elif remote_log.startswith('gs:/'):
log += log_utils.GCSLog().read(remote_log, return_error=True)

elif remote_log_path.startswith('gs:/'):
remote_log += log_utils.GCSLog().read(
remote_log_path, return_error=surface_log_retrieval_errors)
remote_log_loaded = True
# unsupported
elif remote_log:
log += '*** Unsupported remote log location.'

session.commit()
session.close()
else:
remote_log += '*** Unsupported remote log location.'

if remote_log:
log += ('*** Reading remote log from {}.\n{}\n'.format(
remote_log_path, remote_log))

# We only want to display the
# local logs while the task is running if a remote log configuration is set up
# since the logs will be transfered there after the run completes.
# TODO(aoen): One problem here is that if a task is running on a worker it
# already ran on, then duplicate logs will be printed for all of the previous
# runs of the task that already completed since they will have been printed as
# part of the remote log section above. This can be fixed either by streaming
# logs to the log servers as tasks are running, or by creating a proper
# abstraction for multiple task instance runs).
if not remote_log_loaded or ti.state == State.RUNNING:
if os.path.exists(loc):
try:
f = open(loc)
log += "*** Reading local log.\n" + "".join(f.readlines())
f.close()
except:
log = "*** Failed to load local log file: {0}.\n".format(loc)
else:
WORKER_LOG_SERVER_PORT = \
conf.get('celery', 'WORKER_LOG_SERVER_PORT')
url = os.path.join(
"http://{ti.hostname}:{WORKER_LOG_SERVER_PORT}/log", log_relative
).format(**locals())
log += "*** Log file isn't local.\n"
log += "*** Fetching here: {url}\n".format(**locals())
try:
import requests
timeout = None # No timeout
try:
timeout = conf.getint('webserver', 'log_fetch_timeout_sec')
except (AirflowConfigException, ValueError):
pass

response = requests.get(url, timeout=timeout)
response.raise_for_status()
log += '\n' + response.text
except:
log += "*** Failed to fetch log file from worker.\n".format(
**locals())

if PY2 and not isinstance(log, unicode):
log = log.decode('utf-8')

title = "Log"

return self.render(
'airflow/ti_code.html',
code=log, dag=dag, title=title, task_id=task_id,
code=log, dag=dag, title="Log", task_id=task_id,
execution_date=execution_date, form=form)

@expose('/task')
Expand Down Expand Up @@ -824,7 +839,7 @@ def task(self):
if not attr_name.startswith('_'):
attr = getattr(task, attr_name)
if type(attr) != type(self.task) and \
attr_name not in attr_renderer:
attr_name not in attr_renderer:
task_attrs.append((attr_name, str(attr)))

# Color coding the special attributes that are code
Expand Down Expand Up @@ -1172,7 +1187,7 @@ def tree(self):
max_date = max(dates) if dates else None

tis = dag.get_task_instances(
session, start_date=min_date, end_date=base_date)
session, start_date=min_date, end_date=base_date)
task_instances = {}
for ti in tis:
tid = alchemy_to_dict(ti)
Expand Down Expand Up @@ -1213,10 +1228,10 @@ def set_duration(tid):
return {
'name': task.task_id,
'instances': [
set_duration(task_instances.get((task.task_id, d))) or {
'execution_date': d.isoformat(),
'task_id': task.task_id
}
set_duration(task_instances.get((task.task_id, d))) or {
'execution_date': d.isoformat(),
'task_id': task.task_id
}
for d in dates],
children_key: children,
'num_dep': len(task.upstream_list),
Expand Down Expand Up @@ -1445,9 +1460,9 @@ def duration(self):
chart.buildcontent()
cum_chart.buildcontent()
s_index = cum_chart.htmlcontent.rfind('});')
cum_chart.htmlcontent = (cum_chart.htmlcontent[:s_index]
+ "$( document ).trigger('chartload')"
+ cum_chart.htmlcontent[s_index:])
cum_chart.htmlcontent = (cum_chart.htmlcontent[:s_index] +
"$( document ).trigger('chartload')" +
cum_chart.htmlcontent[s_index:])

return self.render(
'airflow/duration_chart.html',
Expand Down Expand Up @@ -1577,7 +1592,7 @@ def landing_times(self):
y=scale_time_units(y[task.task_id], y_unit))

tis = dag.get_task_instances(
session, start_date=min_date, end_date=base_date)
session, start_date=min_date, end_date=base_date)
dates = sorted(list({ti.execution_date for ti in tis}))
max_date = max([ti.execution_date for ti in tis]) if dates else None

Expand Down Expand Up @@ -1687,7 +1702,7 @@ def gantt(self):
'status': ti.state,
'executionDate': ti.execution_date.isoformat(),
})
states = {ti.state:ti.state for ti in tis}
states = {ti.state: ti.state for ti in tis}
data = {
'taskNames': [ti.task_id for ti in tis],
'tasks': tasks,
Expand Down Expand Up @@ -1765,6 +1780,7 @@ def varimport(self):
flash("{} variable(s) successfully updated.".format(len(d)))
return redirect('/admin/variable')


class HomeView(AdminIndexView):
@expose("/")
@login_required
Expand Down Expand Up @@ -2058,9 +2074,9 @@ class ChartModelView(wwwutils.DataProfilingMixin, AirflowModelView):
(c.conn_id, c.conn_id)
for c in (
Session().query(models.Connection.conn_id)
.group_by(models.Connection.conn_id)
.group_by(models.Connection.conn_id)
)
]
]
}

def on_model_change(self, form, model, is_created=True):
Expand Down Expand Up @@ -2243,8 +2259,8 @@ class DagRunModelView(ModelViewOnly):
def action_new_delete(self, ids):
session = settings.Session()
deleted = set(session.query(models.DagRun)
.filter(models.DagRun.id.in_(ids))
.all())
.filter(models.DagRun.id.in_(ids))
.all())
session.query(models.DagRun)\
.filter(models.DagRun.id.in_(ids))\
.delete(synchronize_session='fetch')
Expand Down Expand Up @@ -2465,7 +2481,7 @@ def on_model_change(self, form, model, is_created):
formdata = form.data
if formdata['conn_type'] in ['jdbc', 'google_cloud_platform']:
extra = {
key:formdata[key]
key: formdata[key]
for key in self.form_extra_fields.keys() if key in formdata}
model.extra = json.dumps(extra)

Expand Down Expand Up @@ -2496,7 +2512,7 @@ def is_secure(cls):
def on_form_prefill(self, form, id):
try:
d = json.loads(form.data.get('extra', '{}'))
except Exception as e:
except Exception:
d = {}

for field in list(self.form_extra_fields.keys()):
Expand Down Expand Up @@ -2609,9 +2625,9 @@ def get_query(self):
"""
return (
super(DagModelView, self)
.get_query()
.filter(or_(models.DagModel.is_active, models.DagModel.is_paused))
.filter(~models.DagModel.is_subdag)
.get_query()
.filter(or_(models.DagModel.is_active, models.DagModel.is_paused))
.filter(~models.DagModel.is_subdag)
)

def get_count_query(self):
Expand All @@ -2620,7 +2636,7 @@ def get_count_query(self):
"""
return (
super(DagModelView, self)
.get_count_query()
.filter(models.DagModel.is_active)
.filter(~models.DagModel.is_subdag)
.get_count_query()
.filter(models.DagModel.is_active)
.filter(~models.DagModel.is_subdag)
)

0 comments on commit 974b75e

Please sign in to comment.