Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
criccomini committed Aug 14, 2017
2 parents 5ff7e83 + 0d0cc62 commit 9660293
Show file tree
Hide file tree
Showing 21 changed files with 896 additions and 334 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ Currently **officially** using Airflow:
1. [Bellhops](https://github.com/bellhops)
1. [BlaBlaCar](https://www.blablacar.com) [[@puckel](https://github.com/puckel) & [@wmorin](https://github.com/wmorin)]
1. [Bloc](https://www.bloc.io) [[@dpaola2](https://github.com/dpaola2)]
1. BlueApron [[@jasonjho](https://github.com/jasonjho) & [@matthewdavidhauser](https://github.com/matthewdavidhauser)]
1. [BlueApron](https://www.blueapron.com) [[@jasonjho](https://github.com/jasonjho) & [@matthewdavidhauser](https://github.com/matthewdavidhauser)]
1. [Blue Yonder](http://www.blue-yonder.com) [[@blue-yonder](https://github.com/blue-yonder)]
1. [Celect](http://www.celect.com) [[@superdosh](https://github.com/superdosh) & [@chadcelect](https://github.com/chadcelect)]
1. [Change.org](https://www.change.org) [[@change](https://github.com/change), [@vijaykramesh](https://github.com/vijaykramesh)]
Expand All @@ -103,7 +103,7 @@ Currently **officially** using Airflow:
1. [City of San Diego](http://sandiego.gov) [[@MrMaksimize](https://github.com/mrmaksimize), [@andrell81](https://github.com/andrell81) & [@arnaudvedy](https://github.com/arnaudvedy)]
1. [Clairvoyant](https://clairvoyantsoft.com) [@shekharv](https://github.com/shekharv)
1. [Clover Health](https://www.cloverhealth.com) [[@gwax](https://github.com/gwax) & [@vansivallab](https://github.com/vansivallab)]
1. Chartboost [[@cgelman](https://github.com/cgelman) & [@dclubb](https://github.com/dclubb)]
1. [Chartboost](https://www.chartboost.com) [[@cgelman](https://github.com/cgelman) & [@dclubb](https://github.com/dclubb)]
1. [Cotap](https://github.com/cotap/) [[@maraca](https://github.com/maraca) & [@richardchew](https://github.com/richardchew)]
1. [Credit Karma](https://www.creditkarma.com/) [[@preete-dixit-ck](https://github.com/preete-dixit-ck) & [@harish-gaggar-ck](https://github.com/harish-gaggar-ck) & [@greg-finley-ck](https://github.com/greg-finley-ck)]
1. [Digital First Media](http://www.digitalfirstmedia.com/) [[@duffn](https://github.com/duffn) & [@mschmo](https://github.com/mschmo) & [@seanmuth](https://github.com/seanmuth)]
Expand Down Expand Up @@ -164,7 +164,7 @@ Currently **officially** using Airflow:
1. [SmartNews](https://www.smartnews.com/) [[@takus](https://github.com/takus)]
1. [Spotify](https://github.com/spotify) [[@znichols](https://github.com/znichols)]
1. [Stackspace](https://beta.stackspace.io/)
1. Stripe [[@jbalogh](https://github.com/jbalogh)]
1. [Stripe](https://stripe.com) [[@jbalogh](https://github.com/jbalogh)]
1. [Tails.com](https://tails.com/) [[@alanmcruickshank](https://github.com/alanmcruickshank)]
1. [Thumbtack](https://www.thumbtack.com/) [[@natekupp](https://github.com/natekupp)]
1. [Tictail](https://tictail.com/)
Expand All @@ -176,9 +176,9 @@ Currently **officially** using Airflow:
1. [WeTransfer](https://github.com/WeTransfer) [[@jochem](https://github.com/jochem)]
1. [Whistle Labs](http://www.whistle.com) [[@ananya77041](https://github.com/ananya77041)]
1. [WiseBanyan](https://wisebanyan.com/)
1. Wooga
1. Xoom [[@gepser](https://github.com/gepser) & [@omarvides](https://github.com/omarvides)]
1. Yahoo!
1. [Wooga](https://www.wooga.com/)
1. [Xoom](https://www.xoom.com/india/send-money) [[@gepser](https://github.com/gepser) & [@omarvides](https://github.com/omarvides)]
1. [Yahoo!](https://www.yahoo.com/)
1. [Zapier](https://www.zapier.com) [[@drknexus](https://github.com/drknexus) & [@statwonk](https://github.com/statwonk)]
1. [Zendesk](https://www.github.com/zendesk)
1. [Zenly](https://zen.ly) [[@cerisier](https://github.com/cerisier) & [@jbdalido](https://github.com/jbdalido)]
Expand Down
29 changes: 16 additions & 13 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ assists people when migrating to a new version.
SSH Hook now uses Paramiko library to create ssh client connection, instead of sub-process based ssh command execution previously (<1.9.0), so this is backward incompatible.
- update SSHHook constructor
- use SSHOperator class in place of SSHExecuteOperator which is removed now. Refer test_ssh_operator.py for usage info.
- SFTPOperator is added to perform secure file transfer from serverA to serverB. Refer test_sftp_operator.py.py for usage info.
- No updates are required if you are using ftpHook, it will continue work as is.
- SFTPOperator is added to perform secure file transfer from serverA to serverB. Refer test_sftp_operator.py.py for usage info.
- No updates are required if you are using ftpHook, it will continue work as is.

### Logging update
Logs now are stored in the log folder as ``{dag_id}/{task_id}/{execution_date}/{try_number}.log``.

### New Features

Expand Down Expand Up @@ -61,8 +64,8 @@ interfere.
Please read through these options, defaults have changed since 1.7.1.

#### child_process_log_directory
In order the increase the robustness of the scheduler, DAGS our now processed in their own process. Therefore each
DAG has its own log file for the scheduler. These are placed in `child_process_log_directory` which defaults to
In order the increase the robustness of the scheduler, DAGS our now processed in their own process. Therefore each
DAG has its own log file for the scheduler. These are placed in `child_process_log_directory` which defaults to
`<AIRFLOW_HOME>/scheduler/latest`. You will need to make sure these log files are removed.

> DAG logs or processor logs ignore and command line settings for log file locations.
Expand All @@ -72,7 +75,7 @@ Previously the command line option `num_runs` was used to let the scheduler term
loops. This is now time bound and defaults to `-1`, which means run continuously. See also num_runs.

#### num_runs
Previously `num_runs` was used to let the scheduler terminate after a certain amount of loops. Now num_runs specifies
Previously `num_runs` was used to let the scheduler terminate after a certain amount of loops. Now num_runs specifies
the number of times to try to schedule each DAG file within `run_duration` time. Defaults to `-1`, which means try
indefinitely. This is only available on the command line.

Expand All @@ -85,7 +88,7 @@ dags are not being picked up, have a look at this number and decrease it when ne

#### catchup_by_default
By default the scheduler will fill any missing interval DAG Runs between the last execution date and the current date.
This setting changes that behavior to only execute the latest interval. This can also be specified per DAG as
This setting changes that behavior to only execute the latest interval. This can also be specified per DAG as
`catchup = False / True`. Command line backfills will still work.

### Faulty Dags do not show an error in the Web UI
Expand All @@ -109,33 +112,33 @@ convenience variables to the config. In case your run a sceure Hadoop setup it m
required to whitelist these variables by adding the following to your configuration:

```
<property>
<property>
<name>hive.security.authorization.sqlstd.confwhitelist.append</name>
<value>airflow\.ctx\..*</value>
</property>
```
### Google Cloud Operator and Hook alignment

All Google Cloud Operators and Hooks are aligned and use the same client library. Now you have a single connection
All Google Cloud Operators and Hooks are aligned and use the same client library. Now you have a single connection
type for all kinds of Google Cloud Operators.

If you experience problems connecting with your operator make sure you set the connection type "Google Cloud Platform".

Also the old P12 key file type is not supported anymore and only the new JSON key files are supported as a service
Also the old P12 key file type is not supported anymore and only the new JSON key files are supported as a service
account.

### Deprecated Features
These features are marked for deprecation. They may still work (and raise a `DeprecationWarning`), but are no longer
These features are marked for deprecation. They may still work (and raise a `DeprecationWarning`), but are no longer
supported and will be removed entirely in Airflow 2.0

- Hooks and operators must be imported from their respective submodules

`airflow.operators.PigOperator` is no longer supported; `from airflow.operators.pig_operator import PigOperator` is.
`airflow.operators.PigOperator` is no longer supported; `from airflow.operators.pig_operator import PigOperator` is.
(AIRFLOW-31, AIRFLOW-200)

- Operators no longer accept arbitrary arguments

Previously, `Operator.__init__()` accepted any arguments (either positional `*args` or keyword `**kwargs`) without
Previously, `Operator.__init__()` accepted any arguments (either positional `*args` or keyword `**kwargs`) without
complaint. Now, invalid arguments will be rejected. (https://github.com/apache/incubator-airflow/pull/1285)

### Known Issues
Expand Down
106 changes: 17 additions & 89 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import socket
import subprocess
import textwrap
import warnings
from importlib import import_module

import argparse
Expand Down Expand Up @@ -54,8 +53,6 @@

from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS)
from airflow.utils import db as db_utils
from airflow.utils import logging as logging_utils
from airflow.utils.file import mkdirs
from airflow.www.app import cached_app

from sqlalchemy import func
Expand Down Expand Up @@ -357,61 +354,22 @@ def run(args, dag=None):
ti = TaskInstance(task, args.execution_date)
ti.refresh_from_db()

logging.root.handlers = []
logger = logging.getLogger('airflow.task')
if args.raw:
# Output to STDOUT for the parent process to read and log
logging.basicConfig(
stream=sys.stdout,
level=settings.LOGGING_LEVEL,
format=settings.LOG_FORMAT)
else:
# Setting up logging to a file.

# To handle log writing when tasks are impersonated, the log files need to
# be writable by the user that runs the Airflow command and the user
# that is impersonated. This is mainly to handle corner cases with the
# SubDagOperator. When the SubDagOperator is run, all of the operators
# run under the impersonated user and create appropriate log files
# as the impersonated user. However, if the user manually runs tasks
# of the SubDagOperator through the UI, then the log files are created
# by the user that runs the Airflow command. For example, the Airflow
# run command may be run by the `airflow_sudoable` user, but the Airflow
# tasks may be run by the `airflow` user. If the log files are not
# writable by both users, then it's possible that re-running a task
# via the UI (or vice versa) results in a permission error as the task
# tries to write to a log file created by the other user.
try_number = ti.try_number
log_base = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER'))
log_relative_dir = logging_utils.get_log_directory(args.dag_id, args.task_id,
args.execution_date)
directory = os.path.join(log_base, log_relative_dir)
# Create the log file and give it group writable permissions
# TODO(aoen): Make log dirs and logs globally readable for now since the SubDag
# operator is not compatible with impersonation (e.g. if a Celery executor is used
# for a SubDag operator and the SubDag operator has a different owner than the
# parent DAG)
if not os.path.isdir(directory):
# Create the directory as globally writable using custom mkdirs
# as os.makedirs doesn't set mode properly.
mkdirs(directory, 0o775)
log_relative = logging_utils.get_log_filename(
args.dag_id, args.task_id, args.execution_date, try_number)
filename = os.path.join(log_base, log_relative)

if not os.path.exists(filename):
open(filename, "a").close()
os.chmod(filename, 0o666)

logging.basicConfig(
filename=filename,
level=settings.LOGGING_LEVEL,
format=settings.LOG_FORMAT)
logger = logging.getLogger('airflow.task.raw')

for handler in logger.handlers:
try:
handler.set_context(ti)
except AttributeError:
# Not all handlers need to have context passed in so we ignore
# the error when handlers do not have set_context defined.
pass

hostname = socket.getfqdn()
logging.info("Running on host {}".format(hostname))

if args.local:
print("Logging into: " + filename)
run_job = jobs.LocalTaskJob(
task_instance=ti,
mark_success=args.mark_success,
Expand Down Expand Up @@ -469,43 +427,13 @@ def run(args, dag=None):
if args.raw:
return

# Force the log to flush, and set the handler to go back to normal so we
# don't continue logging to the task's log file. The flush is important
# because we subsequently read from the log to insert into S3 or Google
# cloud storage.
logging.root.handlers[0].flush()
logging.root.handlers = []

# store logs remotely
remote_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')

# deprecated as of March 2016
if not remote_base and conf.get('core', 'S3_LOG_FOLDER'):
warnings.warn(
'The S3_LOG_FOLDER conf key has been replaced by '
'REMOTE_BASE_LOG_FOLDER. Your conf still works but please '
'update airflow.cfg to ensure future compatibility.',
DeprecationWarning)
remote_base = conf.get('core', 'S3_LOG_FOLDER')

if os.path.exists(filename):
# read log and remove old logs to get just the latest additions

with open(filename, 'r') as logfile:
log = logfile.read()

remote_log_location = os.path.join(remote_base, log_relative)
logging.debug("Uploading to remote log location {}".format(remote_log_location))
# S3
if remote_base.startswith('s3:/'):
logging_utils.S3Log().write(log, remote_log_location)
# GCS
elif remote_base.startswith('gs:/'):
logging_utils.GCSLog().write(log, remote_log_location)
# Other
elif remote_base and remote_base != 'None':
logging.error(
'Unsupported remote log location: {}'.format(remote_base))
# Force the log to flush. The flush is important because we
# might subsequently read from the log to insert into S3 or
# Google cloud storage. Explicitly close the handler is
# needed in order to upload to remote storage services.
for handler in logger.handlers:
handler.flush()
handler.close()


def task_failed_deps(args):
Expand Down
13 changes: 13 additions & 0 deletions airflow/config_templates/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
7 changes: 7 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ security =
# values at runtime)
unit_test_mode = False

# User defined logging configuration file path.
logging_config_path =

# Name of handler to read task instance logs.
# Default to use file task handler.
task_log_reader = file.task

[cli]
# In what way should the cli access the API. The LocalClient will use the
# database directly, while the json_client will use the api running on the
Expand Down
94 changes: 94 additions & 0 deletions airflow/config_templates/default_airflow_logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os

from airflow import configuration as conf

# TODO: Logging format and level should be configured
# in this file instead of from airflow.cfg. Currently
# there are other log format and level configurations in
# settings.py and cli.py. Please see AIRFLOW-1455.

LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('core', 'log_format')

BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')

# TODO: REMOTE_BASE_LOG_FOLDER should be deprecated and
# directly specify in the handler definitions. This is to
# provide compatibility to older remote log folder settings.
REMOTE_BASE_LOG_FOLDER = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
S3_LOG_FOLDER = ''
GCS_LOG_FOLDER = ''
if REMOTE_BASE_LOG_FOLDER.startswith('s3:/'):
S3_LOG_FOLDER = REMOTE_BASE_LOG_FOLDER
elif REMOTE_BASE_LOG_FOLDER.startswith('gs:/'):
GCS_LOG_FOLDER = REMOTE_BASE_LOG_FOLDER

FILENAME_TEMPLATE = '{dag_id}/{task_id}/{execution_date}/{try_number}.log'

DEFAULT_LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'airflow.task': {
'format': LOG_FORMAT,
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'airflow.task',
'stream': 'ext://sys.stdout'
},
'file.task': {
'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'filename_template': FILENAME_TEMPLATE,
},
's3.task': {
'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
's3_log_folder': S3_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
'gcs.task': {
'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'gcs_log_folder': GCS_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
},
'loggers': {
'airflow.task': {
'handlers': ['file.task'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.task_runner': {
'handlers': ['file.task'],
'level': LOG_LEVEL,
'propagate': True,
},
'airflow.task.raw': {
'handlers': ['console'],
'level': LOG_LEVEL,
'propagate': False,
},
}
}
1 change: 0 additions & 1 deletion airflow/dag/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,3 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
Loading

0 comments on commit 9660293

Please sign in to comment.