Skip to content

Commit

Permalink
HiveServer2 hack to run multi-statement in one session by passing a list
Browse files Browse the repository at this point in the history
  • Loading branch information
mistercrunch committed Jun 24, 2015
1 parent f163819 commit 4249f66
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 17 deletions.
1 change: 0 additions & 1 deletion TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ TODO
* Add a run_only_latest flag to BaseOperator, runs only most recent task instance where deps are met
* Pickle all the THINGS!
* Distributed scheduler
* Add decorator to timeout imports on master process [lib](https://github.com/pnpnpn/timeout-decorator)
* Raise errors when setting dependencies on task in foreign DAGs
* Add an is_test flag to the run context
* Add operator to task_instance table
Expand Down
41 changes: 25 additions & 16 deletions airflow/hooks/hive_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,19 +298,25 @@ def get_results(self, hql, schema='default', arraysize=1000):
authMechanism="NOSASL",
user='airflow',
database=schema) as conn:
with conn.cursor() as cur:
cur.execute(hql)
results = cur.fetchall()
if results:
return {
'data': results,
'header': cur.getSchema(),
}
else:
return {
'data': [],
'header': [],
}

# Hack to allow multiple statements to run in the same session,
# only the results from the last one is returned
if isinstance(hql, basestring):
hql = [hql]
results = {
'data': [],
'header': [],
}
for statement in hql:
with conn.cursor() as cur:
cur.execute(statement)
records = cur.fetchall()
if records:
results = {
'data': records,
'header': cur.getSchema(),
}
return results

def to_csv(self, hql, csv_filepath, schema='default'):
schema = schema or 'default'
Expand Down Expand Up @@ -358,9 +364,12 @@ def get_pandas_df(self, hql, schema='default'):
'''
import pandas as pd
res = self.get_results(hql, schema=schema)
df = pd.DataFrame(res['data'])
df.columns = [c['columnName'] for c in res['header']]
return df
if res:
df = pd.DataFrame(res['data'])
df.columns = [c['columnName'] for c in res['header']]
return df
else:
return pd.DataFrame()

def run(self, hql, schema=None):
self.hive._oprot.trans.open()
Expand Down

0 comments on commit 4249f66

Please sign in to comment.