Skip to content

Commit

Permalink
Merge branch 'master' into v1-8-test
Browse files Browse the repository at this point in the history
  • Loading branch information
bolkedebruin committed Jan 16, 2017
2 parents 36b16a5 + 44798e0 commit df9464b
Show file tree
Hide file tree
Showing 9 changed files with 561 additions and 0 deletions.
82 changes: 82 additions & 0 deletions airflow/contrib/hooks/jira_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from jira import JIRA
from jira.exceptions import JIRAError

from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook


class JiraHook(BaseHook):
"""
Jira interaction hook, a Wrapper around JIRA Python SDK.
:param jira_conn_id: reference to a pre-defined Jira Connection
:type jira_conn_id: string
"""

def __init__(self,
jira_conn_id='jira_default'):
super(JiraHook, self).__init__(jira_conn_id)
self.jira_conn_id = jira_conn_id
self.client = None
self.get_conn()

def get_conn(self):
if not self.client:
logging.debug('creating jira client for conn_id: {0}'.format(self.jira_conn_id))

get_server_info = True
validate = True
extra_options = {}
conn = None

if self.jira_conn_id is not None:
conn = self.get_connection(self.jira_conn_id)
if conn.extra is not None:
extra_options = conn.extra_dejson
# only required attributes are taken for now,
# more can be added ex: async, logging, max_retries

# verify
if 'verify' in extra_options \
and extra_options['verify'].lower() == 'false':
extra_options['verify'] = False

# validate
if 'validate' in extra_options \
and extra_options['validate'].lower() == 'false':
validate = False

if 'get_server_info' in extra_options \
and extra_options['get_server_info'].lower() == 'false':
get_server_info = False

try:
self.client = JIRA(conn.host,
options=extra_options,
basic_auth=(conn.login, conn.password),
get_server_info=get_server_info,
validate=validate)
except JIRAError as jira_error:
raise AirflowException('Failed to create jira client, jira error: %s'
% str(jira_error))
except Exception as e:
raise AirflowException('Failed to create jira client, error: %s'
% str(e))

return self.client
89 changes: 89 additions & 0 deletions airflow/contrib/operators/jira_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from airflow.contrib.hooks.jira_hook import JIRAError
from airflow.contrib.hooks.jira_hook import JiraHook
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class JiraOperator(BaseOperator):
"""
JiraOperator to interact and perform action on Jira issue tracking system.
This operator is designed to use Jira Python SDK: http://jira.readthedocs.io
:param jira_conn_id: reference to a pre-defined Jira Connection
:type jira_conn_id: str
:param jira_method: method name from Jira Python SDK to be called
:type jira_method: str
:param jira_method_args: required method parameters for the jira_method
:type jira_method_args: dict
:param result_processor: function to further process the response from Jira
:type result_processor: function
:param get_jira_resource_method: function or operator to get jira resource
on which the provided jira_method will be executed
:type get_jira_resource_method: function
"""

template_fields = ("jira_method_args",)

@apply_defaults
def __init__(self,
jira_conn_id='jira_default',
jira_method=None,
jira_method_args=None,
result_processor=None,
get_jira_resource_method=None,
*args,
**kwargs):
super(JiraOperator, self).__init__(*args, **kwargs)
self.jira_conn_id = jira_conn_id
self.method_name = jira_method
self.jira_method_args = jira_method_args
self.result_processor = result_processor
self.get_jira_resource_method = get_jira_resource_method

def execute(self, context):
try:
if self.get_jira_resource_method is not None:
# if get_jira_resource_method is provided, jira_method will be executed on
# resource returned by executing the get_jira_resource_method.
# This makes all the provided methods of JIRA sdk accessible and usable
# directly at the JiraOperator without additional wrappers.
# ref: http://jira.readthedocs.io/en/latest/api.html
if isinstance(self.get_jira_resource_method, JiraOperator):
resource = self.get_jira_resource_method.execute(**context)
else:
resource = self.get_jira_resource_method(**context)
else:
# Default method execution is on the top level jira client resource
hook = JiraHook(jira_conn_id=self.jira_conn_id)
resource = hook.client

# Current Jira-Python SDK (1.0.7) has issue with pickling the jira response.
# ex: self.xcom_push(context, key='operator_response', value=jira_response)
# This could potentially throw error if jira_result is not picklable
jira_result = getattr(resource, self.method_name)(**self.jira_method_args)
if self.result_processor:
return self.result_processor(context, jira_result)

return jira_result

except JIRAError as jira_error:
raise AirflowException("Failed to execute jiraOperator, error: %s"
% str(jira_error))
except Exception as e:
raise AirflowException("Jira operator error: %s" % str(e))
146 changes: 146 additions & 0 deletions airflow/contrib/sensors/jira_sensor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from jira.resources import Resource

from airflow.contrib.operators.jira_operator import JIRAError
from airflow.contrib.operators.jira_operator import JiraOperator
from airflow.operators.sensors import BaseSensorOperator
from airflow.utils.decorators import apply_defaults


class JiraSensor(BaseSensorOperator):
"""
Monitors a jira ticket for any change.
:param jira_conn_id: reference to a pre-defined Jira Connection
:type jira_conn_id: str
:param method_name: method name from jira-python-sdk to be execute
:type method_name: str
:param method_params: parameters for the method method_name
:type method_params: dict
:param result_processor: function that return boolean and act as a sensor response
:type result_processor: function
"""

@apply_defaults
def __init__(self,
jira_conn_id='jira_default',
method_name=None,
method_params=None,
result_processor=None,
*args,
**kwargs):
super(JiraSensor, self).__init__(*args, **kwargs)
self.jira_conn_id = jira_conn_id
self.result_processor = None
if result_processor is not None:
self.result_processor = result_processor
self.method_name = method_name
self.method_params = method_params
self.jira_operator = JiraOperator(task_id=self.task_id,
jira_conn_id=self.jira_conn_id,
jira_method=self.method_name,
jira_method_args=self.method_params,
result_processor=self.result_processor)

def poke(self, context):
return self.jira_operator.execute(context=context)


class JiraTicketSensor(JiraSensor):
"""
Monitors a jira ticket for given change in terms of function.
:param jira_conn_id: reference to a pre-defined Jira Connection
:type jira_conn_id: str
:param ticket_id: id of the ticket to be monitored
:type ticket_id: str
:param field: field of the ticket to be monitored
:type field: str
:param expected_value: expected value of the field
:type expected_value: str
:param result_processor: function that return boolean and act as a sensor response
:type result_processor: function
"""

template_fields = ("ticket_id",)

@apply_defaults
def __init__(self,
jira_conn_id='jira_default',
ticket_id=None,
field=None,
expected_value=None,
field_checker_func=None,
*args, **kwargs):

self.jira_conn_id = jira_conn_id
self.ticket_id = ticket_id
self.field = field
self.expected_value = expected_value
if field_checker_func is None:
field_checker_func = self.issue_field_checker

super(JiraTicketSensor, self).__init__(jira_conn_id=jira_conn_id,
result_processor=field_checker_func,
*args, **kwargs)

def poke(self, context):
logging.info('Jira Sensor checking for change in ticket : {0}'
.format(self.ticket_id))

self.jira_operator.method_name = "issue"
self.jira_operator.jira_method_args = {
'id': self.ticket_id,
'fields': self.field
}
return JiraSensor.poke(self, context=context)

def issue_field_checker(self, context, issue):
result = None
try:
if issue is not None \
and self.field is not None \
and self.expected_value is not None:

field_value = getattr(issue.fields, self.field)
if field_value is not None:
if isinstance(field_value, list):
result = self.expected_value in field_value
elif isinstance(field_value, str):
result = self.expected_value.lower() == field_value.lower()
elif isinstance(field_value, Resource) \
and getattr(field_value, 'name'):
result = self.expected_value.lower() == field_value.name.lower()
else:
logging.warning("not implemented checker for issue field {0} "
"which is neither string nor list nor "
"jira Resource".format(self.field))

except JIRAError as jira_error:
logging.error("jira error while checking with expected value: {0}"
.format(jira_error))
except Exception as e:
logging.error("error while checking with expected value {0}, error: {1}"
.format(self.expected_value, e))
if result is True:
logging.info("issue field {0} has expected value {1}, returning success"
.format(self.field, self.expected_value))
else:
logging.info("issue field {0} dont have expected value {1} yet."
.format(self.field, self.expected_value))
return result
4 changes: 4 additions & 0 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ class Connection(Base):
('cloudant', 'IBM Cloudant',),
('mssql', 'Microsoft SQL Server'),
('mesos_framework-id', 'Mesos Framework ID'),
('jira', 'JIRA',),
]

def __init__(
Expand Down Expand Up @@ -655,6 +656,9 @@ def get_hook(self):
elif self.conn_type == 'cloudant':
from airflow.contrib.hooks.cloudant_hook import CloudantHook
return CloudantHook(cloudant_conn_id=self.conn_id)
elif self.conn_type == 'jira':
from airflow.contrib.hooks.jira_hook import JiraHook
return JiraHook(jira_conn_id=self.conn_id)
except:
pass

Expand Down
1 change: 1 addition & 0 deletions scripts/ci/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ impyla
ipython
jaydebeapi
jinja2<2.9.0
jira
ldap3
lxml
markdown
Expand Down
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ def write_version(filename=os.path.join(*['airflow',
]
hdfs = ['snakebite>=2.7.8']
webhdfs = ['hdfs[dataframe,avro,kerberos]>=2.0.4']
jira = ['JIRA>1.0.7']
hive = [
'hive-thrift-py>=0.0.1',
'pyhive>=0.1.3',
Expand Down Expand Up @@ -256,6 +257,7 @@ def do_setup():
'statsd': statsd,
'vertica': vertica,
'webhdfs': webhdfs,
'jira': jira,
},
classifiers=[
'Development Status :: 5 - Production/Stable',
Expand Down
Loading

0 comments on commit df9464b

Please sign in to comment.