Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
bolkedebruin committed Jul 14, 2016
2 parents 3079da0 + c32452a commit aea1fa2
Show file tree
Hide file tree
Showing 40 changed files with 288 additions and 9 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ Currently **officially** using Airflow:
* [allegro.pl](http://allegro.tech/) [[@kretes](https://github.com/kretes)]
* [Bellhops](https://github.com/bellhops)
* BlueApron [[@jasonjho](https://github.com/jasonjho) & [@matthewdavidhauser](https://github.com/matthewdavidhauser)]
* [Blue Yonder](http://www.blue-yonder.com) [[@blue-yonder](https://github.com/blue-yonder)]
* [Clairvoyant] (https://clairvoyantsoft.com) [@shekharv](https://github.com/shekharv)
* [Clover Health] (https://www.cloverhealth.com) [[@gwax](https://github.com/gwax) & [@vansivallab](https://github.com/vansivallab)]
* Chartboost [[@cgelman](https://github.com/cgelman) & [@dclubb](https://github.com/dclubb)]
Expand Down
1 change: 1 addition & 0 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1026,5 +1026,6 @@ def get_parser(cls, dag_parser=False):
sp.set_defaults(func=sub['func'])
return parser


def get_parser():
return CLIFactory.get_parser()
1 change: 1 addition & 0 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
warnings.filterwarnings(
action='default', category=PendingDeprecationWarning, module='airflow')


class AirflowConfigException(Exception):
pass

Expand Down
1 change: 1 addition & 0 deletions airflow/contrib/auth/backends/github_enterprise_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

_log = logging.getLogger(__name__)


def get_config_param(param):
return str(configuration.get('github_enterprise', param))

Expand Down
1 change: 1 addition & 0 deletions airflow/contrib/auth/backends/ldap_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
class AuthenticationError(Exception):
pass


class LdapException(Exception):
pass

Expand Down
1 change: 1 addition & 0 deletions airflow/contrib/auth/backends/password_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
LOG = logging.getLogger(__name__)
PY3 = version_info[0] == 3


class AuthenticationError(Exception):
pass

Expand Down
1 change: 1 addition & 0 deletions airflow/contrib/hooks/datastore_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from apiclient.discovery import build
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook


class DatastoreHook(GoogleCloudBaseHook):
"""
Interact with Google Cloud Datastore. This hook uses the Google Cloud Platform
Expand Down
1 change: 1 addition & 0 deletions airflow/contrib/hooks/qubole_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
'dbimportcmd': ['mode', 'hive_table', 'dbtap_id', 'db_table', 'where_clause', 'parallelism', 'extract_query', 'boundary_query', 'split_column', 'tags', 'name']
}


class QuboleHook(BaseHook):
def __init__(self, *args, **kwargs):
conn = self.get_connection(kwargs['qubole_conn_id'])
Expand Down
149 changes: 149 additions & 0 deletions airflow/contrib/hooks/spark_sql_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import subprocess

from airflow.hooks.base_hook import BaseHook
from airflow.exceptions import AirflowException

log = logging.getLogger(__name__)


class SparkSqlHook(BaseHook):
"""
This hook is a wrapper around the spark-sql binary. It requires that the
"spark-sql" binary is in the PATH.
:param sql: The SQL query to execute
:type sql: str
:param conf: arbitrary Spark configuration property
:type conf: str (format: PROP=VALUE)
:param conn_id: connection_id string
:type conn_id: str
:param executor_cores: Number of cores per executor
:type executor_cores: int
:param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G)
:type executor_memory: str
:param keytab: Full path to the file that contains the keytab
:type keytab: str
:param master: spark://host:port, mesos://host:port, yarn, or local
:type master: str
:param name: Name of the job.
:type name: str
:param num_executors: Number of executors to launch
:type num_executors: int
:param verbose: Whether to pass the verbose flag to spark-sql
:type verbose: bool
:param yarn_queue: The YARN queue to submit to (Default: "default")
:type yarn_queue: str
"""
def __init__(self,
sql,
conf=None,
conn_id='spark_sql_default',
executor_cores=None,
executor_memory=None,
keytab=None,
master='yarn',
name='default-name',
num_executors=None,
verbose=True,
yarn_queue='default'
):
self._sql = sql
self._conf = conf
self._conn = self.get_connection(conn_id)
self._executor_cores = executor_cores
self._executor_memory = executor_memory
self._keytab = keytab
self._master = master
self._name = name
self._num_executors = num_executors
self._verbose = verbose
self._yarn_queue = yarn_queue
self._sp = None

def get_conn(self):
pass

def _prepare_command(self, cmd):
"""
Construct the spark-sql command to execute. Verbose output is enabled
as default.
:param cmd: command to append to the spark-sql command
:type cmd: str
:return: full command to be executed
"""
connection_cmd = ["spark-sql"]
if self._conf:
for conf_el in self._conf.split(","):
connection_cmd += ["--conf", conf_el]
if self._executor_cores:
connection_cmd += ["--executor-cores", self._executor_cores]
if self._executor_memory:
connection_cmd += ["--executor-memory", self._executor_memory]
if self._keytab:
connection_cmd += ["--keytab", self._keytab]
if self._num_executors:
connection_cmd += ["--num_executors", self._num_executors]
if self._sql:
if self._sql.endswith('.sql'):
connection_cmd += ["-f", self._sql]
else:
connection_cmd += ["-e", self._sql]
if self._master:
connection_cmd += ["--master", self._master]
if self._name:
connection_cmd += ["--name", self._name]
if self._verbose:
connection_cmd += ["--verbose"]
if self._yarn_queue:
connection_cmd += ["--queue", self._yarn_queue]

connection_cmd += cmd
logging.debug("Spark-Sql cmd: {}".format(connection_cmd))

return connection_cmd

def run_query(self, cmd="", **kwargs):
"""
Remote Popen (actually execute the Spark-sql query)
:param cmd: command to remotely execute
:param kwargs: extra arguments to Popen (see subprocess.Popen)
"""
prefixed_cmd = self._prepare_command(cmd)
self._sp = subprocess.Popen(prefixed_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**kwargs)
# using two iterators here to support 'real-time' logging
for line in iter(self._sp.stdout.readline, b''):
line = line.decode('utf-8').strip()
logging.info(line)
for line in iter(self._sp.stderr.readline, b''):
line = line.decode('utf-8').strip()
logging.info(line)
output, stderr = self._sp.communicate()

if self._sp.returncode:
raise AirflowException("Cannot execute {} on {}. Error code is: "
"{}. Output: {}, Stderr: {}"
.format(cmd, self._conn.host,
self._sp.returncode, output, stderr))

def kill(self):
if self._sp and self._sp.poll() is None:
logging.info("Killing the Spark-Sql job")
self._sp.kill()
1 change: 1 addition & 0 deletions airflow/contrib/hooks/vertica_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from airflow.hooks.dbapi_hook import DbApiHook


class VerticaHook(DbApiHook):
'''
Interact with Vertica.
Expand Down
1 change: 1 addition & 0 deletions airflow/contrib/operators/bigquery_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class BigQueryOperator(BaseOperator):
"""
Executes BigQuery SQL queries in a specific BigQuery database
Expand Down
1 change: 1 addition & 0 deletions airflow/contrib/operators/bigquery_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class BigQueryToBigQueryOperator(BaseOperator):
"""
Copy a BigQuery table to another BigQuery table.
Expand Down
1 change: 1 addition & 0 deletions airflow/contrib/operators/bigquery_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class BigQueryToCloudStorageOperator(BaseOperator):
"""
Transfers a BigQuery table to a Google Cloud Storage bucket.
Expand Down
1 change: 1 addition & 0 deletions airflow/contrib/operators/fs_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from airflow.contrib.hooks.fs_hook import FSHook
from airflow.utils.decorators import apply_defaults


class FileSensor(BaseSensorOperator):
"""
Waits for a file or folder to land in a filesystem
Expand Down
1 change: 1 addition & 0 deletions airflow/contrib/operators/gcs_download_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class GoogleCloudStorageDownloadOperator(BaseOperator):
"""
Downloads a file from Google Cloud Storage.
Expand Down
1 change: 1 addition & 0 deletions airflow/contrib/operators/gcs_to_bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class GoogleCloudStorageToBigQueryOperator(BaseOperator):
"""
Loads files from Google cloud storage into BigQuery.
Expand Down
1 change: 1 addition & 0 deletions airflow/contrib/operators/mysql_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from MySQLdb.constants import FIELD_TYPE
from tempfile import NamedTemporaryFile


class MySqlToGoogleCloudStorageOperator(BaseOperator):
"""
Copy data from MySQL to Google cloud storage in JSON format.
Expand Down
91 changes: 91 additions & 0 deletions airflow/contrib/operators/spark_sql_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.contrib.hooks.spark_sql_hook import SparkSqlHook


class SparkSqlOperator(BaseOperator):
"""
Execute Spark SQL query
:param sql: The SQL query to execute
:type sql: str
:param conf: arbitrary Spark configuration property
:type conf: str (format: PROP=VALUE)
:param conn_id: connection_id string
:type conn_id: str
:param executor_cores: Number of cores per executor
:type executor_cores: int
:param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G)
:type executor_memory: str
:param keytab: Full path to the file that contains the keytab
:type keytab: str
:param master: spark://host:port, mesos://host:port, yarn, or local
:type master: str
:param name: Name of the job
:type name: str
:param num_executors: Number of executors to launch
:type num_executors: int
:param verbose: Whether to pass the verbose flag to spark-sql
:type verbose: bool
:param yarn_queue: The YARN queue to submit to (Default: "default")
:type yarn_queue: str
"""
@apply_defaults
def __init__(self,
sql,
conf=None,
conn_id='spark_sql_default',
executor_cores=None,
executor_memory=None,
keytab=None,
master='yarn',
name='default-name',
num_executors=None,
yarn_queue='default',
*args,
**kwargs):
super(SparkSqlOperator, self).__init__(*args, **kwargs)
self._sql = sql
self._conf = conf
self._conn_id = conn_id
self._executor_cores = executor_cores
self._executor_memory = executor_memory
self._keytab = keytab
self._master = master
self._name = name
self._num_executors = num_executors
self._yarn_queue = yarn_queue
self._hook = None

def execute(self, context):
"""
Call the SparkSqlHook to run the provided sql query
"""
self._hook = SparkSqlHook(sql=self._sql,
conf=self._conf,
conn_id=self._conn_id,
executor_cores=self._executor_cores,
executor_memory=self._executor_memory,
keytab=self._keytab,
name=self._name,
num_executors=self._num_executors,
master=self._master,
yarn_queue=self._yarn_queue
)
self._hook.run_query()

def on_kill(self):
self._hook.kill()
1 change: 1 addition & 0 deletions airflow/contrib/operators/vertica_to_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class VerticaToHiveTransfer(BaseOperator):
"""
Moves data from Vertia to Hive. The operator runs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
# alternating runs
dag = DAG(dag_id='example_branch_dop_operator_v3',schedule_interval='*/1 * * * *', default_args=args)


def should_run(ds, **kwargs):

print("------------- exec dttm = {} and minute = {}".format(kwargs['execution_date'], kwargs['execution_date'].minute))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
dagrun_timeout=timedelta(minutes=4)
)


def my_py_command(ds, **kwargs):
# Print out the "foo" param passed in via
# `airflow test example_passing_params_via_test_command run_this <date>
Expand Down
Loading

0 comments on commit aea1fa2

Please sign in to comment.