Skip to content

Commit

Permalink
Add unit tests for CassandraTableSensor, CassandraRecordSensor and We…
Browse files Browse the repository at this point in the history
…bHdfsSensor (apache#9874)
  • Loading branch information
rootcss authored Jul 21, 2020
1 parent 95632ce commit eb1aedd
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 113 deletions.
34 changes: 0 additions & 34 deletions tests/providers/apache/cassandra/sensors/record.py

This file was deleted.

35 changes: 0 additions & 35 deletions tests/providers/apache/cassandra/sensors/table.py

This file was deleted.

75 changes: 75 additions & 0 deletions tests/providers/apache/cassandra/sensors/test_record.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import unittest
from unittest.mock import patch

from airflow.providers.apache.cassandra.sensors.record import CassandraRecordSensor

TEST_CASSANDRA_CONN_ID = 'cassandra_default'
TEST_CASSANDRA_TABLE = 't'
TEST_CASSANDRA_KEY = {'foo': 'bar'}


class TestCassandraRecordSensor(unittest.TestCase):
@patch("airflow.providers.apache.cassandra.sensors.record.CassandraHook")
def test_poke(self, mock_hook):
sensor = CassandraRecordSensor(
task_id='test_task',
cassandra_conn_id=TEST_CASSANDRA_CONN_ID,
table=TEST_CASSANDRA_TABLE,
keys=TEST_CASSANDRA_KEY,
)
exists = sensor.poke(dict())

self.assertTrue(exists)

mock_hook.return_value.record_exists.assert_called_once_with(TEST_CASSANDRA_TABLE, TEST_CASSANDRA_KEY)
mock_hook.assert_called_once_with(TEST_CASSANDRA_CONN_ID)

@patch("airflow.providers.apache.cassandra.sensors.record.CassandraHook")
def test_poke_should_not_fail_with_empty_keys(self, mock_hook):
sensor = CassandraRecordSensor(
task_id='test_task',
cassandra_conn_id=TEST_CASSANDRA_CONN_ID,
table=TEST_CASSANDRA_TABLE,
keys=None,
)
exists = sensor.poke(dict())

self.assertTrue(exists)

mock_hook.return_value.record_exists.assert_called_once_with(TEST_CASSANDRA_TABLE, None)
mock_hook.assert_called_once_with(TEST_CASSANDRA_CONN_ID)

@patch("airflow.providers.apache.cassandra.sensors.record.CassandraHook")
def test_poke_should_return_false_for_non_existing_table(self, mock_hook):
mock_hook.return_value.record_exists.return_value = False

sensor = CassandraRecordSensor(
task_id='test_task',
cassandra_conn_id=TEST_CASSANDRA_CONN_ID,
table=TEST_CASSANDRA_TABLE,
keys=TEST_CASSANDRA_KEY,
)
exists = sensor.poke(dict())

self.assertFalse(exists)

mock_hook.return_value.record_exists.assert_called_once_with(TEST_CASSANDRA_TABLE, TEST_CASSANDRA_KEY)
mock_hook.assert_called_once_with(TEST_CASSANDRA_CONN_ID)
72 changes: 72 additions & 0 deletions tests/providers/apache/cassandra/sensors/test_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import unittest
from unittest.mock import patch

from airflow.providers.apache.cassandra.sensors.table import CassandraTableSensor

TEST_CASSANDRA_CONN_ID = 'cassandra_default'
TEST_CASSANDRA_TABLE = 't'
TEST_CASSANDRA_TABLE_WITH_KEYSPACE = 'keyspacename.tablename'


class TestCassandraTableSensor(unittest.TestCase):
@patch("airflow.providers.apache.cassandra.sensors.table.CassandraHook")
def test_poke(self, mock_hook):
sensor = CassandraTableSensor(
task_id='test_task',
cassandra_conn_id=TEST_CASSANDRA_CONN_ID,
table=TEST_CASSANDRA_TABLE,
)
exists = sensor.poke(dict())

self.assertTrue(exists)

mock_hook.return_value.table_exists.assert_called_once_with(TEST_CASSANDRA_TABLE)
mock_hook.assert_called_once_with(TEST_CASSANDRA_CONN_ID)

@patch("airflow.providers.apache.cassandra.sensors.table.CassandraHook")
def test_poke_should_return_false_for_non_existing_table(self, mock_hook):
mock_hook.return_value.table_exists.return_value = False

sensor = CassandraTableSensor(
task_id='test_task',
cassandra_conn_id=TEST_CASSANDRA_CONN_ID,
table=TEST_CASSANDRA_TABLE,
)
exists = sensor.poke(dict())

self.assertFalse(exists)

mock_hook.return_value.table_exists.assert_called_once_with(TEST_CASSANDRA_TABLE)
mock_hook.assert_called_once_with(TEST_CASSANDRA_CONN_ID)

@patch("airflow.providers.apache.cassandra.sensors.table.CassandraHook")
def test_poke_should_succeed_for_table_with_mentioned_keyspace(self, mock_hook):
sensor = CassandraTableSensor(
task_id='test_task',
cassandra_conn_id=TEST_CASSANDRA_CONN_ID,
table=TEST_CASSANDRA_TABLE_WITH_KEYSPACE,
)
exists = sensor.poke(dict())

self.assertTrue(exists)

mock_hook.return_value.table_exists.assert_called_once_with(TEST_CASSANDRA_TABLE_WITH_KEYSPACE)
mock_hook.assert_called_once_with(TEST_CASSANDRA_CONN_ID)
58 changes: 58 additions & 0 deletions tests/providers/apache/hdfs/sensors/test_web_hdfs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from unittest import mock

from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor
from tests.providers.apache.hive import TestHiveEnvironment

TEST_HDFS_CONN = 'webhdfs_default'
TEST_HDFS_PATH = 'hdfs://user/hive/warehouse/airflow.db/static_babynames'


class TestWebHdfsSensor(TestHiveEnvironment):

@mock.patch('airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook')
def test_poke(self, mock_hook):
sensor = WebHdfsSensor(
task_id='test_task',
webhdfs_conn_id=TEST_HDFS_CONN,
filepath=TEST_HDFS_PATH,
)
exists = sensor.poke(dict())

self.assertTrue(exists)

mock_hook.return_value.check_for_path.assert_called_once_with(hdfs_path=TEST_HDFS_PATH)
mock_hook.assert_called_once_with(TEST_HDFS_CONN)

@mock.patch('airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook')
def test_poke_should_return_false_for_non_existing_table(self, mock_hook):
mock_hook.return_value.check_for_path.return_value = False

sensor = WebHdfsSensor(
task_id='test_task',
webhdfs_conn_id=TEST_HDFS_CONN,
filepath=TEST_HDFS_PATH,
)
exists = sensor.poke(dict())

self.assertFalse(exists)

mock_hook.return_value.check_for_path.assert_called_once_with(hdfs_path=TEST_HDFS_PATH)
mock_hook.assert_called_once_with(TEST_HDFS_CONN)
41 changes: 0 additions & 41 deletions tests/providers/apache/hdfs/sensors/test_webhdfs.py

This file was deleted.

3 changes: 0 additions & 3 deletions tests/test_project_structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@
)

MISSING_TEST_FILES = {
'tests/providers/apache/cassandra/sensors/test_record.py',
'tests/providers/apache/cassandra/sensors/test_table.py',
'tests/providers/apache/hdfs/sensors/test_web_hdfs.py',
'tests/providers/google/cloud/log/test_gcs_task_handler.py',
'tests/providers/google/cloud/operators/test_datastore.py',
'tests/providers/google/cloud/transfers/test_sql_to_gcs.py',
Expand Down

0 comments on commit eb1aedd

Please sign in to comment.