diff --git a/airflow/__init__.py b/airflow/__init__.py index 9185c9e1d5129..03840cab880e6 100644 --- a/airflow/__init__.py +++ b/airflow/__init__.py @@ -79,6 +79,6 @@ def __init__(self, namespace): from airflow import macros from airflow import contrib -operators.integrate_plugins() -hooks.integrate_plugins() -macros.integrate_plugins() +operators._integrate_plugins() +hooks._integrate_plugins() +macros._integrate_plugins() diff --git a/airflow/contrib/example_dags/example_twitter_dag.py b/airflow/contrib/example_dags/example_twitter_dag.py index d7fffd820d7f4..af1978e926d48 100644 --- a/airflow/contrib/example_dags/example_twitter_dag.py +++ b/airflow/contrib/example_dags/example_twitter_dag.py @@ -23,7 +23,8 @@ # -------------------------------------------------------------------------------- from airflow import DAG -from airflow.operators import BashOperator, HiveOperator, PythonOperator +from airflow.operators import BashOperator, PythonOperator +from airflow.operators.hive_operator import HiveOperator from datetime import datetime, date, timedelta # -------------------------------------------------------------------------------- @@ -180,4 +181,3 @@ def transfertodb(): load_to_hive.set_upstream(load_to_hdfs) load_to_hive.set_downstream(hive_to_mysql) - diff --git a/airflow/contrib/hooks/__init__.py b/airflow/contrib/hooks/__init__.py index 29bb44a6e6e12..83b505dd49f0b 100644 --- a/airflow/contrib/hooks/__init__.py +++ b/airflow/contrib/hooks/__init__.py @@ -11,7 +11,28 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + + + +# Contrib hooks are not imported by default. They should be accessed +# directly: from airflow.contrib.hooks.hook_module import Hook + + + + + + + +# ------------------------------------------------------------------------ +# +# #TODO #FIXME Airflow 2.0 +# +# Old import machinary below. +# +# This is deprecated but should be kept until Airflow 2.0 +# for compatibility. # +# ------------------------------------------------------------------------ # Imports the hooks dynamically while keeping the package API clean, # abstracting the underlying modules @@ -31,4 +52,14 @@ 'fs_hook': ['FSHook'] } -_import_module_attrs(globals(), _hooks) +import os as _os +if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False): + from zope.deprecation import deprecated as _deprecated + _imported = _import_module_attrs(globals(), _hooks) + for _i in _imported: + _deprecated( + _i, + "Importing {i} directly from 'contrib.hooks' has been " + "deprecated. Please import from " + "'contrib.hooks.[hook_module]' instead. Support for direct imports " + "will be dropped entirely in Airflow 2.0.".format(i=_i)) diff --git a/airflow/contrib/operators/__init__.py b/airflow/contrib/operators/__init__.py index 9f758bfdc82de..5ac2d457b921c 100644 --- a/airflow/contrib/operators/__init__.py +++ b/airflow/contrib/operators/__init__.py @@ -1,3 +1,33 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# Contrib operators are not imported by default. They should be accessed +# directly: from airflow.contrib.operators.operator_module import Operator + + +# ------------------------------------------------------------------------ +# +# #TODO #FIXME Airflow 2.0 +# +# Old import machinary below. +# +# This is deprecated but should be kept until Airflow 2.0 +# for compatibility. +# +# ------------------------------------------------------------------------ + # Imports the operators dynamically while keeping the package API clean, # abstracting the underlying modules from airflow.utils.helpers import import_module_attrs as _import_module_attrs @@ -10,4 +40,14 @@ 'fs': ['FileSensor'] } -_import_module_attrs(globals(), _operators) +import os as _os +if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False): + from zope.deprecation import deprecated as _deprecated + _imported = _import_module_attrs(globals(), _operators) + for _i in _imported: + _deprecated( + _i, + "Importing {i} directly from 'contrib.operators' has been " + "deprecated. Please import from " + "'contrib.operators.[operator_module]' instead. Support for direct " + "imports will be dropped entirely in Airflow 2.0.".format(i=_i)) diff --git a/airflow/contrib/operators/fs_operator.py b/airflow/contrib/operators/fs_operator.py index c68eed293c982..98db70e49cc1b 100644 --- a/airflow/contrib/operators/fs_operator.py +++ b/airflow/contrib/operators/fs_operator.py @@ -17,7 +17,7 @@ import logging from airflow.operators.sensors import BaseSensorOperator -from airflow.contrib.hooks import FSHook +from airflow.contrib.hooks.fs_hook import FSHook from airflow.utils.decorators import apply_defaults class FileSensor(BaseSensorOperator): @@ -54,4 +54,3 @@ def poke(self, context): except: return False return True - diff --git a/airflow/contrib/operators/mysql_to_gcs.py b/airflow/contrib/operators/mysql_to_gcs.py index e740de263f783..89eaecd1e6aa7 100644 --- a/airflow/contrib/operators/mysql_to_gcs.py +++ b/airflow/contrib/operators/mysql_to_gcs.py @@ -3,7 +3,7 @@ import time from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook -from airflow.hooks import MySqlHook +from airflow.hooks.mysql_hook import MySqlHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from collections import OrderedDict diff --git a/airflow/contrib/operators/vertica_to_hive.py b/airflow/contrib/operators/vertica_to_hive.py index 35a489a9beba3..e261e238e6cfc 100644 --- a/airflow/contrib/operators/vertica_to_hive.py +++ b/airflow/contrib/operators/vertica_to_hive.py @@ -4,7 +4,7 @@ import logging from tempfile import NamedTemporaryFile -from airflow.hooks import HiveCliHook +from airflow.hooks.hive_hooks import HiveCliHook from airflow.contrib.hooks import VerticaHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults diff --git a/airflow/contrib/plugins/metastore_browser/main.py b/airflow/contrib/plugins/metastore_browser/main.py index 4a9d3835e56ba..eb0a60e719bd6 100644 --- a/airflow/contrib/plugins/metastore_browser/main.py +++ b/airflow/contrib/plugins/metastore_browser/main.py @@ -5,7 +5,9 @@ from flask_admin import BaseView, expose import pandas as pd -from airflow.hooks import HiveMetastoreHook, MySqlHook, PrestoHook, HiveCliHook +from airflow.hooks.hive_hooks import HiveMetastoreHook, HiveCliHook +from airflow.hooks.mysql_hook import MySqlHook +from airflow.hooks.presto_hook import PrestoHook from airflow.plugins_manager import AirflowPlugin from airflow.www import utils as wwwutils diff --git a/airflow/example_dags/example_http_operator.py b/airflow/example_dags/example_http_operator.py index bbfc17afe58c8..45018252e9e52 100644 --- a/airflow/example_dags/example_http_operator.py +++ b/airflow/example_dags/example_http_operator.py @@ -2,7 +2,8 @@ ### Example HTTP operator and sensor """ from airflow import DAG -from airflow.operators import SimpleHttpOperator, HttpSensor +from airflow.operators import SimpleHttpOperator +from airflow.operators.sensors import HttpSensor from datetime import datetime, timedelta import json diff --git a/airflow/hooks/__init__.py b/airflow/hooks/__init__.py index 58fac177e84d8..10fb5a761b2b6 100644 --- a/airflow/hooks/__init__.py +++ b/airflow/hooks/__init__.py @@ -1,3 +1,36 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# Only import Core Airflow Operators that don't have extra requirements. +# All other operators must be imported directly. +from .base_hook import BaseHook +from .dbapi_hook import DbApiHook +from .http_hook import HttpHook +from .sqlite_hook import SqliteHook + +# ------------------------------------------------------------------------ +# +# #TODO #FIXME Airflow 2.0 +# +# Old import machinary below. +# +# This is deprecated but should be kept until Airflow 2.0 +# for compatibility. +# +# ------------------------------------------------------------------------ + # Imports the hooks dynamically while keeping the package API clean, # abstracting the underlying modules @@ -17,9 +50,9 @@ 'postgres_hook': ['PostgresHook'], 'presto_hook': ['PrestoHook'], 'samba_hook': ['SambaHook'], - 'sqlite_hook': ['SqliteHook'], + # 'sqlite_hook': ['SqliteHook'], 'S3_hook': ['S3Hook'], - 'http_hook': ['HttpHook'], + # 'http_hook': ['HttpHook'], 'druid_hook': ['DruidHook'], 'jdbc_hook': ['JdbcHook'], 'dbapi_hook': ['DbApiHook'], @@ -27,11 +60,39 @@ 'oracle_hook': ['OracleHook'], } -_import_module_attrs(globals(), _hooks) +import os as _os +if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False): + from zope.deprecation import deprecated as _deprecated + _imported = _import_module_attrs(globals(), _hooks) + for _i in _imported: + _deprecated( + _i, + "Importing {i} directly from 'airflow.hooks' has been " + "deprecated. Please import from " + "'airflow.hooks.[hook_module]' instead. Support for direct imports " + "will be dropped entirely in Airflow 2.0.".format(i=_i)) -def integrate_plugins(): + +def _integrate_plugins(): """Integrate plugins to the context""" + import sys from airflow.plugins_manager import hooks as _hooks - for _h in _hooks: - globals()[_h.__name__] = _h + for _hook_module in _hooks: + sys.modules[_hook_module.__name__] = _hook_module + globals()[_hook_module._name] = _hook_module + + + ########################################################## + # TODO FIXME Remove in Airflow 2.0 + + if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False): + from zope.deprecation import deprecated as _deprecated + for _hook in _hook_module._objects: + globals()[_hook.__name__] = _deprecated( + _hook, + "Importing plugin hook '{i}' directly from " + "'airflow.hooks' has been deprecated. Please " + "import from 'airflow.hooks.[plugin_module]' " + "instead. Support for direct imports will be dropped " + "entirely in Airflow 2.0.".format(i=_hook)) diff --git a/airflow/hooks/base_hook.py b/airflow/hooks/base_hook.py index 2a6cb734d7bda..e640b6392fce9 100644 --- a/airflow/hooks/base_hook.py +++ b/airflow/hooks/base_hook.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from __future__ import absolute_import from __future__ import division from __future__ import print_function diff --git a/airflow/hooks/dbapi_hook.py b/airflow/hooks/dbapi_hook.py index e5de92e6e59f7..e48e1383ba14b 100644 --- a/airflow/hooks/dbapi_hook.py +++ b/airflow/hooks/dbapi_hook.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from builtins import str from past.builtins import basestring diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py index bb6d9fa0e26e8..4fe0020bb340b 100644 --- a/airflow/hooks/druid_hook.py +++ b/airflow/hooks/druid_hook.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from __future__ import print_function import logging import json diff --git a/airflow/hooks/hdfs_hook.py b/airflow/hooks/hdfs_hook.py index 98e5f97568499..e84595c166f70 100644 --- a/airflow/hooks/hdfs_hook.py +++ b/airflow/hooks/hdfs_hook.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from airflow.hooks.base_hook import BaseHook from airflow import configuration diff --git a/airflow/hooks/http_hook.py b/airflow/hooks/http_hook.py index 5a257b0552633..7cf9a24407b91 100644 --- a/airflow/hooks/http_hook.py +++ b/airflow/hooks/http_hook.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# + from builtins import str import logging diff --git a/airflow/hooks/jdbc_hook.py b/airflow/hooks/jdbc_hook.py index 1f9275f4bd70c..510f3869d635b 100644 --- a/airflow/hooks/jdbc_hook.py +++ b/airflow/hooks/jdbc_hook.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from builtins import str __author__ = 'janomar' diff --git a/airflow/hooks/mssql_hook.py b/airflow/hooks/mssql_hook.py index 2b6610dab0a8d..ae9d971196358 100644 --- a/airflow/hooks/mssql_hook.py +++ b/airflow/hooks/mssql_hook.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import pymssql from airflow.hooks.dbapi_hook import DbApiHook diff --git a/airflow/hooks/mysql_hook.py b/airflow/hooks/mysql_hook.py index 9f2d951bcf269..e81d7961ccdb8 100644 --- a/airflow/hooks/mysql_hook.py +++ b/airflow/hooks/mysql_hook.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import MySQLdb import MySQLdb.cursors diff --git a/airflow/hooks/oracle_hook.py b/airflow/hooks/oracle_hook.py index 82ccddb0fa10c..aa70a56d8b26a 100644 --- a/airflow/hooks/oracle_hook.py +++ b/airflow/hooks/oracle_hook.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import cx_Oracle from airflow.hooks.dbapi_hook import DbApiHook diff --git a/airflow/hooks/pig_hook.py b/airflow/hooks/pig_hook.py index 5b40e52536c39..7201b9f33fb26 100644 --- a/airflow/hooks/pig_hook.py +++ b/airflow/hooks/pig_hook.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from __future__ import print_function import logging import subprocess diff --git a/airflow/hooks/postgres_hook.py b/airflow/hooks/postgres_hook.py index 0456416cbd3c3..627e97bfce2d4 100644 --- a/airflow/hooks/postgres_hook.py +++ b/airflow/hooks/postgres_hook.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import psycopg2 import psycopg2.extensions diff --git a/airflow/hooks/presto_hook.py b/airflow/hooks/presto_hook.py index da195bbbaa13f..f8f6ac887f217 100644 --- a/airflow/hooks/presto_hook.py +++ b/airflow/hooks/presto_hook.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from builtins import str import logging diff --git a/airflow/hooks/samba_hook.py b/airflow/hooks/samba_hook.py index 617c7fbc19c03..6a29982ceee47 100644 --- a/airflow/hooks/samba_hook.py +++ b/airflow/hooks/samba_hook.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from smbclient import SambaClient import os diff --git a/airflow/hooks/sqlite_hook.py b/airflow/hooks/sqlite_hook.py index c6e8f21d032f7..c241c2ddef03c 100644 --- a/airflow/hooks/sqlite_hook.py +++ b/airflow/hooks/sqlite_hook.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import sqlite3 from airflow.hooks.dbapi_hook import DbApiHook diff --git a/airflow/hooks/webhdfs_hook.py b/airflow/hooks/webhdfs_hook.py index 79a23bc38cee2..f808865986fe1 100644 --- a/airflow/hooks/webhdfs_hook.py +++ b/airflow/hooks/webhdfs_hook.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from airflow.hooks.base_hook import BaseHook from airflow import configuration import logging diff --git a/airflow/macros/__init__.py b/airflow/macros/__init__.py index 2634075b5554c..da4eb3da5250b 100644 --- a/airflow/macros/__init__.py +++ b/airflow/macros/__init__.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from __future__ import absolute_import from random import random from datetime import datetime, timedelta @@ -48,8 +62,26 @@ def ds_format(ds, input_format, output_format): return datetime.strptime(ds, input_format).strftime(output_format) -def integrate_plugins(): +def _integrate_plugins(): """Integrate plugins to the context""" + import sys from airflow.plugins_manager import macros as _macros - for _macro in _macros: - globals()[_macro.__name__] = _macro + for _macro_module in _macros: + sys.modules[_macro_module.__name__] = _macro_module + globals()[_macro_module._name] = _macro_module + + + ########################################################## + # TODO FIXME Remove in Airflow 2.0 + + import os as _os + if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False): + from zope.deprecation import deprecated as _deprecated + for _macro in _macro_module._objects: + globals()[_macro.__name__] = _deprecated( + _macro, + "Importing plugin macro '{i}' directly from " + "'airflow.macros' has been deprecated. Please " + "import from 'airflow.macros.[plugin_module]' " + "instead. Support for direct imports will be dropped " + "entirely in Airflow 2.0.".format(i=_macro)) diff --git a/airflow/macros/hive.py b/airflow/macros/hive.py index 2f69d6604ddad..044fd5f7d6c49 100644 --- a/airflow/macros/hive.py +++ b/airflow/macros/hive.py @@ -25,7 +25,7 @@ def max_partition( >>> max_partition('airflow.static_babynames_partitioned') '2015-01-01' ''' - from airflow.hooks import HiveMetastoreHook + from airflow.hooks.hive_hooks import HiveMetastoreHook if '.' in table: schema, table = table.split('.') hh = HiveMetastoreHook(metastore_conn_id=metastore_conn_id) @@ -78,7 +78,7 @@ def closest_ds_partition( >>> closest_ds_partition(tbl, '2015-01-02') '2015-01-01' ''' - from airflow.hooks import HiveMetastoreHook + from airflow.hooks.hive_hooks import HiveMetastoreHook if '.' in table: schema, table = table.split('.') hh = HiveMetastoreHook(metastore_conn_id=metastore_conn_id) diff --git a/airflow/models.py b/airflow/models.py index 6a10ee8080a01..38359f7644dee 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -596,33 +596,43 @@ def extra(cls): descriptor=property(cls.get_extra, cls.set_extra)) def get_hook(self): - from airflow import hooks - from airflow.contrib import hooks as contrib_hooks try: if self.conn_type == 'mysql': - return hooks.MySqlHook(mysql_conn_id=self.conn_id) + from airflow.hooks.mysql_hook import MySqlHook + return MySqlHook(mysql_conn_id=self.conn_id) elif self.conn_type == 'google_cloud_platform': - return contrib_hooks.BigQueryHook(bigquery_conn_id=self.conn_id) + from airflow.contrib.hooks.bigquery_hook import BigQueryHook + return BigQueryHook(bigquery_conn_id=self.conn_id) elif self.conn_type == 'postgres': - return hooks.PostgresHook(postgres_conn_id=self.conn_id) + from airflow.hooks.postgres_hook import PostgresHook + return PostgresHook(postgres_conn_id=self.conn_id) elif self.conn_type == 'hive_cli': - return hooks.HiveCliHook(hive_cli_conn_id=self.conn_id) + from airflow.hooks.hive_hooks import HiveCliHook + return HiveCliHook(hive_cli_conn_id=self.conn_id) elif self.conn_type == 'presto': - return hooks.PrestoHook(presto_conn_id=self.conn_id) + from airflow.hooks.presto_hook import PrestoHook + return PrestoHook(presto_conn_id=self.conn_id) elif self.conn_type == 'hiveserver2': - return hooks.HiveServer2Hook(hiveserver2_conn_id=self.conn_id) + from airflow.hooks.hive_hooks import HiveServer2Hook + return HiveServer2Hook(hiveserver2_conn_id=self.conn_id) elif self.conn_type == 'sqlite': - return hooks.SqliteHook(sqlite_conn_id=self.conn_id) + from airflow.hooks.sqlite_hook import SqliteHook + return SqliteHook(sqlite_conn_id=self.conn_id) elif self.conn_type == 'jdbc': - return hooks.JdbcHook(jdbc_conn_id=self.conn_id) + from airflow.hooks.jdbc_hook import JdbcHook + return JdbcHook(jdbc_conn_id=self.conn_id) elif self.conn_type == 'mssql': - return hooks.MsSqlHook(mssql_conn_id=self.conn_id) + from airflow.hooks.mssql_hook import MsSqlHook + return MsSqlHook(mssql_conn_id=self.conn_id) elif self.conn_type == 'oracle': - return hooks.OracleHook(oracle_conn_id=self.conn_id) + from airflow.hooks.oracle_hook import OracleHook + return OracleHook(oracle_conn_id=self.conn_id) elif self.conn_type == 'vertica': - return contrib_hooks.VerticaHook(vertica_conn_id=self.conn_id) + from airflow.contrib.hooks.vertica_hook import VerticaHook + return VerticaHook(vertica_conn_id=self.conn_id) elif self.conn_type == 'cloudant': - return contrib_hooks.CloudantHook(cloudant_conn_id=self.conn_id) + from airflow.contrib.hooks.cloudant_hook import CloudantHook + return CloudantHook(cloudant_conn_id=self.conn_id) except: return None diff --git a/airflow/operators/__init__.py b/airflow/operators/__init__.py index af63aeecf5890..0d2c403ee993c 100644 --- a/airflow/operators/__init__.py +++ b/airflow/operators/__init__.py @@ -1,23 +1,71 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# Only import Core Airflow Operators that don't have extra requirements. +# All other operators must be imported directly. +from airflow.models import BaseOperator +from .bash_operator import BashOperator +from .python_operator import ( + BranchPythonOperator, + PythonOperator, + ShortCircuitOperator) +from .check_operator import ( + CheckOperator, + ValueCheckOperator, + IntervalCheckOperator) +from .dagrun_operator import TriggerDagRunOperator +from .dummy_operator import DummyOperator +from .email_operator import EmailOperator +from .http_operator import SimpleHttpOperator +import airflow.operators.sensors +from .subdag_operator import SubDagOperator + + + + +# ------------------------------------------------------------------------ +# +# #TODO #FIXME Airflow 2.0 +# +# Old import machinary below. +# +# This is deprecated but should be kept until Airflow 2.0 +# for compatibility. +# +# ------------------------------------------------------------------------ + # Imports operators dynamically while keeping the package API clean, # abstracting the underlying modules from airflow.utils.helpers import import_module_attrs as _import_module_attrs # These need to be integrated first as other operators depend on them -_import_module_attrs(globals(), { - 'check_operator': [ - 'CheckOperator', - 'ValueCheckOperator', - 'IntervalCheckOperator', - ], -}) +# _import_module_attrs(globals(), { +# 'check_operator': [ +# 'CheckOperator', +# 'ValueCheckOperator', +# 'IntervalCheckOperator', +# ], +# }) _operators = { - 'bash_operator': ['BashOperator'], - 'python_operator': [ - 'PythonOperator', - 'BranchPythonOperator', - 'ShortCircuitOperator', - ], + # 'bash_operator': ['BashOperator'], + # 'python_operator': [ + # 'PythonOperator', + # 'BranchPythonOperator', + # 'ShortCircuitOperator', + # ], 'hive_operator': ['HiveOperator'], 'pig_operator': ['PigOperator'], 'presto_check_operator': [ @@ -25,9 +73,9 @@ 'PrestoValueCheckOperator', 'PrestoIntervalCheckOperator', ], - 'dagrun_operator': ['TriggerDagRunOperator'], - 'dummy_operator': ['DummyOperator'], - 'email_operator': ['EmailOperator'], + # 'dagrun_operator': ['TriggerDagRunOperator'], + # 'dummy_operator': ['DummyOperator'], + # 'email_operator': ['EmailOperator'], 'hive_to_samba_operator': ['Hive2SambaOperator'], 'mysql_operator': ['MySqlOperator'], 'sqlite_operator': ['SqliteOperator'], @@ -47,13 +95,13 @@ 'TimeSensor', 'WebHdfsSensor', ], - 'subdag_operator': ['SubDagOperator'], + # 'subdag_operator': ['SubDagOperator'], 'hive_stats_operator': ['HiveStatsCollectionOperator'], 's3_to_hive_operator': ['S3ToHiveTransfer'], 'hive_to_mysql': ['HiveToMySqlTransfer'], 'presto_to_mysql': ['PrestoToMySqlTransfer'], 's3_file_transform_operator': ['S3FileTransformOperator'], - 'http_operator': ['SimpleHttpOperator'], + # 'http_operator': ['SimpleHttpOperator'], 'hive_to_druid': ['HiveToDruidTransfer'], 'jdbc_operator': ['JdbcOperator'], 'mssql_operator': ['MsSqlOperator'], @@ -63,12 +111,38 @@ 'oracle_operator': ['OracleOperator'] } -_import_module_attrs(globals(), _operators) -from airflow.models import BaseOperator +import os as _os +if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False): + from zope.deprecation import deprecated as _deprecated + _imported = _import_module_attrs(globals(), _operators) + for _i in _imported: + _deprecated( + _i, + "Importing {i} directly from 'airflow.operators' has been " + "deprecated. Please import from " + "'airflow.operators.[operator_module]' instead. Support for direct " + "imports will be dropped entirely in Airflow 2.0.".format(i=_i)) -def integrate_plugins(): +def _integrate_plugins(): """Integrate plugins to the context""" + import sys from airflow.plugins_manager import operators as _operators - for _operator in _operators: - globals()[_operator.__name__] = _operator + for _operator_module in _operators: + sys.modules[_operator_module.__name__] = _operator_module + globals()[_operator_module._name] = _operator_module + + + ########################################################## + # TODO FIXME Remove in Airflow 2.0 + + if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False): + from zope.deprecation import deprecated as _deprecated + for _operator in _operator_module._objects: + globals()[_operator.__name__] = _deprecated( + _operator, + "Importing plugin operator '{i}' directly from " + "'airflow.operators' has been deprecated. Please " + "import from 'airflow.operators.[plugin_module]' " + "instead. Support for direct imports will be dropped " + "entirely in Airflow 2.0.".format(i=_operator)) diff --git a/airflow/operators/bash_operator.py b/airflow/operators/bash_operator.py index 3e9c7ef97bae6..a3b1b9284891b 100644 --- a/airflow/operators/bash_operator.py +++ b/airflow/operators/bash_operator.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from builtins import bytes import logging diff --git a/airflow/operators/check_operator.py b/airflow/operators/check_operator.py index 0624d915041d3..e4c826225e09d 100644 --- a/airflow/operators/check_operator.py +++ b/airflow/operators/check_operator.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from builtins import zip from builtins import str import logging diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/dagrun_operator.py index 7f8bb53400ad7..dc42d67575ae0 100644 --- a/airflow/operators/dagrun_operator.py +++ b/airflow/operators/dagrun_operator.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from datetime import datetime import logging diff --git a/airflow/operators/docker_operator.py b/airflow/operators/docker_operator.py index b01d31ac51042..d17d154d00e61 100644 --- a/airflow/operators/docker_operator.py +++ b/airflow/operators/docker_operator.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import json import logging from airflow.exceptions import AirflowException diff --git a/airflow/operators/dummy_operator.py b/airflow/operators/dummy_operator.py index 1392e7d33cc98..4517a8a0b6a6e 100644 --- a/airflow/operators/dummy_operator.py +++ b/airflow/operators/dummy_operator.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults diff --git a/airflow/operators/email_operator.py b/airflow/operators/email_operator.py index 29b18edad0b51..91a8d05e8bb9a 100644 --- a/airflow/operators/email_operator.py +++ b/airflow/operators/email_operator.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from airflow.models import BaseOperator from airflow.utils.email import send_email from airflow.utils.decorators import apply_defaults diff --git a/airflow/operators/generic_transfer.py b/airflow/operators/generic_transfer.py index eab9d61c0f224..de3bf73824c19 100644 --- a/airflow/operators/generic_transfer.py +++ b/airflow/operators/generic_transfer.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import logging from airflow.models import BaseOperator diff --git a/airflow/operators/hive_operator.py b/airflow/operators/hive_operator.py index 9a299e1e02160..9849e9dbb1e6c 100644 --- a/airflow/operators/hive_operator.py +++ b/airflow/operators/hive_operator.py @@ -1,7 +1,21 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import logging import re -from airflow.hooks import HiveCliHook +from airflow.hooks.hive_hooks import HiveCliHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults diff --git a/airflow/operators/hive_stats_operator.py b/airflow/operators/hive_stats_operator.py index aadca4de28755..b31c6b5fb48c1 100644 --- a/airflow/operators/hive_stats_operator.py +++ b/airflow/operators/hive_stats_operator.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from builtins import str from builtins import zip from collections import OrderedDict @@ -5,7 +19,9 @@ import logging from airflow.exceptions import AirflowException -from airflow.hooks import PrestoHook, HiveMetastoreHook, MySqlHook +from airflow.hooks.mysql_hook import MySqlHook +from airflow.hooks.presto_hook import PrestoHook +from airflow.hooks.hive_hooks import HiveMetastoreHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults diff --git a/airflow/operators/hive_to_druid.py b/airflow/operators/hive_to_druid.py index 420aeeda1be80..6d73e17fc49fe 100644 --- a/airflow/operators/hive_to_druid.py +++ b/airflow/operators/hive_to_druid.py @@ -1,6 +1,21 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import logging -from airflow.hooks import HiveCliHook, DruidHook, HiveMetastoreHook +from airflow.hooks.hive_hooks import HiveCliHook, HiveMetastoreHook +from airflow.hooks.druid_hook import DruidHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults diff --git a/airflow/operators/hive_to_mysql.py b/airflow/operators/hive_to_mysql.py index 9e27f38516ab5..36c91d4339788 100644 --- a/airflow/operators/hive_to_mysql.py +++ b/airflow/operators/hive_to_mysql.py @@ -1,6 +1,21 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import logging -from airflow.hooks import HiveServer2Hook, MySqlHook +from airflow.hooks.hive_hooks import HiveServer2Hook +from airflow.hooks.mysql_hook import MySqlHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults diff --git a/airflow/operators/hive_to_samba_operator.py b/airflow/operators/hive_to_samba_operator.py index 63881ab981097..8f18dd9434ffe 100644 --- a/airflow/operators/hive_to_samba_operator.py +++ b/airflow/operators/hive_to_samba_operator.py @@ -1,7 +1,22 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import logging import tempfile -from airflow.hooks import HiveServer2Hook, SambaHook +from airflow.hooks.hive_hooks import HiveServer2Hook +from airflow.hooks.samba_hook import SambaHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults diff --git a/airflow/operators/http_operator.py b/airflow/operators/http_operator.py index 87d1415bf625b..ad9bd4f105a7f 100644 --- a/airflow/operators/http_operator.py +++ b/airflow/operators/http_operator.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import logging from airflow.exceptions import AirflowException diff --git a/airflow/operators/jdbc_operator.py b/airflow/operators/jdbc_operator.py index 5efdaf4e6ba84..28977db0d00b4 100644 --- a/airflow/operators/jdbc_operator.py +++ b/airflow/operators/jdbc_operator.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + __author__ = 'janomar' import logging diff --git a/airflow/operators/mssql_operator.py b/airflow/operators/mssql_operator.py index 1d5273a49105b..ed9bdf44fa898 100644 --- a/airflow/operators/mssql_operator.py +++ b/airflow/operators/mssql_operator.py @@ -1,6 +1,20 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import logging -from airflow.hooks import MsSqlHook +from airflow.hooks.mssql_hook import MsSqlHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults diff --git a/airflow/operators/mssql_to_hive.py b/airflow/operators/mssql_to_hive.py index 6a981b43c8d97..6db0cba3f7ccd 100644 --- a/airflow/operators/mssql_to_hive.py +++ b/airflow/operators/mssql_to_hive.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from builtins import chr from collections import OrderedDict import unicodecsv as csv @@ -6,7 +20,8 @@ import pymssql -from airflow.hooks import HiveCliHook, MsSqlHook +from airflow.hooks.hive_hooks import HiveCliHook +from airflow.hooks.mssql_hook import MsSqlHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults diff --git a/airflow/operators/mysql_operator.py b/airflow/operators/mysql_operator.py index ae6d36f3278af..b3a3c73d1bf4c 100644 --- a/airflow/operators/mysql_operator.py +++ b/airflow/operators/mysql_operator.py @@ -1,6 +1,20 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import logging -from airflow.hooks import MySqlHook +from airflow.hooks.mysql_hook import MySqlHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults diff --git a/airflow/operators/mysql_to_hive.py b/airflow/operators/mysql_to_hive.py index 09ec190f77458..2fa2541865ee5 100644 --- a/airflow/operators/mysql_to_hive.py +++ b/airflow/operators/mysql_to_hive.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from builtins import chr from collections import OrderedDict import unicodecsv as csv @@ -5,7 +19,8 @@ from tempfile import NamedTemporaryFile import MySQLdb -from airflow.hooks import HiveCliHook, MySqlHook +from airflow.hooks.hive_hooks import HiveCliHook +from airflow.hooks.mysql_hook import MySqlHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults diff --git a/airflow/operators/oracle_operator.py b/airflow/operators/oracle_operator.py index 28182cf5a9572..ab7bdb2f34c5e 100644 --- a/airflow/operators/oracle_operator.py +++ b/airflow/operators/oracle_operator.py @@ -14,7 +14,7 @@ import logging -from airflow.hooks import OracleHook +from airflow.hooks.oracle_hook import OracleHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults diff --git a/airflow/operators/pig_operator.py b/airflow/operators/pig_operator.py index d25795dec73d7..4a21eccbbad65 100644 --- a/airflow/operators/pig_operator.py +++ b/airflow/operators/pig_operator.py @@ -1,7 +1,21 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import logging import re -from airflow.hooks import PigCliHook +from airflow.hooks.pig_hook import PigCliHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults diff --git a/airflow/operators/postgres_operator.py b/airflow/operators/postgres_operator.py index 79fa5e75330de..c4f56a49d8324 100644 --- a/airflow/operators/postgres_operator.py +++ b/airflow/operators/postgres_operator.py @@ -1,6 +1,20 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import logging -from airflow.hooks import PostgresHook +from airflow.hooks.postgres_hook import PostgresHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults diff --git a/airflow/operators/presto_check_operator.py b/airflow/operators/presto_check_operator.py index e857036415e6e..c1ac9cfc1d14e 100644 --- a/airflow/operators/presto_check_operator.py +++ b/airflow/operators/presto_check_operator.py @@ -1,4 +1,18 @@ -from airflow.hooks import PrestoHook +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from airflow.hooks.presto_hook import PrestoHook from airflow.operators import CheckOperator, ValueCheckOperator, IntervalCheckOperator from airflow.utils.decorators import apply_defaults diff --git a/airflow/operators/presto_to_mysql.py b/airflow/operators/presto_to_mysql.py index 29de0c7d86655..7ff2ad6170b01 100644 --- a/airflow/operators/presto_to_mysql.py +++ b/airflow/operators/presto_to_mysql.py @@ -1,6 +1,21 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import logging -from airflow.hooks import PrestoHook, MySqlHook +from airflow.hooks.presto_hook import PrestoHook +from airflow.hooks.mysql_hook import MySqlHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults diff --git a/airflow/operators/python_operator.py b/airflow/operators/python_operator.py index 290cc65d139e9..b5f63863a7e77 100644 --- a/airflow/operators/python_operator.py +++ b/airflow/operators/python_operator.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from builtins import str from datetime import datetime import logging diff --git a/airflow/operators/s3_file_transform_operator.py b/airflow/operators/s3_file_transform_operator.py index 06411239763c3..1cdd0e5e481ab 100644 --- a/airflow/operators/s3_file_transform_operator.py +++ b/airflow/operators/s3_file_transform_operator.py @@ -1,9 +1,23 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import logging from tempfile import NamedTemporaryFile import subprocess from airflow.exceptions import AirflowException -from airflow.hooks import S3Hook +from airflow.hooks.S3_hook import S3Hook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults diff --git a/airflow/operators/s3_to_hive_operator.py b/airflow/operators/s3_to_hive_operator.py index 3fc5327a40743..3e01c298241c9 100644 --- a/airflow/operators/s3_to_hive_operator.py +++ b/airflow/operators/s3_to_hive_operator.py @@ -1,10 +1,25 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from builtins import next from builtins import zip import logging from tempfile import NamedTemporaryFile from airflow.exceptions import AirflowException -from airflow.hooks import HiveCliHook, S3Hook +from airflow.hooks.S3_hook import S3Hook +from airflow.hooks.hive_hooks import HiveCliHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py index 569b4d7a4fe36..5276f6e40ca3e 100644 --- a/airflow/operators/sensors.py +++ b/airflow/operators/sensors.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from __future__ import print_function from future import standard_library standard_library.install_aliases() @@ -7,6 +21,7 @@ from urllib.parse import urlparse from time import sleep +import airflow from airflow import hooks, settings from airflow.exceptions import AirflowException, AirflowSensorTimeout, AirflowSkipException from airflow.models import BaseOperator, TaskInstance, Connection as DB @@ -249,7 +264,8 @@ def poke(self, context): 'Poking for table {self.schema}.{self.table}, ' 'partition {self.partition}'.format(**locals())) if not hasattr(self, 'hook'): - self.hook = hooks.HiveMetastoreHook( + import airflow.hooks.hive_hooks + self.hook = airflow.hooks.hive_hooks.HiveMetastoreHook( metastore_conn_id=self.metastore_conn_id) return self.hook.check_for_partition( self.schema, self.table, self.partition) @@ -272,7 +288,8 @@ def __init__( self.hdfs_conn_id = hdfs_conn_id def poke(self, context): - sb = hooks.HDFSHook(self.hdfs_conn_id).get_conn() + import airflow.hooks.hdfs_hook + sb = airflow.hooks.hdfs_hook.HDFSHook(self.hdfs_conn_id).get_conn() logging.getLogger("snakebite").setLevel(logging.WARNING) logging.info( 'Poking for file {self.filepath} '.format(**locals())) @@ -300,7 +317,7 @@ def __init__( self.webhdfs_conn_id = webhdfs_conn_id def poke(self, context): - c = hooks.WebHDFSHook(self.webhdfs_conn_id) + c = airflow.hooks.webhdfs_hook.WebHDFSHook(self.webhdfs_conn_id) logging.info( 'Poking for file {self.filepath} '.format(**locals())) return c.check_for_path(hdfs_path=self.filepath) @@ -356,7 +373,8 @@ def __init__( session.close() def poke(self, context): - hook = hooks.S3Hook(s3_conn_id=self.s3_conn_id) + import airflow.hooks.S3_hook + hook = airflow.hooks.S3_hook.S3Hook(s3_conn_id=self.s3_conn_id) full_url = "s3://" + self.bucket_name + "/" + self.bucket_key logging.info('Poking for key : {full_url}'.format(**locals())) if self.wildcard_match: @@ -408,7 +426,8 @@ def __init__( def poke(self, context): logging.info('Poking for prefix : {self.prefix}\n' 'in bucket s3://{self.bucket_name}'.format(**locals())) - hook = hooks.S3Hook(s3_conn_id=self.s3_conn_id) + import airflow.hooks.S3_hook + hook = airflow.hooks.S3_hook.S3Hook(s3_conn_id=self.s3_conn_id) return hook.check_for_prefix( prefix=self.prefix, delimiter=self.delimiter, @@ -513,7 +532,7 @@ def poke(self, context): # run content check on response return self.response_check(response) except AirflowException as ae: - if ae.message.startswith("404"): + if str(ae).startswith("404"): return False raise ae diff --git a/airflow/operators/slack_operator.py b/airflow/operators/slack_operator.py index f92eff178bed9..2e6d4269fa71c 100644 --- a/airflow/operators/slack_operator.py +++ b/airflow/operators/slack_operator.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from slackclient import SlackClient from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults diff --git a/airflow/operators/sqlite_operator.py b/airflow/operators/sqlite_operator.py index 700019d9ead8b..52b3b4b860ccc 100644 --- a/airflow/operators/sqlite_operator.py +++ b/airflow/operators/sqlite_operator.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import logging from airflow.hooks import SqliteHook diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py index 25c65cdbb4bc4..b6108f4be2459 100644 --- a/airflow/plugins_manager.py +++ b/airflow/plugins_manager.py @@ -89,10 +89,26 @@ def validate(cls): logging.exception(e) logging.error('Failed to import plugin ' + filepath) -operators = merge([p.operators for p in plugins]) -hooks = merge([p.hooks for p in plugins]) -executors = merge([p.executors for p in plugins]) -macros = merge([p.macros for p in plugins]) -admin_views = merge([p.admin_views for p in plugins]) -flask_blueprints = merge([p.flask_blueprints for p in plugins]) -menu_links = merge([p.menu_links for p in plugins]) +def make_module(name, objects): + name = name.lower() + module = imp.new_module(name) + module._name = name.split('.')[-1] + module._objects = objects + module.__dict__.update((o.__name__, o) for o in objects) + return module + +operators, hooks, executors, macros, admin_views = [], [], [], [], [] +flask_blueprints, menu_links = [], [] + +for p in plugins: + operators.append(make_module('airflow.operators.' + p.name, p.operators)) + hooks.append(make_module('airflow.hooks.' + p.name, p.hooks)) + executors.append(make_module('airflow.executors.' + p.name, p.executors)) + macros.append(make_module('airflow.macros.' + p.name, p.macros)) + admin_views.append( + make_module('airflow.www.admin_views' + p.name, p.admin_views)) + flask_blueprints.append( + make_module( + 'airflow.www.flask_blueprints' + p.name, p.flask_blueprints)) + menu_links.append( + make_module('airflow.www.menu_links' + p.name, p.menu_links)) diff --git a/airflow/utils/email.py b/airflow/utils/email.py index 6877e4721cc1b..c19bb89f8bad5 100644 --- a/airflow/utils/email.py +++ b/airflow/utils/email.py @@ -1,11 +1,3 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function -from __future__ import unicode_literals - -from builtins import str -from past.builtins import basestring - # -*- coding: utf-8 -*- # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -19,7 +11,15 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +from builtins import str +from past.builtins import basestring + import importlib import logging import os diff --git a/airflow/utils/logging.py b/airflow/utils/logging.py index 117587f0b9c73..8f5fc511c98a1 100644 --- a/airflow/utils/logging.py +++ b/airflow/utils/logging.py @@ -47,7 +47,7 @@ class S3Log(object): def __init__(self): remote_conn_id = configuration.get('core', 'REMOTE_LOG_CONN_ID') try: - from airflow.hooks import S3Hook + from airflow.hooks.S3_hook import S3Hook self.hook = S3Hook(remote_conn_id) except: self.hook = None diff --git a/airflow/utils/tests.py b/airflow/utils/tests.py new file mode 100644 index 0000000000000..50490d3df0e94 --- /dev/null +++ b/airflow/utils/tests.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +def skipUnlessImported(module, obj): + import importlib + m = importlib.import_module(module) + return unittest.skipUnless( + obj in dir(m), + "Skipping test because {} could not be imported from {}".format( + obj, module)) diff --git a/dags/testdruid.py b/dags/testdruid.py index ebfc6073bc27b..c356fc94cf987 100644 --- a/dags/testdruid.py +++ b/dags/testdruid.py @@ -1,4 +1,4 @@ -from airflow.operators import HiveToDruidTransfer +from airflow.operators.hive_to_druid import HiveToDruidTransfer from airflow import DAG from datetime import datetime diff --git a/run_unit_tests.sh b/run_unit_tests.sh index 71df44cbeb541..8cb916de89789 100755 --- a/run_unit_tests.sh +++ b/run_unit_tests.sh @@ -7,6 +7,9 @@ export AIRFLOW_CONFIG=$AIRFLOW_HOME/unittests.cfg # configuration test export AIRFLOW__TESTSECTION__TESTKEY=testvalue +# use Airflow 2.0-style imports +export AIRFLOW_USE_NEW_IMPORTS=1 + # any argument received is overriding the default nose execution arguments: nose_args=$@ diff --git a/setup.py b/setup.py index 4741f2070af4a..411a699044d6b 100644 --- a/setup.py +++ b/setup.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from setuptools import setup, find_packages, Command from setuptools.command.test import test as TestCommand diff --git a/tests/__init__.py b/tests/__init__.py index ca8150b097cd3..4a79d0fb82455 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,9 +1,9 @@ from __future__ import absolute_import from .configuration import * +from .contrib import * from .core import * from .jobs import * from .models import * from .operators import * -from .contrib import * from .utils import * diff --git a/tests/core.py b/tests/core.py index af791e3574b7d..5e6a4fd65ee64 100644 --- a/tests/core.py +++ b/tests/core.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + from __future__ import print_function import doctest @@ -289,7 +290,7 @@ class DAGsubclass(DAG): assert hash(self.dag) != hash(dag_subclass) def test_time_sensor(self): - t = operators.TimeSensor( + t = operators.sensors.TimeSensor( task_id='time_sensor_check', target_time=time(0), dag=self.dag) @@ -380,21 +381,22 @@ def test_dryrun(self): t.dry_run() def test_sqlite(self): - t = operators.SqliteOperator( + import airflow.operators.sqlite_operator + t = airflow.operators.sqlite_operator.SqliteOperator( task_id='time_sqlite', sql="CREATE TABLE IF NOT EXISTS unitest (dummy VARCHAR(20))", dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) def test_timedelta_sensor(self): - t = operators.TimeDeltaSensor( + t = operators.sensors.TimeDeltaSensor( task_id='timedelta_sensor_check', delta=timedelta(seconds=2), dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) def test_external_task_sensor(self): - t = operators.ExternalTaskSensor( + t = operators.sensors.ExternalTaskSensor( task_id='test_external_task_sensor_check', external_dag_id=TEST_DAG_ID, external_task_id='time_sensor_check', @@ -402,7 +404,7 @@ def test_external_task_sensor(self): t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) def test_external_task_sensor_delta(self): - t = operators.ExternalTaskSensor( + t = operators.sensors.ExternalTaskSensor( task_id='test_external_task_sensor_check_delta', external_dag_id=TEST_DAG_ID, external_task_id='time_sensor_check', @@ -1077,97 +1079,6 @@ def tearDown(self): session.close() configuration.conf.set("webserver", "authenticate", "False") - -if 'MySqlOperator' in dir(operators): - # Only testing if the operator is installed - class MySqlTest(unittest.TestCase): - def setUp(self): - configuration.test_mode() - args = { - 'owner': 'airflow', - 'mysql_conn_id': 'airflow_db', - 'start_date': DEFAULT_DATE - } - dag = DAG(TEST_DAG_ID, default_args=args) - self.dag = dag - - def mysql_operator_test(self): - sql = """ - CREATE TABLE IF NOT EXISTS test_airflow ( - dummy VARCHAR(50) - ); - """ - t = operators.MySqlOperator( - task_id='basic_mysql', - sql=sql, - mysql_conn_id='airflow_db', - dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - def mysql_operator_test_multi(self): - sql = [ - "TRUNCATE TABLE test_airflow", - "INSERT INTO test_airflow VALUES ('X')", - ] - t = operators.MySqlOperator( - task_id='mysql_operator_test_multi', - mysql_conn_id='airflow_db', - sql=sql, dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - def test_mysql_to_mysql(self): - sql = "SELECT * FROM INFORMATION_SCHEMA.TABLES LIMIT 100;" - t = operators.GenericTransfer( - task_id='test_m2m', - preoperator=[ - "DROP TABLE IF EXISTS test_mysql_to_mysql", - "CREATE TABLE IF NOT EXISTS " - "test_mysql_to_mysql LIKE INFORMATION_SCHEMA.TABLES" - ], - source_conn_id='airflow_db', - destination_conn_id='airflow_db', - destination_table="test_mysql_to_mysql", - sql=sql, - dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - def test_sql_sensor(self): - t = operators.SqlSensor( - task_id='sql_sensor_check', - conn_id='mysql_default', - sql="SELECT count(1) FROM INFORMATION_SCHEMA.TABLES", - dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - -if 'PostgresOperator' in dir(operators): - # Only testing if the operator is installed - class PostgresTest(unittest.TestCase): - def setUp(self): - configuration.test_mode() - args = {'owner': 'airflow', 'start_date': DEFAULT_DATE} - dag = DAG(TEST_DAG_ID, default_args=args) - self.dag = dag - - def postgres_operator_test(self): - sql = """ - CREATE TABLE IF NOT EXISTS test_airflow ( - dummy VARCHAR(50) - ); - """ - t = operators.PostgresOperator( - task_id='basic_postgres', sql=sql, dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - autocommitTask = operators.PostgresOperator( - task_id='basic_postgres_with_autocommit', - sql=sql, - dag=self.dag, - autocommit=True) - autocommitTask.run( - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE, - force=True) - class FakeSession(object): def __init__(self): from requests import Response @@ -1213,7 +1124,7 @@ def test_get_response_check(self): @mock.patch('requests.Session', FakeSession) def test_sensor(self): - sensor = operators.HttpSensor( + sensor = operators.sensors.HttpSensor( task_id='http_sensor_check', conn_id='http_default', endpoint='/search', @@ -1235,15 +1146,6 @@ def get_conn(self): def check_for_path(self, hdfs_path): return hdfs_path -class WebHdfsSensorTest(unittest.TestCase): - - @mock.patch('airflow.hooks.WebHDFSHook', FakeWebHDFSHook) - def test_poke(self): - s = operators.WebHdfsSensor(filepath='fakepath', - task_id='webhdfs_sensor_check', - owner='webhdfs') - assert s.poke({}) == 'fakepath' - class ConnectionTest(unittest.TestCase): def setUp(self): configuration.test_mode() @@ -1462,231 +1364,5 @@ def test_send_mime_dryrun(self, mock_smtp, mock_smtp_ssl): assert not mock_smtp.called assert not mock_smtp_ssl.called - -if 'HiveOperator' in dir(operators): - class HiveServer2Test(unittest.TestCase): - def setUp(self): - configuration.test_mode() - - def test_select_conn(self): - from airflow.hooks.hive_hooks import HiveServer2Hook - sql = "select 1" - hook = HiveServer2Hook() - hook.get_records(sql) - - def test_multi_statements(self): - from airflow.hooks.hive_hooks import HiveServer2Hook - sqls = [ - "CREATE TABLE IF NOT EXISTS test_multi_statements (i INT)", - "DROP TABLE test_multi_statements", - ] - hook = HiveServer2Hook() - hook.get_records(sqls) - - def test_get_metastore_databases(self): - if six.PY2: - from airflow.hooks.hive_hooks import HiveMetastoreHook - hook = HiveMetastoreHook() - hook.get_databases() - - def test_to_csv(self): - from airflow.hooks.hive_hooks import HiveServer2Hook - sql = "select 1" - hook = HiveServer2Hook() - hook.to_csv(hql=sql, csv_filepath="/tmp/test_to_csv") - -if 'MySqlOperator' in dir(operators) and 'HiveOperator' in dir(operators): - class TransferTests(unittest.TestCase): - cluster = None - - def setUp(self): - configuration.test_mode() - args = {'owner': 'airflow', 'start_date': DEFAULT_DATE_ISO} - dag = DAG(TEST_DAG_ID, default_args=args) - self.dag = dag - - def test_clear(self): - self.dag.clear(start_date=DEFAULT_DATE, end_date=datetime.now()) - - def test_mysql_to_hive(self): - # import airflow.operators - from airflow.operators.mysql_to_hive import MySqlToHiveTransfer - sql = "SELECT * FROM baby_names LIMIT 1000;" - t = MySqlToHiveTransfer( - task_id='test_m2h', - mysql_conn_id='airflow_ci', - hive_cli_conn_id='beeline_default', - sql=sql, - hive_table='test_mysql_to_hive', - recreate=True, - delimiter=",", - dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - def test_mysql_to_hive_partition(self): - from airflow.operators.mysql_to_hive import MySqlToHiveTransfer - sql = "SELECT * FROM baby_names LIMIT 1000;" - t = MySqlToHiveTransfer( - task_id='test_m2h', - mysql_conn_id='airflow_ci', - hive_cli_conn_id='beeline_default', - sql=sql, - hive_table='test_mysql_to_hive_part', - partition={'ds': DEFAULT_DATE_DS}, - recreate=False, - create=True, - delimiter=",", - dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - -if 'AIRFLOW_RUNALL_TESTS' in os.environ: - - class HivePrestoTest(unittest.TestCase): - - def setUp(self): - configuration.test_mode() - args = {'owner': 'airflow', 'start_date': DEFAULT_DATE} - dag = DAG(TEST_DAG_ID, default_args=args) - self.dag = dag - self.hql = """ - USE airflow; - DROP TABLE IF EXISTS static_babynames_partitioned; - CREATE TABLE IF NOT EXISTS static_babynames_partitioned ( - state string, - year string, - name string, - gender string, - num int) - PARTITIONED BY (ds string); - INSERT OVERWRITE TABLE static_babynames_partitioned - PARTITION(ds='{{ ds }}') - SELECT state, year, name, gender, num FROM static_babynames; - """ - - def test_hive(self): - t = operators.HiveOperator( - task_id='basic_hql', hql=self.hql, dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - def test_hive_dryrun(self): - t = operators.HiveOperator( - task_id='basic_hql', hql=self.hql, dag=self.dag) - t.dry_run() - - def test_beeline(self): - t = operators.HiveOperator( - task_id='beeline_hql', hive_cli_conn_id='beeline_default', - hql=self.hql, dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - def test_presto(self): - sql = """ - SELECT count(1) FROM airflow.static_babynames_partitioned; - """ - t = operators.PrestoCheckOperator( - task_id='presto_check', sql=sql, dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - def test_presto_to_mysql(self): - t = operators.PrestoToMySqlTransfer( - task_id='presto_to_mysql_check', - sql=""" - SELECT name, count(*) as ccount - FROM airflow.static_babynames - GROUP BY name - """, - mysql_table='test_static_babynames', - mysql_preoperator='TRUNCATE TABLE test_static_babynames;', - dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - def test_presto_to_mysql(self): - t = operators.PrestoToMySqlTransfer( - task_id='presto_to_mysql_check', - sql=""" - SELECT name, count(*) as ccount - FROM airflow.static_babynames - GROUP BY name - """, - mysql_table='test_static_babynames', - mysql_preoperator='TRUNCATE TABLE test_static_babynames;', - dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - def test_hdfs_sensor(self): - t = operators.HdfsSensor( - task_id='hdfs_sensor_check', - filepath='hdfs://user/hive/warehouse/airflow.db/static_babynames', - dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - def test_webhdfs_sensor(self): - t = operators.WebHdfsSensor( - task_id='webhdfs_sensor_check', - filepath='hdfs://user/hive/warehouse/airflow.db/static_babynames', - timeout=120, - dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - def test_sql_sensor(self): - t = operators.SqlSensor( - task_id='hdfs_sensor_check', - conn_id='presto_default', - sql="SELECT 'x' FROM airflow.static_babynames LIMIT 1;", - dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - def test_hive_stats(self): - t = operators.HiveStatsCollectionOperator( - task_id='hive_stats_check', - table="airflow.static_babynames_partitioned", - partition={'ds': DEFAULT_DATE_DS}, - dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - def test_hive_partition_sensor(self): - t = operators.HivePartitionSensor( - task_id='hive_partition_check', - table='airflow.static_babynames_partitioned', - dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - def test_hive_metastore_sql_sensor(self): - t = operators.MetastorePartitionSensor( - task_id='hive_partition_check', - table='airflow.static_babynames_partitioned', - partition_name='ds={}'.format(DEFAULT_DATE_DS), - dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - def test_hive2samba(self): - if 'Hive2SambaOperator' in dir(operators): - t = operators.Hive2SambaOperator( - task_id='hive2samba_check', - samba_conn_id='tableau_samba', - hql="SELECT * FROM airflow.static_babynames LIMIT 10000", - destination_filepath='test_airflow.csv', - dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - def test_hive_to_mysql(self): - t = operators.HiveToMySqlTransfer( - mysql_conn_id='airflow_db', - task_id='hive_to_mysql_check', - create=True, - sql=""" - SELECT name - FROM airflow.static_babynames - LIMIT 100 - """, - mysql_table='test_static_babynames', - mysql_preoperator=[ - 'DROP TABLE IF EXISTS test_static_babynames;', - 'CREATE TABLE test_static_babynames (name VARCHAR(500))', - ], - dag=self.dag) - t.clear(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - if __name__ == '__main__': unittest.main() diff --git a/tests/operators/__init__.py b/tests/operators/__init__.py index 98a17a788a8a9..63ff2a0db3384 100644 --- a/tests/operators/__init__.py +++ b/tests/operators/__init__.py @@ -1,2 +1,19 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from .docker_operator import * from .subdag_operator import * +from .operators import * +from .sensors import * +from .hive_operator import * diff --git a/tests/operators/hive_operator.py b/tests/operators/hive_operator.py new file mode 100644 index 0000000000000..202adcf526d66 --- /dev/null +++ b/tests/operators/hive_operator.py @@ -0,0 +1,209 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import datetime +import os +import unittest +import six + +from airflow import DAG, configuration, operators, utils +configuration.test_mode() + +import os +import unittest + + +DEFAULT_DATE = datetime.datetime(2015, 1, 1) +DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat() +DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10] + + +if 'AIRFLOW_RUNALL_TESTS' in os.environ: + + import airflow.hooks.hive_hooks + import airflow.operators.presto_to_mysql + + class HiveServer2Test(unittest.TestCase): + def setUp(self): + configuration.test_mode() + + def test_select_conn(self): + from airflow.hooks.hive_hooks import HiveServer2Hook + sql = "select 1" + hook = HiveServer2Hook() + hook.get_records(sql) + + def test_multi_statements(self): + from airflow.hooks.hive_hooks import HiveServer2Hook + sqls = [ + "CREATE TABLE IF NOT EXISTS test_multi_statements (i INT)", + "DROP TABLE test_multi_statements", + ] + hook = HiveServer2Hook() + hook.get_records(sqls) + + def test_get_metastore_databases(self): + if six.PY2: + from airflow.hooks.hive_hooks import HiveMetastoreHook + hook = HiveMetastoreHook() + hook.get_databases() + + def test_to_csv(self): + from airflow.hooks.hive_hooks import HiveServer2Hook + sql = "select 1" + hook = HiveServer2Hook() + hook.to_csv(hql=sql, csv_filepath="/tmp/test_to_csv") + + class HivePrestoTest(unittest.TestCase): + + def setUp(self): + configuration.test_mode() + args = {'owner': 'airflow', 'start_date': DEFAULT_DATE} + dag = DAG('test_dag_id', default_args=args) + self.dag = dag + self.hql = """ + USE airflow; + DROP TABLE IF EXISTS static_babynames_partitioned; + CREATE TABLE IF NOT EXISTS static_babynames_partitioned ( + state string, + year string, + name string, + gender string, + num int) + PARTITIONED BY (ds string); + INSERT OVERWRITE TABLE static_babynames_partitioned + PARTITION(ds='{{ ds }}') + SELECT state, year, name, gender, num FROM static_babynames; + """ + + def test_hive(self): + import airflow.operators.hive_operator + t = operators.hive_operator.HiveOperator( + task_id='basic_hql', hql=self.hql, dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def test_hive_dryrun(self): + import airflow.operators.hive_operator + t = operators.hive_operator.HiveOperator( + task_id='basic_hql', hql=self.hql, dag=self.dag) + t.dry_run() + + def test_beeline(self): + import airflow.operators.hive_operator + t = operators.hive_operator.HiveOperator( + task_id='beeline_hql', hive_cli_conn_id='beeline_default', + hql=self.hql, dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def test_presto(self): + sql = """ + SELECT count(1) FROM airflow.static_babynames_partitioned; + """ + import airflow.operators.presto_check_operator + t = operators.presto_check_operator.PrestoCheckOperator( + task_id='presto_check', sql=sql, dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def test_presto_to_mysql(self): + import airflow.operators.presto_to_mysql + t = operators.presto_to_mysql.PrestoToMySqlTransfer( + task_id='presto_to_mysql_check', + sql=""" + SELECT name, count(*) as ccount + FROM airflow.static_babynames + GROUP BY name + """, + mysql_table='test_static_babynames', + mysql_preoperator='TRUNCATE TABLE test_static_babynames;', + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def test_hdfs_sensor(self): + t = operators.sensors.HdfsSensor( + task_id='hdfs_sensor_check', + filepath='hdfs://user/hive/warehouse/airflow.db/static_babynames', + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def test_webhdfs_sensor(self): + t = operators.sensors.WebHdfsSensor( + task_id='webhdfs_sensor_check', + filepath='hdfs://user/hive/warehouse/airflow.db/static_babynames', + timeout=120, + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def test_sql_sensor(self): + t = operators.sensors.SqlSensor( + task_id='hdfs_sensor_check', + conn_id='presto_default', + sql="SELECT 'x' FROM airflow.static_babynames LIMIT 1;", + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def test_hive_stats(self): + import airflow.operators.hive_stats_operator + t = operators.hive_stats_operator.HiveStatsCollectionOperator( + task_id='hive_stats_check', + table="airflow.static_babynames_partitioned", + partition={'ds': DEFAULT_DATE_DS}, + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def test_hive_partition_sensor(self): + t = operators.sensors.HivePartitionSensor( + task_id='hive_partition_check', + table='airflow.static_babynames_partitioned', + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def test_hive_metastore_sql_sensor(self): + t = operators.sensors.MetastorePartitionSensor( + task_id='hive_partition_check', + table='airflow.static_babynames_partitioned', + partition_name='ds={}'.format(DEFAULT_DATE_DS), + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def test_hive2samba(self): + import airflow.operators.hive_to_samba_operator + t = operators.hive_to_samba_operator.Hive2SambaOperator( + task_id='hive2samba_check', + samba_conn_id='tableau_samba', + hql="SELECT * FROM airflow.static_babynames LIMIT 10000", + destination_filepath='test_airflow.csv', + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def test_hive_to_mysql(self): + import airflow.operators.hive_to_mysql + t = operators.hive_to_mysql.HiveToMySqlTransfer( + mysql_conn_id='airflow_db', + task_id='hive_to_mysql_check', + create=True, + sql=""" + SELECT name + FROM airflow.static_babynames + LIMIT 100 + """, + mysql_table='test_static_babynames', + mysql_preoperator=[ + 'DROP TABLE IF EXISTS test_static_babynames;', + 'CREATE TABLE test_static_babynames (name VARCHAR(500))', + ], + dag=self.dag) + t.clear(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) diff --git a/tests/operators/operators.py b/tests/operators/operators.py new file mode 100644 index 0000000000000..d9bc0c2242007 --- /dev/null +++ b/tests/operators/operators.py @@ -0,0 +1,174 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import datetime +import os +import unittest +import six + +from airflow import DAG, configuration, operators, utils +from airflow.utils.tests import skipUnlessImported +configuration.test_mode() + +import os +import unittest + + +DEFAULT_DATE = datetime.datetime(2015, 1, 1) +DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat() +DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10] +TEST_DAG_ID = 'unit_test_dag' + + +@skipUnlessImported('airflow.operators.mysql_operator', 'MySqlOperator') +class MySqlTest(unittest.TestCase): + def setUp(self): + configuration.test_mode() + args = { + 'owner': 'airflow', + 'mysql_conn_id': 'airflow_db', + 'start_date': DEFAULT_DATE + } + dag = DAG(TEST_DAG_ID, default_args=args) + self.dag = dag + + def mysql_operator_test(self): + sql = """ + CREATE TABLE IF NOT EXISTS test_airflow ( + dummy VARCHAR(50) + ); + """ + import airflow.operators.mysql_operator + t = operators.mysql_operator.MySqlOperator( + task_id='basic_mysql', + sql=sql, + mysql_conn_id='airflow_db', + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def mysql_operator_test_multi(self): + sql = [ + "TRUNCATE TABLE test_airflow", + "INSERT INTO test_airflow VALUES ('X')", + ] + import airflow.operators.mysql_operator + t = operators.mysql_operator.MySqlOperator( + task_id='mysql_operator_test_multi', + mysql_conn_id='airflow_db', + sql=sql, dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def test_mysql_to_mysql(self): + sql = "SELECT * FROM INFORMATION_SCHEMA.TABLES LIMIT 100;" + import airflow.operators.generic_transfer + t = operators.generic_transfer.GenericTransfer( + task_id='test_m2m', + preoperator=[ + "DROP TABLE IF EXISTS test_mysql_to_mysql", + "CREATE TABLE IF NOT EXISTS " + "test_mysql_to_mysql LIKE INFORMATION_SCHEMA.TABLES" + ], + source_conn_id='airflow_db', + destination_conn_id='airflow_db', + destination_table="test_mysql_to_mysql", + sql=sql, + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def test_sql_sensor(self): + t = operators.sensors.SqlSensor( + task_id='sql_sensor_check', + conn_id='mysql_default', + sql="SELECT count(1) FROM INFORMATION_SCHEMA.TABLES", + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + +@skipUnlessImported('airflow.operators.postgres_operator', 'PostgresOperator') +class PostgresTest(unittest.TestCase): + def setUp(self): + configuration.test_mode() + args = {'owner': 'airflow', 'start_date': DEFAULT_DATE} + dag = DAG(TEST_DAG_ID, default_args=args) + self.dag = dag + + def postgres_operator_test(self): + sql = """ + CREATE TABLE IF NOT EXISTS test_airflow ( + dummy VARCHAR(50) + ); + """ + import airflow.operators.postgres_operator + t = operators.postgres_operator.PostgresOperator( + task_id='basic_postgres', sql=sql, dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + autocommitTask = operators.postgres_operator.PostgresOperator( + task_id='basic_postgres_with_autocommit', + sql=sql, + dag=self.dag, + autocommit=True) + autocommitTask.run( + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE, + force=True) + + +@skipUnlessImported('airflow.operators.hive_operator', 'HiveOperator') +@skipUnlessImported('airflow.operators.postgres_operator', 'PostgresOperator') +class TransferTests(unittest.TestCase): + cluster = None + + def setUp(self): + configuration.test_mode() + args = {'owner': 'airflow', 'start_date': DEFAULT_DATE_ISO} + dag = DAG(TEST_DAG_ID, default_args=args) + self.dag = dag + + def test_clear(self): + self.dag.clear( + start_date=DEFAULT_DATE, + end_date=datetime.datetime.now()) + + def test_mysql_to_hive(self): + # import airflow.operators + from airflow.operators.mysql_to_hive import MySqlToHiveTransfer + sql = "SELECT * FROM baby_names LIMIT 1000;" + t = MySqlToHiveTransfer( + task_id='test_m2h', + mysql_conn_id='airflow_ci', + hive_cli_conn_id='beeline_default', + sql=sql, + hive_table='test_mysql_to_hive', + recreate=True, + delimiter=",", + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def test_mysql_to_hive_partition(self): + from airflow.operators.mysql_to_hive import MySqlToHiveTransfer + sql = "SELECT * FROM baby_names LIMIT 1000;" + t = MySqlToHiveTransfer( + task_id='test_m2h', + mysql_conn_id='airflow_ci', + hive_cli_conn_id='beeline_default', + sql=sql, + hive_table='test_mysql_to_hive_part', + partition={'ds': DEFAULT_DATE_DS}, + recreate=False, + create=True, + delimiter=",", + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) diff --git a/tests/operators/sensor.py b/tests/operators/sensors.py similarity index 96% rename from tests/operators/sensor.py rename to tests/operators/sensors.py index 45d4b81547df5..025790e28fd2f 100644 --- a/tests/operators/sensor.py +++ b/tests/operators/sensors.py @@ -15,7 +15,7 @@ import os import unittest -from airflow.operators import HttpSensor +from airflow.operators.sensors import HttpSensor from airflow.exceptions import AirflowException