diff --git a/tests/providers/apache/cassandra/sensors/record.py b/tests/providers/apache/cassandra/sensors/record.py deleted file mode 100644 index b3375f195d15b..0000000000000 --- a/tests/providers/apache/cassandra/sensors/record.py +++ /dev/null @@ -1,34 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import unittest -from unittest.mock import patch - -from airflow.providers.apache.cassandra.sensors.table import CassandraTableSensor - - -class TestCassandraTableSensor(unittest.TestCase): - @patch("airflow.providers.apache.cassandra.sensors.table.CassandraHook") - def test_poke(self, mock_hook): - sensor = CassandraTableSensor( - task_id='test_task', - cassandra_conn_id='cassandra_default', - table='t', - ) - sensor.poke(None) - mock_hook.return_value.table_exists.assert_called_once_with('t') diff --git a/tests/providers/apache/cassandra/sensors/table.py b/tests/providers/apache/cassandra/sensors/table.py deleted file mode 100644 index e164781b67ce5..0000000000000 --- a/tests/providers/apache/cassandra/sensors/table.py +++ /dev/null @@ -1,35 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import unittest -from unittest.mock import patch - -from airflow.providers.apache.cassandra.sensors.record import CassandraRecordSensor - - -class TestCassandraRecordSensor(unittest.TestCase): - @patch("airflow.providers.apache.cassandra.sensors.record.CassandraHook") - def test_poke(self, mock_hook): - sensor = CassandraRecordSensor( - task_id='test_task', - cassandra_conn_id='cassandra_default', - table='t', - keys={'foo': 'bar'} - ) - sensor.poke(None) - mock_hook.return_value.record_exists.assert_called_once_with('t', {'foo': 'bar'}) diff --git a/tests/providers/apache/cassandra/sensors/test_record.py b/tests/providers/apache/cassandra/sensors/test_record.py new file mode 100644 index 0000000000000..35f5aefe4dc29 --- /dev/null +++ b/tests/providers/apache/cassandra/sensors/test_record.py @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest +from unittest.mock import patch + +from airflow.providers.apache.cassandra.sensors.record import CassandraRecordSensor + +TEST_CASSANDRA_CONN_ID = 'cassandra_default' +TEST_CASSANDRA_TABLE = 't' +TEST_CASSANDRA_KEY = {'foo': 'bar'} + + +class TestCassandraRecordSensor(unittest.TestCase): + @patch("airflow.providers.apache.cassandra.sensors.record.CassandraHook") + def test_poke(self, mock_hook): + sensor = CassandraRecordSensor( + task_id='test_task', + cassandra_conn_id=TEST_CASSANDRA_CONN_ID, + table=TEST_CASSANDRA_TABLE, + keys=TEST_CASSANDRA_KEY, + ) + exists = sensor.poke(dict()) + + self.assertTrue(exists) + + mock_hook.return_value.record_exists.assert_called_once_with(TEST_CASSANDRA_TABLE, TEST_CASSANDRA_KEY) + mock_hook.assert_called_once_with(TEST_CASSANDRA_CONN_ID) + + @patch("airflow.providers.apache.cassandra.sensors.record.CassandraHook") + def test_poke_should_not_fail_with_empty_keys(self, mock_hook): + sensor = CassandraRecordSensor( + task_id='test_task', + cassandra_conn_id=TEST_CASSANDRA_CONN_ID, + table=TEST_CASSANDRA_TABLE, + keys=None, + ) + exists = sensor.poke(dict()) + + self.assertTrue(exists) + + mock_hook.return_value.record_exists.assert_called_once_with(TEST_CASSANDRA_TABLE, None) + mock_hook.assert_called_once_with(TEST_CASSANDRA_CONN_ID) + + @patch("airflow.providers.apache.cassandra.sensors.record.CassandraHook") + def test_poke_should_return_false_for_non_existing_table(self, mock_hook): + mock_hook.return_value.record_exists.return_value = False + + sensor = CassandraRecordSensor( + task_id='test_task', + cassandra_conn_id=TEST_CASSANDRA_CONN_ID, + table=TEST_CASSANDRA_TABLE, + keys=TEST_CASSANDRA_KEY, + ) + exists = sensor.poke(dict()) + + self.assertFalse(exists) + + mock_hook.return_value.record_exists.assert_called_once_with(TEST_CASSANDRA_TABLE, TEST_CASSANDRA_KEY) + mock_hook.assert_called_once_with(TEST_CASSANDRA_CONN_ID) diff --git a/tests/providers/apache/cassandra/sensors/test_table.py b/tests/providers/apache/cassandra/sensors/test_table.py new file mode 100644 index 0000000000000..4f35bac4b9dcb --- /dev/null +++ b/tests/providers/apache/cassandra/sensors/test_table.py @@ -0,0 +1,72 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest +from unittest.mock import patch + +from airflow.providers.apache.cassandra.sensors.table import CassandraTableSensor + +TEST_CASSANDRA_CONN_ID = 'cassandra_default' +TEST_CASSANDRA_TABLE = 't' +TEST_CASSANDRA_TABLE_WITH_KEYSPACE = 'keyspacename.tablename' + + +class TestCassandraTableSensor(unittest.TestCase): + @patch("airflow.providers.apache.cassandra.sensors.table.CassandraHook") + def test_poke(self, mock_hook): + sensor = CassandraTableSensor( + task_id='test_task', + cassandra_conn_id=TEST_CASSANDRA_CONN_ID, + table=TEST_CASSANDRA_TABLE, + ) + exists = sensor.poke(dict()) + + self.assertTrue(exists) + + mock_hook.return_value.table_exists.assert_called_once_with(TEST_CASSANDRA_TABLE) + mock_hook.assert_called_once_with(TEST_CASSANDRA_CONN_ID) + + @patch("airflow.providers.apache.cassandra.sensors.table.CassandraHook") + def test_poke_should_return_false_for_non_existing_table(self, mock_hook): + mock_hook.return_value.table_exists.return_value = False + + sensor = CassandraTableSensor( + task_id='test_task', + cassandra_conn_id=TEST_CASSANDRA_CONN_ID, + table=TEST_CASSANDRA_TABLE, + ) + exists = sensor.poke(dict()) + + self.assertFalse(exists) + + mock_hook.return_value.table_exists.assert_called_once_with(TEST_CASSANDRA_TABLE) + mock_hook.assert_called_once_with(TEST_CASSANDRA_CONN_ID) + + @patch("airflow.providers.apache.cassandra.sensors.table.CassandraHook") + def test_poke_should_succeed_for_table_with_mentioned_keyspace(self, mock_hook): + sensor = CassandraTableSensor( + task_id='test_task', + cassandra_conn_id=TEST_CASSANDRA_CONN_ID, + table=TEST_CASSANDRA_TABLE_WITH_KEYSPACE, + ) + exists = sensor.poke(dict()) + + self.assertTrue(exists) + + mock_hook.return_value.table_exists.assert_called_once_with(TEST_CASSANDRA_TABLE_WITH_KEYSPACE) + mock_hook.assert_called_once_with(TEST_CASSANDRA_CONN_ID) diff --git a/tests/providers/apache/hdfs/sensors/test_web_hdfs.py b/tests/providers/apache/hdfs/sensors/test_web_hdfs.py new file mode 100644 index 0000000000000..cad72403ff629 --- /dev/null +++ b/tests/providers/apache/hdfs/sensors/test_web_hdfs.py @@ -0,0 +1,58 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from unittest import mock + +from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor +from tests.providers.apache.hive import TestHiveEnvironment + +TEST_HDFS_CONN = 'webhdfs_default' +TEST_HDFS_PATH = 'hdfs://user/hive/warehouse/airflow.db/static_babynames' + + +class TestWebHdfsSensor(TestHiveEnvironment): + + @mock.patch('airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook') + def test_poke(self, mock_hook): + sensor = WebHdfsSensor( + task_id='test_task', + webhdfs_conn_id=TEST_HDFS_CONN, + filepath=TEST_HDFS_PATH, + ) + exists = sensor.poke(dict()) + + self.assertTrue(exists) + + mock_hook.return_value.check_for_path.assert_called_once_with(hdfs_path=TEST_HDFS_PATH) + mock_hook.assert_called_once_with(TEST_HDFS_CONN) + + @mock.patch('airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook') + def test_poke_should_return_false_for_non_existing_table(self, mock_hook): + mock_hook.return_value.check_for_path.return_value = False + + sensor = WebHdfsSensor( + task_id='test_task', + webhdfs_conn_id=TEST_HDFS_CONN, + filepath=TEST_HDFS_PATH, + ) + exists = sensor.poke(dict()) + + self.assertFalse(exists) + + mock_hook.return_value.check_for_path.assert_called_once_with(hdfs_path=TEST_HDFS_PATH) + mock_hook.assert_called_once_with(TEST_HDFS_CONN) diff --git a/tests/providers/apache/hdfs/sensors/test_webhdfs.py b/tests/providers/apache/hdfs/sensors/test_webhdfs.py deleted file mode 100644 index f142a090f653a..0000000000000 --- a/tests/providers/apache/hdfs/sensors/test_webhdfs.py +++ /dev/null @@ -1,41 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -import os -import unittest -from unittest import mock - -from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor -from tests.providers.apache.hive import DEFAULT_DATE, TestHiveEnvironment -from tests.test_utils.mock_hooks import MockDBConnection - - -@unittest.skipIf( - 'AIRFLOW_RUNALL_TESTS' not in os.environ, - "Skipped because AIRFLOW_RUNALL_TESTS is not set") -class TestWebHdfsSensor(TestHiveEnvironment): - - @mock.patch('airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook._find_valid_server') - def test_webhdfs_sensor(self, mock_find_valid_server): - mock_find_valid_server.return_value = MockDBConnection() - op = WebHdfsSensor( - task_id='webhdfs_sensor_check', - filepath='hdfs://user/hive/warehouse/airflow.db/static_babynames', - timeout=120, - dag=self.dag) - op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, - ignore_ti_state=True) diff --git a/tests/test_project_structure.py b/tests/test_project_structure.py index 2cc0da43663fb..b1568c33318f8 100644 --- a/tests/test_project_structure.py +++ b/tests/test_project_structure.py @@ -28,9 +28,6 @@ ) MISSING_TEST_FILES = { - 'tests/providers/apache/cassandra/sensors/test_record.py', - 'tests/providers/apache/cassandra/sensors/test_table.py', - 'tests/providers/apache/hdfs/sensors/test_web_hdfs.py', 'tests/providers/google/cloud/log/test_gcs_task_handler.py', 'tests/providers/google/cloud/operators/test_datastore.py', 'tests/providers/google/cloud/transfers/test_sql_to_gcs.py',