Skip to content

Commit

Permalink
typos and xcom changes
Browse files Browse the repository at this point in the history
  • Loading branch information
msumit committed Jan 18, 2016
1 parent 841d47c commit 5779c18
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 10 deletions.
12 changes: 6 additions & 6 deletions airflow/contrib/hooks/qubole_hook.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ def __init__(self, *args, **kwargs):
self.cls = COMMAND_CLASSES[self.kwargs['command_type']]
self.cmd = None

def execute(self):
def execute(self, context):
args = self.cls.parse(self.args)
self.cmd = self.cls.create(**args)
context['task_instance'].xcom_push(key='qbol_cmd_id', value=self.cmd.id)
logging.info("Qubole command created with Id: {0} and Status: {1}".format(str(self.cmd.id), self.cmd.status))

while not Command.is_done(self.cmd.status):
Expand All @@ -61,12 +62,11 @@ def execute(self):
logging.info("Command Id: {0} and Status: {1}".format(str(self.cmd.id), self.cmd.status))

if self.kwargs.has_key('fetch_logs') and self.kwargs['fetch_logs'] == True:
logging.info("Logs for Command Id: {0}, {1}".format(str(self.cmd.id), self.cmd.get_log()))
logging.info("Logs for Command Id: {0} \n{1}".format(str(self.cmd.id), self.cmd.get_log()))

if self.cmd.status != 'done':
raise AirflowException('Command Id: {0} failed with Status: {1}'.format(self.cmd.id, self.cmd.status))

return self.cmd.id

def kill(self, ti):
"""
Expand Down Expand Up @@ -99,7 +99,7 @@ def get_results(self, ti=None, fp=None, inline=True, delim=None, fetch=True):
fp = open(resultpath + '/' + iso, 'wb')

if self.cmd is None:
cmd_id = ti.xcom_pull(key="return_value", task_ids=self.task_id)
cmd_id = ti.xcom_pull(key="qbol_cmd_id", task_ids=self.task_id)
self.cmd = self.cls.find(cmd_id)

self.cmd.get_results(fp, inline, delim, fetch)
Expand All @@ -114,7 +114,7 @@ def get_log(self, ti):
:return: command log as text
"""
if self.cmd is None:
cmd_id = ti.xcom_pull(key="return_value", task_ids=self.task_id)
cmd_id = ti.xcom_pull(key="qbol_cmd_id", task_ids=self.task_id)
Command.get_log_id(self.cls, cmd_id)

def get_jobs_id(self, ti):
Expand All @@ -124,7 +124,7 @@ def get_jobs_id(self, ti):
:return: Job informations assoiciated with command
"""
if self.cmd is None:
cmd_id = ti.xcom_pull(key="return_value", task_ids=self.task_id)
cmd_id = ti.xcom_pull(key="qbol_cmd_id", task_ids=self.task_id)
Command.get_jobs_id(self.cls, cmd_id)

def create_cmd_args(self):
Expand Down
6 changes: 3 additions & 3 deletions airflow/contrib/operators/qubole_operator.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def __init__(self, qubole_conn_id="qubole_default", *args, **kwargs):

def execute(self, context):
self.hook = QuboleHook(*self.args, **self.kwargs)
return self.hook.execute()
return self.hook.execute(context)

def on_kill(self, ti):
self.hook.kill(ti)
Expand All @@ -115,10 +115,10 @@ def get_results(self, ti=None, fp=None, inline=True, delim=None, fetch=True):
return self.hook.get_results(ti, fp, inline, delim, fetch)

def get_log(self, ti):
return self.hook(ti)
return self.hook.get_log(ti)

def get_jobs_id(self, ti):
return self.get_jobs_id(ti)
return self.hook.get_jobs_id(ti)



2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ def run(self):
'webhdfs': webhdfs,
'kerberos': kerberos,
'password': password,
'qds': qds
'github_enterprise': github_enterprise,
'qds': qds
},
author='Maxime Beauchemin',
author_email='[email protected]',
Expand Down

0 comments on commit 5779c18

Please sign in to comment.