Skip to content

Commit

Permalink
merging with master
Browse files Browse the repository at this point in the history
  • Loading branch information
msumit committed Jan 15, 2016
1 parent 3790db0 commit d0c32c9
Show file tree
Hide file tree
Showing 46 changed files with 1,347 additions and 622 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Airflow

[![Join the chat at https://gitter.im/airbnb/airflow](https://badges.gitter.im/airbnb/airflow.svg)](https://gitter.im/airbnb/airflow?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)

[![Build Status](https://travis-ci.org/airbnb/airflow.svg)](https://travis-ci.org/airbnb/airflow)
[![Coverage Status](https://coveralls.io/repos/airbnb/airflow/badge.svg?service=github)](https://coveralls.io/github/airbnb/airflow)
[![pypi downloads](https://img.shields.io/pypi/dm/airflow.svg)](https://pypi.python.org/pypi/airflow/)
Expand Down Expand Up @@ -51,16 +53,20 @@ Currently **officially** using Airflow:

* Airbnb [@mistercrunch]
* Agari [@r39132](https://github.com/r39132)
* [Bellhops](https://github.com/bellhops)
* BlueApron [[@jasonjho](https://github.com/jasonjho) & [@matthewdavidhauser](https://github.com/matthewdavidhauser)]
* Chartboost [[@cgelman](https://github.com/cgelman) & [@dclubb](https://github.com/dclubb)]
* [Cotap](https://github.com/cotap/) [[@maraca](https://github.com/maraca) & [@richardchew](https://github.com/richardchew)]
* Easy Taxi [[@caique-lima](https://github.com/caique-lima)]
* [FreshBooks](https://github.com/freshbooks) [[@DinoCow](https://github.com/DinoCow)]
* [Holimetrix](http://holimetrix.com/) [[@thibault-ketterer](https://github.com/thibault-ketterer)]
* [Handy](http://www.handy.com/careers/73115?gh_jid=73115&gh_src=o5qcxn) [[@marcintustin](https://github.com/marcintustin) / [@mtustin-handy](https://github.com/mtustin-handy)]
* ING
* [Jampp](https://github.com/jampp)
* [LingoChamp](http://www.liulishuo.com/) [[@haitaoyao](https://github.com/haitaoyao)]
* Lyft
* [Sense360](https://github.com/Sense360) [[@kamilmroczek](https://github.com/KamilMroczek)]
* [Sidecar](https://hello.getsidecar.com/) [[@getsidecar](https://github.com/getsidecar)]
* Stripe [@jbalogh]
* [WeTransfer](https://github.com/WeTransfer) [[@jochem](https://github.com/jochem)]
* Wooga
Expand All @@ -77,3 +83,4 @@ Currently **officially** using Airflow:
* [Airflow: Tips, Tricks, and Pitfalls @ Handy](https://medium.com/handy-tech/airflow-tips-tricks-and-pitfalls-9ba53fba14eb#.o2snqeoz7)
* [Airflow at Agari Blog Post](http://agari.com/blog/airflow-agari)
* Airflow Chef recipe (community contributed) [github] (https://github.com/bahchis/airflow-cookbook) [chef] (https://supermarket.chef.io/cookbooks/airflow)
* [Gitter (live chat) Channel](https://gitter.im/airbnb/airflow)
17 changes: 14 additions & 3 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,19 +1,30 @@
#### Roadmap items
* UI page answering "Why isn't this task instance running?"
* Attempt removing DagBag caching for the web server
* Distributed scheduler (supervisors)
* Get the supervisors to run sensors (as opposed to each sensor taking a slot)
* Improve DagBag differential refresh
* Pickle all the THINGS! supervisors maintains fresh, versionned pickles in the database as they monitor for change
* Pre-prod running off of master
* Containment / YarnExecutor / Docker?
* Get s3 logs
* Test and migrate to use beeline instead of the Hive CLI
* Run Hive / Hadoop / HDFS tests in Travis-CI

#### UI
* Backfill form
* Better task filtering int duration and landing time charts (operator toggle, task regex, uncheck all button)
* Add templating to adhoc queries

#### Backend
* Add a run_only_latest flag to BaseOperator, runs only most recent task instance where deps are met
* Pickle all the THINGS!
* Distributed scheduler
* Raise errors when setting dependencies on task in foreign DAGs
* Add an is_test flag to the run context
* Add operator to task_instance table

#### Wishlist
* Pause flag at the task level
* Increase unit test coverage
* Stats logging interface with support for stats and sqlalchemy to collect detailed information from the scheduler and dag processing times

#### Other
* deprecate TimeSensor
2 changes: 1 addition & 1 deletion airflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
`airflow.www.login`
"""
from builtins import object
__version__ = "1.6.1"
__version__ = "1.6.2"

import logging
import os
Expand Down
9 changes: 6 additions & 3 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,15 @@ def trigger_dag(args):
session = settings.Session()
# TODO: verify dag_id
execution_date = datetime.now()
run_id = args.run_id or "manual__{0}".format(execution_date.isoformat())
dr = session.query(DagRun).filter(
DagRun.dag_id==args.dag_id, DagRun.run_id==args.run_id).first()
DagRun.dag_id==args.dag_id, DagRun.run_id==run_id).first()
if dr:
logging.error("This run_id already exists")
else:
trigger = DagRun(
dag_id=args.dag_id,
run_id=args.run_id,
run_id=run_id,
execution_date=execution_date,
state=State.RUNNING,
external_trigger=True)
Expand Down Expand Up @@ -427,7 +428,8 @@ def kerberos(args):

def get_parser():
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(help='sub-command help')
subparsers = parser.add_subparsers(help='sub-command help', dest='subcommand')
subparsers.required = True

ht = "Run subsections of a DAG for a specified date range"
parser_backfill = subparsers.add_parser('backfill', help=ht)
Expand Down Expand Up @@ -495,6 +497,7 @@ def get_parser():
parser_clear.add_argument(
"-sd", "--subdir", help=subdir_help,
default=DAGS_FOLDER)
ht = "Do not request confirmation"
parser_clear.add_argument(
"-c", "--no_confirm", help=ht, action="store_true")
parser_clear.set_defaults(func=clear)
Expand Down
17 changes: 17 additions & 0 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class AirflowConfigException(Exception):
's3_log_folder': '',
'dag_concurrency': 16,
'max_active_runs_per_dag': 16,
'executor': 'SequentialExecutor',
},
'webserver': {
'base_url': 'http://localhost:8080',
Expand Down Expand Up @@ -93,6 +94,9 @@ class AirflowConfigException(Exception):
'reinit_frequency': '3600',
'kinit_path': 'kinit',
'keytab': 'airflow.keytab',
},
'github_enterprise': {
'api_rev': 'v3'
}
}

Expand Down Expand Up @@ -322,6 +326,15 @@ class ConfigParserWithDefaults(ConfigParser):
def __init__(self, defaults, *args, **kwargs):
self.defaults = defaults
ConfigParser.__init__(self, *args, **kwargs)
self.is_validated = False

def _validate(self):
if self.get("core", "executor") != 'SequentialExecutor' \
and "sqlite" in self.get('core', 'sql_alchemy_conn'):
raise AirflowConfigException("error: cannot use sqlite with the {}".
format(self.get('core', 'executor')))

self.is_validated = True

def get(self, section, key, **kwargs):
section = str(section).lower()
Expand Down Expand Up @@ -367,6 +380,10 @@ def getint(self, section, key):
def getfloat(self, section, key):
return float(self.get(section, key))

def read(self, filenames):
ConfigParser.read(self, filenames)
self._validate()


def mkdir_p(path):
try:
Expand Down
221 changes: 221 additions & 0 deletions airflow/contrib/auth/backends/github_enterprise_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
# Copyright 2015 Matthew Pelland ([email protected])
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging

import flask_login
from flask_login import (
login_user, current_user,
logout_user, login_required
)

from flask import url_for, redirect, request

from flask_oauthlib.client import OAuth

from airflow import models, configuration, settings
from airflow.configuration import AirflowConfigException

_log = logging.getLogger(__name__)

def get_config_param(param):
return str(configuration.get('github_enterprise', param))


class GHEUser(models.User):

def __init__(self, user):
self.user = user

def is_active(self):
'''Required by flask_login'''
return True

def is_authenticated(self):
'''Required by flask_login'''
return True

def is_anonymous(self):
'''Required by flask_login'''
return False

def get_id(self):
'''Returns the current user id as required by flask_login'''
return self.user.get_id()

def data_profiling(self):
'''Provides access to data profiling tools'''
return True

def is_superuser(self):
'''Access all the things'''
return True


class AuthenticationError(Exception):
pass


class GHEAuthBackend(object):

def __init__(self):
self.ghe_host = get_config_param('host')
self.login_manager = flask_login.LoginManager()
self.login_manager.login_view = 'airflow.login'
self.flask_app = None
self.ghe_oauth = None
self.api_rev = None

def ghe_api_route(self, leaf):
if not self.api_rev:
self.api_rev = get_config_param('api_rev')

return '/'.join(['https:/',
self.ghe_host,
'api',
self.api_rev,
leaf.strip('/')])

def init_app(self, flask_app):
self.flask_app = flask_app

self.login_manager.init_app(self.flask_app)

self.ghe_oauth = OAuth(self.flask_app).remote_app(
'ghe',
consumer_key=get_config_param('client_id'),
consumer_secret=get_config_param('client_secret'),
# need read:org to get team member list
request_token_params={'scope': 'user,read:org'},
base_url=self.ghe_host,
request_token_url=None,
access_token_method='POST',
access_token_url=''.join(['https://',
self.ghe_host,
'/login/oauth/access_token']),
authorize_url=''.join(['https://',
self.ghe_host,
'/login/oauth/authorize']))

self.login_manager.user_loader(self.load_user)

self.flask_app.add_url_rule(get_config_param('oauth_callback_route'),
'ghe_oauth_callback',
self.oauth_callback)

def login(self, request):
_log.debug('Redirecting user to GHE login')
return self.ghe_oauth.authorize(callback=url_for(
'ghe_oauth_callback',
_external=True,
next=request.args.get('next') or request.referrer or None))

def get_ghe_user_profile_info(self, ghe_token):
resp = self.ghe_oauth.get(self.ghe_api_route('/user'),
token=(ghe_token, ''))

if not resp or resp.status != 200:
raise AuthenticationError(
'Failed to fetch user profile, status ({0})'.format(
resp.status if resp else 'None'))

return resp.data['login'], resp.data['email']

def ghe_team_check(self, username, ghe_token):
try:
teams = [team.strip()
for team in
get_config_param('allowed_teams').split(',')]
except AirflowConfigException:
# No allowed teams defined, let anyone in GHE in.
return True

resp = self.ghe_oauth.get(self.ghe_api_route('/user/teams'),
token=(ghe_token, ''))

if not resp or resp.status != 200:
raise AuthenticationError(
'Bad response from GHE ({0})'.format(
resp.status if resp else 'None'))

for team in resp.data:
# team json object has a slug cased team name field aptly named
# 'slug'
if team['slug'] in teams:
return True

_log.debug('Denying access for user "%s", not a member of "%s"',
username,
str(teams))

return False

def load_user(self, userid):
if not userid or userid == 'None':
return None

session = settings.Session()
user = session.query(models.User).filter(
models.User.id == int(userid)).first()
session.expunge_all()
session.commit()
session.close()
return GHEUser(user)

def oauth_callback(self):
_log.debug('GHE OAuth callback called')

next_url = request.args.get('next') or url_for('admin.index')

resp = self.ghe_oauth.authorized_response()

try:
if resp is None:
raise AuthenticationError(
'Null response from GHE, denying access.'
)

ghe_token = resp['access_token']

username, email = self.get_ghe_user_profile_info(ghe_token)

if not self.ghe_team_check(username, ghe_token):
return redirect(url_for('airflow.noaccess'))

except AuthenticationError:
_log.exception('')
return redirect(url_for('airflow.noaccess'))

session = settings.Session()

user = session.query(models.User).filter(
models.User.username == username).first()

if not user:
user = models.User(
username=username,
email=email,
is_superuser=False)

session.merge(user)
session.commit()
login_user(GHEUser(user))
session.commit()
session.close()

return redirect(next_url)

login_manager = GHEAuthBackend()

def login(self, request):
return login_manager.login(request)
1 change: 1 addition & 0 deletions airflow/contrib/hooks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
'vertica_hook': ['VerticaHook'],
'ssh_hook': ['SSHHook'],
'qubole_hook': ['QuboleHook']
'bigquery_hook': ['BigQueryHook'],
}

_import_module_attrs(globals(), _hooks)
Loading

0 comments on commit d0c32c9

Please sign in to comment.