Skip to content

Commit

Permalink
adding template support in qbol operator
Browse files Browse the repository at this point in the history
  • Loading branch information
msumit committed Feb 25, 2016
1 parent 5a2dc8f commit ab80913
Showing 1 changed file with 20 additions and 2 deletions.
22 changes: 20 additions & 2 deletions airflow/contrib/operators/qubole_operator.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import six
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
from airflow.contrib.hooks import QuboleHook


class QuboleOperator(BaseOperator):
"""
Executes commands on Qubole (https://qubole.com).
Expand Down Expand Up @@ -94,6 +94,8 @@ class QuboleOperator(BaseOperator):
"""

template_fields = ('query', 'script_location', 'sub_command', 'script', 'files', 'archives', 'program', 'cmdline', 'sql', 'where_clause', 'extract_query', 'boundary_query', 'macros', 'tags', 'name')
template_ext = ('.hql', '.sql', '.sh', '.bash', '.pig')
ui_color = '#3064A1'
ui_fgcolor = '#fff'

Expand All @@ -102,10 +104,11 @@ def __init__(self, qubole_conn_id="qubole_default", *args, **kwargs):
self.args = args
self.kwargs = kwargs
self.kwargs['qubole_conn_id'] = qubole_conn_id
self.hook = QuboleHook(*self.args, **self.kwargs)
self.hook = None
super(QuboleOperator, self).__init__(*args, **kwargs)

def execute(self, context):
self.hook = QuboleHook(*self.args, **self.kwargs)
return self.hook.execute(context)

def on_kill(self, ti):
Expand All @@ -120,5 +123,20 @@ def get_log(self, ti):
def get_jobs_id(self, ti):
return self.hook.get_jobs_id(ti)

def __getattribute__(self, name):
if name in QuboleOperator.template_fields:
if name in self.kwargs:
return self.kwargs[name]
else:
return ''
else:
return object.__getattribute__(self, name)

def __setattr__(self, name, value):
if name in QuboleOperator.template_fields:
self.kwargs[name] = value
else:
object.__setattr__(self, name, value)



0 comments on commit ab80913

Please sign in to comment.