Skip to content

Commit

Permalink
Revert "[AIRFLOW-1385] Create abstraction for Airflow task logging"
Browse files Browse the repository at this point in the history
This reverts commit e6ef06c which
was committed accidentally.
  • Loading branch information
aoen committed Jul 21, 2017
1 parent e6ef06c commit b9576d5
Show file tree
Hide file tree
Showing 11 changed files with 177 additions and 416 deletions.
111 changes: 90 additions & 21 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import socket
import subprocess
import textwrap
import warnings
from importlib import import_module

import argparse
Expand Down Expand Up @@ -51,6 +52,8 @@
Connection)
from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS)
from airflow.utils import db as db_utils
from airflow.utils import logging as logging_utils
from airflow.utils.file import mkdirs
from airflow.www.app import cached_app

from sqlalchemy import func
Expand Down Expand Up @@ -324,6 +327,55 @@ def run(args, dag=None):
settings.configure_vars()
settings.configure_orm()

logging.root.handlers = []
if args.raw:
# Output to STDOUT for the parent process to read and log
logging.basicConfig(
stream=sys.stdout,
level=settings.LOGGING_LEVEL,
format=settings.LOG_FORMAT)
else:
# Setting up logging to a file.

# To handle log writing when tasks are impersonated, the log files need to
# be writable by the user that runs the Airflow command and the user
# that is impersonated. This is mainly to handle corner cases with the
# SubDagOperator. When the SubDagOperator is run, all of the operators
# run under the impersonated user and create appropriate log files
# as the impersonated user. However, if the user manually runs tasks
# of the SubDagOperator through the UI, then the log files are created
# by the user that runs the Airflow command. For example, the Airflow
# run command may be run by the `airflow_sudoable` user, but the Airflow
# tasks may be run by the `airflow` user. If the log files are not
# writable by both users, then it's possible that re-running a task
# via the UI (or vice versa) results in a permission error as the task
# tries to write to a log file created by the other user.
log_base = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER'))
directory = log_base + "/{args.dag_id}/{args.task_id}".format(args=args)
# Create the log file and give it group writable permissions
# TODO(aoen): Make log dirs and logs globally readable for now since the SubDag
# operator is not compatible with impersonation (e.g. if a Celery executor is used
# for a SubDag operator and the SubDag operator has a different owner than the
# parent DAG)
if not os.path.exists(directory):
# Create the directory as globally writable using custom mkdirs
# as os.makedirs doesn't set mode properly.
mkdirs(directory, 0o775)
iso = args.execution_date.isoformat()
filename = "{directory}/{iso}".format(**locals())

if not os.path.exists(filename):
open(filename, "a").close()
os.chmod(filename, 0o666)

logging.basicConfig(
filename=filename,
level=settings.LOGGING_LEVEL,
format=settings.LOG_FORMAT)

hostname = socket.getfqdn()
logging.info("Running on host {}".format(hostname))

if not args.pickle and not dag:
dag = get_dag(args)
elif not dag:
Expand All @@ -339,21 +391,8 @@ def run(args, dag=None):
ti = TaskInstance(task, args.execution_date)
ti.refresh_from_db()

logger = logging.getLogger('airflow.task')
if args.raw:
logger = logging.getLogger('airflow.task.raw')

for handler in logger.handlers:
try:
print("inside cli, setting up context")
handler.set_context(ti)
except AttributeError:
pass

hostname = socket.getfqdn()
logger.info("Running on host {}".format(hostname))

if args.local:
print("Logging into: " + filename)
run_job = jobs.LocalTaskJob(
task_instance=ti,
mark_success=args.mark_success,
Expand Down Expand Up @@ -411,13 +450,43 @@ def run(args, dag=None):
if args.raw:
return

# Force the log to flush. The flush is important because we
# subsequently read from the log to insert into S3 or Google
# cloud storage. Explicitly close the handler is needed in order
# to upload to remote storage services.
for handler in logger.handlers:
handler.flush()
handler.close()
# Force the log to flush, and set the handler to go back to normal so we
# don't continue logging to the task's log file. The flush is important
# because we subsequently read from the log to insert into S3 or Google
# cloud storage.
logging.root.handlers[0].flush()
logging.root.handlers = []

# store logs remotely
remote_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')

# deprecated as of March 2016
if not remote_base and conf.get('core', 'S3_LOG_FOLDER'):
warnings.warn(
'The S3_LOG_FOLDER conf key has been replaced by '
'REMOTE_BASE_LOG_FOLDER. Your conf still works but please '
'update airflow.cfg to ensure future compatibility.',
DeprecationWarning)
remote_base = conf.get('core', 'S3_LOG_FOLDER')

if os.path.exists(filename):
# read log and remove old logs to get just the latest additions

with open(filename, 'r') as logfile:
log = logfile.read()

remote_log_location = filename.replace(log_base, remote_base)
# S3
if remote_base.startswith('s3:/'):
logging_utils.S3Log().write(log, remote_log_location)
# GCS
elif remote_base.startswith('gs:/'):
logging_utils.GCSLog().write(log, remote_log_location)
# Other
elif remote_base and remote_base != 'None':
logging.error(
'Unsupported remote log location: {}'.format(remote_base))


def task_failed_deps(args):
"""
Expand Down
13 changes: 0 additions & 13 deletions airflow/config_templates/__init__.py

This file was deleted.

6 changes: 0 additions & 6 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,6 @@ security =
# values at runtime)
unit_test_mode = False

# Logging configuration path
logging_config_path = airflow.logging.airflow_logging_config.AIRFLOW_LOGGING_CONFIG

# Name of handler to read task instance logs
task_log_reader = airflow.task

[cli]
# In what way should the cli access the API. The LocalClient will use the
# database directly, while the json_client will use the api running on the
Expand Down
73 changes: 0 additions & 73 deletions airflow/config_templates/default_airflow_logging.py

This file was deleted.

13 changes: 1 addition & 12 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from __future__ import unicode_literals

import logging
import logging.config
import os
import sys

Expand Down Expand Up @@ -163,23 +162,13 @@ def configure_orm(disable_connection_pool=False):
try:
from airflow_local_settings import *
logging.info("Loaded airflow_local_settings.")
except Exception:
except:
pass

configure_logging()
configure_vars()
configure_orm()

# TODO: Merge airflow logging configurations.
logging_config_path = conf.get('core', 'logging_config_path')
try:
from logging_config_path import LOGGING_CONFIG
except Exception:
# Import default logging configuration
from airflow.config_templates.default_airflow_logging import \
DEFAULT_LOGGING_CONFIG as LOGGING_CONFIG
logging.config.dictConfig(LOGGING_CONFIG)

# Const stuff

KILOBYTE = 1024
Expand Down
1 change: 0 additions & 1 deletion airflow/task_runner/base_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ def __init__(self, local_task_job):
:type local_task_job: airflow.jobs.LocalTaskJob
"""
self._task_instance = local_task_job.task_instance
self.set_logger_contexts(self._task_instance)

popen_prepend = []
cfg_path = None
Expand Down
13 changes: 0 additions & 13 deletions airflow/utils/log/__init__.py

This file was deleted.

Loading

0 comments on commit b9576d5

Please sign in to comment.