Skip to content

Commit

Permalink
[AIRFLOW-999] Add support for Redis database
Browse files Browse the repository at this point in the history
This PR includes a redis_hook and a redis_key_sensor to enable
checking for key existence in redis. It also updates the
documentation and add the relevant unit tests.

- [x] Opened a PR on Github

- [x] My PR addresses the following Airflow JIRA
issues:
    -
https://issues.apache.org/jira/browse/AIRFLOW-999
- [x] The PR title references the JIRA issues. For
example, "[AIRFLOW-1] My Airflow PR"

- [x] My PR adds unit tests
- [ ] __OR__ my PR does not need testing for this
extremely good reason:

- [x] Here are some details about my PR:
- [ ] Here are screenshots of any UI changes, if
appropriate:

- [x] Each commit subject references a JIRA issue.
For example, "[AIRFLOW-1] Add new feature"
- [x] Multiple commits addressing the same JIRA
issue have been squashed
- [x] My commits follow the guidelines from "[How
to write a good git commit
message](http://chris.beams.io/posts/git-
commit/)":
  1. Subject is separated from body by a blank line
  2. Subject is limited to 50 characters
  3. Subject does not end with a period
  4. Subject uses the imperative mood ("add", not
"adding")
  5. Body wraps at 72 characters
  6. Body explains "what" and "why", not "how"

Closes apache#2165 from msempere/AIRFLOW-999/support-
for-redis-database
  • Loading branch information
msempere authored and artwr committed Mar 20, 2017
1 parent 23a16f7 commit 8de8501
Show file tree
Hide file tree
Showing 8 changed files with 261 additions and 0 deletions.
92 changes: 92 additions & 0 deletions airflow/contrib/hooks/redis_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
RedisHook module
"""

import logging

from redis import StrictRedis

from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook


class RedisHook(BaseHook):
"""
Hook to interact with Redis database
"""
def __init__(self, redis_conn_id='redis_default'):
"""
Prepares hook to connect to a Redis database.
:param conn_id: the name of the connection that has the parameters
we need to connect to Redis.
"""
self.redis_conn_id = redis_conn_id
self.client = None
conn = self.get_connection(self.redis_conn_id)
self.host = conn.host
self.port = int(conn.port)
self.password = conn.password
self.db = int(conn.extra_dejson.get('db', 0))
self.logger = logging.getLogger(__name__)
self.logger.debug(
'''Connection "{conn}":
\thost: {host}
\tport: {port}
\textra: {extra}
'''.format(
conn=self.redis_conn_id,
host=self.host,
port=self.port,
extra=conn.extra_dejson
)
)

def get_conn(self):
"""
Returns a Redis connection.
"""
if not self.client:
self.logger.debug(
'generating Redis client for conn_id "{conn}" on '
'{host}:{port}:{db}'.format(conn=self.redis_conn_id,
host=self.host,
port=self.port,
db=self.db))
try:
self.client = StrictRedis(
host=self.host,
port=self.port,
password=self.password,
db=self.db)
except Exception as general_error:
raise AirflowException(
'Failed to create Redis client, error: {error}'.format(
error=str(general_error)
)
)

return self.client

def key_exists(self, key):
"""
Checks if a key exists in Redis database
:param key: The key to check the existence.
:type key: string
"""
return self.get_conn().exists(key)
46 changes: 46 additions & 0 deletions airflow/contrib/sensors/redis_key_sensor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from airflow.contrib.hooks.redis_hook import RedisHook
from airflow.operators.sensors import BaseSensorOperator
from airflow.utils.decorators import apply_defaults


class RedisKeySensor(BaseSensorOperator):
"""
Checks for the existence of a key in a Redis database
"""
template_fields = ('key',)
ui_color = '#f0eee4'

@apply_defaults
def __init__(self, key, redis_conn_id, *args, **kwargs):
"""
Create a new RedisKeySensor
:param key: The key to be monitored
:type key: string
:param redis_conn_id: The connection ID to use when connecting to Redis DB.
:type redis_conn_id: string
"""
super(RedisKeySensor, self).__init__(*args, **kwargs)
self.logger = logging.getLogger(__name__)
self.redis_conn_id = redis_conn_id
self.key = key

def poke(self, context):
self.logger.info('Sensor check existence of key: %s', self.key)
return RedisHook(self.redis_conn_id).key_exists(self.key)
4 changes: 4 additions & 0 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,7 @@ class Connection(Base):
('mssql', 'Microsoft SQL Server'),
('mesos_framework-id', 'Mesos Framework ID'),
('jira', 'JIRA',),
('redis', 'Redis',),
]

def __init__(
Expand Down Expand Up @@ -670,6 +671,9 @@ def get_hook(self):
elif self.conn_type == 'jira':
from airflow.contrib.hooks.jira_hook import JiraHook
return JiraHook(jira_conn_id=self.conn_id)
elif self.conn_type == 'redis':
from airflow.contrib.hooks.redis_hook import RedisHook
return RedisHook(redis_conn_id=self.conn_id)
except:
pass

Expand Down
5 changes: 5 additions & 0 deletions airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ def initdb():
models.Connection(
conn_id='spark_default', conn_type='spark',
host='yarn', extra='{"queue": "root.default"}'))
merge_conn(
models.Connection(
conn_id='redis_default', conn_type='redis',
host='localhost', port=6379,
extra='{"db": 0}'))
merge_conn(
models.Connection(
conn_id='emr_default', conn_type='emr',
Expand Down
2 changes: 2 additions & 0 deletions docs/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,5 @@ Here's the list of the subpackages and what they enable:
+---------------+-------------------------------------+-------------------------------------------------+
| cloudant | ``pip install airflow[cloudant]`` | Cloudant hook |
+---------------+-------------------------------------+-------------------------------------------------+
| redis | ``pip install airflow[redis]`` | Redis hooks and sensors |
+---------------+-------------------------------------+-------------------------------------------------+
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def write_version(filename=os.path.join(*['airflow',
github_enterprise = ['Flask-OAuthlib>=0.9.1']
qds = ['qds-sdk>=1.9.0']
cloudant = ['cloudant>=0.5.9,<2.0'] # major update coming soon, clamp to 0.x
redis = ['redis>=2.10.5']

all_dbs = postgres + mysql + hive + mssql + hdfs + vertica + cloudant
devel = [
Expand Down Expand Up @@ -269,6 +270,7 @@ def do_setup():
'vertica': vertica,
'webhdfs': webhdfs,
'jira': jira,
'redis': redis,
},
classifiers=[
'Development Status :: 5 - Production/Stable',
Expand Down
46 changes: 46 additions & 0 deletions tests/contrib/hooks/test_redis_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import unittest
from mock import patch

from airflow import configuration
from airflow.contrib.hooks.redis_hook import RedisHook


class TestRedisHook(unittest.TestCase):
def setUp(self):
configuration.load_test_config()

def test_get_conn(self):
hook = RedisHook(redis_conn_id='redis_default')
self.assertEqual(hook.client, None)
self.assertEqual(
repr(hook.get_conn()),
(
'StrictRedis<ConnectionPool'
'<Connection<host=localhost,port=6379,db=0>>>'
)
)

@patch("airflow.contrib.hooks.redis_hook.RedisHook.get_conn")
def test_first_conn_instantiation(self, get_conn):
hook = RedisHook(redis_conn_id='redis_default')
hook.key_exists('test_key')
self.assertTrue(get_conn.called_once())


if __name__ == '__main__':
unittest.main()
64 changes: 64 additions & 0 deletions tests/contrib/sensors/redis_sensor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import unittest
import datetime

from mock import patch

from airflow import DAG
from airflow import configuration
from airflow.contrib.sensors.redis_key_sensor import RedisKeySensor

DEFAULT_DATE = datetime.datetime(2017, 1, 1)


class TestRedisSensor(unittest.TestCase):

def setUp(self):
configuration.load_test_config()
args = {
'owner': 'airflow',
'start_date': DEFAULT_DATE
}

self.dag = DAG('test_dag_id', default_args=args)
self.sensor = RedisKeySensor(
task_id='test_task',
redis_conn_id='redis_default',
dag=self.dag,
key='test_key'
)

@patch("airflow.contrib.hooks.redis_hook.RedisHook.key_exists")
def test_poke(self, key_exists):
key_exists.return_value = True
self.assertTrue(self.sensor.poke(None))

key_exists.return_value = False
self.assertFalse(self.sensor.poke(None))

@patch("airflow.contrib.hooks.redis_hook.StrictRedis.exists")
def test_existing_key_called(self, redis_client_exists):
self.sensor.run(
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE, ignore_ti_state=True
)

self.assertTrue(redis_client_exists.called_with('test_key'))


if __name__ == '__main__':
unittest.main()

0 comments on commit 8de8501

Please sign in to comment.