Skip to content

Commit

Permalink
[AIRFLOW-1189] Fix get a DataFrame using BigQueryHook failing
Browse files Browse the repository at this point in the history
Closes apache#2287 from mremes/master
  • Loading branch information
mremes authored and criccomini committed May 12, 2017
1 parent 3f546e2 commit 93666f9
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 3 deletions.
11 changes: 8 additions & 3 deletions airflow/contrib/hooks/bigquery_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def insert_rows(self, table, rows, target_fields=None, commit_every=1000):
"""
raise NotImplementedError()

def get_pandas_df(self, bql, parameters=None):
def get_pandas_df(self, bql, parameters=None, dialect='legacy'):
"""
Returns a Pandas DataFrame for the results produced by a BigQuery
query. The DbApiHook method must be overridden because Pandas
Expand All @@ -85,10 +85,14 @@ def get_pandas_df(self, bql, parameters=None):
:param bql: The BigQuery SQL to execute.
:type bql: string
:param parameters: The parameters to render the SQL query with (not used, leave to override superclass method)
:type parameters: mapping or iterable
:param dialect: Dialect of BigQuery SQL – legacy SQL or standard SQL
:type dialect: string in {'legacy', 'standard'}, default 'legacy'
"""
service = self.get_service()
project = self._get_field('project')
connector = BigQueryPandasConnector(project, service)
connector = BigQueryPandasConnector(project, service, dialect=dialect)
schema, pages = connector.run_query(bql)
dataframe_list = []

Expand Down Expand Up @@ -136,13 +140,14 @@ class BigQueryPandasConnector(GbqConnector):
without forcing a three legged OAuth connection. Instead, we can inject
service account credentials into the binding.
"""
def __init__(self, project_id, service, reauth=False, verbose=False):
def __init__(self, project_id, service, reauth=False, verbose=False, dialect='legacy'):
gbq_check_google_client_version()
gbq_test_google_api_imports()
self.project_id = project_id
self.reauth = reauth
self.service = service
self.verbose = verbose
self.dialect = dialect


class BigQueryConnection(object):
Expand Down
29 changes: 29 additions & 0 deletions tests/contrib/hooks/test_bigquery_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,35 @@
from airflow.contrib.hooks import bigquery_hook as hook


class TestBigQueryDataframeResults(unittest.TestCase):
def setUp(self):
self.instance = hook.BigQueryHook()

def test_output_is_dataframe_with_valid_query(self):
import pandas as pd
df = self.instance.get_pandas_df('select 1')
self.assertIsInstance(df, pd.DataFrame)

def test_throws_exception_with_invalid_query(self):
with self.assertRaises(Exception) as context:
self.instance.get_pandas_df('from `1`')
self.assertIn('pandas_gbq.gbq.GenericGBQException: Reason: invalidQuery',
str(context.exception), "")

def test_suceeds_with_explicit_legacy_query(self):
df = self.instance.get_pandas_df('select 1', dialect='legacy')
self.assertEqual(df.iloc(0)[0][0], 1)

def test_suceeds_with_explicit_std_query(self):
df = self.instance.get_pandas_df('select * except(b) from (select 1 a, 2 b)', dialect='standard')
self.assertEqual(df.iloc(0)[0][0], 1)

def test_throws_exception_with_incompatible_syntax(self):
with self.assertRaises(Exception) as context:
self.instance.get_pandas_df('select * except(b) from (select 1 a, 2 b)', dialect='legacy')
self.assertIn('pandas_gbq.gbq.GenericGBQException: Reason: invalidQuery',
str(context.exception), "")

class TestBigQueryTableSplitter(unittest.TestCase):
def test_internal_need_default_project(self):
with self.assertRaises(Exception) as context:
Expand Down

0 comments on commit 93666f9

Please sign in to comment.