diff --git a/airflow/www/views.py b/airflow/www/views.py index 8c9b2df4b98c5..0b1db61b65f4c 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -711,79 +711,94 @@ def log(self): loc = loc.format(**locals()) log = "" TI = models.TaskInstance - session = Session() dttm = dateutil.parser.parse(execution_date) + form = DateTimeForm(data={'execution_date': dttm}) + session = Session() ti = session.query(TI).filter( TI.dag_id == dag_id, TI.task_id == task_id, TI.execution_date == dttm).first() - dttm = dateutil.parser.parse(execution_date) - form = DateTimeForm(data={'execution_date': dttm}) - - if ti: - host = ti.hostname - log_loaded = False - if os.path.exists(loc): - try: - f = open(loc) - log += "".join(f.readlines()) - f.close() - log_loaded = True - except: - log = "*** Failed to load local log file: {0}.\n".format(loc) - else: - WORKER_LOG_SERVER_PORT = \ - conf.get('celery', 'WORKER_LOG_SERVER_PORT') - url = os.path.join( - "http://{host}:{WORKER_LOG_SERVER_PORT}/log", log_relative - ).format(**locals()) - log += "*** Log file isn't local.\n" - log += "*** Fetching here: {url}\n".format(**locals()) - try: - import requests - timeout = None # No timeout - try: - timeout = conf.getint('webserver', 'log_fetch_timeout_sec') - except (AirflowConfigException, ValueError): - pass - - response = requests.get(url, timeout=timeout) - response.raise_for_status() - log += '\n' + response.text - log_loaded = True - except: - log += "*** Failed to fetch log file from worker.\n".format( - **locals()) - - if not log_loaded: - # load remote logs - remote_log_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER') - remote_log = os.path.join(remote_log_base, log_relative) - log += '\n*** Reading remote logs...\n' + if ti is None: + log = "*** Task instance did not exist in the DB\n" + else: + # load remote logs + remote_log_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER') + remote_log_loaded = False + if remote_log_base: + remote_log_path = os.path.join(remote_log_base, log_relative) + remote_log = "" + + # Only display errors reading the log if the task completed or ran at least + # once before (otherwise there won't be any remote log stored). + ti_execution_completed = ti.state in {State.SUCCESS, State.FAILED} + ti_ran_more_than_once = ti.try_number > 1 + surface_log_retrieval_errors = ( + ti_execution_completed or ti_ran_more_than_once) # S3 - if remote_log.startswith('s3:/'): - log += log_utils.S3Log().read(remote_log, return_error=True) - + if remote_log_path.startswith('s3:/'): + remote_log += log_utils.S3Log().read( + remote_log_path, return_error=surface_log_retrieval_errors) + remote_log_loaded = True # GCS - elif remote_log.startswith('gs:/'): - log += log_utils.GCSLog().read(remote_log, return_error=True) - + elif remote_log_path.startswith('gs:/'): + remote_log += log_utils.GCSLog().read( + remote_log_path, return_error=surface_log_retrieval_errors) + remote_log_loaded = True # unsupported - elif remote_log: - log += '*** Unsupported remote log location.' - - session.commit() - session.close() + else: + remote_log += '*** Unsupported remote log location.' + + if remote_log: + log += ('*** Reading remote log from {}.\n{}\n'.format( + remote_log_path, remote_log)) + + # We only want to display the + # local logs while the task is running if a remote log configuration is set up + # since the logs will be transfered there after the run completes. + # TODO(aoen): One problem here is that if a task is running on a worker it + # already ran on, then duplicate logs will be printed for all of the previous + # runs of the task that already completed since they will have been printed as + # part of the remote log section above. This can be fixed either by streaming + # logs to the log servers as tasks are running, or by creating a proper + # abstraction for multiple task instance runs). + if not remote_log_loaded or ti.state == State.RUNNING: + if os.path.exists(loc): + try: + f = open(loc) + log += "*** Reading local log.\n" + "".join(f.readlines()) + f.close() + except: + log = "*** Failed to load local log file: {0}.\n".format(loc) + else: + WORKER_LOG_SERVER_PORT = \ + conf.get('celery', 'WORKER_LOG_SERVER_PORT') + url = os.path.join( + "http://{ti.hostname}:{WORKER_LOG_SERVER_PORT}/log", log_relative + ).format(**locals()) + log += "*** Log file isn't local.\n" + log += "*** Fetching here: {url}\n".format(**locals()) + try: + import requests + timeout = None # No timeout + try: + timeout = conf.getint('webserver', 'log_fetch_timeout_sec') + except (AirflowConfigException, ValueError): + pass + + response = requests.get(url, timeout=timeout) + response.raise_for_status() + log += '\n' + response.text + except: + log += "*** Failed to fetch log file from worker.\n".format( + **locals()) if PY2 and not isinstance(log, unicode): log = log.decode('utf-8') - title = "Log" - return self.render( 'airflow/ti_code.html', - code=log, dag=dag, title=title, task_id=task_id, + code=log, dag=dag, title="Log", task_id=task_id, execution_date=execution_date, form=form) @expose('/task') @@ -824,7 +839,7 @@ def task(self): if not attr_name.startswith('_'): attr = getattr(task, attr_name) if type(attr) != type(self.task) and \ - attr_name not in attr_renderer: + attr_name not in attr_renderer: task_attrs.append((attr_name, str(attr))) # Color coding the special attributes that are code @@ -1172,7 +1187,7 @@ def tree(self): max_date = max(dates) if dates else None tis = dag.get_task_instances( - session, start_date=min_date, end_date=base_date) + session, start_date=min_date, end_date=base_date) task_instances = {} for ti in tis: tid = alchemy_to_dict(ti) @@ -1213,10 +1228,10 @@ def set_duration(tid): return { 'name': task.task_id, 'instances': [ - set_duration(task_instances.get((task.task_id, d))) or { - 'execution_date': d.isoformat(), - 'task_id': task.task_id - } + set_duration(task_instances.get((task.task_id, d))) or { + 'execution_date': d.isoformat(), + 'task_id': task.task_id + } for d in dates], children_key: children, 'num_dep': len(task.upstream_list), @@ -1445,9 +1460,9 @@ def duration(self): chart.buildcontent() cum_chart.buildcontent() s_index = cum_chart.htmlcontent.rfind('});') - cum_chart.htmlcontent = (cum_chart.htmlcontent[:s_index] - + "$( document ).trigger('chartload')" - + cum_chart.htmlcontent[s_index:]) + cum_chart.htmlcontent = (cum_chart.htmlcontent[:s_index] + + "$( document ).trigger('chartload')" + + cum_chart.htmlcontent[s_index:]) return self.render( 'airflow/duration_chart.html', @@ -1577,7 +1592,7 @@ def landing_times(self): y=scale_time_units(y[task.task_id], y_unit)) tis = dag.get_task_instances( - session, start_date=min_date, end_date=base_date) + session, start_date=min_date, end_date=base_date) dates = sorted(list({ti.execution_date for ti in tis})) max_date = max([ti.execution_date for ti in tis]) if dates else None @@ -1687,7 +1702,7 @@ def gantt(self): 'status': ti.state, 'executionDate': ti.execution_date.isoformat(), }) - states = {ti.state:ti.state for ti in tis} + states = {ti.state: ti.state for ti in tis} data = { 'taskNames': [ti.task_id for ti in tis], 'tasks': tasks, @@ -1765,6 +1780,7 @@ def varimport(self): flash("{} variable(s) successfully updated.".format(len(d))) return redirect('/admin/variable') + class HomeView(AdminIndexView): @expose("/") @login_required @@ -2058,9 +2074,9 @@ class ChartModelView(wwwutils.DataProfilingMixin, AirflowModelView): (c.conn_id, c.conn_id) for c in ( Session().query(models.Connection.conn_id) - .group_by(models.Connection.conn_id) + .group_by(models.Connection.conn_id) ) - ] + ] } def on_model_change(self, form, model, is_created=True): @@ -2243,8 +2259,8 @@ class DagRunModelView(ModelViewOnly): def action_new_delete(self, ids): session = settings.Session() deleted = set(session.query(models.DagRun) - .filter(models.DagRun.id.in_(ids)) - .all()) + .filter(models.DagRun.id.in_(ids)) + .all()) session.query(models.DagRun)\ .filter(models.DagRun.id.in_(ids))\ .delete(synchronize_session='fetch') @@ -2465,7 +2481,7 @@ def on_model_change(self, form, model, is_created): formdata = form.data if formdata['conn_type'] in ['jdbc', 'google_cloud_platform']: extra = { - key:formdata[key] + key: formdata[key] for key in self.form_extra_fields.keys() if key in formdata} model.extra = json.dumps(extra) @@ -2496,7 +2512,7 @@ def is_secure(cls): def on_form_prefill(self, form, id): try: d = json.loads(form.data.get('extra', '{}')) - except Exception as e: + except Exception: d = {} for field in list(self.form_extra_fields.keys()): @@ -2609,9 +2625,9 @@ def get_query(self): """ return ( super(DagModelView, self) - .get_query() - .filter(or_(models.DagModel.is_active, models.DagModel.is_paused)) - .filter(~models.DagModel.is_subdag) + .get_query() + .filter(or_(models.DagModel.is_active, models.DagModel.is_paused)) + .filter(~models.DagModel.is_subdag) ) def get_count_query(self): @@ -2620,7 +2636,7 @@ def get_count_query(self): """ return ( super(DagModelView, self) - .get_count_query() - .filter(models.DagModel.is_active) - .filter(~models.DagModel.is_subdag) + .get_count_query() + .filter(models.DagModel.is_active) + .filter(~models.DagModel.is_subdag) )